- Use secrets.compare_digest() for token comparison instead of == to prevent timing-based attacks that could leak token information - Fix rotate_session_auth_token() to call the correct method rotate_session_token() instead of non-existent rotate_session_auth_token()
236 lines
7.3 KiB
Python
236 lines
7.3 KiB
Python
"""
|
|
Token-Based Authentication for OpenCode Sessions
|
|
|
|
Provides secure token generation, validation, and management for individual
|
|
user sessions to prevent unauthorized access to OpenCode servers.
|
|
"""
|
|
|
|
import os
|
|
import uuid
|
|
import secrets
|
|
import hashlib
|
|
import hmac
|
|
from typing import Dict, Optional, Tuple
|
|
from datetime import datetime, timedelta
|
|
import logging
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class SessionTokenManager:
|
|
"""Manages authentication tokens for OpenCode user sessions."""
|
|
|
|
def __init__(self):
|
|
# Token storage - in production, this should be in Redis/database
|
|
self._session_tokens: Dict[str, Dict] = {}
|
|
|
|
# Token configuration
|
|
self._token_length = int(os.getenv("SESSION_TOKEN_LENGTH", "32"))
|
|
self._token_expiry_hours = int(os.getenv("SESSION_TOKEN_EXPIRY_HOURS", "24"))
|
|
self._token_secret = os.getenv("SESSION_TOKEN_SECRET", self._generate_secret())
|
|
|
|
# Cleanup configuration
|
|
self._cleanup_interval_minutes = int(
|
|
os.getenv("TOKEN_CLEANUP_INTERVAL_MINUTES", "60")
|
|
)
|
|
|
|
def _generate_secret(self) -> str:
|
|
"""Generate a secure secret for token signing."""
|
|
return secrets.token_hex(32)
|
|
|
|
def generate_session_token(self, session_id: str) -> str:
|
|
"""
|
|
Generate a unique authentication token for a session.
|
|
|
|
Args:
|
|
session_id: The session identifier
|
|
|
|
Returns:
|
|
str: The authentication token
|
|
"""
|
|
# Generate cryptographically secure random token
|
|
token = secrets.token_urlsafe(self._token_length)
|
|
|
|
# Create token data with expiry
|
|
expiry = datetime.now() + timedelta(hours=self._token_expiry_hours)
|
|
|
|
# Store token information
|
|
self._session_tokens[session_id] = {
|
|
"token": token,
|
|
"session_id": session_id,
|
|
"created_at": datetime.now(),
|
|
"expires_at": expiry,
|
|
"last_used": datetime.now(),
|
|
}
|
|
|
|
logger.info(f"Generated authentication token for session {session_id}")
|
|
return token
|
|
|
|
def validate_session_token(self, session_id: str, token: str) -> Tuple[bool, str]:
|
|
"""
|
|
Validate a session token.
|
|
|
|
Args:
|
|
session_id: The session identifier
|
|
token: The token to validate
|
|
|
|
Returns:
|
|
Tuple[bool, str]: (is_valid, reason)
|
|
"""
|
|
# Check if session exists
|
|
if session_id not in self._session_tokens:
|
|
return False, "Session not found"
|
|
|
|
session_data = self._session_tokens[session_id]
|
|
|
|
# Check if token matches using constant-time comparison to prevent timing attacks
|
|
if not secrets.compare_digest(session_data["token"], token):
|
|
return False, "Invalid token"
|
|
|
|
# Check if token has expired
|
|
if datetime.now() > session_data["expires_at"]:
|
|
# Clean up expired token
|
|
del self._session_tokens[session_id]
|
|
return False, "Token expired"
|
|
|
|
# Update last used time
|
|
session_data["last_used"] = datetime.now()
|
|
|
|
return True, "Valid"
|
|
|
|
def revoke_session_token(self, session_id: str) -> bool:
|
|
"""
|
|
Revoke a session token.
|
|
|
|
Args:
|
|
session_id: The session identifier
|
|
|
|
Returns:
|
|
bool: True if token was revoked, False if not found
|
|
"""
|
|
if session_id in self._session_tokens:
|
|
del self._session_tokens[session_id]
|
|
logger.info(f"Revoked authentication token for session {session_id}")
|
|
return True
|
|
return False
|
|
|
|
def rotate_session_token(self, session_id: str) -> Optional[str]:
|
|
"""
|
|
Rotate (regenerate) a session token.
|
|
|
|
Args:
|
|
session_id: The session identifier
|
|
|
|
Returns:
|
|
Optional[str]: New token if session exists, None otherwise
|
|
"""
|
|
if session_id not in self._session_tokens:
|
|
return None
|
|
|
|
# Generate new token
|
|
new_token = self.generate_session_token(session_id)
|
|
|
|
logger.info(f"Rotated authentication token for session {session_id}")
|
|
return new_token
|
|
|
|
def cleanup_expired_tokens(self) -> int:
|
|
"""
|
|
Clean up expired tokens.
|
|
|
|
Returns:
|
|
int: Number of tokens cleaned up
|
|
"""
|
|
now = datetime.now()
|
|
expired_sessions = []
|
|
|
|
for session_id, session_data in self._session_tokens.items():
|
|
if now > session_data["expires_at"]:
|
|
expired_sessions.append(session_id)
|
|
|
|
# Remove expired tokens
|
|
for session_id in expired_sessions:
|
|
del self._session_tokens[session_id]
|
|
|
|
if expired_sessions:
|
|
logger.info(
|
|
f"Cleaned up {len(expired_sessions)} expired authentication tokens"
|
|
)
|
|
|
|
return len(expired_sessions)
|
|
|
|
def get_session_token_info(self, session_id: str) -> Optional[Dict]:
|
|
"""
|
|
Get information about a session token.
|
|
|
|
Args:
|
|
session_id: The session identifier
|
|
|
|
Returns:
|
|
Optional[Dict]: Token information or None if not found
|
|
"""
|
|
if session_id not in self._session_tokens:
|
|
return None
|
|
|
|
session_data = self._session_tokens[session_id].copy()
|
|
# Remove sensitive token value
|
|
session_data.pop("token", None)
|
|
return session_data
|
|
|
|
def get_active_sessions_count(self) -> int:
|
|
"""Get the number of active sessions with tokens."""
|
|
return len(self._session_tokens)
|
|
|
|
def list_active_sessions(self) -> Dict[str, Dict]:
|
|
"""List all active sessions with token information (without token values)."""
|
|
result = {}
|
|
for session_id, session_data in self._session_tokens.items():
|
|
# Create copy without sensitive token
|
|
info = session_data.copy()
|
|
info.pop("token", None)
|
|
result[session_id] = info
|
|
return result
|
|
|
|
|
|
# Global token manager instance
|
|
_session_token_manager = SessionTokenManager()
|
|
|
|
|
|
def generate_session_auth_token(session_id: str) -> str:
|
|
"""Generate an authentication token for a session."""
|
|
return _session_token_manager.generate_session_token(session_id)
|
|
|
|
|
|
def validate_session_auth_token(session_id: str, token: str) -> Tuple[bool, str]:
|
|
"""Validate a session authentication token."""
|
|
return _session_token_manager.validate_session_token(session_id, token)
|
|
|
|
|
|
def revoke_session_auth_token(session_id: str) -> bool:
|
|
"""Revoke a session authentication token."""
|
|
return _session_token_manager.revoke_session_token(session_id)
|
|
|
|
|
|
def rotate_session_auth_token(session_id: str) -> Optional[str]:
|
|
"""Rotate a session authentication token."""
|
|
return _session_token_manager.rotate_session_token(session_id)
|
|
|
|
|
|
def cleanup_expired_auth_tokens() -> int:
|
|
"""Clean up expired authentication tokens."""
|
|
return _session_token_manager.cleanup_expired_tokens()
|
|
|
|
|
|
def get_session_auth_info(session_id: str) -> Optional[Dict]:
|
|
"""Get authentication information for a session."""
|
|
return _session_token_manager.get_session_token_info(session_id)
|
|
|
|
|
|
def get_active_auth_sessions_count() -> int:
|
|
"""Get the number of active authenticated sessions."""
|
|
return _session_token_manager.get_active_sessions_count()
|
|
|
|
|
|
def list_active_auth_sessions() -> Dict[str, Dict]:
|
|
"""List all active authenticated sessions."""
|
|
return _session_token_manager.list_active_sessions()
|