Files
my-pal-mcp-server/simulator_tests/test_redis_validation.py
Fahad 7462599ddb Simplified thread continuations
Fixed and improved tests
2025-06-12 12:47:02 +04:00

140 lines
5.2 KiB
Python

#!/usr/bin/env python3
"""
Redis Conversation Memory Validation Test
Validates that conversation memory is working via Redis by checking
for stored conversation threads and their content.
"""
import json
from .base_test import BaseSimulatorTest
class RedisValidationTest(BaseSimulatorTest):
"""Validate that conversation memory is working via Redis"""
@property
def test_name(self) -> str:
return "redis_validation"
@property
def test_description(self) -> str:
return "Redis conversation memory validation"
def run_test(self) -> bool:
"""Validate that conversation memory is working via Redis"""
try:
self.logger.info("💾 Test: Validating conversation memory via Redis...")
# First, test Redis connectivity
ping_result = self.run_command(
["docker", "exec", self.redis_container, "redis-cli", "ping"], capture_output=True
)
if ping_result.returncode != 0:
self.logger.error("Failed to connect to Redis")
return False
if "PONG" not in ping_result.stdout.decode():
self.logger.error("Redis ping failed")
return False
self.logger.info("✅ Redis connectivity confirmed")
# Check Redis for stored conversations
result = self.run_command(
["docker", "exec", self.redis_container, "redis-cli", "KEYS", "thread:*"], capture_output=True
)
if result.returncode != 0:
self.logger.error("Failed to query Redis")
return False
keys = result.stdout.decode().strip().split("\n")
thread_keys = [k for k in keys if k.startswith("thread:") and k != "thread:*"]
if thread_keys:
self.logger.info(f"✅ Found {len(thread_keys)} conversation threads in Redis")
# Get details of first thread
thread_key = thread_keys[0]
result = self.run_command(
["docker", "exec", self.redis_container, "redis-cli", "GET", thread_key], capture_output=True
)
if result.returncode == 0:
thread_data = result.stdout.decode()
try:
parsed = json.loads(thread_data)
turns = parsed.get("turns", [])
self.logger.info(f"✅ Thread has {len(turns)} turns")
return True
except json.JSONDecodeError:
self.logger.warning("Could not parse thread data")
return True
else:
# If no existing threads, create a test thread to validate Redis functionality
self.logger.info(" No existing threads found, creating test thread to validate Redis...")
test_thread_id = "test_thread_validation"
test_data = {
"thread_id": test_thread_id,
"turns": [
{"tool": "chat", "timestamp": "2025-06-11T16:30:00Z", "prompt": "Test validation prompt"}
],
}
# Store test data
store_result = self.run_command(
[
"docker",
"exec",
self.redis_container,
"redis-cli",
"SET",
f"thread:{test_thread_id}",
json.dumps(test_data),
],
capture_output=True,
)
if store_result.returncode != 0:
self.logger.error("Failed to store test data in Redis")
return False
# Retrieve test data
retrieve_result = self.run_command(
["docker", "exec", self.redis_container, "redis-cli", "GET", f"thread:{test_thread_id}"],
capture_output=True,
)
if retrieve_result.returncode != 0:
self.logger.error("Failed to retrieve test data from Redis")
return False
retrieved_data = retrieve_result.stdout.decode()
try:
parsed = json.loads(retrieved_data)
if parsed.get("thread_id") == test_thread_id:
self.logger.info("✅ Redis read/write validation successful")
# Clean up test data
self.run_command(
["docker", "exec", self.redis_container, "redis-cli", "DEL", f"thread:{test_thread_id}"],
capture_output=True,
)
return True
else:
self.logger.error("Retrieved data doesn't match stored data")
return False
except json.JSONDecodeError:
self.logger.error("Could not parse retrieved test data")
return False
except Exception as e:
self.logger.error(f"Conversation memory validation failed: {e}")
return False