diff --git a/tools/consensus.py b/tools/consensus.py index 22db2b7..3965eb3 100644 --- a/tools/consensus.py +++ b/tools/consensus.py @@ -28,7 +28,8 @@ from mcp.types import TextContent from config import TEMPERATURE_ANALYTICAL from systemprompts import CONSENSUS_PROMPT -from tools.shared.base_models import WorkflowRequest +from tools.shared.base_models import ConsolidatedFindings, WorkflowRequest +from utils.conversation_memory import MAX_CONVERSATION_TURNS, create_thread, get_thread from .workflow.base import WorkflowTool @@ -414,11 +415,21 @@ of the evidence, even when it strongly points in one direction.""", # Validate request request = self.get_workflow_request_model()(**arguments) - # On first step, store the models to consult + # Resolve existing continuation_id or create a new one on first step + continuation_id = request.continuation_id + if request.step_number == 1: + if not continuation_id: + clean_args = {k: v for k, v in arguments.items() if k not in ["_model_context", "_resolved_model_name"]} + continuation_id = create_thread(self.get_name(), clean_args) + request.continuation_id = continuation_id + arguments["continuation_id"] = continuation_id + self.work_history = [] + self.consolidated_findings = ConsolidatedFindings() + # Store the original proposal from step 1 - this is what all models should see - self.original_proposal = request.step - self.initial_prompt = request.step # Keep for backward compatibility + self.store_initial_issue(request.step) + self.initial_request = request.step self.models_to_consult = request.models or [] self.accumulated_responses = [] # Set total steps: len(models) (each step includes consultation + response) @@ -430,6 +441,11 @@ of the evidence, even when it strongly points in one direction.""", model_idx = request.step_number - 1 # 0-based index if model_idx < len(self.models_to_consult): + # Track workflow state for conversation memory + step_data = self.prepare_step_data(request) + self.work_history.append(step_data) + self._update_consolidated_findings(step_data) + # Consult the model for this step model_response = await self._consult_model(self.models_to_consult[model_idx], request) @@ -484,24 +500,50 @@ of the evidence, even when it strongly points in one direction.""", f"- findings: Summarize key points from this model's response" ) - # Add accumulated responses for tracking - response_data["accumulated_responses"] = self.accumulated_responses + # Add continuation information and workflow customization + response_data = self.customize_workflow_response(response_data, request) - # Add metadata (since we're bypassing the base class metadata addition) - model_name = self.get_request_model_name(request) - provider = self.get_model_provider(model_name) - response_data["metadata"] = { - "tool_name": self.get_name(), - "model_name": model_name, - "model_used": model_name, - "provider_used": provider.get_provider_type().value, - } + # Ensure consensus-specific metadata is attached + self._add_workflow_metadata(response_data, arguments) + + if continuation_id: + self.store_conversation_turn(continuation_id, response_data, request) + continuation_offer = self._build_continuation_offer(continuation_id) + if continuation_offer: + response_data["continuation_offer"] = continuation_offer return [TextContent(type="text", text=json.dumps(response_data, indent=2, ensure_ascii=False))] # Otherwise, use standard workflow execution return await super().execute_workflow(arguments) + def _build_continuation_offer(self, continuation_id: str) -> dict[str, Any] | None: + """Create a continuation offer without exposing prior model responses.""" + try: + from tools.models import ContinuationOffer + + thread = get_thread(continuation_id) + if thread and thread.turns: + remaining_turns = max(0, MAX_CONVERSATION_TURNS - len(thread.turns)) + else: + remaining_turns = MAX_CONVERSATION_TURNS - 1 + + # Provide a neutral note specific to consensus workflow + note = ( + f"Consensus workflow can continue for {remaining_turns} more exchanges." + if remaining_turns > 0 + else "Consensus workflow continuation limit reached." + ) + + continuation_offer = ContinuationOffer( + continuation_id=continuation_id, + note=note, + remaining_turns=remaining_turns, + ) + return continuation_offer.model_dump() + except Exception: + return None + async def _consult_model(self, model_config: dict, request) -> dict: """Consult a single model and return its response.""" try: