* addinte templates and user guide * up docs * up * up claude.md * add mb * umb * up workflow * up settings claude * adding detailed docs * adding missing files docs * add main readme for docs * up main readme * adding docs for tests * Complete documentation integration with test structure analysis link Adds link to comprehensive test structure documentation in main README.md, finalizing the progressive disclosure strategy for project documentation. This completes the documentation integration work that includes: - Architecture documentation - API reference documentation - Contributing guidelines - Detailed test analysis 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com> * removing folders from git * up * up * up gitignore * feat: Add automatic semantic versioning workflow - Create GitHub Actions workflow for automatic version bumping based on PR title prefixes - Add version bumping script (scripts/bump_version.py) for programmatic updates - Update PR template with semantic versioning guidelines - Document versioning workflow in contributing guide - Integrate with existing Docker build workflow via git tags This enables automatic version management: - feat: triggers MINOR version bump - fix: triggers PATCH version bump - breaking: triggers MAJOR version bump - docs/chore/test: no version bump 🤖 Generated with Claude Code Co-Authored-By: Claude <noreply@anthropic.com> * fix: Separate Docker workflows for testing and publishing - Add docker-test.yml for PR validation (build test only) - Fix build_and_publish_docker.yml to trigger only on tags - Remove problematic sha prefix causing invalid tag format - Ensure proper workflow sequence: PR test → merge → version → publish 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com> * style: Fix black formatting issues in bump_version.py - Fix spacing and indentation to pass black formatter - Ensure code quality standards are met for CI workflow 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com> * style: Modernize type hints in bump_version.py - Replace typing.Tuple with modern tuple syntax - Remove deprecated typing imports per ruff suggestions - Maintain Python 3.10+ compatibility 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com> * fix: Remove invalid colon in bash else statement - Fix bash syntax error in auto-version workflow - Remove Python-style colon from else statement - Resolves exit code 127 in version bump determination 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com> * feat: Add Docker build combinations for non-versioning prefixes - Add support for prefix+docker combinations (docs+docker:, chore+docker:, etc.) - Enable Docker build for non-versioning changes when requested - Add repository_dispatch trigger for Docker workflow - Update Docker tagging for PR-based builds (pr-X, main-sha) - Update PR template with new prefix options This allows contributors to force Docker builds for documentation, maintenance, and other non-versioning changes when needed. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com> * docs: Add comprehensive PR prefix and automation documentation - Update CONTRIBUTING.md with detailed PR prefix system explanation - Add automation workflow documentation to docs/contributing/workflows.md - Create new user-friendly contributing guide at docs/user-guides/contributing-guide.md - Include Mermaid diagrams for workflow visualization - Document Docker testing combinations and image tagging strategy - Add best practices and common mistakes to avoid This provides clear guidance for contributors on using the automated versioning and Docker build system effectively. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com> * docs+docker: Complete documentation infrastructure with Docker automation testing (#2) * fix: Remove invalid colon in bash else statement - Fix bash syntax error in auto-version workflow - Remove Python-style colon from else statement - Resolves exit code 127 in version bump determination 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com> * feat: Add Docker build combinations for non-versioning prefixes - Add support for prefix+docker combinations (docs+docker:, chore+docker:, etc.) - Enable Docker build for non-versioning changes when requested - Add repository_dispatch trigger for Docker workflow - Update Docker tagging for PR-based builds (pr-X, main-sha) - Update PR template with new prefix options This allows contributors to force Docker builds for documentation, maintenance, and other non-versioning changes when needed. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com> * docs: Add comprehensive PR prefix and automation documentation - Update CONTRIBUTING.md with detailed PR prefix system explanation - Add automation workflow documentation to docs/contributing/workflows.md - Create new user-friendly contributing guide at docs/user-guides/contributing-guide.md - Include Mermaid diagrams for workflow visualization - Document Docker testing combinations and image tagging strategy - Add best practices and common mistakes to avoid This provides clear guidance for contributors on using the automated versioning and Docker build system effectively. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com> --------- Co-authored-by: Patryk Ciechanski <patryk.ciechanski@inetum.com> Co-authored-by: Claude <noreply@anthropic.com> * fix: Correct digest reference in Docker artifact attestation - Add id to build step to capture outputs - Fix subject-digest reference from steps.build.outputs.digest - Resolves 'One of subject-path or subject-digest must be provided' error 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com> * docs: Add comprehensive Docker image usage instructions - Add Option B (Published Docker Image) to main README.md - Update installation guide with published image as fastest option - Add comprehensive configuration examples for GHCR images - Document image tagging strategy (latest, versioned, PR builds) - Include version pinning examples for stability - Highlight benefits: instant setup, no build, cross-platform Users can now choose between: 1. Published image (fastest, no setup) - ghcr.io/patrykiti/gemini-mcp-server:latest 2. Local build (development, customization) - traditional setup 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com> * feat: Add automated Docker image usage instructions and PR comments - Generate comprehensive usage instructions in workflow summary after Docker build - Include exact docker pull commands with built image tags - Auto-generate Claude Desktop configuration examples - Add automatic PR comments with testing instructions for +docker builds - Show expected image tags (pr-X, main-sha) in PR comments - Include ready-to-use configuration snippets for immediate testing - Link to GitHub Container Registry and Actions for monitoring Now when Docker images are built, users get: - Step-by-step usage instructions in workflow summary - PR comments with exact pull commands and config - Copy-paste ready Claude Desktop configurations - Direct links to monitor build progress 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com> * feat: Add automatic README.md updating after Docker builds - Updates Docker image references in README.md and documentation files - Automatically commits and pushes changes after image builds - Handles both release builds (version tags) and development builds (PR numbers) - Ensures documentation always references the latest published images - Uses sed pattern matching to update ghcr.io image references 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com> * correcting * up * fix: GitHub Actions workflows semantic errors Fixed critical semantic and logic errors in auto-version and Docker workflows: Auto-version.yml fixes: - Removed duplicate echo statements for should_build_docker output - Fixed malformed if/else structure (else after else) - Removed redundant conditional blocks for docker: prefixes - Cleaned up duplicate lines in summary generation Build_and_publish_docker.yml fixes: - Replaced hardcoded 'patrykiti' with dynamic ${{ github.repository_owner }} - Enhanced regex pattern to support underscores in Docker tags: [a-zA-Z0-9\._-]* - Fixed sed patterns for dynamic repository owner detection These changes ensure workflows execute correctly and support any repository owner. 🤖 Generated with Claude Code Co-Authored-By: Claude <noreply@anthropic.com> * docs: Add advanced Docker configuration options to README Added comprehensive configuration section with optional environment variables: Docker Configuration Features: - Advanced configuration example with all available env vars - Complete table of environment variables with descriptions - Practical examples for common configuration scenarios - Clear documentation of config.py options for Docker users Available Configuration Options: - DEFAULT_MODEL: Choose between Pro (quality) vs Flash (speed) - DEFAULT_THINKING_MODE_THINKDEEP: Control token costs with thinking depth - LOG_LEVEL: Debug logging for troubleshooting - MCP_PROJECT_ROOT: Security sandbox for file access - REDIS_URL: Custom Redis configuration Benefits: - Users can customize server behavior without rebuilding images - Better cost control through model and thinking mode selection - Enhanced security through project root restrictions - Improved debugging capabilities with configurable logging - Complete transparency of available configuration options This addresses user request for exposing config.py parameters via Docker environment variables. 🤖 Generated with Claude Code Co-Authored-By: Claude <noreply@anthropic.com> --------- Co-authored-by: Patryk Ciechanski <patryk.ciechanski@inetum.com> Co-authored-by: Claude <noreply@anthropic.com>
545 lines
17 KiB
Markdown
545 lines
17 KiB
Markdown
# Data Flow & Processing Patterns
|
|
|
|
## Overview
|
|
|
|
The Gemini MCP Server implements sophisticated data flow patterns that enable secure, efficient, and contextually-aware AI collaboration. This document traces data movement through the system with concrete examples and performance considerations.
|
|
|
|
## Primary Data Flow Patterns
|
|
|
|
### 1. Standard Tool Execution Flow
|
|
|
|
```mermaid
|
|
sequenceDiagram
|
|
participant C as Claude
|
|
participant M as MCP Engine
|
|
participant S as Security Layer
|
|
participant T as Tool Handler
|
|
participant G as Gemini API
|
|
participant R as Redis Memory
|
|
|
|
C->>M: MCP Request (tool_name, params)
|
|
M->>M: Validate Request Schema
|
|
M->>S: Security Validation
|
|
S->>S: Path Validation & Sanitization
|
|
S->>T: Secure Parameters
|
|
T->>R: Load Conversation Context
|
|
R-->>T: Thread Context (if exists)
|
|
T->>T: Process Files & Context
|
|
T->>G: Formatted Prompt + Context
|
|
G-->>T: AI Response
|
|
T->>R: Store Execution Result
|
|
T->>M: Formatted Tool Output
|
|
M->>C: MCP Response
|
|
```
|
|
|
|
**Example Request Flow**:
|
|
```json
|
|
// Claude → MCP Engine
|
|
{
|
|
"method": "tools/call",
|
|
"params": {
|
|
"name": "analyze",
|
|
"arguments": {
|
|
"files": ["/workspace/tools/analyze.py"],
|
|
"question": "Explain the architecture pattern",
|
|
"continuation_id": "550e8400-e29b-41d4-a716-446655440000"
|
|
}
|
|
}
|
|
}
|
|
```
|
|
|
|
### 2. File Processing Pipeline
|
|
|
|
#### Stage 1: Security Validation (`utils/file_utils.py:67`)
|
|
|
|
```python
|
|
# Input: ["/workspace/tools/analyze.py", "../../../etc/passwd"]
|
|
def validate_file_paths(file_paths: List[str]) -> List[str]:
|
|
validated = []
|
|
for path in file_paths:
|
|
# 1. Dangerous pattern detection
|
|
if any(danger in path for danger in ['../', '~/', '/etc/', '/var/']):
|
|
logger.warning(f"Blocked dangerous path: {path}")
|
|
continue
|
|
|
|
# 2. Absolute path requirement
|
|
if not os.path.isabs(path):
|
|
path = os.path.abspath(path)
|
|
|
|
# 3. Sandbox boundary check
|
|
if not path.startswith(PROJECT_ROOT):
|
|
logger.warning(f"Path outside sandbox: {path}")
|
|
continue
|
|
|
|
validated.append(path)
|
|
|
|
return validated
|
|
# Output: ["/workspace/tools/analyze.py"]
|
|
```
|
|
|
|
#### Stage 2: Docker Path Translation (`utils/file_utils.py:89`)
|
|
|
|
```python
|
|
# Host Environment: /Users/user/project/tools/analyze.py
|
|
# Container Environment: /workspace/tools/analyze.py
|
|
def translate_paths_for_environment(paths: List[str]) -> List[str]:
|
|
translated = []
|
|
for path in paths:
|
|
if WORKSPACE_ROOT and path.startswith(WORKSPACE_ROOT):
|
|
container_path = path.replace(WORKSPACE_ROOT, '/workspace', 1)
|
|
translated.append(container_path)
|
|
else:
|
|
translated.append(path)
|
|
return translated
|
|
```
|
|
|
|
#### Stage 3: Priority-Based Processing (`utils/file_utils.py:134`)
|
|
|
|
```python
|
|
# File Priority Matrix
|
|
FILE_PRIORITIES = {
|
|
'.py': 1, # Source code (highest priority)
|
|
'.js': 1, '.ts': 1, '.tsx': 1,
|
|
'.md': 2, # Documentation
|
|
'.json': 2, '.yaml': 2, '.yml': 2,
|
|
'.txt': 3, # Text files
|
|
'.log': 4, # Logs (lowest priority)
|
|
}
|
|
|
|
# Token Budget Allocation
|
|
def allocate_token_budget(files: List[str], total_budget: int) -> Dict[str, int]:
|
|
# Priority 1 files get 60% of budget
|
|
# Priority 2 files get 30% of budget
|
|
# Priority 3+ files get 10% of budget
|
|
|
|
priority_groups = defaultdict(list)
|
|
for file in files:
|
|
ext = Path(file).suffix.lower()
|
|
priority = FILE_PRIORITIES.get(ext, 4)
|
|
priority_groups[priority].append(file)
|
|
|
|
allocations = {}
|
|
if priority_groups[1]: # Source code files
|
|
code_budget = int(total_budget * 0.6)
|
|
per_file = code_budget // len(priority_groups[1])
|
|
for file in priority_groups[1]:
|
|
allocations[file] = per_file
|
|
|
|
if priority_groups[2]: # Documentation files
|
|
doc_budget = int(total_budget * 0.3)
|
|
per_file = doc_budget // len(priority_groups[2])
|
|
for file in priority_groups[2]:
|
|
allocations[file] = per_file
|
|
|
|
return allocations
|
|
```
|
|
|
|
#### Stage 4: Content Processing & Formatting
|
|
|
|
```python
|
|
def process_file_content(file_path: str, token_limit: int) -> str:
|
|
try:
|
|
with open(file_path, 'r', encoding='utf-8') as f:
|
|
content = f.read()
|
|
|
|
# Token estimation (rough: 1 token ≈ 4 characters)
|
|
estimated_tokens = len(content) // 4
|
|
|
|
if estimated_tokens > token_limit:
|
|
# Smart truncation preserving structure
|
|
lines = content.split('\n')
|
|
truncated_lines = []
|
|
current_tokens = 0
|
|
|
|
for line in lines:
|
|
line_tokens = len(line) // 4
|
|
if current_tokens + line_tokens > token_limit:
|
|
break
|
|
truncated_lines.append(line)
|
|
current_tokens += line_tokens
|
|
|
|
content = '\n'.join(truncated_lines)
|
|
content += f"\n\n... [Truncated at {token_limit} tokens]"
|
|
|
|
# Format with line numbers for precise references
|
|
lines = content.split('\n')
|
|
formatted_lines = []
|
|
for i, line in enumerate(lines, 1):
|
|
formatted_lines.append(f"{i:6d}\t{line}")
|
|
|
|
return '\n'.join(formatted_lines)
|
|
|
|
except Exception as e:
|
|
return f"Error reading {file_path}: {str(e)}"
|
|
```
|
|
|
|
### 3. Conversation Memory Flow
|
|
|
|
#### Context Storage Pattern (`utils/conversation_memory.py:78`)
|
|
|
|
```python
|
|
# Tool execution creates persistent context
|
|
async def store_tool_execution(thread_id: str, tool_execution: ToolExecution):
|
|
context = await self.retrieve_thread(thread_id) or ThreadContext(thread_id)
|
|
|
|
# Add new execution to history
|
|
context.tool_history.append(tool_execution)
|
|
|
|
# Update file set (deduplication)
|
|
if tool_execution.files:
|
|
context.conversation_files.update(tool_execution.files)
|
|
|
|
# Update token tracking
|
|
context.context_tokens += tool_execution.response_tokens
|
|
context.last_accessed = datetime.now()
|
|
|
|
# Persist to Redis
|
|
await self.redis.setex(
|
|
f"thread:{thread_id}",
|
|
timedelta(hours=24), # 24-hour expiration
|
|
context.to_json()
|
|
)
|
|
```
|
|
|
|
#### Context Retrieval & Reconstruction
|
|
|
|
```python
|
|
async def build_conversation_context(thread_id: str) -> str:
|
|
context = await self.retrieve_thread(thread_id)
|
|
if not context:
|
|
return ""
|
|
|
|
# Build conversation summary
|
|
summary_parts = []
|
|
|
|
# Add file context (deduplicated)
|
|
if context.conversation_files:
|
|
summary_parts.append("## Previous Files Analyzed:")
|
|
for file_path in sorted(context.conversation_files):
|
|
summary_parts.append(f"- {file_path}")
|
|
|
|
# Add tool execution history
|
|
if context.tool_history:
|
|
summary_parts.append("\n## Previous Analysis:")
|
|
for execution in context.tool_history[-3:]: # Last 3 executions
|
|
summary_parts.append(f"**{execution.tool_name}**: {execution.summary}")
|
|
|
|
return '\n'.join(summary_parts)
|
|
```
|
|
|
|
### 4. Thinking Mode Processing
|
|
|
|
#### Dynamic Token Allocation (`tools/models.py:67`)
|
|
|
|
```python
|
|
# Thinking mode determines computational budget
|
|
THINKING_MODE_TOKENS = {
|
|
'minimal': 128, # Quick answers, simple questions
|
|
'low': 2048, # Basic analysis, straightforward tasks
|
|
'medium': 8192, # Standard analysis, moderate complexity
|
|
'high': 16384, # Deep analysis, complex problems
|
|
'max': 32768 # Maximum depth, critical decisions
|
|
}
|
|
|
|
def prepare_gemini_request(prompt: str, thinking_mode: str, files: List[str]) -> dict:
|
|
# Calculate total context budget
|
|
thinking_tokens = THINKING_MODE_TOKENS.get(thinking_mode, 8192)
|
|
file_tokens = MAX_CONTEXT_TOKENS - thinking_tokens - 1000 # Reserve for response
|
|
|
|
# Process files within budget
|
|
file_content = process_files_with_budget(files, file_tokens)
|
|
|
|
# Construct final prompt
|
|
full_prompt = f"""
|
|
{prompt}
|
|
|
|
## Available Context ({thinking_tokens} thinking tokens allocated)
|
|
|
|
{file_content}
|
|
|
|
Please analyze using {thinking_mode} thinking mode.
|
|
"""
|
|
|
|
return {
|
|
'prompt': full_prompt,
|
|
'max_tokens': thinking_tokens,
|
|
'temperature': 0.2 if thinking_mode in ['high', 'max'] else 0.5
|
|
}
|
|
```
|
|
|
|
## Advanced Data Flow Patterns
|
|
|
|
### 1. Cross-Tool Continuation Flow
|
|
|
|
```python
|
|
# Tool A (analyze) creates foundation
|
|
analyze_result = await analyze_tool.execute({
|
|
'files': ['/workspace/tools/'],
|
|
'question': 'What is the architecture pattern?'
|
|
})
|
|
|
|
# Store context with continuation capability
|
|
thread_id = str(uuid.uuid4())
|
|
await memory.store_tool_execution(thread_id, ToolExecution(
|
|
tool_name='analyze',
|
|
files=['/workspace/tools/'],
|
|
summary='Identified MCP plugin architecture pattern',
|
|
continuation_id=thread_id
|
|
))
|
|
|
|
# Tool B (thinkdeep) continues analysis
|
|
thinkdeep_result = await thinkdeep_tool.execute({
|
|
'current_analysis': analyze_result.content,
|
|
'focus_areas': ['scalability', 'security'],
|
|
'continuation_id': thread_id # Links to previous context
|
|
})
|
|
```
|
|
|
|
### 2. Error Recovery & Graceful Degradation
|
|
|
|
```python
|
|
def resilient_file_processing(files: List[str]) -> str:
|
|
"""Process files with graceful error handling"""
|
|
results = []
|
|
|
|
for file_path in files:
|
|
try:
|
|
content = read_file_safely(file_path)
|
|
results.append(f"=== {file_path} ===\n{content}")
|
|
except PermissionError:
|
|
results.append(f"=== {file_path} ===\nERROR: Permission denied")
|
|
except FileNotFoundError:
|
|
results.append(f"=== {file_path} ===\nERROR: File not found")
|
|
except UnicodeDecodeError:
|
|
# Try binary file detection
|
|
try:
|
|
with open(file_path, 'rb') as f:
|
|
header = f.read(16)
|
|
if is_binary_file(header):
|
|
results.append(f"=== {file_path} ===\nBinary file (skipped)")
|
|
else:
|
|
results.append(f"=== {file_path} ===\nERROR: Encoding issue")
|
|
except:
|
|
results.append(f"=== {file_path} ===\nERROR: Unreadable file")
|
|
except Exception as e:
|
|
results.append(f"=== {file_path} ===\nERROR: {str(e)}")
|
|
|
|
return '\n\n'.join(results)
|
|
```
|
|
|
|
### 3. Performance Optimization Patterns
|
|
|
|
#### Concurrent File Processing
|
|
|
|
```python
|
|
async def process_files_concurrently(files: List[str], token_budget: int) -> str:
|
|
"""Process multiple files concurrently with shared budget"""
|
|
|
|
# Allocate budget per file
|
|
allocations = allocate_token_budget(files, token_budget)
|
|
|
|
# Create processing tasks
|
|
tasks = []
|
|
for file_path in files:
|
|
task = asyncio.create_task(
|
|
process_single_file(file_path, allocations.get(file_path, 1000))
|
|
)
|
|
tasks.append(task)
|
|
|
|
# Wait for all files to complete
|
|
results = await asyncio.gather(*tasks, return_exceptions=True)
|
|
|
|
# Combine results, handling exceptions
|
|
processed_content = []
|
|
for i, result in enumerate(results):
|
|
if isinstance(result, Exception):
|
|
processed_content.append(f"Error processing {files[i]}: {result}")
|
|
else:
|
|
processed_content.append(result)
|
|
|
|
return '\n\n'.join(processed_content)
|
|
```
|
|
|
|
#### Intelligent Caching
|
|
|
|
```python
|
|
class FileContentCache:
|
|
def __init__(self, max_size: int = 100):
|
|
self.cache = {}
|
|
self.access_times = {}
|
|
self.max_size = max_size
|
|
|
|
async def get_file_content(self, file_path: str, token_limit: int) -> str:
|
|
# Create cache key including token limit
|
|
cache_key = f"{file_path}:{token_limit}"
|
|
|
|
# Check cache hit
|
|
if cache_key in self.cache:
|
|
self.access_times[cache_key] = time.time()
|
|
return self.cache[cache_key]
|
|
|
|
# Process file and cache result
|
|
content = await process_file_content(file_path, token_limit)
|
|
|
|
# Evict oldest entries if cache full
|
|
if len(self.cache) >= self.max_size:
|
|
oldest_key = min(self.access_times.keys(),
|
|
key=lambda k: self.access_times[k])
|
|
del self.cache[oldest_key]
|
|
del self.access_times[oldest_key]
|
|
|
|
# Store in cache
|
|
self.cache[cache_key] = content
|
|
self.access_times[cache_key] = time.time()
|
|
|
|
return content
|
|
```
|
|
|
|
## Data Persistence Patterns
|
|
|
|
### 1. Redis Thread Storage
|
|
|
|
```python
|
|
# Thread context serialization
|
|
class ThreadContext:
|
|
def to_json(self) -> str:
|
|
return json.dumps({
|
|
'thread_id': self.thread_id,
|
|
'tool_history': [ex.to_dict() for ex in self.tool_history],
|
|
'conversation_files': list(self.conversation_files),
|
|
'context_tokens': self.context_tokens,
|
|
'created_at': self.created_at.isoformat(),
|
|
'last_accessed': self.last_accessed.isoformat()
|
|
})
|
|
|
|
@classmethod
|
|
def from_json(cls, json_str: str) -> 'ThreadContext':
|
|
data = json.loads(json_str)
|
|
context = cls(data['thread_id'])
|
|
context.tool_history = [
|
|
ToolExecution.from_dict(ex) for ex in data['tool_history']
|
|
]
|
|
context.conversation_files = set(data['conversation_files'])
|
|
context.context_tokens = data['context_tokens']
|
|
context.created_at = datetime.fromisoformat(data['created_at'])
|
|
context.last_accessed = datetime.fromisoformat(data['last_accessed'])
|
|
return context
|
|
```
|
|
|
|
### 2. Configuration State Management
|
|
|
|
```python
|
|
# Environment-based configuration with validation
|
|
class Config:
|
|
def __init__(self):
|
|
self.gemini_api_key = self._require_env('GEMINI_API_KEY')
|
|
self.gemini_model = os.getenv('GEMINI_MODEL', 'gemini-2.0-flash-thinking-exp')
|
|
self.project_root = os.getenv('PROJECT_ROOT', '/workspace')
|
|
self.redis_url = os.getenv('REDIS_URL', 'redis://localhost:6379')
|
|
self.max_context_tokens = int(os.getenv('MAX_CONTEXT_TOKENS', '1000000'))
|
|
|
|
# Validate critical paths
|
|
if not os.path.exists(self.project_root):
|
|
raise ConfigError(f"PROJECT_ROOT does not exist: {self.project_root}")
|
|
|
|
def _require_env(self, key: str) -> str:
|
|
value = os.getenv(key)
|
|
if not value:
|
|
raise ConfigError(f"Required environment variable not set: {key}")
|
|
return value
|
|
```
|
|
|
|
## Security Data Flow
|
|
|
|
### 1. Request Sanitization Pipeline
|
|
|
|
```python
|
|
def sanitize_request_data(request: dict) -> dict:
|
|
"""Multi-layer request sanitization"""
|
|
sanitized = {}
|
|
|
|
# 1. Schema validation
|
|
validated_data = RequestSchema.parse_obj(request)
|
|
|
|
# 2. Path sanitization
|
|
if 'files' in validated_data:
|
|
sanitized['files'] = [
|
|
sanitize_file_path(path) for path in validated_data['files']
|
|
]
|
|
|
|
# 3. Content filtering
|
|
if 'prompt' in validated_data:
|
|
sanitized['prompt'] = filter_sensitive_content(validated_data['prompt'])
|
|
|
|
# 4. Parameter validation
|
|
for key, value in validated_data.items():
|
|
if key not in ['files', 'prompt']:
|
|
sanitized[key] = validate_parameter(key, value)
|
|
|
|
return sanitized
|
|
```
|
|
|
|
### 2. Response Sanitization
|
|
|
|
```python
|
|
def sanitize_response_data(response: str) -> str:
|
|
"""Remove sensitive information from responses"""
|
|
|
|
# Remove potential API keys, tokens, passwords
|
|
sensitive_patterns = [
|
|
r'api[_-]?key["\s:=]+[a-zA-Z0-9-_]{20,}',
|
|
r'token["\s:=]+[a-zA-Z0-9-_]{20,}',
|
|
r'password["\s:=]+\S+',
|
|
r'/home/[^/\s]+', # User paths
|
|
r'[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}', # Emails
|
|
]
|
|
|
|
sanitized = response
|
|
for pattern in sensitive_patterns:
|
|
sanitized = re.sub(pattern, '[REDACTED]', sanitized, flags=re.IGNORECASE)
|
|
|
|
return sanitized
|
|
```
|
|
|
|
## Performance Monitoring & Metrics
|
|
|
|
### 1. Request Processing Metrics
|
|
|
|
```python
|
|
class PerformanceMetrics:
|
|
def __init__(self):
|
|
self.request_times = []
|
|
self.file_processing_times = []
|
|
self.memory_usage = []
|
|
self.error_counts = defaultdict(int)
|
|
|
|
async def track_request(self, tool_name: str, files: List[str]):
|
|
start_time = time.time()
|
|
start_memory = psutil.Process().memory_info().rss
|
|
|
|
try:
|
|
# Process request...
|
|
yield
|
|
|
|
except Exception as e:
|
|
self.error_counts[f"{tool_name}:{type(e).__name__}"] += 1
|
|
raise
|
|
finally:
|
|
# Record metrics
|
|
end_time = time.time()
|
|
end_memory = psutil.Process().memory_info().rss
|
|
|
|
self.request_times.append({
|
|
'tool': tool_name,
|
|
'duration': end_time - start_time,
|
|
'file_count': len(files),
|
|
'timestamp': datetime.now()
|
|
})
|
|
|
|
self.memory_usage.append({
|
|
'memory_delta': end_memory - start_memory,
|
|
'timestamp': datetime.now()
|
|
})
|
|
```
|
|
|
|
This comprehensive data flow documentation provides the foundation for understanding how information moves through the Gemini MCP Server, enabling effective debugging, optimization, and extension of the system. |