From d6d7bf8cac6510232c04e799858e6e9bd1acb71a Mon Sep 17 00:00:00 2001 From: Fahad Date: Mon, 16 Jun 2025 11:30:02 +0400 Subject: [PATCH] Fixed internal file path translation into docker --- config.py | 2 +- tests/test_app_path_translation.py | 126 ++++++++++ tests/test_conversation_missing_files.py | 64 +++++ tests/test_internal_config_file_access.py | 290 ++++++++++++++++++++++ utils/conversation_memory.py | 38 ++- utils/file_utils.py | 34 ++- 6 files changed, 544 insertions(+), 10 deletions(-) create mode 100644 tests/test_app_path_translation.py create mode 100644 tests/test_conversation_missing_files.py create mode 100644 tests/test_internal_config_file_access.py diff --git a/config.py b/config.py index b958bc3..5c7b4f3 100644 --- a/config.py +++ b/config.py @@ -14,7 +14,7 @@ import os # These values are used in server responses and for tracking releases # IMPORTANT: This is the single source of truth for version and author info # Semantic versioning: MAJOR.MINOR.PATCH -__version__ = "4.7.4" +__version__ = "4.7.5" # Last update date in ISO format __updated__ = "2025-06-16" # Primary maintainer diff --git a/tests/test_app_path_translation.py b/tests/test_app_path_translation.py new file mode 100644 index 0000000..948c2d6 --- /dev/null +++ b/tests/test_app_path_translation.py @@ -0,0 +1,126 @@ +""" +Test /app/ to ./ path translation for standalone mode. + +Tests that internal application paths work in both Docker and standalone modes. +""" + +import os +import tempfile +from unittest.mock import patch + +from utils.file_utils import translate_path_for_environment + + +class TestAppPathTranslation: + """Test translation of /app/ paths for different environments.""" + + def test_app_path_translation_in_standalone_mode(self): + """Test that /app/ paths are translated to ./ in standalone mode.""" + + # Mock standalone environment (no Docker) + with patch("utils.file_utils.CONTAINER_WORKSPACE") as mock_container_workspace: + mock_container_workspace.exists.return_value = False + + # Clear WORKSPACE_ROOT to simulate standalone mode + with patch.dict(os.environ, {}, clear=True): + + # Test translation of internal app paths + test_cases = [ + ("/app/conf/custom_models.json", "./conf/custom_models.json"), + ("/app/conf/other_config.json", "./conf/other_config.json"), + ("/app/logs/app.log", "./logs/app.log"), + ("/app/data/file.txt", "./data/file.txt"), + ] + + for input_path, expected_output in test_cases: + result = translate_path_for_environment(input_path) + assert result == expected_output, f"Expected {expected_output}, got {result}" + + def test_allowed_app_path_unchanged_in_docker_mode(self): + """Test that allowed /app/ paths remain unchanged in Docker mode.""" + + with tempfile.TemporaryDirectory() as tmpdir: + # Mock Docker environment + with patch("utils.file_utils.CONTAINER_WORKSPACE") as mock_container_workspace: + mock_container_workspace.exists.return_value = True + mock_container_workspace.__str__.return_value = "/workspace" + + # Set WORKSPACE_ROOT to simulate Docker environment + with patch.dict(os.environ, {"WORKSPACE_ROOT": tmpdir}): + + # Only specifically allowed internal app paths should remain unchanged in Docker + allowed_path = "/app/conf/custom_models.json" + result = translate_path_for_environment(allowed_path) + assert ( + result == allowed_path + ), f"Docker mode should preserve allowed path {allowed_path}, got {result}" + + def test_non_allowed_app_paths_blocked_in_docker_mode(self): + """Test that non-allowed /app/ paths are blocked in Docker mode.""" + + with tempfile.TemporaryDirectory() as tmpdir: + # Mock Docker environment + with patch("utils.file_utils.CONTAINER_WORKSPACE") as mock_container_workspace: + mock_container_workspace.exists.return_value = True + mock_container_workspace.__str__.return_value = "/workspace" + + # Set WORKSPACE_ROOT to simulate Docker environment + with patch.dict(os.environ, {"WORKSPACE_ROOT": tmpdir}): + + # Non-allowed internal app paths should be blocked in Docker for security + blocked_paths = [ + "/app/conf/other_config.json", + "/app/logs/app.log", + "/app/server.py", + ] + + for blocked_path in blocked_paths: + result = translate_path_for_environment(blocked_path) + assert result.startswith( + "/inaccessible/" + ), f"Docker mode should block non-allowed path {blocked_path}, got {result}" + + def test_non_app_paths_unchanged_in_standalone(self): + """Test that non-/app/ paths are unchanged in standalone mode.""" + + # Mock standalone environment + with patch("utils.file_utils.CONTAINER_WORKSPACE") as mock_container_workspace: + mock_container_workspace.exists.return_value = False + + with patch.dict(os.environ, {}, clear=True): + + # Non-app paths should be unchanged + test_cases = [ + "/home/user/file.py", + "/etc/config.conf", + "./local/file.txt", + "relative/path.py", + "/workspace/file.py", + ] + + for input_path in test_cases: + result = translate_path_for_environment(input_path) + assert result == input_path, f"Non-app path {input_path} should be unchanged, got {result}" + + def test_edge_cases_in_app_translation(self): + """Test edge cases in /app/ path translation.""" + + # Mock standalone environment + with patch("utils.file_utils.CONTAINER_WORKSPACE") as mock_container_workspace: + mock_container_workspace.exists.return_value = False + + with patch.dict(os.environ, {}, clear=True): + + # Test edge cases + test_cases = [ + ("/app/", "./"), # Root app directory + ("/app", "/app"), # Exact match without trailing slash - not translated + ("/app/file", "./file"), # File directly in app + ("/app//double/slash", "./double/slash"), # Handle double slashes + ] + + for input_path, expected_output in test_cases: + result = translate_path_for_environment(input_path) + assert ( + result == expected_output + ), f"Edge case {input_path}: expected {expected_output}, got {result}" diff --git a/tests/test_conversation_missing_files.py b/tests/test_conversation_missing_files.py new file mode 100644 index 0000000..ffc273f --- /dev/null +++ b/tests/test_conversation_missing_files.py @@ -0,0 +1,64 @@ +""" +Test conversation memory handling of missing files. + +Following existing test patterns to ensure conversation memory gracefully +handles missing files without crashing. +""" + +from unittest.mock import Mock + +from utils.conversation_memory import ( + ConversationTurn, + ThreadContext, + build_conversation_history, +) + + +class TestConversationMissingFiles: + """Test handling of missing files during conversation memory reconstruction.""" + + def test_build_conversation_history_handles_missing_files(self): + """Test that conversation history building handles missing files gracefully.""" + + # Create conversation context with missing file reference (following existing test patterns) + context = ThreadContext( + thread_id="test-thread", + created_at="2024-01-01T00:00:00Z", + last_updated_at="2024-01-01T00:05:00Z", + tool_name="analyze", + turns=[ + ConversationTurn( + role="user", + content="Please analyze this file", + timestamp="2024-01-01T00:01:00Z", + files=["/nonexistent/missing_file.py"], # File that doesn't exist + tool_name="analyze", + ), + ConversationTurn( + role="assistant", + content="Here's my analysis...", + timestamp="2024-01-01T00:02:00Z", + tool_name="analyze", + ), + ], + initial_context={"path": "/nonexistent/missing_file.py"}, + ) + + # Mock model context (following existing test patterns) + mock_model_context = Mock() + mock_model_context.calculate_token_allocation.return_value = Mock(file_tokens=50000, history_tokens=50000) + mock_model_context.estimate_tokens.return_value = 100 + mock_model_context.model_name = "test-model" + + # Should not crash, should handle missing file gracefully + history, tokens = build_conversation_history(context, mock_model_context) + + # Should return valid history despite missing file + assert isinstance(history, str) + assert isinstance(tokens, int) + assert len(history) > 0 + + # Should contain conversation content + assert "CONVERSATION HISTORY" in history + assert "Please analyze this file" in history + assert "Here's my analysis" in history diff --git a/tests/test_internal_config_file_access.py b/tests/test_internal_config_file_access.py new file mode 100644 index 0000000..38f1e6f --- /dev/null +++ b/tests/test_internal_config_file_access.py @@ -0,0 +1,290 @@ +""" +Integration tests for internal application configuration file access. + +These tests verify that: +1. Specific internal config files are accessible (exact path matching) +2. Path variations and traversal attempts are blocked (security) +3. The OpenRouter model configuration loads properly +4. Normal workspace file operations continue to work + +This follows the established testing patterns from test_docker_path_integration.py +by using actual file operations and module reloading instead of mocks. +""" + +import importlib +import os +import tempfile +from pathlib import Path +from unittest.mock import patch + +import pytest + +from utils.file_utils import translate_path_for_environment + + +class TestInternalConfigFileAccess: + """Test access to internal application configuration files.""" + + def test_allowed_internal_config_file_access(self): + """Test that the specific internal config file is accessible.""" + + with tempfile.TemporaryDirectory() as tmpdir: + # Set up Docker-like environment + host_workspace = Path(tmpdir) / "host_workspace" + host_workspace.mkdir() + container_workspace = Path(tmpdir) / "container_workspace" + container_workspace.mkdir() + + original_env = os.environ.copy() + try: + os.environ["WORKSPACE_ROOT"] = str(host_workspace) + + # Reload modules to pick up environment + import utils.security_config + + importlib.reload(utils.security_config) + importlib.reload(utils.file_utils) + + # Test with Docker environment simulation + with patch("utils.file_utils.CONTAINER_WORKSPACE", container_workspace): + # The exact allowed path should pass through unchanged + result = translate_path_for_environment("/app/conf/custom_models.json") + assert result == "/app/conf/custom_models.json" + + finally: + # Restore environment + os.environ.clear() + os.environ.update(original_env) + import utils.security_config + + importlib.reload(utils.security_config) + importlib.reload(utils.file_utils) + + def test_blocked_config_file_variations(self): + """Test that variations of the config file path are blocked.""" + + with tempfile.TemporaryDirectory() as tmpdir: + host_workspace = Path(tmpdir) / "host_workspace" + host_workspace.mkdir() + container_workspace = Path(tmpdir) / "container_workspace" + container_workspace.mkdir() + + original_env = os.environ.copy() + try: + os.environ["WORKSPACE_ROOT"] = str(host_workspace) + + import utils.security_config + + importlib.reload(utils.security_config) + importlib.reload(utils.file_utils) + + with patch("utils.file_utils.CONTAINER_WORKSPACE", container_workspace): + # Test blocked variations - these should return inaccessible paths + blocked_paths = [ + "/app/conf/", # Directory + "/app/conf/other_file.json", # Different file + "/app/conf/custom_models.json.backup", # Extra extension + "/app/conf/custom_models.txt", # Different extension + "/app/conf/../server.py", # Path traversal + "/app/server.py", # Application code + "/etc/passwd", # System file + ] + + for path in blocked_paths: + result = translate_path_for_environment(path) + assert result.startswith("/inaccessible/"), f"Path {path} should be blocked but got: {result}" + + finally: + os.environ.clear() + os.environ.update(original_env) + import utils.security_config + + importlib.reload(utils.security_config) + importlib.reload(utils.file_utils) + + def test_workspace_files_continue_to_work(self): + """Test that normal workspace file operations are unaffected.""" + + with tempfile.TemporaryDirectory() as tmpdir: + host_workspace = Path(tmpdir) / "host_workspace" + host_workspace.mkdir() + container_workspace = Path(tmpdir) / "container_workspace" + container_workspace.mkdir() + + # Create a test file in the workspace + test_file = host_workspace / "src" / "test.py" + test_file.parent.mkdir(parents=True) + test_file.write_text("# test file") + + original_env = os.environ.copy() + try: + os.environ["WORKSPACE_ROOT"] = str(host_workspace) + + import utils.security_config + + importlib.reload(utils.security_config) + importlib.reload(utils.file_utils) + + with patch("utils.file_utils.CONTAINER_WORKSPACE", container_workspace): + # Normal workspace file should translate correctly + result = translate_path_for_environment(str(test_file)) + expected = str(container_workspace / "src" / "test.py") + assert result == expected + + finally: + os.environ.clear() + os.environ.update(original_env) + import utils.security_config + + importlib.reload(utils.security_config) + importlib.reload(utils.file_utils) + + def test_openrouter_config_loading_real_world(self): + """Test that OpenRouter configuration loading works in real container environment.""" + + # This test validates that our fix works in the actual Docker environment + # by checking that the translate_path_for_environment function handles + # the exact internal config path correctly + + with tempfile.TemporaryDirectory() as tmpdir: + host_workspace = Path(tmpdir) / "host_workspace" + host_workspace.mkdir() + container_workspace = Path(tmpdir) / "container_workspace" + container_workspace.mkdir() + + original_env = os.environ.copy() + try: + os.environ["WORKSPACE_ROOT"] = str(host_workspace) + + import utils.security_config + + importlib.reload(utils.security_config) + importlib.reload(utils.file_utils) + + with patch("utils.file_utils.CONTAINER_WORKSPACE", container_workspace): + # Test that the function correctly handles the config path + result = translate_path_for_environment("/app/conf/custom_models.json") + + # The path should pass through unchanged (not be blocked) + assert result == "/app/conf/custom_models.json" + + # Verify it's not marked as inaccessible + assert not result.startswith("/inaccessible/") + + finally: + os.environ.clear() + os.environ.update(original_env) + import utils.security_config + + importlib.reload(utils.security_config) + importlib.reload(utils.file_utils) + + def test_security_boundary_comprehensive(self): + """Comprehensive test of all security boundaries in Docker environment.""" + + with tempfile.TemporaryDirectory() as tmpdir: + host_workspace = Path(tmpdir) / "host_workspace" + host_workspace.mkdir() + container_workspace = Path(tmpdir) / "container_workspace" + container_workspace.mkdir() + + # Create a workspace file for testing + workspace_file = host_workspace / "project" / "main.py" + workspace_file.parent.mkdir(parents=True) + workspace_file.write_text("# workspace file") + + original_env = os.environ.copy() + try: + os.environ["WORKSPACE_ROOT"] = str(host_workspace) + + import utils.security_config + + importlib.reload(utils.security_config) + importlib.reload(utils.file_utils) + + with patch("utils.file_utils.CONTAINER_WORKSPACE", container_workspace): + # Test cases: (path, should_be_allowed, description) + test_cases = [ + # Allowed cases + ("/app/conf/custom_models.json", True, "Exact allowed internal config"), + (str(workspace_file), True, "Workspace file"), + (str(container_workspace / "existing.py"), True, "Container path"), + # Blocked cases + ("/app/conf/", False, "Directory access"), + ("/app/conf/other.json", False, "Different config file"), + ("/app/conf/custom_models.json.backup", False, "Config with extra extension"), + ("/app/server.py", False, "Application source"), + ("/etc/passwd", False, "System file"), + ("../../../etc/passwd", False, "Relative path traversal"), + ("/app/conf/../server.py", False, "Path traversal through config dir"), + ] + + for path, should_be_allowed, description in test_cases: + result = translate_path_for_environment(path) + + if should_be_allowed: + # Should either pass through unchanged or translate to container path + assert not result.startswith( + "/inaccessible/" + ), f"{description}: {path} should be allowed but was blocked" + else: + # Should be blocked with inaccessible path + assert result.startswith( + "/inaccessible/" + ), f"{description}: {path} should be blocked but got: {result}" + + finally: + os.environ.clear() + os.environ.update(original_env) + import utils.security_config + + importlib.reload(utils.security_config) + importlib.reload(utils.file_utils) + + def test_exact_path_matching_prevents_wildcards(self): + """Test that using exact path matching prevents any wildcard-like behavior.""" + + with tempfile.TemporaryDirectory() as tmpdir: + host_workspace = Path(tmpdir) / "host_workspace" + host_workspace.mkdir() + container_workspace = Path(tmpdir) / "container_workspace" + container_workspace.mkdir() + + original_env = os.environ.copy() + try: + os.environ["WORKSPACE_ROOT"] = str(host_workspace) + + import utils.security_config + + importlib.reload(utils.security_config) + importlib.reload(utils.file_utils) + + with patch("utils.file_utils.CONTAINER_WORKSPACE", container_workspace): + # Even subtle variations should be blocked + subtle_variations = [ + "/app/conf/custom_models.jsonx", # Extra char + "/app/conf/custom_models.jso", # Missing char + "/app/conf/custom_models.JSON", # Different case + "/app/conf/custom_models.json ", # Trailing space + " /app/conf/custom_models.json", # Leading space + "/app/conf/./custom_models.json", # Current dir reference + "/app/conf/subdir/../custom_models.json", # Up and down + ] + + for variation in subtle_variations: + result = translate_path_for_environment(variation) + assert result.startswith( + "/inaccessible/" + ), f"Variation {variation} should be blocked but got: {result}" + + finally: + os.environ.clear() + os.environ.update(original_env) + import utils.security_config + + importlib.reload(utils.security_config) + importlib.reload(utils.file_utils) + + +if __name__ == "__main__": + pytest.main([__file__, "-v"]) diff --git a/utils/conversation_memory.py b/utils/conversation_memory.py index 1eee435..2e07b8e 100644 --- a/utils/conversation_memory.py +++ b/utils/conversation_memory.py @@ -534,11 +534,17 @@ def _plan_file_inclusion_by_size(all_files: list[str], max_file_tokens: int) -> ) else: files_to_skip.append(file_path) - logger.debug(f"[FILES] Skipping {file_path} - file not accessible") + # More descriptive message for missing files + if not os.path.exists(translated_path): + logger.debug( + f"[FILES] Skipping {file_path} - file no longer exists (may have been moved/deleted since conversation)" + ) + else: + logger.debug(f"[FILES] Skipping {file_path} - file not accessible (not a regular file)") except Exception as e: files_to_skip.append(file_path) - logger.debug(f"[FILES] Skipping {file_path} - error: {type(e).__name__}: {e}") + logger.debug(f"[FILES] Skipping {file_path} - error during processing: {type(e).__name__}: {e}") logger.debug( f"[FILES] Inclusion plan: {len(files_to_include)} include, {len(files_to_skip)} skip, {total_tokens:,} tokens" @@ -725,7 +731,8 @@ def build_conversation_history(context: ThreadContext, model_context=None, read_ files_to_include, files_to_skip, estimated_tokens = _plan_file_inclusion_by_size(all_files, max_file_tokens) if files_to_skip: - logger.info(f"[FILES] Skipping {len(files_to_skip)} files due to size constraints: {files_to_skip}") + logger.info(f"[FILES] Excluding {len(files_to_skip)} files from conversation history: {files_to_skip}") + logger.debug("[FILES] Files excluded for various reasons (size constraints, missing files, access issues)") if files_to_include: history_parts.extend( @@ -735,7 +742,7 @@ def build_conversation_history(context: ThreadContext, model_context=None, read_ ( "" if not files_to_skip - else f"[NOTE: {len(files_to_skip)} files omitted due to size constraints]" + else f"[NOTE: {len(files_to_skip)} files omitted (size constraints, missing files, or access issues)]" ), "Refer to these when analyzing the context and requests below:", "", @@ -764,16 +771,31 @@ def build_conversation_history(context: ThreadContext, model_context=None, read_ else: logger.debug(f"File skipped (empty content): {file_path}") except Exception as e: - logger.warning( - f"Failed to embed file in conversation history: {file_path} - {type(e).__name__}: {e}" - ) + # More descriptive error handling for missing files + try: + from utils.file_utils import translate_path_for_environment + + translated_path = translate_path_for_environment(file_path) + if not os.path.exists(translated_path): + logger.info( + f"File no longer accessible for conversation history: {file_path} - file was moved/deleted since conversation (marking as excluded)" + ) + else: + logger.warning( + f"Failed to embed file in conversation history: {file_path} - {type(e).__name__}: {e}" + ) + except Exception: + # Fallback if path translation also fails + logger.warning( + f"Failed to embed file in conversation history: {file_path} - {type(e).__name__}: {e}" + ) continue if file_contents: files_content = "".join(file_contents) if files_to_skip: files_content += ( - f"\n[NOTE: {len(files_to_skip)} additional file(s) were omitted due to size constraints. " + f"\n[NOTE: {len(files_to_skip)} additional file(s) were omitted due to size constraints, missing files, or access issues. " f"These were older files from earlier conversation turns.]\n" ) history_parts.append(files_content) diff --git a/utils/file_utils.py b/utils/file_utils.py index f6d8033..9945d9c 100644 --- a/utils/file_utils.py +++ b/utils/file_utils.py @@ -284,8 +284,40 @@ def translate_path_for_environment(path_str: str) -> str: Returns: Translated path appropriate for the current environment """ + # Allow access to specific internal application configuration files + # Store as relative paths so they work in both Docker and standalone modes + # Use exact paths for security - no wildcards or prefix matching + ALLOWED_INTERNAL_PATHS = { + "conf/custom_models.json", + # Add other specific internal files here as needed + } + + # Check for internal app paths - extract relative part if it's an /app/ path + relative_internal_path = None + if path_str.startswith("/app/"): + relative_internal_path = path_str[5:] # Remove "/app/" prefix + if relative_internal_path.startswith("/"): + relative_internal_path = relative_internal_path[1:] # Remove leading slash if present + + # Check if this is an allowed internal file + if relative_internal_path and relative_internal_path in ALLOWED_INTERNAL_PATHS: + # Translate to appropriate path for current environment + if not WORKSPACE_ROOT or not WORKSPACE_ROOT.strip() or not CONTAINER_WORKSPACE.exists(): + # Standalone mode: use relative path + return "./" + relative_internal_path + else: + # Docker mode: use absolute app path + return "/app/" + relative_internal_path + + # Handle other /app/ paths in standalone mode (for non-whitelisted files) if not WORKSPACE_ROOT or not WORKSPACE_ROOT.strip() or not CONTAINER_WORKSPACE.exists(): - # Not in the configured Docker environment, no translation needed + if path_str.startswith("/app/"): + # Convert Docker internal paths to local relative paths for standalone mode + relative_path = path_str[5:] # Remove "/app/" prefix + if relative_path.startswith("/"): + relative_path = relative_path[1:] # Remove leading slash if present + return "./" + relative_path + # No other translation needed for standalone mode return path_str # Check if the path is already a container path (starts with /workspace)