326 lines
10 KiB
Python
Executable File
326 lines
10 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
Session Authentication Test Script
|
|
|
|
Tests the token-based authentication system for securing individual
|
|
user sessions in OpenCode servers.
|
|
"""
|
|
|
|
import os
|
|
import sys
|
|
import asyncio
|
|
import time
|
|
from pathlib import Path
|
|
|
|
# Add session-manager to path for imports
|
|
sys.path.insert(0, str(Path(__file__).parent))
|
|
|
|
from session_auth import (
|
|
SessionTokenManager,
|
|
generate_session_auth_token,
|
|
validate_session_auth_token,
|
|
revoke_session_auth_token,
|
|
rotate_session_auth_token,
|
|
cleanup_expired_auth_tokens,
|
|
get_session_auth_info,
|
|
list_active_auth_sessions,
|
|
get_active_auth_sessions_count,
|
|
)
|
|
|
|
# Set up logging
|
|
import logging
|
|
|
|
logging.basicConfig(level=logging.INFO)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
async def test_token_generation():
|
|
"""Test token generation and basic properties."""
|
|
print("🧪 Testing Token Generation")
|
|
print("=" * 50)
|
|
|
|
manager = SessionTokenManager()
|
|
|
|
# Test token generation
|
|
session_id = "test-session-123"
|
|
token = manager.generate_session_token(session_id)
|
|
|
|
print(f"✅ Generated token for session {session_id}")
|
|
print(f" Token length: {len(token)} characters")
|
|
print(f" Token format: URL-safe (contains only: {set(token)})")
|
|
|
|
# Verify token is stored
|
|
info = manager.get_session_token_info(session_id)
|
|
assert info is not None, "Token info should be stored"
|
|
assert info["session_id"] == session_id, "Session ID should match"
|
|
assert "created_at" in info, "Created timestamp should exist"
|
|
assert "expires_at" in info, "Expiry timestamp should exist"
|
|
|
|
print("✅ Token storage and retrieval working")
|
|
return True
|
|
|
|
|
|
async def test_token_validation():
|
|
"""Test token validation functionality."""
|
|
print("\n🔐 Testing Token Validation")
|
|
print("=" * 50)
|
|
|
|
manager = SessionTokenManager()
|
|
|
|
session_id = "test-session-456"
|
|
token = manager.generate_session_token(session_id)
|
|
|
|
# Test valid token
|
|
is_valid, reason = manager.validate_session_token(session_id, token)
|
|
assert is_valid, f"Valid token should be accepted: {reason}"
|
|
print("✅ Valid token accepted")
|
|
|
|
# Test invalid token
|
|
is_valid, reason = manager.validate_session_token(session_id, "invalid-token")
|
|
assert not is_valid, "Invalid token should be rejected"
|
|
print("✅ Invalid token rejected")
|
|
|
|
# Test non-existent session
|
|
is_valid, reason = manager.validate_session_token("non-existent", token)
|
|
assert not is_valid, "Token for non-existent session should be rejected"
|
|
print("✅ Non-existent session token rejected")
|
|
|
|
return True
|
|
|
|
|
|
async def test_token_expiry():
|
|
"""Test token expiry functionality."""
|
|
print("\n⏰ Testing Token Expiry")
|
|
print("=" * 50)
|
|
|
|
# Create manager with short expiry for testing
|
|
manager = SessionTokenManager()
|
|
manager._token_expiry_hours = 0.001 # Expire in ~3.6 seconds
|
|
|
|
session_id = "test-expiry-session"
|
|
token = manager.generate_session_token(session_id)
|
|
|
|
# Token should be valid initially
|
|
is_valid, reason = manager.validate_session_token(session_id, token)
|
|
assert is_valid, f"Token should be valid initially: {reason}"
|
|
print("✅ Token valid before expiry")
|
|
|
|
# Wait for expiry
|
|
await asyncio.sleep(4)
|
|
|
|
# Token should be expired
|
|
is_valid, reason = manager.validate_session_token(session_id, token)
|
|
assert not is_valid, "Token should be expired"
|
|
assert "expired" in reason.lower(), f"Expiry reason should mention expiry: {reason}"
|
|
print("✅ Token properly expired")
|
|
|
|
# Cleanup should remove expired token
|
|
cleaned = manager.cleanup_expired_tokens()
|
|
assert cleaned >= 1, f"Should clean up at least 1 expired token, cleaned {cleaned}"
|
|
print(f"✅ Cleaned up {cleaned} expired tokens")
|
|
|
|
return True
|
|
|
|
|
|
async def test_token_revocation():
|
|
"""Test token revocation functionality."""
|
|
print("\n🚫 Testing Token Revocation")
|
|
print("=" * 50)
|
|
|
|
manager = SessionTokenManager()
|
|
|
|
session_id = "test-revoke-session"
|
|
token = manager.generate_session_token(session_id)
|
|
|
|
# Token should work initially
|
|
is_valid, reason = manager.validate_session_token(session_id, token)
|
|
assert is_valid, "Token should be valid before revocation"
|
|
print("✅ Token valid before revocation")
|
|
|
|
# Revoke token
|
|
revoked = manager.revoke_session_token(session_id)
|
|
assert revoked, "Revocation should succeed"
|
|
print("✅ Token revocation successful")
|
|
|
|
# Token should no longer work
|
|
is_valid, reason = manager.validate_session_token(session_id, token)
|
|
assert not is_valid, "Revoked token should be invalid"
|
|
print("✅ Revoked token properly invalidated")
|
|
|
|
return True
|
|
|
|
|
|
async def test_token_rotation():
|
|
"""Test token rotation functionality."""
|
|
print("\n🔄 Testing Token Rotation")
|
|
print("=" * 50)
|
|
|
|
manager = SessionTokenManager()
|
|
|
|
session_id = "test-rotate-session"
|
|
old_token = manager.generate_session_token(session_id)
|
|
|
|
# Old token should work
|
|
is_valid, reason = manager.validate_session_token(session_id, old_token)
|
|
assert is_valid, "Original token should be valid"
|
|
print("✅ Original token valid")
|
|
|
|
# Rotate token
|
|
new_token = manager.rotate_session_token(session_id)
|
|
assert new_token is not None, "Rotation should return new token"
|
|
assert new_token != old_token, "New token should be different from old token"
|
|
print("✅ Token rotation successful")
|
|
|
|
# Old token should no longer work
|
|
is_valid, reason = manager.validate_session_token(session_id, old_token)
|
|
assert not is_valid, "Old token should be invalid after rotation"
|
|
print("✅ Old token invalidated after rotation")
|
|
|
|
# New token should work
|
|
is_valid, reason = manager.validate_session_token(session_id, new_token)
|
|
assert is_valid, "New token should be valid after rotation"
|
|
print("✅ New token validated after rotation")
|
|
|
|
return True
|
|
|
|
|
|
async def test_concurrent_sessions():
|
|
"""Test multiple concurrent authenticated sessions."""
|
|
print("\n👥 Testing Concurrent Authenticated Sessions")
|
|
print("=" * 50)
|
|
|
|
manager = SessionTokenManager()
|
|
|
|
# Create multiple sessions
|
|
sessions = {}
|
|
for i in range(5):
|
|
session_id = f"concurrent-session-{i}"
|
|
token = manager.generate_session_token(session_id)
|
|
sessions[session_id] = token
|
|
|
|
print(f"✅ Created {len(sessions)} concurrent sessions")
|
|
|
|
# Validate all tokens
|
|
for session_id, token in sessions.items():
|
|
is_valid, reason = manager.validate_session_token(session_id, token)
|
|
assert is_valid, f"Session {session_id} token should be valid: {reason}"
|
|
|
|
print("✅ All concurrent session tokens validated")
|
|
|
|
# Check active sessions count
|
|
active_count = manager.get_active_sessions_count()
|
|
assert active_count == len(sessions), (
|
|
f"Should have {len(sessions)} active sessions, got {active_count}"
|
|
)
|
|
print(f"✅ Active sessions count correct: {active_count}")
|
|
|
|
# List active sessions
|
|
active_sessions = manager.list_active_sessions()
|
|
assert len(active_sessions) == len(sessions), "Active sessions list should match"
|
|
print(f"✅ Active sessions list contains {len(active_sessions)} sessions")
|
|
|
|
return True
|
|
|
|
|
|
async def test_environment_configuration():
|
|
"""Test environment-based configuration."""
|
|
print("\n🌍 Testing Environment Configuration")
|
|
print("=" * 50)
|
|
|
|
# Test with custom environment variables
|
|
original_values = {}
|
|
test_vars = [
|
|
("SESSION_TOKEN_LENGTH", "16"),
|
|
("SESSION_TOKEN_EXPIRY_HOURS", "12"),
|
|
("TOKEN_CLEANUP_INTERVAL_MINUTES", "30"),
|
|
]
|
|
|
|
# Save original values
|
|
for var, _ in test_vars:
|
|
original_values[var] = os.environ.get(var)
|
|
|
|
try:
|
|
# Set test values
|
|
for var, value in test_vars:
|
|
os.environ[var] = value
|
|
|
|
# Create new manager with environment config
|
|
manager = SessionTokenManager()
|
|
|
|
# Verify configuration
|
|
assert manager._token_length == 16, (
|
|
f"Token length should be 16, got {manager._token_length}"
|
|
)
|
|
assert manager._token_expiry_hours == 12, (
|
|
f"Token expiry should be 12, got {manager._token_expiry_hours}"
|
|
)
|
|
assert manager._cleanup_interval_minutes == 30, (
|
|
f"Cleanup interval should be 30, got {manager._cleanup_interval_minutes}"
|
|
)
|
|
|
|
print("✅ Environment configuration applied correctly")
|
|
|
|
# Test token generation with custom length
|
|
token = manager.generate_session_token("env-test-session")
|
|
assert len(token) == 16, f"Token should be 16 chars, got {len(token)}"
|
|
print("✅ Custom token length working")
|
|
|
|
finally:
|
|
# Restore original values
|
|
for var, original_value in original_values.items():
|
|
if original_value is not None:
|
|
os.environ[var] = original_value
|
|
elif var in os.environ:
|
|
del os.environ[var]
|
|
|
|
return True
|
|
|
|
|
|
async def run_all_auth_tests():
|
|
"""Run all authentication tests."""
|
|
print("🔐 Session Authentication Test Suite")
|
|
print("=" * 70)
|
|
|
|
tests = [
|
|
("Token Generation", test_token_generation),
|
|
("Token Validation", test_token_validation),
|
|
("Token Expiry", test_token_expiry),
|
|
("Token Revocation", test_token_revocation),
|
|
("Token Rotation", test_token_rotation),
|
|
("Concurrent Sessions", test_concurrent_sessions),
|
|
("Environment Configuration", test_environment_configuration),
|
|
]
|
|
|
|
results = []
|
|
for test_name, test_func in tests:
|
|
print(f"\n{'=' * 25} {test_name} {'=' * 25}")
|
|
try:
|
|
result = await test_func()
|
|
results.append(result)
|
|
status = "✅ PASSED" if result else "❌ FAILED"
|
|
print(f"\n{status}: {test_name}")
|
|
except Exception as e:
|
|
print(f"\n❌ ERROR in {test_name}: {e}")
|
|
results.append(False)
|
|
|
|
# Summary
|
|
print(f"\n{'=' * 70}")
|
|
passed = sum(results)
|
|
total = len(results)
|
|
print(f"📊 Test Results: {passed}/{total} tests passed")
|
|
|
|
if passed == total:
|
|
print("🎉 All session authentication tests completed successfully!")
|
|
print(
|
|
"🔐 Token-based authentication is working correctly for securing sessions."
|
|
)
|
|
else:
|
|
print("⚠️ Some tests failed. Check the output above for details.")
|
|
print("💡 Ensure all dependencies are installed and configuration is correct.")
|
|
|
|
return passed == total
|
|
|
|
|
|
if __name__ == "__main__":
|
|
asyncio.run(run_all_auth_tests())
|