WIP - communication memory

This commit is contained in:
Fahad
2025-06-10 19:16:51 +04:00
parent bb8a101dbf
commit f5060367a0
23 changed files with 2111 additions and 716 deletions

View File

@@ -0,0 +1,413 @@
"""
Test suite for Claude continuation opportunities
Tests the system that offers Claude the opportunity to continue conversations
when Gemini doesn't explicitly ask a follow-up question.
"""
import json
from unittest.mock import Mock, patch
import pytest
from pydantic import Field
from tools.base import BaseTool, ToolRequest
from tools.models import ContinuationOffer, ToolOutput
from utils.conversation_memory import MAX_CONVERSATION_TURNS
class ContinuationRequest(ToolRequest):
"""Test request model with prompt field"""
prompt: str = Field(..., description="The prompt to analyze")
files: list[str] = Field(default_factory=list, description="Optional files to analyze")
class ClaudeContinuationTool(BaseTool):
"""Test tool for continuation functionality"""
def get_name(self) -> str:
return "test_continuation"
def get_description(self) -> str:
return "Test tool for Claude continuation"
def get_input_schema(self) -> dict:
return {
"type": "object",
"properties": {
"prompt": {"type": "string"},
"continuation_id": {"type": "string", "required": False},
},
}
def get_system_prompt(self) -> str:
return "Test system prompt"
def get_request_model(self):
return ContinuationRequest
async def prepare_prompt(self, request) -> str:
return f"System: {self.get_system_prompt()}\nUser: {request.prompt}"
class TestClaudeContinuationOffers:
"""Test Claude continuation offer functionality"""
def setup_method(self):
self.tool = ClaudeContinuationTool()
@patch("utils.conversation_memory.get_redis_client")
def test_new_conversation_offers_continuation(self, mock_redis):
"""Test that new conversations offer Claude continuation opportunity"""
mock_client = Mock()
mock_redis.return_value = mock_client
# Test request without continuation_id (new conversation)
request = ContinuationRequest(prompt="Analyze this code")
# Check continuation opportunity
continuation_data = self.tool._check_continuation_opportunity(request)
assert continuation_data is not None
assert continuation_data["remaining_turns"] == MAX_CONVERSATION_TURNS - 1
assert continuation_data["tool_name"] == "test_continuation"
def test_existing_conversation_no_continuation_offer(self):
"""Test that existing threaded conversations don't offer continuation"""
# Test request with continuation_id (existing conversation)
request = ContinuationRequest(
prompt="Continue analysis", continuation_id="12345678-1234-1234-1234-123456789012"
)
# Check continuation opportunity
continuation_data = self.tool._check_continuation_opportunity(request)
assert continuation_data is None
@patch("utils.conversation_memory.get_redis_client")
def test_create_continuation_offer_response(self, mock_redis):
"""Test creating continuation offer response"""
mock_client = Mock()
mock_redis.return_value = mock_client
request = ContinuationRequest(prompt="Test prompt")
content = "This is the analysis result."
continuation_data = {"remaining_turns": 4, "tool_name": "test_continuation"}
# Create continuation offer response
response = self.tool._create_continuation_offer_response(content, continuation_data, request)
assert isinstance(response, ToolOutput)
assert response.status == "continuation_available"
assert response.content == content
assert response.continuation_offer is not None
offer = response.continuation_offer
assert isinstance(offer, ContinuationOffer)
assert offer.remaining_turns == 4
assert "continuation_id" in offer.suggested_tool_params
assert "You have 4 more exchange(s) available" in offer.message_to_user
@patch("utils.conversation_memory.get_redis_client")
async def test_full_response_flow_with_continuation_offer(self, mock_redis):
"""Test complete response flow that creates continuation offer"""
mock_client = Mock()
mock_redis.return_value = mock_client
# Mock the model to return a response without follow-up question
with patch.object(self.tool, "create_model") as mock_create_model:
mock_model = Mock()
mock_response = Mock()
mock_response.candidates = [
Mock(
content=Mock(parts=[Mock(text="Analysis complete. The code looks good.")]),
finish_reason="STOP",
)
]
mock_model.generate_content.return_value = mock_response
mock_create_model.return_value = mock_model
# Execute tool with new conversation
arguments = {"prompt": "Analyze this code"}
response = await self.tool.execute(arguments)
# Parse response
assert len(response) == 1
response_data = json.loads(response[0].text)
# Debug output
if response_data.get("status") == "error":
print(f"Error content: {response_data.get('content')}")
assert response_data["status"] == "continuation_available"
assert response_data["content"] == "Analysis complete. The code looks good."
assert "continuation_offer" in response_data
offer = response_data["continuation_offer"]
assert "continuation_id" in offer
assert offer["remaining_turns"] == MAX_CONVERSATION_TURNS - 1
assert "You have" in offer["message_to_user"]
assert "more exchange(s) available" in offer["message_to_user"]
@patch("utils.conversation_memory.get_redis_client")
async def test_gemini_follow_up_takes_precedence(self, mock_redis):
"""Test that Gemini follow-up questions take precedence over continuation offers"""
mock_client = Mock()
mock_redis.return_value = mock_client
# Mock the model to return a response WITH follow-up question
with patch.object(self.tool, "create_model") as mock_create_model:
mock_model = Mock()
mock_response = Mock()
mock_response.candidates = [
Mock(
content=Mock(
parts=[
Mock(
text="""Analysis complete. The code looks good.
```json
{
"follow_up_question": "Would you like me to examine the error handling patterns?",
"suggested_params": {"files": ["/src/error_handler.py"]},
"ui_hint": "Examining error handling would help ensure robustness"
}
```"""
)
]
),
finish_reason="STOP",
)
]
mock_model.generate_content.return_value = mock_response
mock_create_model.return_value = mock_model
# Execute tool
arguments = {"prompt": "Analyze this code"}
response = await self.tool.execute(arguments)
# Parse response
response_data = json.loads(response[0].text)
# Should be follow-up, not continuation offer
assert response_data["status"] == "requires_continuation"
assert "follow_up_request" in response_data
assert response_data.get("continuation_offer") is None
@patch("utils.conversation_memory.get_redis_client")
async def test_threaded_conversation_no_continuation_offer(self, mock_redis):
"""Test that threaded conversations don't get continuation offers"""
mock_client = Mock()
mock_redis.return_value = mock_client
# Mock existing thread context
from utils.conversation_memory import ThreadContext
thread_context = ThreadContext(
thread_id="12345678-1234-1234-1234-123456789012",
created_at="2023-01-01T00:00:00Z",
last_updated_at="2023-01-01T00:01:00Z",
tool_name="test_continuation",
turns=[],
initial_context={"prompt": "Previous analysis"},
)
mock_client.get.return_value = thread_context.model_dump_json()
# Mock the model
with patch.object(self.tool, "create_model") as mock_create_model:
mock_model = Mock()
mock_response = Mock()
mock_response.candidates = [
Mock(
content=Mock(parts=[Mock(text="Continued analysis complete.")]),
finish_reason="STOP",
)
]
mock_model.generate_content.return_value = mock_response
mock_create_model.return_value = mock_model
# Execute tool with continuation_id
arguments = {"prompt": "Continue the analysis", "continuation_id": "12345678-1234-1234-1234-123456789012"}
response = await self.tool.execute(arguments)
# Parse response
response_data = json.loads(response[0].text)
# Should be regular success, not continuation offer
assert response_data["status"] == "success"
assert response_data.get("continuation_offer") is None
def test_max_turns_reached_no_continuation_offer(self):
"""Test that no continuation is offered when max turns would be exceeded"""
# Mock MAX_CONVERSATION_TURNS to be 1 for this test
with patch("utils.conversation_memory.MAX_CONVERSATION_TURNS", 1):
request = ContinuationRequest(prompt="Test prompt")
# Check continuation opportunity
continuation_data = self.tool._check_continuation_opportunity(request)
# Should be None because remaining_turns would be 0
assert continuation_data is None
@patch("utils.conversation_memory.get_redis_client")
def test_continuation_offer_thread_creation_failure_fallback(self, mock_redis):
"""Test fallback to normal response when thread creation fails"""
# Mock Redis to fail
mock_client = Mock()
mock_client.setex.side_effect = Exception("Redis failure")
mock_redis.return_value = mock_client
request = ContinuationRequest(prompt="Test prompt")
content = "Analysis result"
continuation_data = {"remaining_turns": 4, "tool_name": "test_continuation"}
# Should fallback to normal response
response = self.tool._create_continuation_offer_response(content, continuation_data, request)
assert response.status == "success"
assert response.content == content
assert response.continuation_offer is None
@patch("utils.conversation_memory.get_redis_client")
def test_continuation_offer_message_format(self, mock_redis):
"""Test that continuation offer message is properly formatted for Claude"""
mock_client = Mock()
mock_redis.return_value = mock_client
request = ContinuationRequest(prompt="Analyze architecture")
content = "Architecture analysis complete."
continuation_data = {"remaining_turns": 3, "tool_name": "test_continuation"}
response = self.tool._create_continuation_offer_response(content, continuation_data, request)
offer = response.continuation_offer
message = offer.message_to_user
# Check message contains key information for Claude
assert "continue this analysis" in message
assert "continuation_id" in message
assert "test_continuation tool call" in message
assert "3 more exchange(s)" in message
# Check suggested params are properly formatted
suggested_params = offer.suggested_tool_params
assert "continuation_id" in suggested_params
assert "prompt" in suggested_params
assert isinstance(suggested_params["continuation_id"], str)
@patch("utils.conversation_memory.get_redis_client")
def test_continuation_offer_metadata(self, mock_redis):
"""Test that continuation offer includes proper metadata"""
mock_client = Mock()
mock_redis.return_value = mock_client
request = ContinuationRequest(prompt="Test")
content = "Test content"
continuation_data = {"remaining_turns": 2, "tool_name": "test_continuation"}
response = self.tool._create_continuation_offer_response(content, continuation_data, request)
metadata = response.metadata
assert metadata["tool_name"] == "test_continuation"
assert metadata["remaining_turns"] == 2
assert "thread_id" in metadata
assert len(metadata["thread_id"]) == 36 # UUID length
class TestContinuationIntegration:
"""Integration tests for continuation offers with conversation memory"""
def setup_method(self):
self.tool = ClaudeContinuationTool()
@patch("utils.conversation_memory.get_redis_client")
def test_continuation_offer_creates_proper_thread(self, mock_redis):
"""Test that continuation offers create properly formatted threads"""
mock_client = Mock()
mock_redis.return_value = mock_client
# Mock the get call that add_turn makes to retrieve the existing thread
# We'll set this up after the first setex call
def side_effect_get(key):
# Return the context from the first setex call
if mock_client.setex.call_count > 0:
first_call_data = mock_client.setex.call_args_list[0][0][2]
return first_call_data
return None
mock_client.get.side_effect = side_effect_get
request = ContinuationRequest(prompt="Initial analysis", files=["/test/file.py"])
content = "Analysis result"
continuation_data = {"remaining_turns": 4, "tool_name": "test_continuation"}
response = self.tool._create_continuation_offer_response(content, continuation_data, request)
# Verify thread creation was called (should be called twice: create_thread + add_turn)
assert mock_client.setex.call_count == 2
# Check the first call (create_thread)
first_call = mock_client.setex.call_args_list[0]
thread_key = first_call[0][0]
assert thread_key.startswith("thread:")
assert len(thread_key.split(":")[-1]) == 36 # UUID length
# Check the second call (add_turn) which should have the assistant response
second_call = mock_client.setex.call_args_list[1]
thread_data = second_call[0][2]
thread_context = json.loads(thread_data)
assert thread_context["tool_name"] == "test_continuation"
assert len(thread_context["turns"]) == 1 # Assistant's response added
assert thread_context["turns"][0]["role"] == "assistant"
assert thread_context["turns"][0]["content"] == content
assert thread_context["turns"][0]["files"] == ["/test/file.py"] # Files from request
assert thread_context["initial_context"]["prompt"] == "Initial analysis"
assert thread_context["initial_context"]["files"] == ["/test/file.py"]
@patch("utils.conversation_memory.get_redis_client")
def test_claude_can_use_continuation_id(self, mock_redis):
"""Test that Claude can use the provided continuation_id in subsequent calls"""
mock_client = Mock()
mock_redis.return_value = mock_client
# Step 1: Initial request creates continuation offer
request1 = ToolRequest(prompt="Analyze code structure")
continuation_data = {"remaining_turns": 4, "tool_name": "test_continuation"}
response1 = self.tool._create_continuation_offer_response(
"Structure analysis done.", continuation_data, request1
)
thread_id = response1.continuation_offer.continuation_id
# Step 2: Mock the thread context for Claude's follow-up
from utils.conversation_memory import ConversationTurn, ThreadContext
existing_context = ThreadContext(
thread_id=thread_id,
created_at="2023-01-01T00:00:00Z",
last_updated_at="2023-01-01T00:01:00Z",
tool_name="test_continuation",
turns=[
ConversationTurn(
role="assistant",
content="Structure analysis done.",
timestamp="2023-01-01T00:00:30Z",
tool_name="test_continuation",
)
],
initial_context={"prompt": "Analyze code structure"},
)
mock_client.get.return_value = existing_context.model_dump_json()
# Step 3: Claude uses continuation_id
request2 = ToolRequest(prompt="Now analyze the performance aspects", continuation_id=thread_id)
# This should NOT offer another continuation (already threaded)
continuation_data2 = self.tool._check_continuation_opportunity(request2)
assert continuation_data2 is None
if __name__ == "__main__":
pytest.main([__file__])

View File

@@ -0,0 +1,721 @@
"""
Test suite for conversation memory system
Tests the Redis-based conversation persistence needed for AI-to-AI multi-turn
discussions in stateless MCP environments.
"""
from unittest.mock import Mock, patch
import pytest
from server import get_follow_up_instructions
from utils.conversation_memory import (
MAX_CONVERSATION_TURNS,
ConversationTurn,
ThreadContext,
add_turn,
build_conversation_history,
create_thread,
get_thread,
)
class TestConversationMemory:
"""Test the conversation memory system for stateless MCP requests"""
@patch("utils.conversation_memory.get_redis_client")
def test_create_thread(self, mock_redis):
"""Test creating a new thread"""
mock_client = Mock()
mock_redis.return_value = mock_client
thread_id = create_thread("chat", {"prompt": "Hello", "files": ["/test.py"]})
assert thread_id is not None
assert len(thread_id) == 36 # UUID4 length
# Verify Redis was called
mock_client.setex.assert_called_once()
call_args = mock_client.setex.call_args
assert call_args[0][0] == f"thread:{thread_id}" # key
assert call_args[0][1] == 3600 # TTL
@patch("utils.conversation_memory.get_redis_client")
def test_get_thread_valid(self, mock_redis):
"""Test retrieving an existing thread"""
mock_client = Mock()
mock_redis.return_value = mock_client
test_uuid = "12345678-1234-1234-1234-123456789012"
# Create valid ThreadContext and serialize it
context_obj = ThreadContext(
thread_id=test_uuid,
created_at="2023-01-01T00:00:00Z",
last_updated_at="2023-01-01T00:01:00Z",
tool_name="chat",
turns=[],
initial_context={"prompt": "test"},
)
mock_client.get.return_value = context_obj.model_dump_json()
context = get_thread(test_uuid)
assert context is not None
assert context.thread_id == test_uuid
assert context.tool_name == "chat"
mock_client.get.assert_called_once_with(f"thread:{test_uuid}")
@patch("utils.conversation_memory.get_redis_client")
def test_get_thread_invalid_uuid(self, mock_redis):
"""Test handling invalid UUID"""
context = get_thread("invalid-uuid")
assert context is None
@patch("utils.conversation_memory.get_redis_client")
def test_get_thread_not_found(self, mock_redis):
"""Test handling thread not found"""
mock_client = Mock()
mock_redis.return_value = mock_client
mock_client.get.return_value = None
context = get_thread("12345678-1234-1234-1234-123456789012")
assert context is None
@patch("utils.conversation_memory.get_redis_client")
def test_add_turn_success(self, mock_redis):
"""Test adding a turn to existing thread"""
mock_client = Mock()
mock_redis.return_value = mock_client
test_uuid = "12345678-1234-1234-1234-123456789012"
# Create valid ThreadContext
context_obj = ThreadContext(
thread_id=test_uuid,
created_at="2023-01-01T00:00:00Z",
last_updated_at="2023-01-01T00:01:00Z",
tool_name="chat",
turns=[],
initial_context={"prompt": "test"},
)
mock_client.get.return_value = context_obj.model_dump_json()
success = add_turn(test_uuid, "user", "Hello there")
assert success is True
# Verify Redis get and setex were called
mock_client.get.assert_called_once()
mock_client.setex.assert_called_once()
@patch("utils.conversation_memory.get_redis_client")
def test_add_turn_max_limit(self, mock_redis):
"""Test turn limit enforcement"""
mock_client = Mock()
mock_redis.return_value = mock_client
test_uuid = "12345678-1234-1234-1234-123456789012"
# Create thread with MAX_CONVERSATION_TURNS turns (at limit)
turns = [
ConversationTurn(role="user", content=f"Turn {i}", timestamp="2023-01-01T00:00:00Z")
for i in range(MAX_CONVERSATION_TURNS)
]
context_obj = ThreadContext(
thread_id=test_uuid,
created_at="2023-01-01T00:00:00Z",
last_updated_at="2023-01-01T00:01:00Z",
tool_name="chat",
turns=turns,
initial_context={"prompt": "test"},
)
mock_client.get.return_value = context_obj.model_dump_json()
success = add_turn(test_uuid, "user", "This should fail")
assert success is False
def test_build_conversation_history(self):
"""Test building conversation history format with files and speaker identification"""
test_uuid = "12345678-1234-1234-1234-123456789012"
turns = [
ConversationTurn(
role="user",
content="What is Python?",
timestamp="2023-01-01T00:00:00Z",
files=["/home/user/main.py", "/home/user/docs/readme.md"],
),
ConversationTurn(
role="assistant",
content="Python is a programming language",
timestamp="2023-01-01T00:01:00Z",
follow_up_question="Would you like examples?",
files=["/home/user/examples/"],
tool_name="chat",
),
]
context = ThreadContext(
thread_id=test_uuid,
created_at="2023-01-01T00:00:00Z",
last_updated_at="2023-01-01T00:01:00Z",
tool_name="chat",
turns=turns,
initial_context={},
)
history = build_conversation_history(context)
# Test basic structure
assert "CONVERSATION HISTORY" in history
assert f"Thread: {test_uuid}" in history
assert "Tool: chat" in history
assert f"Turn 2/{MAX_CONVERSATION_TURNS}" in history
# Test speaker identification
assert "--- Turn 1 (Claude) ---" in history
assert "--- Turn 2 (Gemini using chat) ---" in history
# Test content
assert "What is Python?" in history
assert "Python is a programming language" in history
# Test file tracking
assert "📁 Files referenced: /home/user/main.py, /home/user/docs/readme.md" in history
assert "📁 Files referenced: /home/user/examples/" in history
# Test follow-up attribution
assert "[Gemini's Follow-up: Would you like examples?]" in history
def test_build_conversation_history_empty(self):
"""Test building history with no turns"""
test_uuid = "12345678-1234-1234-1234-123456789012"
context = ThreadContext(
thread_id=test_uuid,
created_at="2023-01-01T00:00:00Z",
last_updated_at="2023-01-01T00:00:00Z",
tool_name="chat",
turns=[],
initial_context={},
)
history = build_conversation_history(context)
assert history == ""
class TestConversationFlow:
"""Test complete conversation flows simulating stateless MCP requests"""
@patch("utils.conversation_memory.get_redis_client")
def test_complete_conversation_cycle(self, mock_redis):
"""Test a complete 5-turn conversation until limit reached"""
mock_client = Mock()
mock_redis.return_value = mock_client
# Simulate independent MCP request cycles
# REQUEST 1: Initial request creates thread
thread_id = create_thread("chat", {"prompt": "Analyze this code"})
initial_context = ThreadContext(
thread_id=thread_id,
created_at="2023-01-01T00:00:00Z",
last_updated_at="2023-01-01T00:00:00Z",
tool_name="chat",
turns=[],
initial_context={"prompt": "Analyze this code"},
)
mock_client.get.return_value = initial_context.model_dump_json()
# Add assistant response with follow-up
success = add_turn(
thread_id,
"assistant",
"Code analysis complete",
follow_up_question="Would you like me to check error handling?",
)
assert success is True
# REQUEST 2: User responds to follow-up (independent request cycle)
# Simulate retrieving updated context from Redis
context_after_1 = ThreadContext(
thread_id=thread_id,
created_at="2023-01-01T00:00:00Z",
last_updated_at="2023-01-01T00:01:00Z",
tool_name="chat",
turns=[
ConversationTurn(
role="assistant",
content="Code analysis complete",
timestamp="2023-01-01T00:00:30Z",
follow_up_question="Would you like me to check error handling?",
)
],
initial_context={"prompt": "Analyze this code"},
)
mock_client.get.return_value = context_after_1.model_dump_json()
success = add_turn(thread_id, "user", "Yes, check error handling")
assert success is True
success = add_turn(
thread_id, "assistant", "Error handling reviewed", follow_up_question="Should I examine the test coverage?"
)
assert success is True
# REQUEST 3-5: Continue conversation (simulating independent cycles)
# After turn 3
context_after_3 = ThreadContext(
thread_id=thread_id,
created_at="2023-01-01T00:00:00Z",
last_updated_at="2023-01-01T00:03:00Z",
tool_name="chat",
turns=[
ConversationTurn(
role="assistant",
content="Code analysis complete",
timestamp="2023-01-01T00:00:30Z",
follow_up_question="Would you like me to check error handling?",
),
ConversationTurn(role="user", content="Yes, check error handling", timestamp="2023-01-01T00:01:30Z"),
ConversationTurn(
role="assistant",
content="Error handling reviewed",
timestamp="2023-01-01T00:02:30Z",
follow_up_question="Should I examine the test coverage?",
),
],
initial_context={"prompt": "Analyze this code"},
)
mock_client.get.return_value = context_after_3.model_dump_json()
success = add_turn(thread_id, "user", "Yes, check tests")
assert success is True
success = add_turn(thread_id, "assistant", "Test coverage analyzed")
assert success is True
# REQUEST 6: Try to exceed MAX_CONVERSATION_TURNS limit - should fail
turns_at_limit = [
ConversationTurn(
role="assistant" if i % 2 == 0 else "user", content=f"Turn {i+1}", timestamp="2023-01-01T00:00:30Z"
)
for i in range(MAX_CONVERSATION_TURNS)
]
context_at_limit = ThreadContext(
thread_id=thread_id,
created_at="2023-01-01T00:00:00Z",
last_updated_at="2023-01-01T00:05:00Z",
tool_name="chat",
turns=turns_at_limit,
initial_context={"prompt": "Analyze this code"},
)
mock_client.get.return_value = context_at_limit.model_dump_json()
# This should fail - conversation has reached limit
success = add_turn(thread_id, "user", "This should be rejected")
assert success is False # CONVERSATION STOPS HERE
@patch("utils.conversation_memory.get_redis_client")
def test_invalid_continuation_id_error(self, mock_redis):
"""Test that invalid continuation IDs raise proper error for restart"""
from server import reconstruct_thread_context
mock_client = Mock()
mock_redis.return_value = mock_client
mock_client.get.return_value = None # Thread not found
arguments = {"continuation_id": "invalid-uuid-12345", "prompt": "Continue conversation"}
# Should raise ValueError asking to restart
with pytest.raises(ValueError) as exc_info:
import asyncio
asyncio.run(reconstruct_thread_context(arguments))
error_msg = str(exc_info.value)
assert "Conversation thread 'invalid-uuid-12345' was not found or has expired" in error_msg
assert (
"Please restart the conversation by providing your full question/prompt without the continuation_id"
in error_msg
)
def test_dynamic_max_turns_configuration(self):
"""Test that all functions respect MAX_CONVERSATION_TURNS configuration"""
# This test ensures if we change MAX_CONVERSATION_TURNS, everything updates
# Test with different max values by patching the constant
test_values = [3, 7, 10]
for test_max in test_values:
# Create turns up to the test limit
turns = [
ConversationTurn(role="user", content=f"Turn {i}", timestamp="2023-01-01T00:00:00Z")
for i in range(test_max)
]
# Test history building respects the limit
test_uuid = "12345678-1234-1234-1234-123456789012"
context = ThreadContext(
thread_id=test_uuid,
created_at="2023-01-01T00:00:00Z",
last_updated_at="2023-01-01T00:00:00Z",
tool_name="chat",
turns=turns,
initial_context={},
)
history = build_conversation_history(context)
expected_turn_text = f"Turn {test_max}/{MAX_CONVERSATION_TURNS}"
assert expected_turn_text in history
def test_follow_up_instructions_dynamic_behavior(self):
"""Test that follow-up instructions change correctly based on turn count and max setting"""
# Test with default MAX_CONVERSATION_TURNS
max_turns = MAX_CONVERSATION_TURNS
# Test early conversation (should allow follow-ups)
early_instructions = get_follow_up_instructions(0, max_turns)
assert "FOLLOW-UP CONVERSATIONS" in early_instructions
assert f"{max_turns - 1} more exchange" in early_instructions
# Test mid conversation
mid_instructions = get_follow_up_instructions(2, max_turns)
assert "FOLLOW-UP CONVERSATIONS" in mid_instructions
assert f"{max_turns - 3} more exchange" in mid_instructions
# Test approaching limit (should stop follow-ups)
limit_instructions = get_follow_up_instructions(max_turns - 1, max_turns)
assert "Do NOT include any follow-up questions" in limit_instructions
assert "FOLLOW-UP CONVERSATIONS" not in limit_instructions
# Test at limit
at_limit_instructions = get_follow_up_instructions(max_turns, max_turns)
assert "Do NOT include any follow-up questions" in at_limit_instructions
# Test with custom max_turns to ensure dynamic behavior
custom_max = 3
custom_early = get_follow_up_instructions(0, custom_max)
assert f"{custom_max - 1} more exchange" in custom_early
custom_limit = get_follow_up_instructions(custom_max - 1, custom_max)
assert "Do NOT include any follow-up questions" in custom_limit
def test_follow_up_instructions_defaults_to_config(self):
"""Test that follow-up instructions use MAX_CONVERSATION_TURNS when max_turns not provided"""
instructions = get_follow_up_instructions(0) # No max_turns parameter
expected_remaining = MAX_CONVERSATION_TURNS - 1
assert f"{expected_remaining} more exchange" in instructions
@patch("utils.conversation_memory.get_redis_client")
def test_complete_conversation_with_dynamic_turns(self, mock_redis):
"""Test complete conversation respecting MAX_CONVERSATION_TURNS dynamically"""
mock_client = Mock()
mock_redis.return_value = mock_client
thread_id = create_thread("chat", {"prompt": "Start conversation"})
# Simulate conversation up to MAX_CONVERSATION_TURNS - 1
for turn_num in range(MAX_CONVERSATION_TURNS - 1):
# Mock context with current turns
turns = [
ConversationTurn(
role="user" if i % 2 == 0 else "assistant", content=f"Turn {i+1}", timestamp="2023-01-01T00:00:00Z"
)
for i in range(turn_num)
]
context = ThreadContext(
thread_id=thread_id,
created_at="2023-01-01T00:00:00Z",
last_updated_at="2023-01-01T00:00:00Z",
tool_name="chat",
turns=turns,
initial_context={"prompt": "Start conversation"},
)
mock_client.get.return_value = context.model_dump_json()
# Should succeed
success = add_turn(thread_id, "user", f"User turn {turn_num + 1}")
assert success is True, f"Turn {turn_num + 1} should succeed"
# Now we should be at the limit - create final context
final_turns = [
ConversationTurn(
role="user" if i % 2 == 0 else "assistant", content=f"Turn {i+1}", timestamp="2023-01-01T00:00:00Z"
)
for i in range(MAX_CONVERSATION_TURNS)
]
final_context = ThreadContext(
thread_id=thread_id,
created_at="2023-01-01T00:00:00Z",
last_updated_at="2023-01-01T00:00:00Z",
tool_name="chat",
turns=final_turns,
initial_context={"prompt": "Start conversation"},
)
mock_client.get.return_value = final_context.model_dump_json()
# This should fail - at the limit
success = add_turn(thread_id, "user", "This should fail")
assert success is False, f"Turn {MAX_CONVERSATION_TURNS + 1} should fail"
@patch("utils.conversation_memory.get_redis_client")
def test_conversation_with_files_and_context_preservation(self, mock_redis):
"""Test complete conversation flow with file tracking and context preservation"""
mock_client = Mock()
mock_redis.return_value = mock_client
# Start conversation with files
thread_id = create_thread("analyze", {"prompt": "Analyze this codebase", "files": ["/project/src/"]})
# Turn 1: Claude provides context with multiple files
initial_context = ThreadContext(
thread_id=thread_id,
created_at="2023-01-01T00:00:00Z",
last_updated_at="2023-01-01T00:00:00Z",
tool_name="analyze",
turns=[],
initial_context={"prompt": "Analyze this codebase", "files": ["/project/src/"]},
)
mock_client.get.return_value = initial_context.model_dump_json()
# Add Gemini's response with follow-up
success = add_turn(
thread_id,
"assistant",
"I've analyzed your codebase structure.",
follow_up_question="Would you like me to examine the test coverage?",
files=["/project/src/main.py", "/project/src/utils.py"],
tool_name="analyze",
)
assert success is True
# Turn 2: Claude responds with different files
context_turn_1 = ThreadContext(
thread_id=thread_id,
created_at="2023-01-01T00:00:00Z",
last_updated_at="2023-01-01T00:01:00Z",
tool_name="analyze",
turns=[
ConversationTurn(
role="assistant",
content="I've analyzed your codebase structure.",
timestamp="2023-01-01T00:00:30Z",
follow_up_question="Would you like me to examine the test coverage?",
files=["/project/src/main.py", "/project/src/utils.py"],
tool_name="analyze",
)
],
initial_context={"prompt": "Analyze this codebase", "files": ["/project/src/"]},
)
mock_client.get.return_value = context_turn_1.model_dump_json()
# User responds with test files
success = add_turn(
thread_id, "user", "Yes, check the test coverage", files=["/project/tests/", "/project/test_main.py"]
)
assert success is True
# Turn 3: Gemini analyzes tests
context_turn_2 = ThreadContext(
thread_id=thread_id,
created_at="2023-01-01T00:00:00Z",
last_updated_at="2023-01-01T00:02:00Z",
tool_name="analyze",
turns=[
ConversationTurn(
role="assistant",
content="I've analyzed your codebase structure.",
timestamp="2023-01-01T00:00:30Z",
follow_up_question="Would you like me to examine the test coverage?",
files=["/project/src/main.py", "/project/src/utils.py"],
tool_name="analyze",
),
ConversationTurn(
role="user",
content="Yes, check the test coverage",
timestamp="2023-01-01T00:01:30Z",
files=["/project/tests/", "/project/test_main.py"],
),
],
initial_context={"prompt": "Analyze this codebase", "files": ["/project/src/"]},
)
mock_client.get.return_value = context_turn_2.model_dump_json()
success = add_turn(
thread_id,
"assistant",
"Test coverage analysis complete. Coverage is 85%.",
files=["/project/tests/test_utils.py", "/project/coverage.html"],
tool_name="analyze",
)
assert success is True
# Build conversation history and verify chronological file preservation
final_context = ThreadContext(
thread_id=thread_id,
created_at="2023-01-01T00:00:00Z",
last_updated_at="2023-01-01T00:03:00Z",
tool_name="analyze",
turns=[
ConversationTurn(
role="assistant",
content="I've analyzed your codebase structure.",
timestamp="2023-01-01T00:00:30Z",
follow_up_question="Would you like me to examine the test coverage?",
files=["/project/src/main.py", "/project/src/utils.py"],
tool_name="analyze",
),
ConversationTurn(
role="user",
content="Yes, check the test coverage",
timestamp="2023-01-01T00:01:30Z",
files=["/project/tests/", "/project/test_main.py"],
),
ConversationTurn(
role="assistant",
content="Test coverage analysis complete. Coverage is 85%.",
timestamp="2023-01-01T00:02:30Z",
files=["/project/tests/test_utils.py", "/project/coverage.html"],
tool_name="analyze",
),
],
initial_context={"prompt": "Analyze this codebase", "files": ["/project/src/"]},
)
history = build_conversation_history(final_context)
# Verify chronological order and speaker identification
assert "--- Turn 1 (Gemini using analyze) ---" in history
assert "--- Turn 2 (Claude) ---" in history
assert "--- Turn 3 (Gemini using analyze) ---" in history
# Verify all files are preserved in chronological order
turn_1_files = "📁 Files referenced: /project/src/main.py, /project/src/utils.py"
turn_2_files = "📁 Files referenced: /project/tests/, /project/test_main.py"
turn_3_files = "📁 Files referenced: /project/tests/test_utils.py, /project/coverage.html"
assert turn_1_files in history
assert turn_2_files in history
assert turn_3_files in history
# Verify content and follow-ups
assert "I've analyzed your codebase structure." in history
assert "Yes, check the test coverage" in history
assert "Test coverage analysis complete. Coverage is 85%." in history
assert "[Gemini's Follow-up: Would you like me to examine the test coverage?]" in history
# Verify chronological ordering (turn 1 appears before turn 2, etc.)
turn_1_pos = history.find("--- Turn 1 (Gemini using analyze) ---")
turn_2_pos = history.find("--- Turn 2 (Claude) ---")
turn_3_pos = history.find("--- Turn 3 (Gemini using analyze) ---")
assert turn_1_pos < turn_2_pos < turn_3_pos
@patch("utils.conversation_memory.get_redis_client")
def test_follow_up_question_parsing_cycle(self, mock_redis):
"""Test follow-up question persistence across request cycles"""
mock_client = Mock()
mock_redis.return_value = mock_client
thread_id = "12345678-1234-1234-1234-123456789012"
# First cycle: Assistant generates follow-up
context = ThreadContext(
thread_id=thread_id,
created_at="2023-01-01T00:00:00Z",
last_updated_at="2023-01-01T00:00:00Z",
tool_name="debug",
turns=[],
initial_context={"prompt": "Debug this error"},
)
mock_client.get.return_value = context.model_dump_json()
success = add_turn(
thread_id,
"assistant",
"Found potential issue in authentication",
follow_up_question="Should I examine the authentication middleware?",
)
assert success is True
# Second cycle: Retrieve conversation history
context_with_followup = ThreadContext(
thread_id=thread_id,
created_at="2023-01-01T00:00:00Z",
last_updated_at="2023-01-01T00:01:00Z",
tool_name="debug",
turns=[
ConversationTurn(
role="assistant",
content="Found potential issue in authentication",
timestamp="2023-01-01T00:00:30Z",
follow_up_question="Should I examine the authentication middleware?",
)
],
initial_context={"prompt": "Debug this error"},
)
mock_client.get.return_value = context_with_followup.model_dump_json()
# Build history to verify follow-up is preserved
history = build_conversation_history(context_with_followup)
assert "Found potential issue in authentication" in history
assert "[Gemini's Follow-up: Should I examine the authentication middleware?]" in history
@patch("utils.conversation_memory.get_redis_client")
def test_stateless_request_isolation(self, mock_redis):
"""Test that each request cycle is independent but shares context via Redis"""
mock_client = Mock()
mock_redis.return_value = mock_client
# Simulate two different "processes" accessing same thread
thread_id = "12345678-1234-1234-1234-123456789012"
# Process 1: Creates thread
initial_context = ThreadContext(
thread_id=thread_id,
created_at="2023-01-01T00:00:00Z",
last_updated_at="2023-01-01T00:00:00Z",
tool_name="thinkdeep",
turns=[],
initial_context={"prompt": "Think about architecture"},
)
mock_client.get.return_value = initial_context.model_dump_json()
success = add_turn(
thread_id, "assistant", "Architecture analysis", follow_up_question="Want to explore scalability?"
)
assert success is True
# Process 2: Different "request cycle" accesses same thread
context_from_redis = ThreadContext(
thread_id=thread_id,
created_at="2023-01-01T00:00:00Z",
last_updated_at="2023-01-01T00:01:00Z",
tool_name="thinkdeep",
turns=[
ConversationTurn(
role="assistant",
content="Architecture analysis",
timestamp="2023-01-01T00:00:30Z",
follow_up_question="Want to explore scalability?",
)
],
initial_context={"prompt": "Think about architecture"},
)
mock_client.get.return_value = context_from_redis.model_dump_json()
# Verify context continuity across "processes"
retrieved_context = get_thread(thread_id)
assert retrieved_context is not None
assert len(retrieved_context.turns) == 1
assert retrieved_context.turns[0].follow_up_question == "Want to explore scalability?"
if __name__ == "__main__":
pytest.main([__file__])