Files
my-pal-mcp-server/log_monitor.py
Fahad 91077e3810 Performance improvements when embedding files:
- Exit early at MCP boundary if files won't fit within given context of chosen model
- Encourage claude to re-run with better context
- Check file sizes before embedding
- Drop files from older conversations when building continuations and give priority to newer files
- List and mention excluded files to Claude on return
- Improved tests
- Improved precommit prompt
- Added a new Low severity to precommit
- Improved documentation of file embedding strategy
- Refactor
2025-06-16 05:51:52 +04:00

180 lines
6.0 KiB
Python
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
Log monitor for MCP server - monitors and displays tool activity
This module provides a simplified log monitoring interface using the
centralized LogTailer class from utils.file_utils.
"""
from datetime import datetime
from utils.file_utils import LogTailer
def create_line_handler(log_name: str, filter_func=None, format_func=None):
"""
Create a line handler function for log monitoring.
Args:
log_name: Name of the log for display purposes
filter_func: Optional function to filter lines (return True to include)
format_func: Optional function to format lines for display
Returns:
Function that handles log lines
"""
def handle_line(line: str):
if filter_func and not filter_func(line):
return
timestamp = datetime.now().strftime("%H:%M:%S")
if format_func:
formatted_line = format_func(line)
else:
formatted_line = line
print(f"[{timestamp}] {formatted_line}")
return handle_line
def monitor_mcp_activity():
"""Monitor MCP server activity by watching multiple log files"""
log_files = {
"/tmp/mcp_server.log": "main",
"/tmp/mcp_activity.log": "activity",
"/tmp/gemini_debug.log": "debug",
"/tmp/mcp_server_overflow.log": "overflow",
}
print(f"[{datetime.now().strftime('%H:%M:%S')}] MCP Log Monitor started")
for file_path, name in log_files.items():
print(f"[{datetime.now().strftime('%H:%M:%S')}] Monitoring {name}: {file_path}")
print(f"[{datetime.now().strftime('%H:%M:%S')}] Note: Logs rotate daily at midnight, keeping 7 days of history")
print("-" * 60)
# Create tailers for each log file
tailers = {}
# Activity log - most important for tool calls
def activity_filter(line: str) -> bool:
return any(
keyword in line
for keyword in [
"TOOL_CALL:",
"TOOL_COMPLETED:",
"CONVERSATION_RESUME:",
"CONVERSATION_CONTEXT:",
"CONVERSATION_ERROR:",
]
)
def activity_formatter(line: str) -> str:
if "TOOL_CALL:" in line:
tool_info = line.split("TOOL_CALL:")[-1].strip()
return f"Tool called: {tool_info}"
elif "TOOL_COMPLETED:" in line:
tool_name = line.split("TOOL_COMPLETED:")[-1].strip()
return f"✓ Tool completed: {tool_name}"
elif "CONVERSATION_RESUME:" in line:
resume_info = line.split("CONVERSATION_RESUME:")[-1].strip()
return f"Resume: {resume_info}"
elif "CONVERSATION_CONTEXT:" in line:
context_info = line.split("CONVERSATION_CONTEXT:")[-1].strip()
return f"Context: {context_info}"
elif "CONVERSATION_ERROR:" in line:
error_info = line.split("CONVERSATION_ERROR:")[-1].strip()
return f"❌ Conversation error: {error_info}"
return line
tailers["activity"] = LogTailer("/tmp/mcp_activity.log")
# Main log - errors and warnings
def main_filter(line: str) -> bool:
return any(keyword in line for keyword in ["ERROR", "WARNING", "DEBUG", "Gemini API"])
def main_formatter(line: str) -> str:
if "ERROR" in line:
return f"{line}"
elif "WARNING" in line:
return f"⚠️ {line}"
elif "DEBUG" in line:
if "📄" in line or "📁" in line:
return f"📂 FILE: {line}"
else:
return f"🔍 {line}"
elif "Gemini API" in line and ("Sending" in line or "Received" in line):
return f"API: {line}"
elif "INFO" in line and any(keyword in line for keyword in ["Gemini API", "Tool", "Conversation"]):
return f" {line}"
return line
tailers["main"] = LogTailer("/tmp/mcp_server.log")
# Debug log
def debug_formatter(line: str) -> str:
return f"DEBUG: {line}"
tailers["debug"] = LogTailer("/tmp/gemini_debug.log")
# Overflow log
def overflow_filter(line: str) -> bool:
return "ERROR" in line or "WARNING" in line
def overflow_formatter(line: str) -> str:
if "ERROR" in line:
return f"🚨 OVERFLOW: {line}"
elif "WARNING" in line:
return f"⚠️ OVERFLOW: {line}"
return line
tailers["overflow"] = LogTailer("/tmp/mcp_server_overflow.log")
# Monitor all files in a simple loop
try:
while True:
# Check activity log
activity_lines = tailers["activity"].read_new_lines()
for line in activity_lines:
if activity_filter(line):
timestamp = datetime.now().strftime("%H:%M:%S")
formatted = activity_formatter(line)
print(f"[{timestamp}] {formatted}")
# Check main log
main_lines = tailers["main"].read_new_lines()
for line in main_lines:
if main_filter(line):
timestamp = datetime.now().strftime("%H:%M:%S")
formatted = main_formatter(line)
print(f"[{timestamp}] {formatted}")
# Check debug log
debug_lines = tailers["debug"].read_new_lines()
for line in debug_lines:
timestamp = datetime.now().strftime("%H:%M:%S")
formatted = debug_formatter(line)
print(f"[{timestamp}] {formatted}")
# Check overflow log
overflow_lines = tailers["overflow"].read_new_lines()
for line in overflow_lines:
if overflow_filter(line):
timestamp = datetime.now().strftime("%H:%M:%S")
formatted = overflow_formatter(line)
print(f"[{timestamp}] {formatted}")
# Wait before next check
import time
time.sleep(0.5)
except KeyboardInterrupt:
print(f"\n[{datetime.now().strftime('%H:%M:%S')}] Log monitor stopped")
if __name__ == "__main__":
monitor_mcp_activity()