Refactor log monitor to eliminate code duplication

Addressed Gemini code review feedback by refactoring repetitive log processing:

 **Added _process_log_stream helper function**:
- Encapsulates common pattern of reading, filtering, formatting, and printing log lines
- Takes tailer, filter_func, and format_func as parameters
- Eliminates repetitive timestamp and formatting logic

 **Simplified main monitoring loop**:
- Reduced from ~35 lines of repetitive code to 4 clean function calls
- Each log stream now uses: _process_log_stream(tailer, filter, formatter)
- Eliminated duplicate timestamp creation (reduced from 4 to 1 occurrence)

 **Improved maintainability**:
- Changes to log processing logic now only need to be made in one place
- Cleaner, more readable main loop
- Better separation of concerns

 **Verified functionality**:
- All containers rebuild and start successfully
- Log monitor functions correctly with refactored code
- No functional changes, only code organization improvements

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
Fahad
2025-06-16 06:19:53 +04:00
parent be157ab771
commit e183e1bfff

View File

@@ -6,11 +6,38 @@ This module provides a simplified log monitoring interface using the
centralized LogTailer class from utils.file_utils.
"""
import time
from datetime import datetime
from utils.file_utils import LogTailer
def _process_log_stream(tailer, filter_func=None, format_func=None):
"""
Process new lines from a log tailer with optional filtering and formatting.
Args:
tailer: LogTailer instance to read from
filter_func: Optional function to filter lines (return True to include)
format_func: Optional function to format lines for display
"""
lines = tailer.read_new_lines()
for line in lines:
# Apply filter if provided
if filter_func and not filter_func(line):
continue
timestamp = datetime.now().strftime("%H:%M:%S")
# Apply formatter if provided
if format_func:
formatted = format_func(line)
else:
formatted = line
print(f"[{timestamp}] {formatted}")
def monitor_mcp_activity():
"""Monitor MCP server activity by watching multiple log files"""
log_files = {
@@ -106,40 +133,13 @@ def monitor_mcp_activity():
# Monitor all files in a simple loop
try:
while True:
# Check activity log
activity_lines = tailers["activity"].read_new_lines()
for line in activity_lines:
if activity_filter(line):
timestamp = datetime.now().strftime("%H:%M:%S")
formatted = activity_formatter(line)
print(f"[{timestamp}] {formatted}")
# Check main log
main_lines = tailers["main"].read_new_lines()
for line in main_lines:
if main_filter(line):
timestamp = datetime.now().strftime("%H:%M:%S")
formatted = main_formatter(line)
print(f"[{timestamp}] {formatted}")
# Check debug log
debug_lines = tailers["debug"].read_new_lines()
for line in debug_lines:
timestamp = datetime.now().strftime("%H:%M:%S")
formatted = debug_formatter(line)
print(f"[{timestamp}] {formatted}")
# Check overflow log
overflow_lines = tailers["overflow"].read_new_lines()
for line in overflow_lines:
if overflow_filter(line):
timestamp = datetime.now().strftime("%H:%M:%S")
formatted = overflow_formatter(line)
print(f"[{timestamp}] {formatted}")
# Process each log stream using the helper function
_process_log_stream(tailers["activity"], activity_filter, activity_formatter)
_process_log_stream(tailers["main"], main_filter, main_formatter)
_process_log_stream(tailers["debug"], None, debug_formatter) # No filter for debug
_process_log_stream(tailers["overflow"], overflow_filter, overflow_formatter)
# Wait before next check
import time
time.sleep(0.5)
except KeyboardInterrupt: