"""OAuth2 token manager for Antigravity provider. This module handles authentication with Google's Antigravity unified gateway API, including refresh token management and multi-account rotation. """ from __future__ import annotations import json import logging import time from dataclasses import dataclass, field from pathlib import Path from typing import Any import httpx from utils.env import get_env logger = logging.getLogger(__name__) # OAuth2 endpoints TOKEN_URL = "https://oauth2.googleapis.com/token" # Google OAuth2 client credentials (from Antigravity/Gemini CLI) # These are publicly documented OAuth client credentials OAUTH_CLIENT_ID = "764086051850-6qr4p6gpi6hn506pt8ejuq83di341hur.apps.googleusercontent.com" OAUTH_CLIENT_SECRET = "d-FL95Q19q7MQmFpd7hHD0Ty" # Required OAuth2 scopes for Antigravity API access OAUTH_SCOPES = [ "https://www.googleapis.com/auth/cloud-platform", "https://www.googleapis.com/auth/userinfo.email", "https://www.googleapis.com/auth/userinfo.profile", "https://www.googleapis.com/auth/cclog", "https://www.googleapis.com/auth/experimentsandconfigs", ] # Default accounts file location DEFAULT_ACCOUNTS_FILE = Path.home() / ".config" / "opencode" / "antigravity-accounts.json" @dataclass class AntigravityAccount: """Represents a single Antigravity OAuth account.""" email: str refresh_token: str project_id: str enabled: bool = True access_token: str | None = None token_expiry: float = 0.0 rate_limit_reset_times: dict[str, float] = field(default_factory=dict) fingerprint: dict[str, Any] = field(default_factory=dict) def is_token_valid(self) -> bool: """Check if the current access token is still valid.""" if not self.access_token: return False # Refresh 60 seconds before expiry return time.time() < (self.token_expiry - 60) def is_rate_limited(self, model: str | None = None) -> bool: """Check if this account is rate limited for a specific model or globally.""" now = time.time() * 1000 # Convert to milliseconds if model: reset_time = self.rate_limit_reset_times.get(model, 0) if reset_time > now: return True # Check for any active rate limits return any(reset > now for reset in self.rate_limit_reset_times.values()) def set_rate_limited(self, model: str, reset_delay_seconds: float) -> None: """Mark this account as rate limited for a model.""" reset_time = (time.time() + reset_delay_seconds) * 1000 self.rate_limit_reset_times[model] = reset_time logger.info("Account %s rate limited for %s until %.0f", self.email, model, reset_time) class AntigravityTokenManager: """Manages OAuth2 tokens for Antigravity API access. Supports: - Loading accounts from ~/.config/opencode/antigravity-accounts.json - Environment variable override via ANTIGRAVITY_REFRESH_TOKEN - Token refresh with automatic expiry handling - Multi-account rotation on rate limits """ def __init__(self, accounts_file: Path | str | None = None) -> None: self._accounts: list[AntigravityAccount] = [] self._current_account_index: int = 0 self._http_client: httpx.Client | None = None # Try environment variable first env_token = get_env("ANTIGRAVITY_REFRESH_TOKEN") env_project = get_env("ANTIGRAVITY_PROJECT_ID") if env_token: logger.info("Using Antigravity refresh token from environment variable") self._accounts.append( AntigravityAccount( email="env-configured", refresh_token=env_token, project_id=env_project or "rising-fact-p41fc", # Default project ) ) else: # Load from accounts file accounts_path = Path(accounts_file) if accounts_file else DEFAULT_ACCOUNTS_FILE self._load_accounts(accounts_path) if self._accounts: logger.info("Antigravity token manager initialized with %d account(s)", len(self._accounts)) else: logger.warning("No Antigravity accounts configured") @property def http_client(self) -> httpx.Client: """Lazy-init HTTP client.""" if self._http_client is None: self._http_client = httpx.Client(timeout=30.0) return self._http_client def _load_accounts(self, accounts_path: Path) -> None: """Load accounts from the antigravity-accounts.json file.""" if not accounts_path.exists(): logger.debug("Antigravity accounts file not found at %s", accounts_path) return try: with open(accounts_path, encoding="utf-8") as f: data = json.load(f) version = data.get("version", 1) if version < 3: logger.warning( "Antigravity accounts file has old format (version %d), may have limited support", version ) for account_data in data.get("accounts", []): if not account_data.get("enabled", True): continue email = account_data.get("email", "unknown") refresh_token = account_data.get("refreshToken") project_id = account_data.get("projectId") or account_data.get("managedProjectId") if not refresh_token: logger.warning("Skipping account %s: no refresh token", email) continue account = AntigravityAccount( email=email, refresh_token=refresh_token, project_id=project_id or "rising-fact-p41fc", enabled=account_data.get("enabled", True), rate_limit_reset_times=account_data.get("rateLimitResetTimes", {}), fingerprint=account_data.get("fingerprint", {}), ) self._accounts.append(account) logger.debug("Loaded Antigravity account: %s (project: %s)", email, project_id) except json.JSONDecodeError as e: logger.error("Failed to parse Antigravity accounts file: %s", e) except Exception as e: logger.error("Error loading Antigravity accounts: %s", e) def has_accounts(self) -> bool: """Check if any accounts are configured.""" return len(self._accounts) > 0 def get_account_count(self) -> int: """Return the number of configured accounts.""" return len(self._accounts) def get_access_token(self, model: str | None = None) -> tuple[str, str, dict[str, str]]: """Get a valid access token, refreshing if necessary. Args: model: Optional model name for rate limit checking Returns: Tuple of (access_token, project_id, fingerprint_headers) Raises: RuntimeError: If no valid token can be obtained """ if not self._accounts: raise RuntimeError("No Antigravity accounts configured") # Try each account starting from current index attempts = 0 original_index = self._current_account_index while attempts < len(self._accounts): account = self._accounts[self._current_account_index] # Skip rate-limited accounts for this model if model and account.is_rate_limited(model): logger.debug("Skipping rate-limited account %s for model %s", account.email, model) self._rotate_account() attempts += 1 continue # Refresh token if needed if not account.is_token_valid(): try: self._refresh_token(account) except Exception as e: logger.warning("Failed to refresh token for %s: %s", account.email, e) self._rotate_account() attempts += 1 continue # Build fingerprint headers fingerprint_headers = self._build_fingerprint_headers(account) return account.access_token, account.project_id, fingerprint_headers # All accounts exhausted self._current_account_index = original_index raise RuntimeError("All Antigravity accounts are rate limited or have invalid tokens") def mark_rate_limited(self, model: str, retry_delay: float = 60.0) -> None: """Mark the current account as rate limited and rotate to next.""" if self._accounts: account = self._accounts[self._current_account_index] account.set_rate_limited(model, retry_delay) self._rotate_account() def _rotate_account(self) -> None: """Rotate to the next available account.""" if len(self._accounts) > 1: self._current_account_index = (self._current_account_index + 1) % len(self._accounts) logger.debug("Rotated to account index %d", self._current_account_index) def _refresh_token(self, account: AntigravityAccount) -> None: """Refresh the OAuth2 access token for an account.""" logger.debug("Refreshing access token for %s", account.email) response = self.http_client.post( TOKEN_URL, data={ "client_id": OAUTH_CLIENT_ID, "client_secret": OAUTH_CLIENT_SECRET, "refresh_token": account.refresh_token, "grant_type": "refresh_token", }, ) if response.status_code != 200: error_msg = f"Token refresh failed: {response.status_code} - {response.text}" logger.error(error_msg) raise RuntimeError(error_msg) data = response.json() account.access_token = data["access_token"] # Token typically expires in 3600 seconds expires_in = data.get("expires_in", 3600) account.token_expiry = time.time() + expires_in logger.debug("Token refreshed for %s, expires in %d seconds", account.email, expires_in) def _build_fingerprint_headers(self, account: AntigravityAccount) -> dict[str, str]: """Build fingerprint headers for API requests.""" fingerprint = account.fingerprint headers = {} if fingerprint.get("userAgent"): headers["User-Agent"] = fingerprint["userAgent"] else: headers["User-Agent"] = "antigravity/1.15.8 linux/amd64" if fingerprint.get("apiClient"): headers["X-Goog-Api-Client"] = fingerprint["apiClient"] else: headers["X-Goog-Api-Client"] = "google-cloud-sdk intellij/2024.1" # Build client metadata client_metadata = fingerprint.get("clientMetadata", {}) metadata = { "ideType": client_metadata.get("ideType", "CLOUD_SHELL_EDITOR"), "platform": client_metadata.get("platform", "LINUX"), "pluginType": client_metadata.get("pluginType", "GEMINI"), } headers["Client-Metadata"] = json.dumps(metadata) # Quota user for rate limiting if fingerprint.get("quotaUser"): headers["X-Goog-Quota-User"] = fingerprint["quotaUser"] return headers def close(self) -> None: """Close the HTTP client.""" if self._http_client: self._http_client.close() self._http_client = None