""" Token-Based Authentication for OpenCode Sessions Provides secure token generation, validation, and management for individual user sessions to prevent unauthorized access to OpenCode servers. """ import os import uuid import secrets import hashlib import hmac from typing import Dict, Optional, Tuple from datetime import datetime, timedelta import logging logger = logging.getLogger(__name__) class SessionTokenManager: """Manages authentication tokens for OpenCode user sessions.""" def __init__(self): # Token storage - in production, this should be in Redis/database self._session_tokens: Dict[str, Dict] = {} # Token configuration self._token_length = int(os.getenv("SESSION_TOKEN_LENGTH", "32")) self._token_expiry_hours = int(os.getenv("SESSION_TOKEN_EXPIRY_HOURS", "24")) self._token_secret = os.getenv("SESSION_TOKEN_SECRET", self._generate_secret()) # Cleanup configuration self._cleanup_interval_minutes = int( os.getenv("TOKEN_CLEANUP_INTERVAL_MINUTES", "60") ) def _generate_secret(self) -> str: """Generate a secure secret for token signing.""" return secrets.token_hex(32) def generate_session_token(self, session_id: str) -> str: """ Generate a unique authentication token for a session. Args: session_id: The session identifier Returns: str: The authentication token """ # Generate cryptographically secure random token token = secrets.token_urlsafe(self._token_length) # Create token data with expiry expiry = datetime.now() + timedelta(hours=self._token_expiry_hours) # Store token information self._session_tokens[session_id] = { "token": token, "session_id": session_id, "created_at": datetime.now(), "expires_at": expiry, "last_used": datetime.now(), } logger.info(f"Generated authentication token for session {session_id}") return token def validate_session_token(self, session_id: str, token: str) -> Tuple[bool, str]: """ Validate a session token. Args: session_id: The session identifier token: The token to validate Returns: Tuple[bool, str]: (is_valid, reason) """ # Check if session exists if session_id not in self._session_tokens: return False, "Session not found" session_data = self._session_tokens[session_id] # Check if token matches using constant-time comparison to prevent timing attacks if not secrets.compare_digest(session_data["token"], token): return False, "Invalid token" # Check if token has expired if datetime.now() > session_data["expires_at"]: # Clean up expired token del self._session_tokens[session_id] return False, "Token expired" # Update last used time session_data["last_used"] = datetime.now() return True, "Valid" def revoke_session_token(self, session_id: str) -> bool: """ Revoke a session token. Args: session_id: The session identifier Returns: bool: True if token was revoked, False if not found """ if session_id in self._session_tokens: del self._session_tokens[session_id] logger.info(f"Revoked authentication token for session {session_id}") return True return False def rotate_session_token(self, session_id: str) -> Optional[str]: """ Rotate (regenerate) a session token. Args: session_id: The session identifier Returns: Optional[str]: New token if session exists, None otherwise """ if session_id not in self._session_tokens: return None # Generate new token new_token = self.generate_session_token(session_id) logger.info(f"Rotated authentication token for session {session_id}") return new_token def cleanup_expired_tokens(self) -> int: """ Clean up expired tokens. Returns: int: Number of tokens cleaned up """ now = datetime.now() expired_sessions = [] for session_id, session_data in self._session_tokens.items(): if now > session_data["expires_at"]: expired_sessions.append(session_id) # Remove expired tokens for session_id in expired_sessions: del self._session_tokens[session_id] if expired_sessions: logger.info( f"Cleaned up {len(expired_sessions)} expired authentication tokens" ) return len(expired_sessions) def get_session_token_info(self, session_id: str) -> Optional[Dict]: """ Get information about a session token. Args: session_id: The session identifier Returns: Optional[Dict]: Token information or None if not found """ if session_id not in self._session_tokens: return None session_data = self._session_tokens[session_id].copy() # Remove sensitive token value session_data.pop("token", None) return session_data def get_active_sessions_count(self) -> int: """Get the number of active sessions with tokens.""" return len(self._session_tokens) def list_active_sessions(self) -> Dict[str, Dict]: """List all active sessions with token information (without token values).""" result = {} for session_id, session_data in self._session_tokens.items(): # Create copy without sensitive token info = session_data.copy() info.pop("token", None) result[session_id] = info return result # Global token manager instance _session_token_manager = SessionTokenManager() def generate_session_auth_token(session_id: str) -> str: """Generate an authentication token for a session.""" return _session_token_manager.generate_session_token(session_id) def validate_session_auth_token(session_id: str, token: str) -> Tuple[bool, str]: """Validate a session authentication token.""" return _session_token_manager.validate_session_token(session_id, token) def revoke_session_auth_token(session_id: str) -> bool: """Revoke a session authentication token.""" return _session_token_manager.revoke_session_token(session_id) def rotate_session_auth_token(session_id: str) -> Optional[str]: """Rotate a session authentication token.""" return _session_token_manager.rotate_session_token(session_id) def cleanup_expired_auth_tokens() -> int: """Clean up expired authentication tokens.""" return _session_token_manager.cleanup_expired_tokens() def get_session_auth_info(session_id: str) -> Optional[Dict]: """Get authentication information for a session.""" return _session_token_manager.get_session_token_info(session_id) def get_active_auth_sessions_count() -> int: """Get the number of active authenticated sessions.""" return _session_token_manager.get_active_sessions_count() def list_active_auth_sessions() -> Dict[str, Dict]: """List all active authenticated sessions.""" return _session_token_manager.list_active_sessions()