545 lines
17 KiB
Markdown
545 lines
17 KiB
Markdown
# Data Flow & Processing Patterns
|
|
|
|
## Overview
|
|
|
|
The Gemini MCP Server implements sophisticated data flow patterns that enable secure, efficient, and contextually-aware AI collaboration. This document traces data movement through the system with concrete examples and performance considerations.
|
|
|
|
## Primary Data Flow Patterns
|
|
|
|
### 1. Standard Tool Execution Flow
|
|
|
|
```mermaid
|
|
sequenceDiagram
|
|
participant C as Claude
|
|
participant M as MCP Engine
|
|
participant S as Security Layer
|
|
participant T as Tool Handler
|
|
participant G as Gemini API
|
|
participant R as Redis Memory
|
|
|
|
C->>M: MCP Request (tool_name, params)
|
|
M->>M: Validate Request Schema
|
|
M->>S: Security Validation
|
|
S->>S: Path Validation & Sanitization
|
|
S->>T: Secure Parameters
|
|
T->>R: Load Conversation Context
|
|
R-->>T: Thread Context (if exists)
|
|
T->>T: Process Files & Context
|
|
T->>G: Formatted Prompt + Context
|
|
G-->>T: AI Response
|
|
T->>R: Store Execution Result
|
|
T->>M: Formatted Tool Output
|
|
M->>C: MCP Response
|
|
```
|
|
|
|
**Example Request Flow**:
|
|
```json
|
|
// Claude → MCP Engine
|
|
{
|
|
"method": "tools/call",
|
|
"params": {
|
|
"name": "analyze",
|
|
"arguments": {
|
|
"files": ["/workspace/tools/analyze.py"],
|
|
"question": "Explain the architecture pattern",
|
|
"continuation_id": "550e8400-e29b-41d4-a716-446655440000"
|
|
}
|
|
}
|
|
}
|
|
```
|
|
|
|
### 2. File Processing Pipeline
|
|
|
|
#### Stage 1: Security Validation (`utils/file_utils.py:67`)
|
|
|
|
```python
|
|
# Input: ["/workspace/tools/analyze.py", "../../../etc/passwd"]
|
|
def validate_file_paths(file_paths: List[str]) -> List[str]:
|
|
validated = []
|
|
for path in file_paths:
|
|
# 1. Dangerous pattern detection
|
|
if any(danger in path for danger in ['../', '~/', '/etc/', '/var/']):
|
|
logger.warning(f"Blocked dangerous path: {path}")
|
|
continue
|
|
|
|
# 2. Absolute path requirement
|
|
if not os.path.isabs(path):
|
|
path = os.path.abspath(path)
|
|
|
|
# 3. Sandbox boundary check
|
|
if not path.startswith(PROJECT_ROOT):
|
|
logger.warning(f"Path outside sandbox: {path}")
|
|
continue
|
|
|
|
validated.append(path)
|
|
|
|
return validated
|
|
# Output: ["/workspace/tools/analyze.py"]
|
|
```
|
|
|
|
#### Stage 2: Docker Path Translation (`utils/file_utils.py:89`)
|
|
|
|
```python
|
|
# Host Environment: /Users/user/project/tools/analyze.py
|
|
# Container Environment: /workspace/tools/analyze.py
|
|
def translate_paths_for_environment(paths: List[str]) -> List[str]:
|
|
translated = []
|
|
for path in paths:
|
|
if WORKSPACE_ROOT and path.startswith(WORKSPACE_ROOT):
|
|
container_path = path.replace(WORKSPACE_ROOT, '/workspace', 1)
|
|
translated.append(container_path)
|
|
else:
|
|
translated.append(path)
|
|
return translated
|
|
```
|
|
|
|
#### Stage 3: Priority-Based Processing (`utils/file_utils.py:134`)
|
|
|
|
```python
|
|
# File Priority Matrix
|
|
FILE_PRIORITIES = {
|
|
'.py': 1, # Source code (highest priority)
|
|
'.js': 1, '.ts': 1, '.tsx': 1,
|
|
'.md': 2, # Documentation
|
|
'.json': 2, '.yaml': 2, '.yml': 2,
|
|
'.txt': 3, # Text files
|
|
'.log': 4, # Logs (lowest priority)
|
|
}
|
|
|
|
# Token Budget Allocation
|
|
def allocate_token_budget(files: List[str], total_budget: int) -> Dict[str, int]:
|
|
# Priority 1 files get 60% of budget
|
|
# Priority 2 files get 30% of budget
|
|
# Priority 3+ files get 10% of budget
|
|
|
|
priority_groups = defaultdict(list)
|
|
for file in files:
|
|
ext = Path(file).suffix.lower()
|
|
priority = FILE_PRIORITIES.get(ext, 4)
|
|
priority_groups[priority].append(file)
|
|
|
|
allocations = {}
|
|
if priority_groups[1]: # Source code files
|
|
code_budget = int(total_budget * 0.6)
|
|
per_file = code_budget // len(priority_groups[1])
|
|
for file in priority_groups[1]:
|
|
allocations[file] = per_file
|
|
|
|
if priority_groups[2]: # Documentation files
|
|
doc_budget = int(total_budget * 0.3)
|
|
per_file = doc_budget // len(priority_groups[2])
|
|
for file in priority_groups[2]:
|
|
allocations[file] = per_file
|
|
|
|
return allocations
|
|
```
|
|
|
|
#### Stage 4: Content Processing & Formatting
|
|
|
|
```python
|
|
def process_file_content(file_path: str, token_limit: int) -> str:
|
|
try:
|
|
with open(file_path, 'r', encoding='utf-8') as f:
|
|
content = f.read()
|
|
|
|
# Token estimation (rough: 1 token ≈ 4 characters)
|
|
estimated_tokens = len(content) // 4
|
|
|
|
if estimated_tokens > token_limit:
|
|
# Smart truncation preserving structure
|
|
lines = content.split('\n')
|
|
truncated_lines = []
|
|
current_tokens = 0
|
|
|
|
for line in lines:
|
|
line_tokens = len(line) // 4
|
|
if current_tokens + line_tokens > token_limit:
|
|
break
|
|
truncated_lines.append(line)
|
|
current_tokens += line_tokens
|
|
|
|
content = '\n'.join(truncated_lines)
|
|
content += f"\n\n... [Truncated at {token_limit} tokens]"
|
|
|
|
# Format with line numbers for precise references
|
|
lines = content.split('\n')
|
|
formatted_lines = []
|
|
for i, line in enumerate(lines, 1):
|
|
formatted_lines.append(f"{i:6d}\t{line}")
|
|
|
|
return '\n'.join(formatted_lines)
|
|
|
|
except Exception as e:
|
|
return f"Error reading {file_path}: {str(e)}"
|
|
```
|
|
|
|
### 3. Conversation Memory Flow
|
|
|
|
#### Context Storage Pattern (`utils/conversation_memory.py:78`)
|
|
|
|
```python
|
|
# Tool execution creates persistent context
|
|
async def store_tool_execution(thread_id: str, tool_execution: ToolExecution):
|
|
context = await self.retrieve_thread(thread_id) or ThreadContext(thread_id)
|
|
|
|
# Add new execution to history
|
|
context.tool_history.append(tool_execution)
|
|
|
|
# Update file set (deduplication)
|
|
if tool_execution.files:
|
|
context.conversation_files.update(tool_execution.files)
|
|
|
|
# Update token tracking
|
|
context.context_tokens += tool_execution.response_tokens
|
|
context.last_accessed = datetime.now()
|
|
|
|
# Persist to Redis
|
|
await self.redis.setex(
|
|
f"thread:{thread_id}",
|
|
timedelta(hours=24), # 24-hour expiration
|
|
context.to_json()
|
|
)
|
|
```
|
|
|
|
#### Context Retrieval & Reconstruction
|
|
|
|
```python
|
|
async def build_conversation_context(thread_id: str) -> str:
|
|
context = await self.retrieve_thread(thread_id)
|
|
if not context:
|
|
return ""
|
|
|
|
# Build conversation summary
|
|
summary_parts = []
|
|
|
|
# Add file context (deduplicated)
|
|
if context.conversation_files:
|
|
summary_parts.append("## Previous Files Analyzed:")
|
|
for file_path in sorted(context.conversation_files):
|
|
summary_parts.append(f"- {file_path}")
|
|
|
|
# Add tool execution history
|
|
if context.tool_history:
|
|
summary_parts.append("\n## Previous Analysis:")
|
|
for execution in context.tool_history[-3:]: # Last 3 executions
|
|
summary_parts.append(f"**{execution.tool_name}**: {execution.summary}")
|
|
|
|
return '\n'.join(summary_parts)
|
|
```
|
|
|
|
### 4. Thinking Mode Processing
|
|
|
|
#### Dynamic Token Allocation (`tools/models.py:67`)
|
|
|
|
```python
|
|
# Thinking mode determines computational budget
|
|
THINKING_MODE_TOKENS = {
|
|
'minimal': 128, # Quick answers, simple questions
|
|
'low': 2048, # Basic analysis, straightforward tasks
|
|
'medium': 8192, # Standard analysis, moderate complexity
|
|
'high': 16384, # Deep analysis, complex problems
|
|
'max': 32768 # Maximum depth, critical decisions
|
|
}
|
|
|
|
def prepare_gemini_request(prompt: str, thinking_mode: str, files: List[str]) -> dict:
|
|
# Calculate total context budget
|
|
thinking_tokens = THINKING_MODE_TOKENS.get(thinking_mode, 8192)
|
|
file_tokens = MAX_CONTEXT_TOKENS - thinking_tokens - 1000 # Reserve for response
|
|
|
|
# Process files within budget
|
|
file_content = process_files_with_budget(files, file_tokens)
|
|
|
|
# Construct final prompt
|
|
full_prompt = f"""
|
|
{prompt}
|
|
|
|
## Available Context ({thinking_tokens} thinking tokens allocated)
|
|
|
|
{file_content}
|
|
|
|
Please analyze using {thinking_mode} thinking mode.
|
|
"""
|
|
|
|
return {
|
|
'prompt': full_prompt,
|
|
'max_tokens': thinking_tokens,
|
|
'temperature': 0.2 if thinking_mode in ['high', 'max'] else 0.5
|
|
}
|
|
```
|
|
|
|
## Advanced Data Flow Patterns
|
|
|
|
### 1. Cross-Tool Continuation Flow
|
|
|
|
```python
|
|
# Tool A (analyze) creates foundation
|
|
analyze_result = await analyze_tool.execute({
|
|
'files': ['/workspace/tools/'],
|
|
'question': 'What is the architecture pattern?'
|
|
})
|
|
|
|
# Store context with continuation capability
|
|
thread_id = str(uuid.uuid4())
|
|
await memory.store_tool_execution(thread_id, ToolExecution(
|
|
tool_name='analyze',
|
|
files=['/workspace/tools/'],
|
|
summary='Identified MCP plugin architecture pattern',
|
|
continuation_id=thread_id
|
|
))
|
|
|
|
# Tool B (thinkdeep) continues analysis
|
|
thinkdeep_result = await thinkdeep_tool.execute({
|
|
'current_analysis': analyze_result.content,
|
|
'focus_areas': ['scalability', 'security'],
|
|
'continuation_id': thread_id # Links to previous context
|
|
})
|
|
```
|
|
|
|
### 2. Error Recovery & Graceful Degradation
|
|
|
|
```python
|
|
def resilient_file_processing(files: List[str]) -> str:
|
|
"""Process files with graceful error handling"""
|
|
results = []
|
|
|
|
for file_path in files:
|
|
try:
|
|
content = read_file_safely(file_path)
|
|
results.append(f"=== {file_path} ===\n{content}")
|
|
except PermissionError:
|
|
results.append(f"=== {file_path} ===\nERROR: Permission denied")
|
|
except FileNotFoundError:
|
|
results.append(f"=== {file_path} ===\nERROR: File not found")
|
|
except UnicodeDecodeError:
|
|
# Try binary file detection
|
|
try:
|
|
with open(file_path, 'rb') as f:
|
|
header = f.read(16)
|
|
if is_binary_file(header):
|
|
results.append(f"=== {file_path} ===\nBinary file (skipped)")
|
|
else:
|
|
results.append(f"=== {file_path} ===\nERROR: Encoding issue")
|
|
except:
|
|
results.append(f"=== {file_path} ===\nERROR: Unreadable file")
|
|
except Exception as e:
|
|
results.append(f"=== {file_path} ===\nERROR: {str(e)}")
|
|
|
|
return '\n\n'.join(results)
|
|
```
|
|
|
|
### 3. Performance Optimization Patterns
|
|
|
|
#### Concurrent File Processing
|
|
|
|
```python
|
|
async def process_files_concurrently(files: List[str], token_budget: int) -> str:
|
|
"""Process multiple files concurrently with shared budget"""
|
|
|
|
# Allocate budget per file
|
|
allocations = allocate_token_budget(files, token_budget)
|
|
|
|
# Create processing tasks
|
|
tasks = []
|
|
for file_path in files:
|
|
task = asyncio.create_task(
|
|
process_single_file(file_path, allocations.get(file_path, 1000))
|
|
)
|
|
tasks.append(task)
|
|
|
|
# Wait for all files to complete
|
|
results = await asyncio.gather(*tasks, return_exceptions=True)
|
|
|
|
# Combine results, handling exceptions
|
|
processed_content = []
|
|
for i, result in enumerate(results):
|
|
if isinstance(result, Exception):
|
|
processed_content.append(f"Error processing {files[i]}: {result}")
|
|
else:
|
|
processed_content.append(result)
|
|
|
|
return '\n\n'.join(processed_content)
|
|
```
|
|
|
|
#### Intelligent Caching
|
|
|
|
```python
|
|
class FileContentCache:
|
|
def __init__(self, max_size: int = 100):
|
|
self.cache = {}
|
|
self.access_times = {}
|
|
self.max_size = max_size
|
|
|
|
async def get_file_content(self, file_path: str, token_limit: int) -> str:
|
|
# Create cache key including token limit
|
|
cache_key = f"{file_path}:{token_limit}"
|
|
|
|
# Check cache hit
|
|
if cache_key in self.cache:
|
|
self.access_times[cache_key] = time.time()
|
|
return self.cache[cache_key]
|
|
|
|
# Process file and cache result
|
|
content = await process_file_content(file_path, token_limit)
|
|
|
|
# Evict oldest entries if cache full
|
|
if len(self.cache) >= self.max_size:
|
|
oldest_key = min(self.access_times.keys(),
|
|
key=lambda k: self.access_times[k])
|
|
del self.cache[oldest_key]
|
|
del self.access_times[oldest_key]
|
|
|
|
# Store in cache
|
|
self.cache[cache_key] = content
|
|
self.access_times[cache_key] = time.time()
|
|
|
|
return content
|
|
```
|
|
|
|
## Data Persistence Patterns
|
|
|
|
### 1. Redis Thread Storage
|
|
|
|
```python
|
|
# Thread context serialization
|
|
class ThreadContext:
|
|
def to_json(self) -> str:
|
|
return json.dumps({
|
|
'thread_id': self.thread_id,
|
|
'tool_history': [ex.to_dict() for ex in self.tool_history],
|
|
'conversation_files': list(self.conversation_files),
|
|
'context_tokens': self.context_tokens,
|
|
'created_at': self.created_at.isoformat(),
|
|
'last_accessed': self.last_accessed.isoformat()
|
|
})
|
|
|
|
@classmethod
|
|
def from_json(cls, json_str: str) -> 'ThreadContext':
|
|
data = json.loads(json_str)
|
|
context = cls(data['thread_id'])
|
|
context.tool_history = [
|
|
ToolExecution.from_dict(ex) for ex in data['tool_history']
|
|
]
|
|
context.conversation_files = set(data['conversation_files'])
|
|
context.context_tokens = data['context_tokens']
|
|
context.created_at = datetime.fromisoformat(data['created_at'])
|
|
context.last_accessed = datetime.fromisoformat(data['last_accessed'])
|
|
return context
|
|
```
|
|
|
|
### 2. Configuration State Management
|
|
|
|
```python
|
|
# Environment-based configuration with validation
|
|
class Config:
|
|
def __init__(self):
|
|
self.gemini_api_key = self._require_env('GEMINI_API_KEY')
|
|
self.gemini_model = os.getenv('GEMINI_MODEL', 'gemini-2.0-flash-thinking-exp')
|
|
self.project_root = os.getenv('PROJECT_ROOT', '/workspace')
|
|
self.redis_url = os.getenv('REDIS_URL', 'redis://localhost:6379')
|
|
self.max_context_tokens = int(os.getenv('MAX_CONTEXT_TOKENS', '1000000'))
|
|
|
|
# Validate critical paths
|
|
if not os.path.exists(self.project_root):
|
|
raise ConfigError(f"PROJECT_ROOT does not exist: {self.project_root}")
|
|
|
|
def _require_env(self, key: str) -> str:
|
|
value = os.getenv(key)
|
|
if not value:
|
|
raise ConfigError(f"Required environment variable not set: {key}")
|
|
return value
|
|
```
|
|
|
|
## Security Data Flow
|
|
|
|
### 1. Request Sanitization Pipeline
|
|
|
|
```python
|
|
def sanitize_request_data(request: dict) -> dict:
|
|
"""Multi-layer request sanitization"""
|
|
sanitized = {}
|
|
|
|
# 1. Schema validation
|
|
validated_data = RequestSchema.parse_obj(request)
|
|
|
|
# 2. Path sanitization
|
|
if 'files' in validated_data:
|
|
sanitized['files'] = [
|
|
sanitize_file_path(path) for path in validated_data['files']
|
|
]
|
|
|
|
# 3. Content filtering
|
|
if 'prompt' in validated_data:
|
|
sanitized['prompt'] = filter_sensitive_content(validated_data['prompt'])
|
|
|
|
# 4. Parameter validation
|
|
for key, value in validated_data.items():
|
|
if key not in ['files', 'prompt']:
|
|
sanitized[key] = validate_parameter(key, value)
|
|
|
|
return sanitized
|
|
```
|
|
|
|
### 2. Response Sanitization
|
|
|
|
```python
|
|
def sanitize_response_data(response: str) -> str:
|
|
"""Remove sensitive information from responses"""
|
|
|
|
# Remove potential API keys, tokens, passwords
|
|
sensitive_patterns = [
|
|
r'api[_-]?key["\s:=]+[a-zA-Z0-9-_]{20,}',
|
|
r'token["\s:=]+[a-zA-Z0-9-_]{20,}',
|
|
r'password["\s:=]+\S+',
|
|
r'/home/[^/\s]+', # User paths
|
|
r'[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}', # Emails
|
|
]
|
|
|
|
sanitized = response
|
|
for pattern in sensitive_patterns:
|
|
sanitized = re.sub(pattern, '[REDACTED]', sanitized, flags=re.IGNORECASE)
|
|
|
|
return sanitized
|
|
```
|
|
|
|
## Performance Monitoring & Metrics
|
|
|
|
### 1. Request Processing Metrics
|
|
|
|
```python
|
|
class PerformanceMetrics:
|
|
def __init__(self):
|
|
self.request_times = []
|
|
self.file_processing_times = []
|
|
self.memory_usage = []
|
|
self.error_counts = defaultdict(int)
|
|
|
|
async def track_request(self, tool_name: str, files: List[str]):
|
|
start_time = time.time()
|
|
start_memory = psutil.Process().memory_info().rss
|
|
|
|
try:
|
|
# Process request...
|
|
yield
|
|
|
|
except Exception as e:
|
|
self.error_counts[f"{tool_name}:{type(e).__name__}"] += 1
|
|
raise
|
|
finally:
|
|
# Record metrics
|
|
end_time = time.time()
|
|
end_memory = psutil.Process().memory_info().rss
|
|
|
|
self.request_times.append({
|
|
'tool': tool_name,
|
|
'duration': end_time - start_time,
|
|
'file_count': len(files),
|
|
'timestamp': datetime.now()
|
|
})
|
|
|
|
self.memory_usage.append({
|
|
'memory_delta': end_memory - start_memory,
|
|
'timestamp': datetime.now()
|
|
})
|
|
```
|
|
|
|
This comprehensive data flow documentation provides the foundation for understanding how information moves through the Gemini MCP Server, enabling effective debugging, optimization, and extension of the system. |