Some checks failed
Semantic Release / release (push) Has been cancelled
Implements a new provider that uses Google's Antigravity unified gateway API to access Claude, Gemini, and other models through a single OAuth2-authenticated endpoint. Features: - OAuth2 token management with automatic refresh - Multi-account rotation for rate limit distribution - Support for Claude Opus/Sonnet 4.5 (with/without thinking) - Support for Gemini 2.5/3 models (Pro/Flash variants) - Thinking mode support with configurable tokens - Image processing support - Dual quota pool tracking (antigravity vs gemini-cli) - Gemini-style API request format Authentication: - Reads from ANTIGRAVITY_REFRESH_TOKEN env var (priority) - Falls back to ~/.config/opencode/antigravity-accounts.json - Automatic token refresh with retry logic - Rate limit tracking per account and quota pool Files added: - providers/antigravity.py - Main provider implementation - providers/antigravity_auth.py - OAuth token manager - providers/registries/antigravity.py - Model registry - conf/antigravity_models.json - Model definitions (11 models) - docs/antigravity_provider.md - Setup and usage docs - tests/test_antigravity_provider.py - Unit tests (14 pass) Integration: - Added to provider priority order after ZEN - Registered in server.py with auto-detection - ToS warning logged on first use
300 lines
11 KiB
Python
300 lines
11 KiB
Python
"""OAuth2 token manager for Antigravity provider.
|
|
|
|
This module handles authentication with Google's Antigravity unified gateway API,
|
|
including refresh token management and multi-account rotation.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import logging
|
|
import time
|
|
from dataclasses import dataclass, field
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
import httpx
|
|
|
|
from utils.env import get_env
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# OAuth2 endpoints
|
|
TOKEN_URL = "https://oauth2.googleapis.com/token"
|
|
|
|
# Google OAuth2 client credentials (from Antigravity/Gemini CLI)
|
|
# These are publicly documented OAuth client credentials
|
|
OAUTH_CLIENT_ID = "764086051850-6qr4p6gpi6hn506pt8ejuq83di341hur.apps.googleusercontent.com"
|
|
OAUTH_CLIENT_SECRET = "d-FL95Q19q7MQmFpd7hHD0Ty"
|
|
|
|
# Required OAuth2 scopes for Antigravity API access
|
|
OAUTH_SCOPES = [
|
|
"https://www.googleapis.com/auth/cloud-platform",
|
|
"https://www.googleapis.com/auth/userinfo.email",
|
|
"https://www.googleapis.com/auth/userinfo.profile",
|
|
"https://www.googleapis.com/auth/cclog",
|
|
"https://www.googleapis.com/auth/experimentsandconfigs",
|
|
]
|
|
|
|
# Default accounts file location
|
|
DEFAULT_ACCOUNTS_FILE = Path.home() / ".config" / "opencode" / "antigravity-accounts.json"
|
|
|
|
|
|
@dataclass
|
|
class AntigravityAccount:
|
|
"""Represents a single Antigravity OAuth account."""
|
|
|
|
email: str
|
|
refresh_token: str
|
|
project_id: str
|
|
enabled: bool = True
|
|
access_token: str | None = None
|
|
token_expiry: float = 0.0
|
|
rate_limit_reset_times: dict[str, float] = field(default_factory=dict)
|
|
fingerprint: dict[str, Any] = field(default_factory=dict)
|
|
|
|
def is_token_valid(self) -> bool:
|
|
"""Check if the current access token is still valid."""
|
|
if not self.access_token:
|
|
return False
|
|
# Refresh 60 seconds before expiry
|
|
return time.time() < (self.token_expiry - 60)
|
|
|
|
def is_rate_limited(self, model: str | None = None) -> bool:
|
|
"""Check if this account is rate limited for a specific model or globally."""
|
|
now = time.time() * 1000 # Convert to milliseconds
|
|
if model:
|
|
reset_time = self.rate_limit_reset_times.get(model, 0)
|
|
if reset_time > now:
|
|
return True
|
|
# Check for any active rate limits
|
|
return any(reset > now for reset in self.rate_limit_reset_times.values())
|
|
|
|
def set_rate_limited(self, model: str, reset_delay_seconds: float) -> None:
|
|
"""Mark this account as rate limited for a model."""
|
|
reset_time = (time.time() + reset_delay_seconds) * 1000
|
|
self.rate_limit_reset_times[model] = reset_time
|
|
logger.info("Account %s rate limited for %s until %.0f", self.email, model, reset_time)
|
|
|
|
|
|
class AntigravityTokenManager:
|
|
"""Manages OAuth2 tokens for Antigravity API access.
|
|
|
|
Supports:
|
|
- Loading accounts from ~/.config/opencode/antigravity-accounts.json
|
|
- Environment variable override via ANTIGRAVITY_REFRESH_TOKEN
|
|
- Token refresh with automatic expiry handling
|
|
- Multi-account rotation on rate limits
|
|
"""
|
|
|
|
def __init__(self, accounts_file: Path | str | None = None) -> None:
|
|
self._accounts: list[AntigravityAccount] = []
|
|
self._current_account_index: int = 0
|
|
self._http_client: httpx.Client | None = None
|
|
|
|
# Try environment variable first
|
|
env_token = get_env("ANTIGRAVITY_REFRESH_TOKEN")
|
|
env_project = get_env("ANTIGRAVITY_PROJECT_ID")
|
|
|
|
if env_token:
|
|
logger.info("Using Antigravity refresh token from environment variable")
|
|
self._accounts.append(
|
|
AntigravityAccount(
|
|
email="env-configured",
|
|
refresh_token=env_token,
|
|
project_id=env_project or "rising-fact-p41fc", # Default project
|
|
)
|
|
)
|
|
else:
|
|
# Load from accounts file
|
|
accounts_path = Path(accounts_file) if accounts_file else DEFAULT_ACCOUNTS_FILE
|
|
self._load_accounts(accounts_path)
|
|
|
|
if self._accounts:
|
|
logger.info("Antigravity token manager initialized with %d account(s)", len(self._accounts))
|
|
else:
|
|
logger.warning("No Antigravity accounts configured")
|
|
|
|
@property
|
|
def http_client(self) -> httpx.Client:
|
|
"""Lazy-init HTTP client."""
|
|
if self._http_client is None:
|
|
self._http_client = httpx.Client(timeout=30.0)
|
|
return self._http_client
|
|
|
|
def _load_accounts(self, accounts_path: Path) -> None:
|
|
"""Load accounts from the antigravity-accounts.json file."""
|
|
if not accounts_path.exists():
|
|
logger.debug("Antigravity accounts file not found at %s", accounts_path)
|
|
return
|
|
|
|
try:
|
|
with open(accounts_path, encoding="utf-8") as f:
|
|
data = json.load(f)
|
|
|
|
version = data.get("version", 1)
|
|
if version < 3:
|
|
logger.warning(
|
|
"Antigravity accounts file has old format (version %d), may have limited support", version
|
|
)
|
|
|
|
for account_data in data.get("accounts", []):
|
|
if not account_data.get("enabled", True):
|
|
continue
|
|
|
|
email = account_data.get("email", "unknown")
|
|
refresh_token = account_data.get("refreshToken")
|
|
project_id = account_data.get("projectId") or account_data.get("managedProjectId")
|
|
|
|
if not refresh_token:
|
|
logger.warning("Skipping account %s: no refresh token", email)
|
|
continue
|
|
|
|
account = AntigravityAccount(
|
|
email=email,
|
|
refresh_token=refresh_token,
|
|
project_id=project_id or "rising-fact-p41fc",
|
|
enabled=account_data.get("enabled", True),
|
|
rate_limit_reset_times=account_data.get("rateLimitResetTimes", {}),
|
|
fingerprint=account_data.get("fingerprint", {}),
|
|
)
|
|
self._accounts.append(account)
|
|
logger.debug("Loaded Antigravity account: %s (project: %s)", email, project_id)
|
|
|
|
except json.JSONDecodeError as e:
|
|
logger.error("Failed to parse Antigravity accounts file: %s", e)
|
|
except Exception as e:
|
|
logger.error("Error loading Antigravity accounts: %s", e)
|
|
|
|
def has_accounts(self) -> bool:
|
|
"""Check if any accounts are configured."""
|
|
return len(self._accounts) > 0
|
|
|
|
def get_account_count(self) -> int:
|
|
"""Return the number of configured accounts."""
|
|
return len(self._accounts)
|
|
|
|
def get_access_token(self, model: str | None = None) -> tuple[str, str, dict[str, str]]:
|
|
"""Get a valid access token, refreshing if necessary.
|
|
|
|
Args:
|
|
model: Optional model name for rate limit checking
|
|
|
|
Returns:
|
|
Tuple of (access_token, project_id, fingerprint_headers)
|
|
|
|
Raises:
|
|
RuntimeError: If no valid token can be obtained
|
|
"""
|
|
if not self._accounts:
|
|
raise RuntimeError("No Antigravity accounts configured")
|
|
|
|
# Try each account starting from current index
|
|
attempts = 0
|
|
original_index = self._current_account_index
|
|
|
|
while attempts < len(self._accounts):
|
|
account = self._accounts[self._current_account_index]
|
|
|
|
# Skip rate-limited accounts for this model
|
|
if model and account.is_rate_limited(model):
|
|
logger.debug("Skipping rate-limited account %s for model %s", account.email, model)
|
|
self._rotate_account()
|
|
attempts += 1
|
|
continue
|
|
|
|
# Refresh token if needed
|
|
if not account.is_token_valid():
|
|
try:
|
|
self._refresh_token(account)
|
|
except Exception as e:
|
|
logger.warning("Failed to refresh token for %s: %s", account.email, e)
|
|
self._rotate_account()
|
|
attempts += 1
|
|
continue
|
|
|
|
# Build fingerprint headers
|
|
fingerprint_headers = self._build_fingerprint_headers(account)
|
|
|
|
return account.access_token, account.project_id, fingerprint_headers
|
|
|
|
# All accounts exhausted
|
|
self._current_account_index = original_index
|
|
raise RuntimeError("All Antigravity accounts are rate limited or have invalid tokens")
|
|
|
|
def mark_rate_limited(self, model: str, retry_delay: float = 60.0) -> None:
|
|
"""Mark the current account as rate limited and rotate to next."""
|
|
if self._accounts:
|
|
account = self._accounts[self._current_account_index]
|
|
account.set_rate_limited(model, retry_delay)
|
|
self._rotate_account()
|
|
|
|
def _rotate_account(self) -> None:
|
|
"""Rotate to the next available account."""
|
|
if len(self._accounts) > 1:
|
|
self._current_account_index = (self._current_account_index + 1) % len(self._accounts)
|
|
logger.debug("Rotated to account index %d", self._current_account_index)
|
|
|
|
def _refresh_token(self, account: AntigravityAccount) -> None:
|
|
"""Refresh the OAuth2 access token for an account."""
|
|
logger.debug("Refreshing access token for %s", account.email)
|
|
|
|
response = self.http_client.post(
|
|
TOKEN_URL,
|
|
data={
|
|
"client_id": OAUTH_CLIENT_ID,
|
|
"client_secret": OAUTH_CLIENT_SECRET,
|
|
"refresh_token": account.refresh_token,
|
|
"grant_type": "refresh_token",
|
|
},
|
|
)
|
|
|
|
if response.status_code != 200:
|
|
error_msg = f"Token refresh failed: {response.status_code} - {response.text}"
|
|
logger.error(error_msg)
|
|
raise RuntimeError(error_msg)
|
|
|
|
data = response.json()
|
|
account.access_token = data["access_token"]
|
|
# Token typically expires in 3600 seconds
|
|
expires_in = data.get("expires_in", 3600)
|
|
account.token_expiry = time.time() + expires_in
|
|
|
|
logger.debug("Token refreshed for %s, expires in %d seconds", account.email, expires_in)
|
|
|
|
def _build_fingerprint_headers(self, account: AntigravityAccount) -> dict[str, str]:
|
|
"""Build fingerprint headers for API requests."""
|
|
fingerprint = account.fingerprint
|
|
headers = {}
|
|
|
|
if fingerprint.get("userAgent"):
|
|
headers["User-Agent"] = fingerprint["userAgent"]
|
|
else:
|
|
headers["User-Agent"] = "antigravity/1.15.8 linux/amd64"
|
|
|
|
if fingerprint.get("apiClient"):
|
|
headers["X-Goog-Api-Client"] = fingerprint["apiClient"]
|
|
else:
|
|
headers["X-Goog-Api-Client"] = "google-cloud-sdk intellij/2024.1"
|
|
|
|
# Build client metadata
|
|
client_metadata = fingerprint.get("clientMetadata", {})
|
|
metadata = {
|
|
"ideType": client_metadata.get("ideType", "CLOUD_SHELL_EDITOR"),
|
|
"platform": client_metadata.get("platform", "LINUX"),
|
|
"pluginType": client_metadata.get("pluginType", "GEMINI"),
|
|
}
|
|
headers["Client-Metadata"] = json.dumps(metadata)
|
|
|
|
# Quota user for rate limiting
|
|
if fingerprint.get("quotaUser"):
|
|
headers["X-Goog-Quota-User"] = fingerprint["quotaUser"]
|
|
|
|
return headers
|
|
|
|
def close(self) -> None:
|
|
"""Close the HTTP client."""
|
|
if self._http_client:
|
|
self._http_client.close()
|
|
self._http_client = None
|