"""Antigravity model provider implementation. Antigravity is Google's unified gateway API for accessing multiple AI models (Claude, Gemini, GPT-OSS) through a single Gemini-style interface. WARNING: Using this provider may violate Google's Terms of Service. See docs/antigravity_provider.md for important information about risks. """ from __future__ import annotations import base64 import json import logging import re import uuid from typing import TYPE_CHECKING, Any, ClassVar import httpx from utils.env import get_env from utils.image_utils import validate_image from .antigravity_auth import AntigravityTokenManager from .base import ModelProvider from .registries.antigravity import AntigravityModelRegistry from .shared import ModelCapabilities, ModelResponse, ProviderType if TYPE_CHECKING: from tools.models import ToolModelCategory logger = logging.getLogger(__name__) # Antigravity API endpoints PRODUCTION_ENDPOINT = "https://cloudcode-pa.googleapis.com" DAILY_ENDPOINT = "https://daily-cloudcode-pa.sandbox.googleapis.com" # API path for content generation GENERATE_CONTENT_PATH = "/v1internal:generateContent" STREAM_GENERATE_CONTENT_PATH = "/v1internal:streamGenerateContent" # ToS warning message TOS_WARNING = """ ================================================================================ ANTIGRAVITY PROVIDER WARNING Using the Antigravity provider may violate Google's Terms of Service. Users have reported account bans when using this approach. HIGH-RISK SCENARIOS: - Fresh Google accounts have a very high chance of getting banned - New accounts with Pro/Ultra subscriptions are frequently flagged By using this provider, you acknowledge and accept all associated risks. See docs/antigravity_provider.md for more information. ================================================================================ """ class AntigravityProvider(ModelProvider): """Provider for Google's Antigravity unified gateway API. Antigravity provides access to Claude, Gemini, and other models through a single Gemini-style API interface. This provider handles: * OAuth2 authentication with refresh token management * Multi-account rotation on rate limits * Request transformation to Gemini-style format * Extended thinking support for compatible models * Image processing for vision requests Configuration: ANTIGRAVITY_REFRESH_TOKEN: OAuth2 refresh token (env var override) ANTIGRAVITY_PROJECT_ID: Google Cloud project ID ANTIGRAVITY_BASE_URL: Custom endpoint URL (defaults to production) """ # Model registry for capability metadata _registry: ClassVar[AntigravityModelRegistry | None] = None MODEL_CAPABILITIES: ClassVar[dict[str, ModelCapabilities]] = {} # Thinking mode budgets (percentage of max_thinking_tokens) THINKING_BUDGETS = { "minimal": 0.005, "low": 0.08, "medium": 0.33, "high": 0.67, "max": 1.0, } # Track whether warning has been shown this session _warning_shown: ClassVar[bool] = False def __init__(self, api_key: str = "", **kwargs) -> None: """Initialize Antigravity provider. Args: api_key: Not used (authentication via OAuth2 refresh tokens) **kwargs: Additional configuration options """ # Initialize registry if AntigravityProvider._registry is None: AntigravityProvider._registry = AntigravityModelRegistry() AntigravityProvider.MODEL_CAPABILITIES = dict(self._registry.model_map) logger.info("Antigravity provider loaded %d models", len(self._registry.model_map)) super().__init__(api_key, **kwargs) # Initialize token manager self._token_manager = AntigravityTokenManager() # HTTP client for API requests self._http_client: httpx.Client | None = None # Base URL configuration self._base_url = get_env("ANTIGRAVITY_BASE_URL") or PRODUCTION_ENDPOINT # Show warning on first use if not AntigravityProvider._warning_shown: logger.warning(TOS_WARNING) AntigravityProvider._warning_shown = True self._invalidate_capability_cache() @property def http_client(self) -> httpx.Client: """Lazy initialization of HTTP client.""" if self._http_client is None: timeout = httpx.Timeout( connect=30.0, read=600.0, # 10 minutes for long responses write=60.0, pool=600.0, ) self._http_client = httpx.Client(timeout=timeout, follow_redirects=True) return self._http_client # ------------------------------------------------------------------ # Provider identity # ------------------------------------------------------------------ def get_provider_type(self) -> ProviderType: """Return the provider type.""" return ProviderType.ANTIGRAVITY # ------------------------------------------------------------------ # Capability surface # ------------------------------------------------------------------ def _lookup_capabilities( self, canonical_name: str, requested_name: str | None = None, ) -> ModelCapabilities | None: """Fetch capabilities from the registry.""" if self._registry: return self._registry.get_capabilities(canonical_name) return None def get_all_model_capabilities(self) -> dict[str, ModelCapabilities]: """Return registry-backed capabilities.""" if not self._registry: return {} return dict(self._registry.model_map) def list_models( self, *, respect_restrictions: bool = True, include_aliases: bool = True, lowercase: bool = False, unique: bool = False, ) -> list[str]: """Return available Antigravity model names.""" if not self._registry: return [] from utils.model_restrictions import get_restriction_service restriction_service = get_restriction_service() if respect_restrictions else None allowed_configs: dict[str, ModelCapabilities] = {} for model_name in self._registry.list_models(): config = self._registry.resolve(model_name) if not config: continue if restriction_service: if not restriction_service.is_allowed(self.get_provider_type(), model_name): continue allowed_configs[model_name] = config if not allowed_configs: return [] return ModelCapabilities.collect_model_names( allowed_configs, include_aliases=include_aliases, lowercase=lowercase, unique=unique, ) def _resolve_model_name(self, model_name: str) -> str: """Resolve aliases to canonical model names.""" if self._registry: config = self._registry.resolve(model_name) if config and config.model_name != model_name: logger.debug("Resolved Antigravity alias '%s' to '%s'", model_name, config.model_name) return config.model_name return model_name # ------------------------------------------------------------------ # Content generation # ------------------------------------------------------------------ def generate_content( self, prompt: str, model_name: str, system_prompt: str | None = None, temperature: float = 0.7, max_output_tokens: int | None = None, thinking_mode: str = "medium", images: list[str] | None = None, **kwargs, ) -> ModelResponse: """Generate content using the Antigravity API. Args: prompt: User prompt to send to the model model_name: Model identifier (e.g., 'claude-opus-4-5-thinking') system_prompt: Optional system instructions temperature: Sampling temperature (0.0-2.0) max_output_tokens: Maximum tokens in response thinking_mode: Thinking budget level for thinking models images: Optional list of image paths or data URLs **kwargs: Additional parameters Returns: ModelResponse with generated content and metadata """ # Validate and resolve model name self.validate_parameters(model_name, temperature) capabilities = self.get_capabilities(model_name) resolved_model = self._resolve_model_name(model_name) # Get authentication access_token, project_id, fingerprint_headers = self._token_manager.get_access_token(model=resolved_model) # Build request request_body = self._build_request( prompt=prompt, model_name=resolved_model, project_id=project_id, system_prompt=system_prompt, temperature=temperature, max_output_tokens=max_output_tokens, thinking_mode=thinking_mode, images=images, capabilities=capabilities, ) # Build headers headers = { "Authorization": f"Bearer {access_token}", "Content-Type": "application/json", **fingerprint_headers, } # Retry logic max_retries = 4 retry_delays = [1.0, 3.0, 5.0, 8.0] attempt_counter = {"value": 0} def _attempt() -> ModelResponse: attempt_counter["value"] += 1 url = f"{self._base_url}{GENERATE_CONTENT_PATH}" logger.debug("Antigravity request to %s for model %s", url, resolved_model) response = self.http_client.post( url, json=request_body, headers=headers, ) # Handle errors if response.status_code != 200: self._handle_error_response(response, resolved_model) return self._parse_response(response.json(), resolved_model, capabilities, thinking_mode) try: return self._run_with_retries( operation=_attempt, max_attempts=max_retries, delays=retry_delays, log_prefix=f"Antigravity API ({resolved_model})", ) except Exception as exc: attempts = max(attempt_counter["value"], 1) error_msg = f"Antigravity API error for {resolved_model} after {attempts} attempt(s): {exc}" raise RuntimeError(error_msg) from exc def _build_request( self, prompt: str, model_name: str, project_id: str, system_prompt: str | None = None, temperature: float = 0.7, max_output_tokens: int | None = None, thinking_mode: str = "medium", images: list[str] | None = None, capabilities: ModelCapabilities | None = None, ) -> dict[str, Any]: """Build the Antigravity API request body.""" # Build contents array (Gemini-style format) contents = [] # Build user message parts user_parts = [] # Add text prompt user_parts.append({"text": prompt}) # Add images if provided if images and capabilities and capabilities.supports_images: for image_path in images: try: image_part = self._process_image(image_path) if image_part: user_parts.append(image_part) except Exception as e: logger.warning("Failed to process image %s: %s", image_path, e) elif images: logger.warning("Model %s does not support images, ignoring %d image(s)", model_name, len(images)) contents.append({"role": "user", "parts": user_parts}) # Build generation config generation_config: dict[str, Any] = { "temperature": temperature, } if max_output_tokens: generation_config["maxOutputTokens"] = max_output_tokens # Add thinking config for thinking-capable models if capabilities and capabilities.supports_extended_thinking and thinking_mode in self.THINKING_BUDGETS: max_thinking = capabilities.max_thinking_tokens or 8192 thinking_budget = int(max_thinking * self.THINKING_BUDGETS[thinking_mode]) generation_config["thinkingConfig"] = { "thinkingBudget": thinking_budget, "includeThoughts": True, } # Ensure maxOutputTokens > thinkingBudget if not max_output_tokens or max_output_tokens <= thinking_budget: generation_config["maxOutputTokens"] = thinking_budget + 10000 # Build system instruction (must be object with parts, not string) system_instruction = None if system_prompt: system_instruction = {"parts": [{"text": system_prompt}]} # Build full request request: dict[str, Any] = { "project": project_id, "model": model_name, "request": { "contents": contents, "generationConfig": generation_config, }, "userAgent": "antigravity", "requestId": f"pal-{uuid.uuid4().hex[:12]}", } if system_instruction: request["request"]["systemInstruction"] = system_instruction return request def _parse_response( self, data: dict[str, Any], model_name: str, capabilities: ModelCapabilities | None, thinking_mode: str, ) -> ModelResponse: """Parse the Antigravity API response.""" response_data = data.get("response", data) # Extract content from candidates content = "" thinking_content = "" candidates = response_data.get("candidates", []) if candidates: candidate = candidates[0] candidate_content = candidate.get("content", {}) parts = candidate_content.get("parts", []) for part in parts: if part.get("thought"): # This is thinking content thinking_content += part.get("text", "") elif "thoughtSignature" in part and "text" in part: # Gemini thinking with signature thinking_content += part.get("text", "") elif "text" in part: content += part.get("text", "") # Extract usage metadata usage_metadata = response_data.get("usageMetadata", {}) usage = { "input_tokens": usage_metadata.get("promptTokenCount", 0), "output_tokens": usage_metadata.get("candidatesTokenCount", 0), "total_tokens": usage_metadata.get("totalTokenCount", 0), } # Add thinking tokens if available if "thoughtsTokenCount" in usage_metadata: usage["thinking_tokens"] = usage_metadata["thoughtsTokenCount"] # Extract finish reason finish_reason = "UNKNOWN" if candidates: finish_reason = candidates[0].get("finishReason", "STOP") return ModelResponse( content=content, usage=usage, model_name=model_name, friendly_name="Antigravity", provider=ProviderType.ANTIGRAVITY, metadata={ "thinking_mode": thinking_mode if capabilities and capabilities.supports_extended_thinking else None, "thinking_content": thinking_content if thinking_content else None, "finish_reason": finish_reason, "model_version": response_data.get("modelVersion"), "response_id": response_data.get("responseId"), "trace_id": data.get("traceId"), }, ) def _handle_error_response(self, response: httpx.Response, model_name: str) -> None: """Handle error responses from the API.""" try: error_data = response.json() error = error_data.get("error", {}) error_code = error.get("code", response.status_code) error_message = error.get("message", response.text) error_status = error.get("status", "UNKNOWN") # Handle rate limiting if error_code == 429: # Extract retry delay from response retry_delay = 60.0 details = error.get("details", []) for detail in details: if detail.get("@type", "").endswith("RetryInfo"): delay_str = detail.get("retryDelay", "60s") # Parse delay string (e.g., "3.957525076s") match = re.match(r"([\d.]+)s?", delay_str) if match: retry_delay = float(match.group(1)) break # Mark current account as rate limited self._token_manager.mark_rate_limited(model_name, retry_delay) raise RuntimeError(f"Rate limited (429): {error_message}. Retry after {retry_delay}s") # Re-raise with formatted message raise RuntimeError(f"Antigravity API error {error_code} ({error_status}): {error_message}") except json.JSONDecodeError: raise RuntimeError(f"Antigravity API error {response.status_code}: {response.text}") def _process_image(self, image_path: str) -> dict[str, Any] | None: """Process an image for the Antigravity API.""" try: image_bytes, mime_type = validate_image(image_path) if image_path.startswith("data:"): # Extract base64 data from data URL _, data = image_path.split(",", 1) return {"inlineData": {"mimeType": mime_type, "data": data}} else: # Encode file bytes image_data = base64.b64encode(image_bytes).decode() return {"inlineData": {"mimeType": mime_type, "data": image_data}} except ValueError as e: logger.warning("Image validation failed: %s", e) return None except Exception as e: logger.error("Error processing image %s: %s", image_path, e) return None def _is_error_retryable(self, error: Exception) -> bool: """Determine if an error should be retried.""" error_str = str(error).lower() # Rate limits should be retried (we'll rotate accounts) if "429" in error_str or "rate limit" in error_str: return True # Check for retryable indicators retryable_indicators = [ "timeout", "connection", "network", "temporary", "unavailable", "retry", "500", "502", "503", "504", ] return any(indicator in error_str for indicator in retryable_indicators) # ------------------------------------------------------------------ # Preferred model selection # ------------------------------------------------------------------ def get_preferred_model(self, category: ToolModelCategory, allowed_models: list[str]) -> str | None: """Get preferred model for a category from allowed models.""" from tools.models import ToolModelCategory if not allowed_models: return None capability_map = self.get_all_model_capabilities() def find_best(candidates: list[str]) -> str | None: return sorted(candidates, reverse=True)[0] if candidates else None if category == ToolModelCategory.EXTENDED_REASONING: # Prefer thinking-capable models thinking_models = [ m for m in allowed_models if m in capability_map and capability_map[m].supports_extended_thinking ] if thinking_models: # Prefer Claude Opus, then Claude Sonnet, then Gemini opus = [m for m in thinking_models if "opus" in m] if opus: return find_best(opus) sonnet = [m for m in thinking_models if "sonnet" in m] if sonnet: return find_best(sonnet) return find_best(thinking_models) elif category == ToolModelCategory.FAST_RESPONSE: # Prefer Flash models flash_models = [m for m in allowed_models if "flash" in m] if flash_models: return find_best(flash_models) # Default: prefer models by capability rank return find_best(allowed_models) # ------------------------------------------------------------------ # Resource cleanup # ------------------------------------------------------------------ def close(self) -> None: """Clean up resources.""" if self._http_client: self._http_client.close() self._http_client = None if self._token_manager: self._token_manager.close()