#!/usr/bin/env python3 """ Session Authentication Test Script Tests the token-based authentication system for securing individual user sessions in OpenCode servers. """ import os import sys import asyncio import time from pathlib import Path # Add session-manager to path for imports sys.path.insert(0, str(Path(__file__).parent)) from session_auth import ( SessionTokenManager, generate_session_auth_token, validate_session_auth_token, revoke_session_auth_token, rotate_session_auth_token, cleanup_expired_auth_tokens, get_session_auth_info, list_active_auth_sessions, get_active_auth_sessions_count, ) # Set up logging import logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) async def test_token_generation(): """Test token generation and basic properties.""" print("๐Ÿงช Testing Token Generation") print("=" * 50) manager = SessionTokenManager() # Test token generation session_id = "test-session-123" token = manager.generate_session_token(session_id) print(f"โœ… Generated token for session {session_id}") print(f" Token length: {len(token)} characters") print(f" Token format: URL-safe (contains only: {set(token)})") # Verify token is stored info = manager.get_session_token_info(session_id) assert info is not None, "Token info should be stored" assert info["session_id"] == session_id, "Session ID should match" assert "created_at" in info, "Created timestamp should exist" assert "expires_at" in info, "Expiry timestamp should exist" print("โœ… Token storage and retrieval working") return True async def test_token_validation(): """Test token validation functionality.""" print("\n๐Ÿ” Testing Token Validation") print("=" * 50) manager = SessionTokenManager() session_id = "test-session-456" token = manager.generate_session_token(session_id) # Test valid token is_valid, reason = manager.validate_session_token(session_id, token) assert is_valid, f"Valid token should be accepted: {reason}" print("โœ… Valid token accepted") # Test invalid token is_valid, reason = manager.validate_session_token(session_id, "invalid-token") assert not is_valid, "Invalid token should be rejected" print("โœ… Invalid token rejected") # Test non-existent session is_valid, reason = manager.validate_session_token("non-existent", token) assert not is_valid, "Token for non-existent session should be rejected" print("โœ… Non-existent session token rejected") return True async def test_token_expiry(): """Test token expiry functionality.""" print("\nโฐ Testing Token Expiry") print("=" * 50) # Create manager with short expiry for testing manager = SessionTokenManager() manager._token_expiry_hours = 0.001 # Expire in ~3.6 seconds session_id = "test-expiry-session" token = manager.generate_session_token(session_id) # Token should be valid initially is_valid, reason = manager.validate_session_token(session_id, token) assert is_valid, f"Token should be valid initially: {reason}" print("โœ… Token valid before expiry") # Wait for expiry await asyncio.sleep(4) # Token should be expired is_valid, reason = manager.validate_session_token(session_id, token) assert not is_valid, "Token should be expired" assert "expired" in reason.lower(), f"Expiry reason should mention expiry: {reason}" print("โœ… Token properly expired") # Cleanup should remove expired token cleaned = manager.cleanup_expired_tokens() assert cleaned >= 1, f"Should clean up at least 1 expired token, cleaned {cleaned}" print(f"โœ… Cleaned up {cleaned} expired tokens") return True async def test_token_revocation(): """Test token revocation functionality.""" print("\n๐Ÿšซ Testing Token Revocation") print("=" * 50) manager = SessionTokenManager() session_id = "test-revoke-session" token = manager.generate_session_token(session_id) # Token should work initially is_valid, reason = manager.validate_session_token(session_id, token) assert is_valid, "Token should be valid before revocation" print("โœ… Token valid before revocation") # Revoke token revoked = manager.revoke_session_token(session_id) assert revoked, "Revocation should succeed" print("โœ… Token revocation successful") # Token should no longer work is_valid, reason = manager.validate_session_token(session_id, token) assert not is_valid, "Revoked token should be invalid" print("โœ… Revoked token properly invalidated") return True async def test_token_rotation(): """Test token rotation functionality.""" print("\n๐Ÿ”„ Testing Token Rotation") print("=" * 50) manager = SessionTokenManager() session_id = "test-rotate-session" old_token = manager.generate_session_token(session_id) # Old token should work is_valid, reason = manager.validate_session_token(session_id, old_token) assert is_valid, "Original token should be valid" print("โœ… Original token valid") # Rotate token new_token = manager.rotate_session_token(session_id) assert new_token is not None, "Rotation should return new token" assert new_token != old_token, "New token should be different from old token" print("โœ… Token rotation successful") # Old token should no longer work is_valid, reason = manager.validate_session_token(session_id, old_token) assert not is_valid, "Old token should be invalid after rotation" print("โœ… Old token invalidated after rotation") # New token should work is_valid, reason = manager.validate_session_token(session_id, new_token) assert is_valid, "New token should be valid after rotation" print("โœ… New token validated after rotation") return True async def test_concurrent_sessions(): """Test multiple concurrent authenticated sessions.""" print("\n๐Ÿ‘ฅ Testing Concurrent Authenticated Sessions") print("=" * 50) manager = SessionTokenManager() # Create multiple sessions sessions = {} for i in range(5): session_id = f"concurrent-session-{i}" token = manager.generate_session_token(session_id) sessions[session_id] = token print(f"โœ… Created {len(sessions)} concurrent sessions") # Validate all tokens for session_id, token in sessions.items(): is_valid, reason = manager.validate_session_token(session_id, token) assert is_valid, f"Session {session_id} token should be valid: {reason}" print("โœ… All concurrent session tokens validated") # Check active sessions count active_count = manager.get_active_sessions_count() assert active_count == len(sessions), ( f"Should have {len(sessions)} active sessions, got {active_count}" ) print(f"โœ… Active sessions count correct: {active_count}") # List active sessions active_sessions = manager.list_active_sessions() assert len(active_sessions) == len(sessions), "Active sessions list should match" print(f"โœ… Active sessions list contains {len(active_sessions)} sessions") return True async def test_environment_configuration(): """Test environment-based configuration.""" print("\n๐ŸŒ Testing Environment Configuration") print("=" * 50) # Test with custom environment variables original_values = {} test_vars = [ ("SESSION_TOKEN_LENGTH", "16"), ("SESSION_TOKEN_EXPIRY_HOURS", "12"), ("TOKEN_CLEANUP_INTERVAL_MINUTES", "30"), ] # Save original values for var, _ in test_vars: original_values[var] = os.environ.get(var) try: # Set test values for var, value in test_vars: os.environ[var] = value # Create new manager with environment config manager = SessionTokenManager() # Verify configuration assert manager._token_length == 16, ( f"Token length should be 16, got {manager._token_length}" ) assert manager._token_expiry_hours == 12, ( f"Token expiry should be 12, got {manager._token_expiry_hours}" ) assert manager._cleanup_interval_minutes == 30, ( f"Cleanup interval should be 30, got {manager._cleanup_interval_minutes}" ) print("โœ… Environment configuration applied correctly") # Test token generation with custom length token = manager.generate_session_token("env-test-session") assert len(token) == 16, f"Token should be 16 chars, got {len(token)}" print("โœ… Custom token length working") finally: # Restore original values for var, original_value in original_values.items(): if original_value is not None: os.environ[var] = original_value elif var in os.environ: del os.environ[var] return True async def run_all_auth_tests(): """Run all authentication tests.""" print("๐Ÿ” Session Authentication Test Suite") print("=" * 70) tests = [ ("Token Generation", test_token_generation), ("Token Validation", test_token_validation), ("Token Expiry", test_token_expiry), ("Token Revocation", test_token_revocation), ("Token Rotation", test_token_rotation), ("Concurrent Sessions", test_concurrent_sessions), ("Environment Configuration", test_environment_configuration), ] results = [] for test_name, test_func in tests: print(f"\n{'=' * 25} {test_name} {'=' * 25}") try: result = await test_func() results.append(result) status = "โœ… PASSED" if result else "โŒ FAILED" print(f"\n{status}: {test_name}") except Exception as e: print(f"\nโŒ ERROR in {test_name}: {e}") results.append(False) # Summary print(f"\n{'=' * 70}") passed = sum(results) total = len(results) print(f"๐Ÿ“Š Test Results: {passed}/{total} tests passed") if passed == total: print("๐ŸŽ‰ All session authentication tests completed successfully!") print( "๐Ÿ” Token-based authentication is working correctly for securing sessions." ) else: print("โš ๏ธ Some tests failed. Check the output above for details.") print("๐Ÿ’ก Ensure all dependencies are installed and configuration is correct.") return passed == total if __name__ == "__main__": asyncio.run(run_all_auth_tests())