fixed all remaining issues with the session manager

This commit is contained in:
2026-01-18 23:28:49 +01:00
parent 0243cfc250
commit 2f5464e1d2
11 changed files with 4040 additions and 101 deletions

View File

@@ -0,0 +1,235 @@
"""
Token-Based Authentication for OpenCode Sessions
Provides secure token generation, validation, and management for individual
user sessions to prevent unauthorized access to OpenCode servers.
"""
import os
import uuid
import secrets
import hashlib
import hmac
from typing import Dict, Optional, Tuple
from datetime import datetime, timedelta
import logging
logger = logging.getLogger(__name__)
class SessionTokenManager:
"""Manages authentication tokens for OpenCode user sessions."""
def __init__(self):
# Token storage - in production, this should be in Redis/database
self._session_tokens: Dict[str, Dict] = {}
# Token configuration
self._token_length = int(os.getenv("SESSION_TOKEN_LENGTH", "32"))
self._token_expiry_hours = int(os.getenv("SESSION_TOKEN_EXPIRY_HOURS", "24"))
self._token_secret = os.getenv("SESSION_TOKEN_SECRET", self._generate_secret())
# Cleanup configuration
self._cleanup_interval_minutes = int(
os.getenv("TOKEN_CLEANUP_INTERVAL_MINUTES", "60")
)
def _generate_secret(self) -> str:
"""Generate a secure secret for token signing."""
return secrets.token_hex(32)
def generate_session_token(self, session_id: str) -> str:
"""
Generate a unique authentication token for a session.
Args:
session_id: The session identifier
Returns:
str: The authentication token
"""
# Generate cryptographically secure random token
token = secrets.token_urlsafe(self._token_length)
# Create token data with expiry
expiry = datetime.now() + timedelta(hours=self._token_expiry_hours)
# Store token information
self._session_tokens[session_id] = {
"token": token,
"session_id": session_id,
"created_at": datetime.now(),
"expires_at": expiry,
"last_used": datetime.now(),
}
logger.info(f"Generated authentication token for session {session_id}")
return token
def validate_session_token(self, session_id: str, token: str) -> Tuple[bool, str]:
"""
Validate a session token.
Args:
session_id: The session identifier
token: The token to validate
Returns:
Tuple[bool, str]: (is_valid, reason)
"""
# Check if session exists
if session_id not in self._session_tokens:
return False, "Session not found"
session_data = self._session_tokens[session_id]
# Check if token matches
if session_data["token"] != token:
return False, "Invalid token"
# Check if token has expired
if datetime.now() > session_data["expires_at"]:
# Clean up expired token
del self._session_tokens[session_id]
return False, "Token expired"
# Update last used time
session_data["last_used"] = datetime.now()
return True, "Valid"
def revoke_session_token(self, session_id: str) -> bool:
"""
Revoke a session token.
Args:
session_id: The session identifier
Returns:
bool: True if token was revoked, False if not found
"""
if session_id in self._session_tokens:
del self._session_tokens[session_id]
logger.info(f"Revoked authentication token for session {session_id}")
return True
return False
def rotate_session_token(self, session_id: str) -> Optional[str]:
"""
Rotate (regenerate) a session token.
Args:
session_id: The session identifier
Returns:
Optional[str]: New token if session exists, None otherwise
"""
if session_id not in self._session_tokens:
return None
# Generate new token
new_token = self.generate_session_token(session_id)
logger.info(f"Rotated authentication token for session {session_id}")
return new_token
def cleanup_expired_tokens(self) -> int:
"""
Clean up expired tokens.
Returns:
int: Number of tokens cleaned up
"""
now = datetime.now()
expired_sessions = []
for session_id, session_data in self._session_tokens.items():
if now > session_data["expires_at"]:
expired_sessions.append(session_id)
# Remove expired tokens
for session_id in expired_sessions:
del self._session_tokens[session_id]
if expired_sessions:
logger.info(
f"Cleaned up {len(expired_sessions)} expired authentication tokens"
)
return len(expired_sessions)
def get_session_token_info(self, session_id: str) -> Optional[Dict]:
"""
Get information about a session token.
Args:
session_id: The session identifier
Returns:
Optional[Dict]: Token information or None if not found
"""
if session_id not in self._session_tokens:
return None
session_data = self._session_tokens[session_id].copy()
# Remove sensitive token value
session_data.pop("token", None)
return session_data
def get_active_sessions_count(self) -> int:
"""Get the number of active sessions with tokens."""
return len(self._session_tokens)
def list_active_sessions(self) -> Dict[str, Dict]:
"""List all active sessions with token information (without token values)."""
result = {}
for session_id, session_data in self._session_tokens.items():
# Create copy without sensitive token
info = session_data.copy()
info.pop("token", None)
result[session_id] = info
return result
# Global token manager instance
_session_token_manager = SessionTokenManager()
def generate_session_auth_token(session_id: str) -> str:
"""Generate an authentication token for a session."""
return _session_token_manager.generate_session_token(session_id)
def validate_session_auth_token(session_id: str, token: str) -> Tuple[bool, str]:
"""Validate a session authentication token."""
return _session_token_manager.validate_session_token(session_id, token)
def revoke_session_auth_token(session_id: str) -> bool:
"""Revoke a session authentication token."""
return _session_token_manager.revoke_session_token(session_id)
def rotate_session_auth_token(session_id: str) -> Optional[str]:
"""Rotate a session authentication token."""
return _session_token_manager.rotate_session_auth_token(session_id)
def cleanup_expired_auth_tokens() -> int:
"""Clean up expired authentication tokens."""
return _session_token_manager.cleanup_expired_tokens()
def get_session_auth_info(session_id: str) -> Optional[Dict]:
"""Get authentication information for a session."""
return _session_token_manager.get_session_token_info(session_id)
def get_active_auth_sessions_count() -> int:
"""Get the number of active authenticated sessions."""
return _session_token_manager.get_active_sessions_count()
def list_active_auth_sessions() -> Dict[str, Dict]:
"""List all active authenticated sessions."""
return _session_token_manager.list_active_sessions()