From f02364d4eff077a966fa2e2d460848eb72c7b785 Mon Sep 17 00:00:00 2001 From: Badri Narayanan S Date: Thu, 1 Jan 2026 15:13:43 +0530 Subject: [PATCH] refactor: Reorganize src/ into modular folder structure MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Split large monolithic files into focused modules: - cloudcode-client.js (1,107 lines) → src/cloudcode/ (9 files) - account-manager.js (639 lines) → src/account-manager/ (5 files) - Move auth files to src/auth/ (oauth, token-extractor, database) - Move CLI to src/cli/accounts.js Update all import paths and documentation. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude --- CLAUDE.md | 78 +- package.json | 10 +- src/account-manager.js | 638 ------------- src/account-manager/credentials.js | 171 ++++ src/account-manager/index.js | 293 ++++++ src/account-manager/rate-limits.js | 157 +++ src/account-manager/selection.js | 169 ++++ src/account-manager/storage.js | 128 +++ src/{db => auth}/database.js | 0 src/{ => auth}/oauth.js | 4 +- src/{ => auth}/token-extractor.js | 6 +- src/{accounts-cli.js => cli/accounts.js} | 22 +- src/cloudcode-client.js | 1107 ---------------------- src/cloudcode/index.js | 28 + src/cloudcode/message-handler.js | 209 ++++ src/cloudcode/model-api.js | 97 ++ src/cloudcode/rate-limit-parser.js | 181 ++++ src/cloudcode/request-builder.js | 68 ++ src/cloudcode/session-manager.js | 47 + src/cloudcode/sse-parser.js | 116 +++ src/cloudcode/sse-streamer.js | 285 ++++++ src/cloudcode/streaming-handler.js | 199 ++++ src/server.js | 6 +- 23 files changed, 2235 insertions(+), 1784 deletions(-) delete mode 100644 src/account-manager.js create mode 100644 src/account-manager/credentials.js create mode 100644 src/account-manager/index.js create mode 100644 src/account-manager/rate-limits.js create mode 100644 src/account-manager/selection.js create mode 100644 src/account-manager/storage.js rename src/{db => auth}/database.js (100%) rename src/{ => auth}/oauth.js (99%) rename src/{ => auth}/token-extractor.js (96%) rename src/{accounts-cli.js => cli/accounts.js} (95%) delete mode 100644 src/cloudcode-client.js create mode 100644 src/cloudcode/index.js create mode 100644 src/cloudcode/message-handler.js create mode 100644 src/cloudcode/model-api.js create mode 100644 src/cloudcode/rate-limit-parser.js create mode 100644 src/cloudcode/request-builder.js create mode 100644 src/cloudcode/session-manager.js create mode 100644 src/cloudcode/sse-parser.js create mode 100644 src/cloudcode/sse-streamer.js create mode 100644 src/cloudcode/streaming-handler.js diff --git a/CLAUDE.md b/CLAUDE.md index 6342f11..ceee3cd 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -45,25 +45,64 @@ npm run test:caching # Prompt caching Claude Code CLI → Express Server (server.js) → CloudCode Client → Antigravity Cloud Code API ``` +**Directory Structure:** + +``` +src/ +├── index.js # Entry point +├── server.js # Express server +├── constants.js # Configuration values +├── errors.js # Custom error classes +│ +├── cloudcode/ # Cloud Code API client +│ ├── index.js # Public API exports +│ ├── session-manager.js # Session ID derivation for caching +│ ├── rate-limit-parser.js # Parse reset times from headers/errors +│ ├── request-builder.js # Build API request payloads +│ ├── sse-parser.js # Parse SSE for non-streaming +│ ├── sse-streamer.js # Stream SSE events in real-time +│ ├── message-handler.js # Non-streaming message handling +│ ├── streaming-handler.js # Streaming message handling +│ └── model-api.js # Model listing and quota APIs +│ +├── account-manager/ # Multi-account pool management +│ ├── index.js # AccountManager class facade +│ ├── storage.js # Config file I/O and persistence +│ ├── selection.js # Account picking (round-robin, sticky) +│ ├── rate-limits.js # Rate limit tracking and state +│ └── credentials.js # OAuth token and project handling +│ +├── auth/ # Authentication +│ ├── oauth.js # Google OAuth with PKCE +│ ├── token-extractor.js # Legacy token extraction from DB +│ └── database.js # SQLite database access +│ +├── cli/ # CLI tools +│ └── accounts.js # Account management CLI +│ +├── format/ # Format conversion (Anthropic ↔ Google) +│ ├── index.js # Re-exports all converters +│ ├── request-converter.js # Anthropic → Google conversion +│ ├── response-converter.js # Google → Anthropic conversion +│ ├── content-converter.js # Message content conversion +│ ├── schema-sanitizer.js # JSON Schema cleaning for Gemini +│ ├── thinking-utils.js # Thinking block validation/recovery +│ └── signature-cache.js # In-memory signature cache +│ +└── utils/ # Utilities + ├── helpers.js # formatDuration, sleep + └── logger.js # Structured logging +``` + **Key Modules:** - **src/server.js**: Express server exposing Anthropic-compatible endpoints (`/v1/messages`, `/v1/models`, `/health`, `/account-limits`) -- **src/cloudcode-client.js**: Makes requests to Antigravity Cloud Code API with retry/failover logic, handles both streaming and non-streaming -- **src/format/**: Format conversion module (Anthropic ↔ Google Generative AI) - - `index.js` - Re-exports all converters - - `request-converter.js` - Anthropic → Google request conversion - - `response-converter.js` - Google → Anthropic response conversion - - `content-converter.js` - Message content and role conversion - - `schema-sanitizer.js` - JSON Schema cleaning for Gemini API compatibility (preserves constraints/enums as hints) - - `thinking-utils.js` - Thinking block validation, filtering, reordering, and recovery logic - - `signature-cache.js` - In-memory cache for Gemini thoughtSignatures -- **src/account-manager.js**: Multi-account pool with sticky selection, rate limit handling, and automatic cooldown -- **src/db/database.js**: Cross-platform SQLite database access using better-sqlite3 (Windows/Mac/Linux compatible) -- **src/oauth.js**: Google OAuth implementation for adding accounts -- **src/token-extractor.js**: Extracts tokens from local Antigravity app installation (legacy single-account mode) +- **src/cloudcode/**: Cloud Code API client with retry/failover logic, streaming and non-streaming support +- **src/account-manager/**: Multi-account pool with sticky selection, rate limit handling, and automatic cooldown +- **src/auth/**: Authentication including Google OAuth, token extraction, and database access +- **src/format/**: Format conversion between Anthropic and Google Generative AI formats - **src/constants.js**: API endpoints, model mappings, OAuth config, and all configuration values -- **src/errors.js**: Custom error classes (`RateLimitError`, `AuthError`, `ApiError`, etc.) for structured error handling -- **src/utils/helpers.js**: Shared utility functions (`formatDuration`, `sleep`) +- **src/errors.js**: Custom error classes (`RateLimitError`, `AuthError`, `ApiError`, etc.) **Multi-Account Load Balancing:** - Sticky account selection for prompt caching (stays on same account across turns) @@ -109,6 +148,15 @@ Claude Code CLI → Express Server (server.js) → CloudCode Client → Antigrav - `formatDuration(ms)` - Format milliseconds as "1h23m45s" - `sleep(ms)` - Promise-based delay +**Logger:** Structured logging via `src/utils/logger.js`: +- `logger.info(msg)` - Standard info (blue) +- `logger.success(msg)` - Success messages (green) +- `logger.warn(msg)` - Warnings (yellow) +- `logger.error(msg)` - Errors (red) +- `logger.debug(msg)` - Debug output (magenta, only when enabled) +- `logger.setDebug(true)` - Enable debug mode +- `logger.isDebugEnabled` - Check if debug mode is on + ## Maintenance When making significant changes to the codebase (new modules, refactoring, architectural changes), update this CLAUDE.md and the README.md file to keep documentation in sync. diff --git a/package.json b/package.json index 9045c63..815c9a7 100644 --- a/package.json +++ b/package.json @@ -14,11 +14,11 @@ "scripts": { "start": "node src/index.js", "dev": "node --watch src/index.js", - "accounts": "node src/accounts-cli.js", - "accounts:add": "node src/accounts-cli.js add", - "accounts:list": "node src/accounts-cli.js list", - "accounts:remove": "node src/accounts-cli.js remove", - "accounts:verify": "node src/accounts-cli.js verify", + "accounts": "node src/cli/accounts.js", + "accounts:add": "node src/cli/accounts.js add", + "accounts:list": "node src/cli/accounts.js list", + "accounts:remove": "node src/cli/accounts.js remove", + "accounts:verify": "node src/cli/accounts.js verify", "test": "node tests/run-all.cjs", "test:signatures": "node tests/test-thinking-signatures.cjs", "test:multiturn": "node tests/test-multiturn-thinking-tools.cjs", diff --git a/src/account-manager.js b/src/account-manager.js deleted file mode 100644 index 5ce2cf7..0000000 --- a/src/account-manager.js +++ /dev/null @@ -1,638 +0,0 @@ -/** - * Account Manager - * Manages multiple Antigravity accounts with sticky selection, - * automatic failover, and smart cooldown for rate-limited accounts. - */ - -import { readFile, writeFile, mkdir, access } from 'fs/promises'; -import { constants as fsConstants } from 'fs'; -import { dirname } from 'path'; -import { - ACCOUNT_CONFIG_PATH, - ANTIGRAVITY_DB_PATH, - DEFAULT_COOLDOWN_MS, - TOKEN_REFRESH_INTERVAL_MS, - ANTIGRAVITY_ENDPOINT_FALLBACKS, - ANTIGRAVITY_HEADERS, - DEFAULT_PROJECT_ID, - MAX_WAIT_BEFORE_ERROR_MS -} from './constants.js'; -import { refreshAccessToken } from './oauth.js'; -import { formatDuration } from './utils/helpers.js'; -import { getAuthStatus } from './db/database.js'; -import { logger } from './utils/logger.js'; - -export class AccountManager { - #accounts = []; - #currentIndex = 0; - #configPath; - #settings = {}; - #initialized = false; - - // Per-account caches - #tokenCache = new Map(); // email -> { token, extractedAt } - #projectCache = new Map(); // email -> projectId - - constructor(configPath = ACCOUNT_CONFIG_PATH) { - this.#configPath = configPath; - } - - /** - * Initialize the account manager by loading config - */ - async initialize() { - if (this.#initialized) return; - - try { - // Check if config file exists using async access - await access(this.#configPath, fsConstants.F_OK); - const configData = await readFile(this.#configPath, 'utf-8'); - const config = JSON.parse(configData); - - this.#accounts = (config.accounts || []).map(acc => ({ - ...acc, - isRateLimited: acc.isRateLimited || false, - rateLimitResetTime: acc.rateLimitResetTime || null, - lastUsed: acc.lastUsed || null - })); - - this.#settings = config.settings || {}; - this.#currentIndex = config.activeIndex || 0; - - // Clamp currentIndex to valid range - if (this.#currentIndex >= this.#accounts.length) { - this.#currentIndex = 0; - } - - logger.info(`[AccountManager] Loaded ${this.#accounts.length} account(s) from config`); - - // If config exists but has no accounts, fall back to Antigravity database - if (this.#accounts.length === 0) { - logger.warn('[AccountManager] No accounts in config. Falling back to Antigravity database'); - await this.#loadDefaultAccount(); - } - } catch (error) { - if (error.code === 'ENOENT') { - // No config file - use single account from Antigravity database - logger.info('[AccountManager] No config file found. Using Antigravity database (single account mode)'); - } else { - logger.error('[AccountManager] Failed to load config:', error.message); - } - // Fall back to default account - await this.#loadDefaultAccount(); - } - - // Clear any expired rate limits - this.clearExpiredLimits(); - - this.#initialized = true; - } - - /** - * Load the default account from Antigravity's database - */ - async #loadDefaultAccount() { - try { - const authData = getAuthStatus(); - if (authData?.apiKey) { - this.#accounts = [{ - email: authData.email || 'default@antigravity', - source: 'database', - isRateLimited: false, - rateLimitResetTime: null, - lastUsed: null - }]; - // Pre-cache the token - this.#tokenCache.set(this.#accounts[0].email, { - token: authData.apiKey, - extractedAt: Date.now() - }); - logger.info(`[AccountManager] Loaded default account: ${this.#accounts[0].email}`); - } - } catch (error) { - logger.error('[AccountManager] Failed to load default account:', error.message); - // Create empty account list - will fail on first request - this.#accounts = []; - } - } - - /** - * Get the number of accounts - * @returns {number} Number of configured accounts - */ - getAccountCount() { - return this.#accounts.length; - } - - /** - * Check if all accounts are rate-limited - * @returns {boolean} True if all accounts are rate-limited - */ - isAllRateLimited() { - if (this.#accounts.length === 0) return true; - return this.#accounts.every(acc => acc.isRateLimited); - } - - /** - * Get list of available (non-rate-limited, non-invalid) accounts - * @returns {Array} Array of available account objects - */ - getAvailableAccounts() { - return this.#accounts.filter(acc => !acc.isRateLimited && !acc.isInvalid); - } - - /** - * Get list of invalid accounts - * @returns {Array} Array of invalid account objects - */ - getInvalidAccounts() { - return this.#accounts.filter(acc => acc.isInvalid); - } - - /** - * Clear expired rate limits - * @returns {number} Number of rate limits cleared - */ - clearExpiredLimits() { - const now = Date.now(); - let cleared = 0; - - for (const account of this.#accounts) { - if (account.isRateLimited && account.rateLimitResetTime && account.rateLimitResetTime <= now) { - account.rateLimitResetTime = null; - cleared++; - logger.success(`[AccountManager] Rate limit expired for: ${account.email}`); - } - } - - if (cleared > 0) { - this.saveToDisk(); - } - - return cleared; - } - - /** - * Clear all rate limits to force a fresh check - * (Optimistic retry strategy) - * @returns {void} - */ - resetAllRateLimits() { - for (const account of this.#accounts) { - account.isRateLimited = false; - // distinct from "clearing" expired limits, we blindly reset here - // we keep the time? User said "clear isRateLimited value, and rateLimitResetTime" - // So we clear both. - account.rateLimitResetTime = null; - } - logger.warn('[AccountManager] Reset all rate limits for optimistic retry'); - } - - /** - * Pick the next available account (fallback when current is unavailable). - * Sets activeIndex to the selected account's index. - * @returns {Object|null} The next available account or null if none available - */ - pickNext() { - this.clearExpiredLimits(); - - const available = this.getAvailableAccounts(); - if (available.length === 0) { - return null; - } - - // Clamp index to valid range - if (this.#currentIndex >= this.#accounts.length) { - this.#currentIndex = 0; - } - - // Find next available account starting from index AFTER current - for (let i = 1; i <= this.#accounts.length; i++) { - const idx = (this.#currentIndex + i) % this.#accounts.length; - const account = this.#accounts[idx]; - - if (!account.isRateLimited && !account.isInvalid) { - // Set activeIndex to this account (not +1) - this.#currentIndex = idx; - account.lastUsed = Date.now(); - - const position = idx + 1; - const total = this.#accounts.length; - logger.info(`[AccountManager] Using account: ${account.email} (${position}/${total})`); - - // Persist the change (don't await to avoid blocking) - this.saveToDisk(); - - return account; - } - } - - return null; - } - - /** - * Get the current account without advancing the index (sticky selection). - * Used for cache continuity - sticks to the same account until rate-limited. - * @returns {Object|null} The current account or null if unavailable/rate-limited - */ - getCurrentStickyAccount() { - this.clearExpiredLimits(); - - if (this.#accounts.length === 0) { - return null; - } - - // Clamp index to valid range - if (this.#currentIndex >= this.#accounts.length) { - this.#currentIndex = 0; - } - - // Get current account directly (activeIndex = current account) - const account = this.#accounts[this.#currentIndex]; - - // Return if available - if (account && !account.isRateLimited && !account.isInvalid) { - account.lastUsed = Date.now(); - // Persist the change (don't await to avoid blocking) - this.saveToDisk(); - return account; - } - - return null; - } - - /** - * Check if we should wait for the current account's rate limit to reset. - * Used for sticky account selection - wait if rate limit is short (≤ threshold). - * @returns {{shouldWait: boolean, waitMs: number, account: Object|null}} - */ - shouldWaitForCurrentAccount() { - if (this.#accounts.length === 0) { - return { shouldWait: false, waitMs: 0, account: null }; - } - - // Clamp index to valid range - if (this.#currentIndex >= this.#accounts.length) { - this.#currentIndex = 0; - } - - // Get current account directly (activeIndex = current account) - const account = this.#accounts[this.#currentIndex]; - - if (!account || account.isInvalid) { - return { shouldWait: false, waitMs: 0, account: null }; - } - - if (account.isRateLimited && account.rateLimitResetTime) { - const waitMs = account.rateLimitResetTime - Date.now(); - - // If wait time is within threshold, recommend waiting - if (waitMs > 0 && waitMs <= MAX_WAIT_BEFORE_ERROR_MS) { - return { shouldWait: true, waitMs, account }; - } - } - - return { shouldWait: false, waitMs: 0, account }; - } - - /** - * Pick an account with sticky selection preference. - * Prefers the current account for cache continuity, only switches when: - * - Current account is rate-limited for > 2 minutes - * - Current account is invalid - * @returns {{account: Object|null, waitMs: number}} Account to use and optional wait time - */ - pickStickyAccount() { - // First try to get the current sticky account - const stickyAccount = this.getCurrentStickyAccount(); - if (stickyAccount) { - return { account: stickyAccount, waitMs: 0 }; - } - - // Current account is rate-limited or invalid. - // CHECK IF OTHERS ARE AVAILABLE before deciding to wait. - // We prefer switching to an available neighbor over waiting for the sticky one, - // to avoid "erroring forever" / tight retry loops on short rate limits. - const available = this.getAvailableAccounts(); - if (available.length > 0) { - // Found a free account! Switch immediately. - const nextAccount = this.pickNext(); - if (nextAccount) { - logger.info(`[AccountManager] Switched to new account (failover): ${nextAccount.email}`); - return { account: nextAccount, waitMs: 0 }; - } - } - - // No other accounts available. Now checking if we should wait for current account. - const waitInfo = this.shouldWaitForCurrentAccount(); - if (waitInfo.shouldWait) { - logger.info(`[AccountManager] Waiting ${formatDuration(waitInfo.waitMs)} for sticky account: ${waitInfo.account.email}`); - return { account: null, waitMs: waitInfo.waitMs }; - } - - // Current account unavailable for too long/invalid, and no others available? - // pickNext will likely return null or loop, but we defer to standard logic. - const nextAccount = this.pickNext(); - if (nextAccount) { - logger.info(`[AccountManager] Switched to new account for cache: ${nextAccount.email}`); - } - return { account: nextAccount, waitMs: 0 }; - } - - /** - * Mark an account as rate-limited - * @param {string} email - Email of the account to mark - * @param {number|null} resetMs - Time in ms until rate limit resets (optional) - */ - markRateLimited(email, resetMs = null) { - const account = this.#accounts.find(a => a.email === email); - if (!account) return; - - account.isRateLimited = true; - const cooldownMs = resetMs || this.#settings.cooldownDurationMs || DEFAULT_COOLDOWN_MS; - account.rateLimitResetTime = Date.now() + cooldownMs; - - logger.warn( - `[AccountManager] Rate limited: ${email}. Available in ${formatDuration(cooldownMs)}` - ); - - this.saveToDisk(); - } - - /** - * Mark an account as invalid (credentials need re-authentication) - * @param {string} email - Email of the account to mark - * @param {string} reason - Reason for marking as invalid - */ - markInvalid(email, reason = 'Unknown error') { - const account = this.#accounts.find(a => a.email === email); - if (!account) return; - - account.isInvalid = true; - account.invalidReason = reason; - account.invalidAt = Date.now(); - - logger.error( - `[AccountManager] ⚠ Account INVALID: ${email}` - ); - logger.error( - `[AccountManager] Reason: ${reason}` - ); - logger.error( - `[AccountManager] Run 'npm run accounts' to re-authenticate this account` - ); - - this.saveToDisk(); - } - - /** - * Get the minimum wait time until any account becomes available - * @returns {number} Wait time in milliseconds - */ - getMinWaitTimeMs() { - if (!this.isAllRateLimited()) return 0; - - const now = Date.now(); - let minWait = Infinity; - let soonestAccount = null; - - for (const account of this.#accounts) { - if (account.rateLimitResetTime) { - const wait = account.rateLimitResetTime - now; - if (wait > 0 && wait < minWait) { - minWait = wait; - soonestAccount = account; - } - } - } - - if (soonestAccount) { - logger.info(`[AccountManager] Shortest wait: ${formatDuration(minWait)} (account: ${soonestAccount.email})`); - } - - return minWait === Infinity ? DEFAULT_COOLDOWN_MS : minWait; - } - - /** - * Get OAuth token for an account - * @param {Object} account - Account object with email and credentials - * @returns {Promise} OAuth access token - * @throws {Error} If token refresh fails - */ - async getTokenForAccount(account) { - // Check cache first - const cached = this.#tokenCache.get(account.email); - if (cached && (Date.now() - cached.extractedAt) < TOKEN_REFRESH_INTERVAL_MS) { - return cached.token; - } - - // Get fresh token based on source - let token; - - if (account.source === 'oauth' && account.refreshToken) { - // OAuth account - use refresh token to get new access token - try { - const tokens = await refreshAccessToken(account.refreshToken); - token = tokens.accessToken; - // Clear invalid flag on success - if (account.isInvalid) { - account.isInvalid = false; - account.invalidReason = null; - await this.saveToDisk(); - } - logger.success(`[AccountManager] Refreshed OAuth token for: ${account.email}`); - } catch (error) { - logger.error(`[AccountManager] Failed to refresh token for ${account.email}:`, error.message); - // Mark account as invalid (credentials need re-auth) - this.markInvalid(account.email, error.message); - throw new Error(`AUTH_INVALID: ${account.email}: ${error.message}`); - } - } else if (account.source === 'manual' && account.apiKey) { - token = account.apiKey; - } else { - // Extract from database - const dbPath = account.dbPath || ANTIGRAVITY_DB_PATH; - const authData = getAuthStatus(dbPath); - token = authData.apiKey; - } - - // Cache the token - this.#tokenCache.set(account.email, { - token, - extractedAt: Date.now() - }); - - return token; - } - - /** - * Get project ID for an account - * @param {Object} account - Account object - * @param {string} token - OAuth access token - * @returns {Promise} Project ID - */ - async getProjectForAccount(account, token) { - // Check cache first - const cached = this.#projectCache.get(account.email); - if (cached) { - return cached; - } - - // OAuth or manual accounts may have projectId specified - if (account.projectId) { - this.#projectCache.set(account.email, account.projectId); - return account.projectId; - } - - // Discover project via loadCodeAssist API - const project = await this.#discoverProject(token); - this.#projectCache.set(account.email, project); - return project; - } - - /** - * Discover project ID via Cloud Code API - */ - async #discoverProject(token) { - for (const endpoint of ANTIGRAVITY_ENDPOINT_FALLBACKS) { - try { - const response = await fetch(`${endpoint}/v1internal:loadCodeAssist`, { - method: 'POST', - headers: { - 'Authorization': `Bearer ${token}`, - 'Content-Type': 'application/json', - ...ANTIGRAVITY_HEADERS - }, - body: JSON.stringify({ - metadata: { - ideType: 'IDE_UNSPECIFIED', - platform: 'PLATFORM_UNSPECIFIED', - pluginType: 'GEMINI' - } - }) - }); - - if (!response.ok) continue; - - const data = await response.json(); - - if (typeof data.cloudaicompanionProject === 'string') { - return data.cloudaicompanionProject; - } - if (data.cloudaicompanionProject?.id) { - return data.cloudaicompanionProject.id; - } - } catch (error) { - logger.warn(`[AccountManager] Project discovery failed at ${endpoint}:`, error.message); - } - } - - logger.info(`[AccountManager] Using default project: ${DEFAULT_PROJECT_ID}`); - return DEFAULT_PROJECT_ID; - } - - /** - * Clear project cache for an account (useful on auth errors) - * @param {string|null} email - Email to clear cache for, or null to clear all - */ - clearProjectCache(email = null) { - if (email) { - this.#projectCache.delete(email); - } else { - this.#projectCache.clear(); - } - } - - /** - * Clear token cache for an account (useful on auth errors) - * @param {string|null} email - Email to clear cache for, or null to clear all - */ - clearTokenCache(email = null) { - if (email) { - this.#tokenCache.delete(email); - } else { - this.#tokenCache.clear(); - } - } - - /** - * Save current state to disk (async) - * @returns {Promise} - */ - async saveToDisk() { - try { - // Ensure directory exists - const dir = dirname(this.#configPath); - await mkdir(dir, { recursive: true }); - - const config = { - accounts: this.#accounts.map(acc => ({ - email: acc.email, - source: acc.source, - dbPath: acc.dbPath || null, - refreshToken: acc.source === 'oauth' ? acc.refreshToken : undefined, - apiKey: acc.source === 'manual' ? acc.apiKey : undefined, - projectId: acc.projectId || undefined, - addedAt: acc.addedAt || undefined, - isRateLimited: acc.isRateLimited, - rateLimitResetTime: acc.rateLimitResetTime, - isInvalid: acc.isInvalid || false, - invalidReason: acc.invalidReason || null, - lastUsed: acc.lastUsed - })), - settings: this.#settings, - activeIndex: this.#currentIndex - }; - - await writeFile(this.#configPath, JSON.stringify(config, null, 2)); - } catch (error) { - logger.error('[AccountManager] Failed to save config:', error.message); - } - } - - /** - * Get status object for logging/API - * @returns {{accounts: Array, settings: Object}} Status object with accounts and settings - */ - getStatus() { - const available = this.getAvailableAccounts(); - const rateLimited = this.#accounts.filter(a => a.isRateLimited); - const invalid = this.getInvalidAccounts(); - - return { - total: this.#accounts.length, - available: available.length, - rateLimited: rateLimited.length, - invalid: invalid.length, - summary: `${this.#accounts.length} total, ${available.length} available, ${rateLimited.length} rate-limited, ${invalid.length} invalid`, - accounts: this.#accounts.map(a => ({ - email: a.email, - source: a.source, - isRateLimited: a.isRateLimited, - rateLimitResetTime: a.rateLimitResetTime, - isInvalid: a.isInvalid || false, - invalidReason: a.invalidReason || null, - lastUsed: a.lastUsed - })) - }; - } - - /** - * Get settings - * @returns {Object} Current settings object - */ - getSettings() { - return { ...this.#settings }; - } - - /** - * Get all accounts (internal use for quota fetching) - * Returns the full account objects including credentials - * @returns {Array} Array of account objects - */ - getAllAccounts() { - return this.#accounts; - } -} - -export default AccountManager; diff --git a/src/account-manager/credentials.js b/src/account-manager/credentials.js new file mode 100644 index 0000000..4aa4cf3 --- /dev/null +++ b/src/account-manager/credentials.js @@ -0,0 +1,171 @@ +/** + * Credentials Management + * + * Handles OAuth token handling and project discovery. + */ + +import { + ANTIGRAVITY_DB_PATH, + TOKEN_REFRESH_INTERVAL_MS, + ANTIGRAVITY_ENDPOINT_FALLBACKS, + ANTIGRAVITY_HEADERS, + DEFAULT_PROJECT_ID +} from '../constants.js'; +import { refreshAccessToken } from '../auth/oauth.js'; +import { getAuthStatus } from '../auth/database.js'; +import { logger } from '../utils/logger.js'; + +/** + * Get OAuth token for an account + * + * @param {Object} account - Account object with email and credentials + * @param {Map} tokenCache - Token cache map + * @param {Function} onInvalid - Callback when account is invalid (email, reason) + * @param {Function} onSave - Callback to save changes + * @returns {Promise} OAuth access token + * @throws {Error} If token refresh fails + */ +export async function getTokenForAccount(account, tokenCache, onInvalid, onSave) { + // Check cache first + const cached = tokenCache.get(account.email); + if (cached && (Date.now() - cached.extractedAt) < TOKEN_REFRESH_INTERVAL_MS) { + return cached.token; + } + + // Get fresh token based on source + let token; + + if (account.source === 'oauth' && account.refreshToken) { + // OAuth account - use refresh token to get new access token + try { + const tokens = await refreshAccessToken(account.refreshToken); + token = tokens.accessToken; + // Clear invalid flag on success + if (account.isInvalid) { + account.isInvalid = false; + account.invalidReason = null; + if (onSave) await onSave(); + } + logger.success(`[AccountManager] Refreshed OAuth token for: ${account.email}`); + } catch (error) { + logger.error(`[AccountManager] Failed to refresh token for ${account.email}:`, error.message); + // Mark account as invalid (credentials need re-auth) + if (onInvalid) onInvalid(account.email, error.message); + throw new Error(`AUTH_INVALID: ${account.email}: ${error.message}`); + } + } else if (account.source === 'manual' && account.apiKey) { + token = account.apiKey; + } else { + // Extract from database + const dbPath = account.dbPath || ANTIGRAVITY_DB_PATH; + const authData = getAuthStatus(dbPath); + token = authData.apiKey; + } + + // Cache the token + tokenCache.set(account.email, { + token, + extractedAt: Date.now() + }); + + return token; +} + +/** + * Get project ID for an account + * + * @param {Object} account - Account object + * @param {string} token - OAuth access token + * @param {Map} projectCache - Project cache map + * @returns {Promise} Project ID + */ +export async function getProjectForAccount(account, token, projectCache) { + // Check cache first + const cached = projectCache.get(account.email); + if (cached) { + return cached; + } + + // OAuth or manual accounts may have projectId specified + if (account.projectId) { + projectCache.set(account.email, account.projectId); + return account.projectId; + } + + // Discover project via loadCodeAssist API + const project = await discoverProject(token); + projectCache.set(account.email, project); + return project; +} + +/** + * Discover project ID via Cloud Code API + * + * @param {string} token - OAuth access token + * @returns {Promise} Project ID + */ +export async function discoverProject(token) { + for (const endpoint of ANTIGRAVITY_ENDPOINT_FALLBACKS) { + try { + const response = await fetch(`${endpoint}/v1internal:loadCodeAssist`, { + method: 'POST', + headers: { + 'Authorization': `Bearer ${token}`, + 'Content-Type': 'application/json', + ...ANTIGRAVITY_HEADERS + }, + body: JSON.stringify({ + metadata: { + ideType: 'IDE_UNSPECIFIED', + platform: 'PLATFORM_UNSPECIFIED', + pluginType: 'GEMINI' + } + }) + }); + + if (!response.ok) continue; + + const data = await response.json(); + + if (typeof data.cloudaicompanionProject === 'string') { + return data.cloudaicompanionProject; + } + if (data.cloudaicompanionProject?.id) { + return data.cloudaicompanionProject.id; + } + } catch (error) { + logger.warn(`[AccountManager] Project discovery failed at ${endpoint}:`, error.message); + } + } + + logger.info(`[AccountManager] Using default project: ${DEFAULT_PROJECT_ID}`); + return DEFAULT_PROJECT_ID; +} + +/** + * Clear project cache for an account + * + * @param {Map} projectCache - Project cache map + * @param {string|null} email - Email to clear cache for, or null to clear all + */ +export function clearProjectCache(projectCache, email = null) { + if (email) { + projectCache.delete(email); + } else { + projectCache.clear(); + } +} + +/** + * Clear token cache for an account + * + * @param {Map} tokenCache - Token cache map + * @param {string|null} email - Email to clear cache for, or null to clear all + */ +export function clearTokenCache(tokenCache, email = null) { + if (email) { + tokenCache.delete(email); + } else { + tokenCache.clear(); + } +} diff --git a/src/account-manager/index.js b/src/account-manager/index.js new file mode 100644 index 0000000..6cc1023 --- /dev/null +++ b/src/account-manager/index.js @@ -0,0 +1,293 @@ +/** + * Account Manager + * Manages multiple Antigravity accounts with sticky selection, + * automatic failover, and smart cooldown for rate-limited accounts. + */ + +import { ACCOUNT_CONFIG_PATH } from '../constants.js'; +import { loadAccounts, loadDefaultAccount, saveAccounts } from './storage.js'; +import { + isAllRateLimited as checkAllRateLimited, + getAvailableAccounts as getAvailable, + getInvalidAccounts as getInvalid, + clearExpiredLimits as clearLimits, + resetAllRateLimits as resetLimits, + markRateLimited as markLimited, + markInvalid as markAccountInvalid, + getMinWaitTimeMs as getMinWait +} from './rate-limits.js'; +import { + getTokenForAccount as fetchToken, + getProjectForAccount as fetchProject, + clearProjectCache as clearProject, + clearTokenCache as clearToken +} from './credentials.js'; +import { + pickNext as selectNext, + getCurrentStickyAccount as getSticky, + shouldWaitForCurrentAccount as shouldWait, + pickStickyAccount as selectSticky +} from './selection.js'; +import { logger } from '../utils/logger.js'; + +export class AccountManager { + #accounts = []; + #currentIndex = 0; + #configPath; + #settings = {}; + #initialized = false; + + // Per-account caches + #tokenCache = new Map(); // email -> { token, extractedAt } + #projectCache = new Map(); // email -> projectId + + constructor(configPath = ACCOUNT_CONFIG_PATH) { + this.#configPath = configPath; + } + + /** + * Initialize the account manager by loading config + */ + async initialize() { + if (this.#initialized) return; + + const { accounts, settings, activeIndex } = await loadAccounts(this.#configPath); + + this.#accounts = accounts; + this.#settings = settings; + this.#currentIndex = activeIndex; + + // If config exists but has no accounts, fall back to Antigravity database + if (this.#accounts.length === 0) { + logger.warn('[AccountManager] No accounts in config. Falling back to Antigravity database'); + const { accounts: defaultAccounts, tokenCache } = loadDefaultAccount(); + this.#accounts = defaultAccounts; + this.#tokenCache = tokenCache; + } + + // Clear any expired rate limits + this.clearExpiredLimits(); + + this.#initialized = true; + } + + /** + * Get the number of accounts + * @returns {number} Number of configured accounts + */ + getAccountCount() { + return this.#accounts.length; + } + + /** + * Check if all accounts are rate-limited + * @returns {boolean} True if all accounts are rate-limited + */ + isAllRateLimited() { + return checkAllRateLimited(this.#accounts); + } + + /** + * Get list of available (non-rate-limited, non-invalid) accounts + * @returns {Array} Array of available account objects + */ + getAvailableAccounts() { + return getAvailable(this.#accounts); + } + + /** + * Get list of invalid accounts + * @returns {Array} Array of invalid account objects + */ + getInvalidAccounts() { + return getInvalid(this.#accounts); + } + + /** + * Clear expired rate limits + * @returns {number} Number of rate limits cleared + */ + clearExpiredLimits() { + const cleared = clearLimits(this.#accounts); + if (cleared > 0) { + this.saveToDisk(); + } + return cleared; + } + + /** + * Clear all rate limits to force a fresh check + * (Optimistic retry strategy) + * @returns {void} + */ + resetAllRateLimits() { + resetLimits(this.#accounts); + } + + /** + * Pick the next available account (fallback when current is unavailable). + * Sets activeIndex to the selected account's index. + * @returns {Object|null} The next available account or null if none available + */ + pickNext() { + const { account, newIndex } = selectNext(this.#accounts, this.#currentIndex, () => this.saveToDisk()); + this.#currentIndex = newIndex; + return account; + } + + /** + * Get the current account without advancing the index (sticky selection). + * Used for cache continuity - sticks to the same account until rate-limited. + * @returns {Object|null} The current account or null if unavailable/rate-limited + */ + getCurrentStickyAccount() { + const { account, newIndex } = getSticky(this.#accounts, this.#currentIndex, () => this.saveToDisk()); + this.#currentIndex = newIndex; + return account; + } + + /** + * Check if we should wait for the current account's rate limit to reset. + * Used for sticky account selection - wait if rate limit is short (≤ threshold). + * @returns {{shouldWait: boolean, waitMs: number, account: Object|null}} + */ + shouldWaitForCurrentAccount() { + return shouldWait(this.#accounts, this.#currentIndex); + } + + /** + * Pick an account with sticky selection preference. + * Prefers the current account for cache continuity, only switches when: + * - Current account is rate-limited for > 2 minutes + * - Current account is invalid + * @returns {{account: Object|null, waitMs: number}} Account to use and optional wait time + */ + pickStickyAccount() { + const { account, waitMs, newIndex } = selectSticky(this.#accounts, this.#currentIndex, () => this.saveToDisk()); + this.#currentIndex = newIndex; + return { account, waitMs }; + } + + /** + * Mark an account as rate-limited + * @param {string} email - Email of the account to mark + * @param {number|null} resetMs - Time in ms until rate limit resets (optional) + */ + markRateLimited(email, resetMs = null) { + markLimited(this.#accounts, email, resetMs, this.#settings); + this.saveToDisk(); + } + + /** + * Mark an account as invalid (credentials need re-authentication) + * @param {string} email - Email of the account to mark + * @param {string} reason - Reason for marking as invalid + */ + markInvalid(email, reason = 'Unknown error') { + markAccountInvalid(this.#accounts, email, reason); + this.saveToDisk(); + } + + /** + * Get the minimum wait time until any account becomes available + * @returns {number} Wait time in milliseconds + */ + getMinWaitTimeMs() { + return getMinWait(this.#accounts); + } + + /** + * Get OAuth token for an account + * @param {Object} account - Account object with email and credentials + * @returns {Promise} OAuth access token + * @throws {Error} If token refresh fails + */ + async getTokenForAccount(account) { + return fetchToken( + account, + this.#tokenCache, + (email, reason) => this.markInvalid(email, reason), + () => this.saveToDisk() + ); + } + + /** + * Get project ID for an account + * @param {Object} account - Account object + * @param {string} token - OAuth access token + * @returns {Promise} Project ID + */ + async getProjectForAccount(account, token) { + return fetchProject(account, token, this.#projectCache); + } + + /** + * Clear project cache for an account (useful on auth errors) + * @param {string|null} email - Email to clear cache for, or null to clear all + */ + clearProjectCache(email = null) { + clearProject(this.#projectCache, email); + } + + /** + * Clear token cache for an account (useful on auth errors) + * @param {string|null} email - Email to clear cache for, or null to clear all + */ + clearTokenCache(email = null) { + clearToken(this.#tokenCache, email); + } + + /** + * Save current state to disk (async) + * @returns {Promise} + */ + async saveToDisk() { + await saveAccounts(this.#configPath, this.#accounts, this.#settings, this.#currentIndex); + } + + /** + * Get status object for logging/API + * @returns {{accounts: Array, settings: Object}} Status object with accounts and settings + */ + getStatus() { + const available = this.getAvailableAccounts(); + const rateLimited = this.#accounts.filter(a => a.isRateLimited); + const invalid = this.getInvalidAccounts(); + + return { + total: this.#accounts.length, + available: available.length, + rateLimited: rateLimited.length, + invalid: invalid.length, + summary: `${this.#accounts.length} total, ${available.length} available, ${rateLimited.length} rate-limited, ${invalid.length} invalid`, + accounts: this.#accounts.map(a => ({ + email: a.email, + source: a.source, + isRateLimited: a.isRateLimited, + rateLimitResetTime: a.rateLimitResetTime, + isInvalid: a.isInvalid || false, + invalidReason: a.invalidReason || null, + lastUsed: a.lastUsed + })) + }; + } + + /** + * Get settings + * @returns {Object} Current settings object + */ + getSettings() { + return { ...this.#settings }; + } + + /** + * Get all accounts (internal use for quota fetching) + * Returns the full account objects including credentials + * @returns {Array} Array of account objects + */ + getAllAccounts() { + return this.#accounts; + } +} + +export default AccountManager; diff --git a/src/account-manager/rate-limits.js b/src/account-manager/rate-limits.js new file mode 100644 index 0000000..22be591 --- /dev/null +++ b/src/account-manager/rate-limits.js @@ -0,0 +1,157 @@ +/** + * Rate Limit Management + * + * Handles rate limit tracking and state management for accounts. + */ + +import { DEFAULT_COOLDOWN_MS } from '../constants.js'; +import { formatDuration } from '../utils/helpers.js'; +import { logger } from '../utils/logger.js'; + +/** + * Check if all accounts are rate-limited + * + * @param {Array} accounts - Array of account objects + * @returns {boolean} True if all accounts are rate-limited + */ +export function isAllRateLimited(accounts) { + if (accounts.length === 0) return true; + return accounts.every(acc => acc.isRateLimited); +} + +/** + * Get list of available (non-rate-limited, non-invalid) accounts + * + * @param {Array} accounts - Array of account objects + * @returns {Array} Array of available account objects + */ +export function getAvailableAccounts(accounts) { + return accounts.filter(acc => !acc.isRateLimited && !acc.isInvalid); +} + +/** + * Get list of invalid accounts + * + * @param {Array} accounts - Array of account objects + * @returns {Array} Array of invalid account objects + */ +export function getInvalidAccounts(accounts) { + return accounts.filter(acc => acc.isInvalid); +} + +/** + * Clear expired rate limits + * + * @param {Array} accounts - Array of account objects + * @returns {number} Number of rate limits cleared + */ +export function clearExpiredLimits(accounts) { + const now = Date.now(); + let cleared = 0; + + for (const account of accounts) { + if (account.isRateLimited && account.rateLimitResetTime && account.rateLimitResetTime <= now) { + account.rateLimitResetTime = null; + cleared++; + logger.success(`[AccountManager] Rate limit expired for: ${account.email}`); + } + } + + return cleared; +} + +/** + * Clear all rate limits to force a fresh check (optimistic retry strategy) + * + * @param {Array} accounts - Array of account objects + */ +export function resetAllRateLimits(accounts) { + for (const account of accounts) { + account.isRateLimited = false; + account.rateLimitResetTime = null; + } + logger.warn('[AccountManager] Reset all rate limits for optimistic retry'); +} + +/** + * Mark an account as rate-limited + * + * @param {Array} accounts - Array of account objects + * @param {string} email - Email of the account to mark + * @param {number|null} resetMs - Time in ms until rate limit resets (optional) + * @param {Object} settings - Settings object with cooldownDurationMs + * @returns {boolean} True if account was found and marked + */ +export function markRateLimited(accounts, email, resetMs = null, settings = {}) { + const account = accounts.find(a => a.email === email); + if (!account) return false; + + account.isRateLimited = true; + const cooldownMs = resetMs || settings.cooldownDurationMs || DEFAULT_COOLDOWN_MS; + account.rateLimitResetTime = Date.now() + cooldownMs; + + logger.warn( + `[AccountManager] Rate limited: ${email}. Available in ${formatDuration(cooldownMs)}` + ); + + return true; +} + +/** + * Mark an account as invalid (credentials need re-authentication) + * + * @param {Array} accounts - Array of account objects + * @param {string} email - Email of the account to mark + * @param {string} reason - Reason for marking as invalid + * @returns {boolean} True if account was found and marked + */ +export function markInvalid(accounts, email, reason = 'Unknown error') { + const account = accounts.find(a => a.email === email); + if (!account) return false; + + account.isInvalid = true; + account.invalidReason = reason; + account.invalidAt = Date.now(); + + logger.error( + `[AccountManager] ⚠ Account INVALID: ${email}` + ); + logger.error( + `[AccountManager] Reason: ${reason}` + ); + logger.error( + `[AccountManager] Run 'npm run accounts' to re-authenticate this account` + ); + + return true; +} + +/** + * Get the minimum wait time until any account becomes available + * + * @param {Array} accounts - Array of account objects + * @returns {number} Wait time in milliseconds + */ +export function getMinWaitTimeMs(accounts) { + if (!isAllRateLimited(accounts)) return 0; + + const now = Date.now(); + let minWait = Infinity; + let soonestAccount = null; + + for (const account of accounts) { + if (account.rateLimitResetTime) { + const wait = account.rateLimitResetTime - now; + if (wait > 0 && wait < minWait) { + minWait = wait; + soonestAccount = account; + } + } + } + + if (soonestAccount) { + logger.info(`[AccountManager] Shortest wait: ${formatDuration(minWait)} (account: ${soonestAccount.email})`); + } + + return minWait === Infinity ? DEFAULT_COOLDOWN_MS : minWait; +} diff --git a/src/account-manager/selection.js b/src/account-manager/selection.js new file mode 100644 index 0000000..de5c480 --- /dev/null +++ b/src/account-manager/selection.js @@ -0,0 +1,169 @@ +/** + * Account Selection + * + * Handles account picking logic (round-robin, sticky) for cache continuity. + */ + +import { MAX_WAIT_BEFORE_ERROR_MS } from '../constants.js'; +import { formatDuration } from '../utils/helpers.js'; +import { logger } from '../utils/logger.js'; +import { clearExpiredLimits, getAvailableAccounts } from './rate-limits.js'; + +/** + * Pick the next available account (fallback when current is unavailable). + * + * @param {Array} accounts - Array of account objects + * @param {number} currentIndex - Current account index + * @param {Function} onSave - Callback to save changes + * @returns {{account: Object|null, newIndex: number}} The next available account and new index + */ +export function pickNext(accounts, currentIndex, onSave) { + clearExpiredLimits(accounts); + + const available = getAvailableAccounts(accounts); + if (available.length === 0) { + return { account: null, newIndex: currentIndex }; + } + + // Clamp index to valid range + let index = currentIndex; + if (index >= accounts.length) { + index = 0; + } + + // Find next available account starting from index AFTER current + for (let i = 1; i <= accounts.length; i++) { + const idx = (index + i) % accounts.length; + const account = accounts[idx]; + + if (!account.isRateLimited && !account.isInvalid) { + account.lastUsed = Date.now(); + + const position = idx + 1; + const total = accounts.length; + logger.info(`[AccountManager] Using account: ${account.email} (${position}/${total})`); + + // Trigger save (don't await to avoid blocking) + if (onSave) onSave(); + + return { account, newIndex: idx }; + } + } + + return { account: null, newIndex: currentIndex }; +} + +/** + * Get the current account without advancing the index (sticky selection). + * + * @param {Array} accounts - Array of account objects + * @param {number} currentIndex - Current account index + * @param {Function} onSave - Callback to save changes + * @returns {{account: Object|null, newIndex: number}} The current account and index + */ +export function getCurrentStickyAccount(accounts, currentIndex, onSave) { + clearExpiredLimits(accounts); + + if (accounts.length === 0) { + return { account: null, newIndex: currentIndex }; + } + + // Clamp index to valid range + let index = currentIndex; + if (index >= accounts.length) { + index = 0; + } + + // Get current account directly (activeIndex = current account) + const account = accounts[index]; + + // Return if available + if (account && !account.isRateLimited && !account.isInvalid) { + account.lastUsed = Date.now(); + // Trigger save (don't await to avoid blocking) + if (onSave) onSave(); + return { account, newIndex: index }; + } + + return { account: null, newIndex: index }; +} + +/** + * Check if we should wait for the current account's rate limit to reset. + * + * @param {Array} accounts - Array of account objects + * @param {number} currentIndex - Current account index + * @returns {{shouldWait: boolean, waitMs: number, account: Object|null}} + */ +export function shouldWaitForCurrentAccount(accounts, currentIndex) { + if (accounts.length === 0) { + return { shouldWait: false, waitMs: 0, account: null }; + } + + // Clamp index to valid range + let index = currentIndex; + if (index >= accounts.length) { + index = 0; + } + + // Get current account directly (activeIndex = current account) + const account = accounts[index]; + + if (!account || account.isInvalid) { + return { shouldWait: false, waitMs: 0, account: null }; + } + + if (account.isRateLimited && account.rateLimitResetTime) { + const waitMs = account.rateLimitResetTime - Date.now(); + + // If wait time is within threshold, recommend waiting + if (waitMs > 0 && waitMs <= MAX_WAIT_BEFORE_ERROR_MS) { + return { shouldWait: true, waitMs, account }; + } + } + + return { shouldWait: false, waitMs: 0, account }; +} + +/** + * Pick an account with sticky selection preference. + * Prefers the current account for cache continuity. + * + * @param {Array} accounts - Array of account objects + * @param {number} currentIndex - Current account index + * @param {Function} onSave - Callback to save changes + * @returns {{account: Object|null, waitMs: number, newIndex: number}} + */ +export function pickStickyAccount(accounts, currentIndex, onSave) { + // First try to get the current sticky account + const { account: stickyAccount, newIndex: stickyIndex } = getCurrentStickyAccount(accounts, currentIndex, onSave); + if (stickyAccount) { + return { account: stickyAccount, waitMs: 0, newIndex: stickyIndex }; + } + + // Current account is rate-limited or invalid. + // CHECK IF OTHERS ARE AVAILABLE before deciding to wait. + const available = getAvailableAccounts(accounts); + if (available.length > 0) { + // Found a free account! Switch immediately. + const { account: nextAccount, newIndex } = pickNext(accounts, currentIndex, onSave); + if (nextAccount) { + logger.info(`[AccountManager] Switched to new account (failover): ${nextAccount.email}`); + return { account: nextAccount, waitMs: 0, newIndex }; + } + } + + // No other accounts available. Now checking if we should wait for current account. + const waitInfo = shouldWaitForCurrentAccount(accounts, currentIndex); + if (waitInfo.shouldWait) { + logger.info(`[AccountManager] Waiting ${formatDuration(waitInfo.waitMs)} for sticky account: ${waitInfo.account.email}`); + return { account: null, waitMs: waitInfo.waitMs, newIndex: currentIndex }; + } + + // Current account unavailable for too long/invalid, and no others available? + const { account: nextAccount, newIndex } = pickNext(accounts, currentIndex, onSave); + if (nextAccount) { + logger.info(`[AccountManager] Switched to new account for cache: ${nextAccount.email}`); + } + return { account: nextAccount, waitMs: 0, newIndex }; +} diff --git a/src/account-manager/storage.js b/src/account-manager/storage.js new file mode 100644 index 0000000..a22a8c9 --- /dev/null +++ b/src/account-manager/storage.js @@ -0,0 +1,128 @@ +/** + * Account Storage + * + * Handles loading and saving account configuration to disk. + */ + +import { readFile, writeFile, mkdir, access } from 'fs/promises'; +import { constants as fsConstants } from 'fs'; +import { dirname } from 'path'; +import { ACCOUNT_CONFIG_PATH } from '../constants.js'; +import { getAuthStatus } from '../auth/database.js'; +import { logger } from '../utils/logger.js'; + +/** + * Load accounts from the config file + * + * @param {string} configPath - Path to the config file + * @returns {Promise<{accounts: Array, settings: Object, activeIndex: number}>} + */ +export async function loadAccounts(configPath = ACCOUNT_CONFIG_PATH) { + try { + // Check if config file exists using async access + await access(configPath, fsConstants.F_OK); + const configData = await readFile(configPath, 'utf-8'); + const config = JSON.parse(configData); + + const accounts = (config.accounts || []).map(acc => ({ + ...acc, + isRateLimited: acc.isRateLimited || false, + rateLimitResetTime: acc.rateLimitResetTime || null, + lastUsed: acc.lastUsed || null + })); + + const settings = config.settings || {}; + let activeIndex = config.activeIndex || 0; + + // Clamp activeIndex to valid range + if (activeIndex >= accounts.length) { + activeIndex = 0; + } + + logger.info(`[AccountManager] Loaded ${accounts.length} account(s) from config`); + + return { accounts, settings, activeIndex }; + } catch (error) { + if (error.code === 'ENOENT') { + // No config file - return empty + logger.info('[AccountManager] No config file found. Using Antigravity database (single account mode)'); + } else { + logger.error('[AccountManager] Failed to load config:', error.message); + } + return { accounts: [], settings: {}, activeIndex: 0 }; + } +} + +/** + * Load the default account from Antigravity's database + * + * @param {string} dbPath - Optional path to the database + * @returns {{accounts: Array, tokenCache: Map}} + */ +export function loadDefaultAccount(dbPath) { + try { + const authData = getAuthStatus(dbPath); + if (authData?.apiKey) { + const account = { + email: authData.email || 'default@antigravity', + source: 'database', + isRateLimited: false, + rateLimitResetTime: null, + lastUsed: null + }; + + const tokenCache = new Map(); + tokenCache.set(account.email, { + token: authData.apiKey, + extractedAt: Date.now() + }); + + logger.info(`[AccountManager] Loaded default account: ${account.email}`); + + return { accounts: [account], tokenCache }; + } + } catch (error) { + logger.error('[AccountManager] Failed to load default account:', error.message); + } + + return { accounts: [], tokenCache: new Map() }; +} + +/** + * Save account configuration to disk + * + * @param {string} configPath - Path to the config file + * @param {Array} accounts - Array of account objects + * @param {Object} settings - Settings object + * @param {number} activeIndex - Current active account index + */ +export async function saveAccounts(configPath, accounts, settings, activeIndex) { + try { + // Ensure directory exists + const dir = dirname(configPath); + await mkdir(dir, { recursive: true }); + + const config = { + accounts: accounts.map(acc => ({ + email: acc.email, + source: acc.source, + dbPath: acc.dbPath || null, + refreshToken: acc.source === 'oauth' ? acc.refreshToken : undefined, + apiKey: acc.source === 'manual' ? acc.apiKey : undefined, + projectId: acc.projectId || undefined, + addedAt: acc.addedAt || undefined, + isRateLimited: acc.isRateLimited, + rateLimitResetTime: acc.rateLimitResetTime, + isInvalid: acc.isInvalid || false, + invalidReason: acc.invalidReason || null, + lastUsed: acc.lastUsed + })), + settings: settings, + activeIndex: activeIndex + }; + + await writeFile(configPath, JSON.stringify(config, null, 2)); + } catch (error) { + logger.error('[AccountManager] Failed to save config:', error.message); + } +} diff --git a/src/db/database.js b/src/auth/database.js similarity index 100% rename from src/db/database.js rename to src/auth/database.js diff --git a/src/oauth.js b/src/auth/oauth.js similarity index 99% rename from src/oauth.js rename to src/auth/oauth.js index 908c147..0c40ff7 100644 --- a/src/oauth.js +++ b/src/auth/oauth.js @@ -13,8 +13,8 @@ import { ANTIGRAVITY_HEADERS, OAUTH_CONFIG, OAUTH_REDIRECT_URI -} from './constants.js'; -import { logger } from './utils/logger.js'; +} from '../constants.js'; +import { logger } from '../utils/logger.js'; /** * Generate PKCE code verifier and challenge diff --git a/src/token-extractor.js b/src/auth/token-extractor.js similarity index 96% rename from src/token-extractor.js rename to src/auth/token-extractor.js index c8ab98a..cface19 100644 --- a/src/token-extractor.js +++ b/src/auth/token-extractor.js @@ -9,9 +9,9 @@ import { TOKEN_REFRESH_INTERVAL_MS, ANTIGRAVITY_AUTH_PORT -} from './constants.js'; -import { getAuthStatus } from './db/database.js'; -import { logger } from './utils/logger.js'; +} from '../constants.js'; +import { getAuthStatus } from './database.js'; +import { logger } from '../utils/logger.js'; // Cache for the extracted token let cachedToken = null; diff --git a/src/accounts-cli.js b/src/cli/accounts.js similarity index 95% rename from src/accounts-cli.js rename to src/cli/accounts.js index 2dab955..eee8013 100644 --- a/src/accounts-cli.js +++ b/src/cli/accounts.js @@ -7,10 +7,10 @@ * for the Antigravity Claude Proxy. * * Usage: - * node src/accounts-cli.js # Interactive mode - * node src/accounts-cli.js add # Add new account(s) - * node src/accounts-cli.js list # List all accounts - * node src/accounts-cli.js clear # Remove all accounts + * node src/cli/accounts.js # Interactive mode + * node src/cli/accounts.js add # Add new account(s) + * node src/cli/accounts.js list # List all accounts + * node src/cli/accounts.js clear # Remove all accounts */ import { createInterface } from 'readline/promises'; @@ -19,14 +19,14 @@ import { existsSync, readFileSync, writeFileSync, mkdirSync } from 'fs'; import { dirname } from 'path'; import { exec } from 'child_process'; import net from 'net'; -import { ACCOUNT_CONFIG_PATH, DEFAULT_PORT, MAX_ACCOUNTS } from './constants.js'; +import { ACCOUNT_CONFIG_PATH, DEFAULT_PORT, MAX_ACCOUNTS } from '../constants.js'; import { getAuthorizationUrl, startCallbackServer, completeOAuthFlow, refreshAccessToken, getUserEmail -} from './oauth.js'; +} from '../auth/oauth.js'; const SERVER_PORT = process.env.PORT || DEFAULT_PORT; @@ -415,11 +415,11 @@ async function main() { break; case 'help': console.log('\nUsage:'); - console.log(' node src/accounts-cli.js add Add new account(s)'); - console.log(' node src/accounts-cli.js list List all accounts'); - console.log(' node src/accounts-cli.js verify Verify account tokens'); - console.log(' node src/accounts-cli.js clear Remove all accounts'); - console.log(' node src/accounts-cli.js help Show this help'); + console.log(' node src/cli/accounts.js add Add new account(s)'); + console.log(' node src/cli/accounts.js list List all accounts'); + console.log(' node src/cli/accounts.js verify Verify account tokens'); + console.log(' node src/cli/accounts.js clear Remove all accounts'); + console.log(' node src/cli/accounts.js help Show this help'); break; case 'remove': await ensureServerStopped(); diff --git a/src/cloudcode-client.js b/src/cloudcode-client.js deleted file mode 100644 index 88e2e53..0000000 --- a/src/cloudcode-client.js +++ /dev/null @@ -1,1107 +0,0 @@ -/** - * Cloud Code Client for Antigravity - * - * Communicates with Google's Cloud Code internal API using the - * v1internal:streamGenerateContent endpoint with proper request wrapping. - * - * Supports multi-account load balancing with automatic failover. - * - * Based on: https://github.com/NoeFabris/opencode-antigravity-auth - */ - -import crypto from 'crypto'; -import { - ANTIGRAVITY_ENDPOINT_FALLBACKS, - ANTIGRAVITY_HEADERS, - MAX_RETRIES, - MAX_WAIT_BEFORE_ERROR_MS, - MIN_SIGNATURE_LENGTH, - getModelFamily, - isThinkingModel -} from './constants.js'; -import { - convertAnthropicToGoogle, - convertGoogleToAnthropic -} from './format/index.js'; -import { cacheSignature } from './format/signature-cache.js'; -import { formatDuration, sleep } from './utils/helpers.js'; -import { isRateLimitError, isAuthError } from './errors.js'; -import { logger } from './utils/logger.js'; - -/** - * Check if an error is a rate limit error (429 or RESOURCE_EXHAUSTED) - * @deprecated Use isRateLimitError from errors.js instead - */ -function is429Error(error) { - return isRateLimitError(error); -} - -/** - * Check if an error is an auth-invalid error (credentials need re-authentication) - * @deprecated Use isAuthError from errors.js instead - */ -function isAuthInvalidError(error) { - return isAuthError(error); -} - -/** - * Derive a stable session ID from the first user message in the conversation. - * This ensures the same conversation uses the same session ID across turns, - * enabling prompt caching (cache is scoped to session + organization). - * - * @param {Object} anthropicRequest - The Anthropic-format request - * @returns {string} A stable session ID (32 hex characters) or random UUID if no user message - */ -function deriveSessionId(anthropicRequest) { - const messages = anthropicRequest.messages || []; - - // Find the first user message - for (const msg of messages) { - if (msg.role === 'user') { - let content = ''; - - if (typeof msg.content === 'string') { - content = msg.content; - } else if (Array.isArray(msg.content)) { - // Extract text from content blocks - content = msg.content - .filter(block => block.type === 'text' && block.text) - .map(block => block.text) - .join('\n'); - } - - if (content) { - // Hash the content with SHA256, return first 32 hex chars - const hash = crypto.createHash('sha256').update(content).digest('hex'); - return hash.substring(0, 32); - } - } - } - - // Fallback to random UUID if no user message found - return crypto.randomUUID(); -} - -/** - * Parse reset time from HTTP response or error - * Checks headers first, then error message body - * Returns milliseconds or null if not found - * - * @param {Response|Error} responseOrError - HTTP Response object or Error - * @param {string} errorText - Optional error body text - */ -function parseResetTime(responseOrError, errorText = '') { - let resetMs = null; - - // If it's a Response object, check headers first - if (responseOrError && typeof responseOrError.headers?.get === 'function') { - const headers = responseOrError.headers; - - // Standard Retry-After header (seconds or HTTP date) - const retryAfter = headers.get('retry-after'); - if (retryAfter) { - const seconds = parseInt(retryAfter, 10); - if (!isNaN(seconds)) { - resetMs = seconds * 1000; - logger.debug(`[CloudCode] Retry-After header: ${seconds}s`); - } else { - // Try parsing as HTTP date - const date = new Date(retryAfter); - if (!isNaN(date.getTime())) { - resetMs = date.getTime() - Date.now(); - if (resetMs > 0) { - logger.debug(`[CloudCode] Retry-After date: ${retryAfter}`); - } else { - resetMs = null; - } - } - } - } - - // x-ratelimit-reset (Unix timestamp in seconds) - if (!resetMs) { - const ratelimitReset = headers.get('x-ratelimit-reset'); - if (ratelimitReset) { - const resetTimestamp = parseInt(ratelimitReset, 10) * 1000; - resetMs = resetTimestamp - Date.now(); - if (resetMs > 0) { - logger.debug(`[CloudCode] x-ratelimit-reset: ${new Date(resetTimestamp).toISOString()}`); - } else { - resetMs = null; - } - } - } - - // x-ratelimit-reset-after (seconds) - if (!resetMs) { - const resetAfter = headers.get('x-ratelimit-reset-after'); - if (resetAfter) { - const seconds = parseInt(resetAfter, 10); - if (!isNaN(seconds) && seconds > 0) { - resetMs = seconds * 1000; - logger.debug(`[CloudCode] x-ratelimit-reset-after: ${seconds}s`); - } - } - } - } - - // If no header found, try parsing from error message/body - if (!resetMs) { - const msg = (responseOrError instanceof Error ? responseOrError.message : errorText) || ''; - - // Try to extract "quotaResetDelay" first (e.g. "754.431528ms" or "1.5s") - // This is Google's preferred format for rate limit reset delay - const quotaDelayMatch = msg.match(/quotaResetDelay[:\s"]+(\d+(?:\.\d+)?)(ms|s)/i); - if (quotaDelayMatch) { - const value = parseFloat(quotaDelayMatch[1]); - const unit = quotaDelayMatch[2].toLowerCase(); - resetMs = unit === 's' ? Math.ceil(value * 1000) : Math.ceil(value); - logger.debug(`[CloudCode] Parsed quotaResetDelay from body: ${resetMs}ms`); - } - - // Try to extract "quotaResetTimeStamp" (ISO format like "2025-12-31T07:00:47Z") - if (!resetMs) { - const quotaTimestampMatch = msg.match(/quotaResetTimeStamp[:\s"]+(\d{4}-\d{2}-\d{2}T[\d:.]+Z?)/i); - if (quotaTimestampMatch) { - const resetTime = new Date(quotaTimestampMatch[1]).getTime(); - if (!isNaN(resetTime)) { - resetMs = resetTime - Date.now(); - // Even if expired or 0, we found a timestamp, so rely on it. - // But if it's negative, it means "now", so treat as small wait. - logger.debug(`[CloudCode] Parsed quotaResetTimeStamp: ${quotaTimestampMatch[1]} (Delta: ${resetMs}ms)`); - } - } - } - - // Try to extract "retry-after-ms" or "retryDelay" - check seconds format first (e.g. "7739.23s") - // Added stricter regex to avoid partial matches - if (!resetMs) { - const secMatch = msg.match(/(?:retry[-_]?after[-_]?ms|retryDelay)[:\s"]+([\d\.]+)(?:s\b|s")/i); - if (secMatch) { - resetMs = Math.ceil(parseFloat(secMatch[1]) * 1000); - logger.debug(`[CloudCode] Parsed retry seconds from body (precise): ${resetMs}ms`); - } - } - - if (!resetMs) { - // Check for ms (explicit "ms" suffix or implicit if no suffix) - const msMatch = msg.match(/(?:retry[-_]?after[-_]?ms|retryDelay)[:\s"]+(\d+)(?:\s*ms)?(?![\w.])/i); - if (msMatch) { - resetMs = parseInt(msMatch[1], 10); - logger.debug(`[CloudCode] Parsed retry-after-ms from body: ${resetMs}ms`); - } - } - - // Try to extract seconds value like "retry after 60 seconds" - if (!resetMs) { - const secMatch = msg.match(/retry\s+(?:after\s+)?(\d+)\s*(?:sec|s\b)/i); - if (secMatch) { - resetMs = parseInt(secMatch[1], 10) * 1000; - logger.debug(`[CloudCode] Parsed retry seconds from body: ${secMatch[1]}s`); - } - } - - // Try to extract duration like "1h23m45s" or "23m45s" or "45s" - if (!resetMs) { - const durationMatch = msg.match(/(\d+)h(\d+)m(\d+)s|(\d+)m(\d+)s|(\d+)s/i); - if (durationMatch) { - if (durationMatch[1]) { - const hours = parseInt(durationMatch[1], 10); - const minutes = parseInt(durationMatch[2], 10); - const seconds = parseInt(durationMatch[3], 10); - resetMs = (hours * 3600 + minutes * 60 + seconds) * 1000; - } else if (durationMatch[4]) { - const minutes = parseInt(durationMatch[4], 10); - const seconds = parseInt(durationMatch[5], 10); - resetMs = (minutes * 60 + seconds) * 1000; - } else if (durationMatch[6]) { - resetMs = parseInt(durationMatch[6], 10) * 1000; - } - if (resetMs) { - logger.debug(`[CloudCode] Parsed duration from body: ${formatDuration(resetMs)}`); - } - } - } - - // Try to extract ISO timestamp or Unix timestamp - if (!resetMs) { - const isoMatch = msg.match(/reset[:\s"]+(\d{4}-\d{2}-\d{2}T[\d:.]+Z?)/i); - if (isoMatch) { - const resetTime = new Date(isoMatch[1]).getTime(); - if (!isNaN(resetTime)) { - resetMs = resetTime - Date.now(); - if (resetMs > 0) { - logger.debug(`[CloudCode] Parsed ISO reset time: ${isoMatch[1]}`); - } else { - resetMs = null; - } - } - } - } - } - - // SANITY CHECK: Enforce strict minimums for found rate limits - // If we found a reset time, but it's very small (e.g. < 1s) or negative, - // explicitly bump it up to avoid "Available in 0s" loops. - if (resetMs !== null) { - if (resetMs < 1000) { - logger.debug(`[CloudCode] Reset time too small (${resetMs}ms), enforcing 2s buffer`); - resetMs = 2000; - } - } - - return resetMs; -} - -/** - * Build the wrapped request body for Cloud Code API - */ -function buildCloudCodeRequest(anthropicRequest, projectId) { - const model = anthropicRequest.model; - const googleRequest = convertAnthropicToGoogle(anthropicRequest); - - // Use stable session ID derived from first user message for cache continuity - googleRequest.sessionId = deriveSessionId(anthropicRequest); - - const payload = { - project: projectId, - model: model, - request: googleRequest, - userAgent: 'antigravity', - requestId: 'agent-' + crypto.randomUUID() - }; - - return payload; -} - -/** - * Build headers for Cloud Code API requests - */ -function buildHeaders(token, model, accept = 'application/json') { - const headers = { - 'Authorization': `Bearer ${token}`, - 'Content-Type': 'application/json', - ...ANTIGRAVITY_HEADERS - }; - - const modelFamily = getModelFamily(model); - - // Add interleaved thinking header only for Claude thinking models - if (modelFamily === 'claude' && isThinkingModel(model)) { - headers['anthropic-beta'] = 'interleaved-thinking-2025-05-14'; - } - - if (accept !== 'application/json') { - headers['Accept'] = accept; - } - - return headers; -} - -/** - * Send a non-streaming request to Cloud Code with multi-account support - * Uses SSE endpoint for thinking models (non-streaming doesn't return thinking blocks) - * - * @param {Object} anthropicRequest - The Anthropic-format request - * @param {Object} anthropicRequest.model - Model name to use - * @param {Array} anthropicRequest.messages - Array of message objects - * @param {number} [anthropicRequest.max_tokens] - Maximum tokens to generate - * @param {Object} [anthropicRequest.thinking] - Thinking configuration - * @param {import('./account-manager.js').default} accountManager - The account manager instance - * @returns {Promise} Anthropic-format response object - * @throws {Error} If max retries exceeded or no accounts available - */ -export async function sendMessage(anthropicRequest, accountManager) { - const model = anthropicRequest.model; - const isThinking = isThinkingModel(model); - - // Retry loop with account failover - // Ensure we try at least as many times as there are accounts to cycle through everyone - // +1 to ensure we hit the "all accounts rate-limited" check at the start of the next loop - const maxAttempts = Math.max(MAX_RETRIES, accountManager.getAccountCount() + 1); - - for (let attempt = 0; attempt < maxAttempts; attempt++) { - // Use sticky account selection for cache continuity - const { account: stickyAccount, waitMs } = accountManager.pickStickyAccount(); - let account = stickyAccount; - - // Handle waiting for sticky account - if (!account && waitMs > 0) { - logger.info(`[CloudCode] Waiting ${formatDuration(waitMs)} for sticky account...`); - await sleep(waitMs); - accountManager.clearExpiredLimits(); - account = accountManager.getCurrentStickyAccount(); - } - - // Handle all accounts rate-limited - if (!account) { - if (accountManager.isAllRateLimited()) { - const allWaitMs = accountManager.getMinWaitTimeMs(); - const resetTime = new Date(Date.now() + allWaitMs).toISOString(); - - // If wait time is too long (> 2 minutes), throw error immediately - if (allWaitMs > MAX_WAIT_BEFORE_ERROR_MS) { - throw new Error( - `RESOURCE_EXHAUSTED: Rate limited. Quota will reset after ${formatDuration(allWaitMs)}. Next available: ${resetTime}` - ); - } - - // Wait for reset (applies to both single and multi-account modes) - const accountCount = accountManager.getAccountCount(); - logger.warn(`[CloudCode] All ${accountCount} account(s) rate-limited. Waiting ${formatDuration(allWaitMs)}...`); - await sleep(allWaitMs); - accountManager.clearExpiredLimits(); - account = accountManager.pickNext(); - } - - if (!account) { - throw new Error('No accounts available'); - } - } - - try { - // Get token and project for this account - const token = await accountManager.getTokenForAccount(account); - const project = await accountManager.getProjectForAccount(account, token); - const payload = buildCloudCodeRequest(anthropicRequest, project); - - logger.debug(`[CloudCode] Sending request for model: ${model}`); - - // Try each endpoint - let lastError = null; - for (const endpoint of ANTIGRAVITY_ENDPOINT_FALLBACKS) { - try { - const url = isThinking - ? `${endpoint}/v1internal:streamGenerateContent?alt=sse` - : `${endpoint}/v1internal:generateContent`; - - const response = await fetch(url, { - method: 'POST', - headers: buildHeaders(token, model, isThinking ? 'text/event-stream' : 'application/json'), - body: JSON.stringify(payload) - }); - - if (!response.ok) { - const errorText = await response.text(); - logger.warn(`[CloudCode] Error at ${endpoint}: ${response.status} - ${errorText}`); - - if (response.status === 401) { - // Auth error - clear caches and retry with fresh token - logger.warn('[CloudCode] Auth error, refreshing token...'); - accountManager.clearTokenCache(account.email); - accountManager.clearProjectCache(account.email); - continue; - } - - if (response.status === 429) { - // Rate limited on this endpoint - try next endpoint first (DAILY → PROD) - logger.debug(`[CloudCode] Rate limited at ${endpoint}, trying next endpoint...`); - const resetMs = parseResetTime(response, errorText); - // Keep minimum reset time across all 429 responses - if (!lastError?.is429 || (resetMs && (!lastError.resetMs || resetMs < lastError.resetMs))) { - lastError = { is429: true, response, errorText, resetMs }; - } - continue; - } - - if (response.status >= 400) { - lastError = new Error(`API error ${response.status}: ${errorText}`); - // If it's a 5xx error, wait a bit before trying the next endpoint - if (response.status >= 500) { - logger.warn(`[CloudCode] ${response.status} error, waiting 1s before retry...`); - await sleep(1000); - } - continue; - } - } - - // For thinking models, parse SSE and accumulate all parts - if (isThinking) { - return await parseThinkingSSEResponse(response, anthropicRequest.model); - } - - // Non-thinking models use regular JSON - const data = await response.json(); - logger.debug('[CloudCode] Response received'); - return convertGoogleToAnthropic(data, anthropicRequest.model); - - } catch (endpointError) { - if (is429Error(endpointError)) { - throw endpointError; // Re-throw to trigger account switch - } - logger.warn(`[CloudCode] Error at ${endpoint}:`, endpointError.message); - lastError = endpointError; - } - } - - // If all endpoints failed for this account - if (lastError) { - // If all endpoints returned 429, mark account as rate-limited - if (lastError.is429) { - logger.warn(`[CloudCode] All endpoints rate-limited for ${account.email}`); - accountManager.markRateLimited(account.email, lastError.resetMs); - throw new Error(`Rate limited: ${lastError.errorText}`); - } - throw lastError; - } - - } catch (error) { - if (is429Error(error)) { - // Rate limited - already marked, continue to next account - logger.info(`[CloudCode] Account ${account.email} rate-limited, trying next...`); - continue; - } - if (isAuthInvalidError(error)) { - // Auth invalid - already marked, continue to next account - logger.warn(`[CloudCode] Account ${account.email} has invalid credentials, trying next...`); - continue; - } - // Non-rate-limit error: throw immediately - // UNLESS it's a 500 error, then we treat it as a "soft" failure for this account and try the next one - if (error.message.includes('API error 5') || error.message.includes('500') || error.message.includes('503')) { - logger.warn(`[CloudCode] Account ${account.email} failed with 5xx error, trying next...`); - accountManager.pickNext(); // Force advance to next account - continue; - } - - throw error; - } - } - - throw new Error('Max retries exceeded'); -} - -/** - * Parse SSE response for thinking models and accumulate all parts - */ -async function parseThinkingSSEResponse(response, originalModel) { - let accumulatedThinkingText = ''; - let accumulatedThinkingSignature = ''; - let accumulatedText = ''; - const finalParts = []; - let usageMetadata = {}; - let finishReason = 'STOP'; - - const flushThinking = () => { - if (accumulatedThinkingText) { - finalParts.push({ - thought: true, - text: accumulatedThinkingText, - thoughtSignature: accumulatedThinkingSignature - }); - accumulatedThinkingText = ''; - accumulatedThinkingSignature = ''; - } - }; - - const flushText = () => { - if (accumulatedText) { - finalParts.push({ text: accumulatedText }); - accumulatedText = ''; - } - }; - - const reader = response.body.getReader(); - const decoder = new TextDecoder(); - let buffer = ''; - - while (true) { - const { done, value } = await reader.read(); - if (done) break; - - buffer += decoder.decode(value, { stream: true }); - const lines = buffer.split('\n'); - buffer = lines.pop() || ''; - - for (const line of lines) { - if (!line.startsWith('data:')) continue; - const jsonText = line.slice(5).trim(); - if (!jsonText) continue; - - try { - const data = JSON.parse(jsonText); - const innerResponse = data.response || data; - - if (innerResponse.usageMetadata) { - usageMetadata = innerResponse.usageMetadata; - } - - const candidates = innerResponse.candidates || []; - const firstCandidate = candidates[0] || {}; - if (firstCandidate.finishReason) { - finishReason = firstCandidate.finishReason; - } - - const parts = firstCandidate.content?.parts || []; - for (const part of parts) { - if (part.thought === true) { - flushText(); - accumulatedThinkingText += (part.text || ''); - if (part.thoughtSignature) { - accumulatedThinkingSignature = part.thoughtSignature; - } - } else if (part.functionCall) { - flushThinking(); - flushText(); - finalParts.push(part); - } else if (part.text !== undefined) { - if (!part.text) continue; - flushThinking(); - accumulatedText += part.text; - } - } - } catch (e) { - logger.debug('[CloudCode] SSE parse warning:', e.message, 'Raw:', jsonText.slice(0, 100)); - } - } - } - - flushThinking(); - flushText(); - - const accumulatedResponse = { - candidates: [{ content: { parts: finalParts }, finishReason }], - usageMetadata - }; - - const partTypes = finalParts.map(p => p.thought ? 'thought' : (p.functionCall ? 'functionCall' : 'text')); - logger.debug('[CloudCode] Response received (SSE), part types:', partTypes); - if (finalParts.some(p => p.thought)) { - const thinkingPart = finalParts.find(p => p.thought); - logger.debug('[CloudCode] Thinking signature length:', thinkingPart?.thoughtSignature?.length || 0); - } - - return convertGoogleToAnthropic(accumulatedResponse, originalModel); -} - -/** - * Send a streaming request to Cloud Code with multi-account support - * Streams events in real-time as they arrive from the server - * - * @param {Object} anthropicRequest - The Anthropic-format request - * @param {string} anthropicRequest.model - Model name to use - * @param {Array} anthropicRequest.messages - Array of message objects - * @param {number} [anthropicRequest.max_tokens] - Maximum tokens to generate - * @param {Object} [anthropicRequest.thinking] - Thinking configuration - * @param {import('./account-manager.js').default} accountManager - The account manager instance - * @yields {Object} Anthropic-format SSE events (message_start, content_block_start, content_block_delta, etc.) - * @throws {Error} If max retries exceeded or no accounts available - */ -export async function* sendMessageStream(anthropicRequest, accountManager) { - const model = anthropicRequest.model; - - // Retry loop with account failover - // Ensure we try at least as many times as there are accounts to cycle through everyone - // +1 to ensure we hit the "all accounts rate-limited" check at the start of the next loop - const maxAttempts = Math.max(MAX_RETRIES, accountManager.getAccountCount() + 1); - - for (let attempt = 0; attempt < maxAttempts; attempt++) { - // Use sticky account selection for cache continuity - const { account: stickyAccount, waitMs } = accountManager.pickStickyAccount(); - let account = stickyAccount; - - // Handle waiting for sticky account - if (!account && waitMs > 0) { - logger.info(`[CloudCode] Waiting ${formatDuration(waitMs)} for sticky account...`); - await sleep(waitMs); - accountManager.clearExpiredLimits(); - account = accountManager.getCurrentStickyAccount(); - } - - // Handle all accounts rate-limited - if (!account) { - if (accountManager.isAllRateLimited()) { - const allWaitMs = accountManager.getMinWaitTimeMs(); - const resetTime = new Date(Date.now() + allWaitMs).toISOString(); - - // If wait time is too long (> 2 minutes), throw error immediately - if (allWaitMs > MAX_WAIT_BEFORE_ERROR_MS) { - throw new Error( - `RESOURCE_EXHAUSTED: Rate limited. Quota will reset after ${formatDuration(allWaitMs)}. Next available: ${resetTime}` - ); - } - - // Wait for reset (applies to both single and multi-account modes) - const accountCount = accountManager.getAccountCount(); - logger.warn(`[CloudCode] All ${accountCount} account(s) rate-limited. Waiting ${formatDuration(allWaitMs)}...`); - await sleep(allWaitMs); - accountManager.clearExpiredLimits(); - account = accountManager.pickNext(); - } - - if (!account) { - throw new Error('No accounts available'); - } - } - - try { - // Get token and project for this account - const token = await accountManager.getTokenForAccount(account); - const project = await accountManager.getProjectForAccount(account, token); - const payload = buildCloudCodeRequest(anthropicRequest, project); - - logger.debug(`[CloudCode] Starting stream for model: ${model}`); - - // Try each endpoint for streaming - let lastError = null; - for (const endpoint of ANTIGRAVITY_ENDPOINT_FALLBACKS) { - try { - const url = `${endpoint}/v1internal:streamGenerateContent?alt=sse`; - - const response = await fetch(url, { - method: 'POST', - headers: buildHeaders(token, model, 'text/event-stream'), - body: JSON.stringify(payload) - }); - - if (!response.ok) { - const errorText = await response.text(); - logger.warn(`[CloudCode] Stream error at ${endpoint}: ${response.status} - ${errorText}`); - - if (response.status === 401) { - // Auth error - clear caches and retry - accountManager.clearTokenCache(account.email); - accountManager.clearProjectCache(account.email); - continue; - } - - if (response.status === 429) { - // Rate limited on this endpoint - try next endpoint first (DAILY → PROD) - logger.debug(`[CloudCode] Stream rate limited at ${endpoint}, trying next endpoint...`); - const resetMs = parseResetTime(response, errorText); - // Keep minimum reset time across all 429 responses - if (!lastError?.is429 || (resetMs && (!lastError.resetMs || resetMs < lastError.resetMs))) { - lastError = { is429: true, response, errorText, resetMs }; - } - continue; - } - - lastError = new Error(`API error ${response.status}: ${errorText}`); - - // If it's a 5xx error, wait a bit before trying the next endpoint - if (response.status >= 500) { - logger.warn(`[CloudCode] ${response.status} stream error, waiting 1s before retry...`); - await sleep(1000); - } - - continue; - } - - // Stream the response - yield events as they arrive - yield* streamSSEResponse(response, anthropicRequest.model); - - logger.debug('[CloudCode] Stream completed'); - return; - - } catch (endpointError) { - if (is429Error(endpointError)) { - throw endpointError; // Re-throw to trigger account switch - } - logger.warn(`[CloudCode] Stream error at ${endpoint}:`, endpointError.message); - lastError = endpointError; - } - } - - // If all endpoints failed for this account - if (lastError) { - // If all endpoints returned 429, mark account as rate-limited - if (lastError.is429) { - logger.warn(`[CloudCode] All endpoints rate-limited for ${account.email}`); - accountManager.markRateLimited(account.email, lastError.resetMs); - throw new Error(`Rate limited: ${lastError.errorText}`); - } - throw lastError; - } - - } catch (error) { - if (is429Error(error)) { - // Rate limited - already marked, continue to next account - logger.info(`[CloudCode] Account ${account.email} rate-limited, trying next...`); - continue; - } - if (isAuthInvalidError(error)) { - // Auth invalid - already marked, continue to next account - logger.warn(`[CloudCode] Account ${account.email} has invalid credentials, trying next...`); - continue; - } - // Non-rate-limit error: throw immediately - // UNLESS it's a 500 error, then we treat it as a "soft" failure for this account and try the next one - if (error.message.includes('API error 5') || error.message.includes('500') || error.message.includes('503')) { - logger.warn(`[CloudCode] Account ${account.email} failed with 5xx stream error, trying next...`); - accountManager.pickNext(); // Force advance to next account - continue; - } - - throw error; - } - } - - throw new Error('Max retries exceeded'); -} - -/** - * Stream SSE response and yield Anthropic-format events - */ -async function* streamSSEResponse(response, originalModel) { - const messageId = `msg_${crypto.randomBytes(16).toString('hex')}`; - let hasEmittedStart = false; - let blockIndex = 0; - let currentBlockType = null; - let currentThinkingSignature = ''; - let inputTokens = 0; - let outputTokens = 0; - let cacheReadTokens = 0; - let stopReason = 'end_turn'; - - const reader = response.body.getReader(); - const decoder = new TextDecoder(); - let buffer = ''; - - while (true) { - const { done, value } = await reader.read(); - if (done) break; - - buffer += decoder.decode(value, { stream: true }); - const lines = buffer.split('\n'); - buffer = lines.pop() || ''; - - for (const line of lines) { - if (!line.startsWith('data:')) continue; - - const jsonText = line.slice(5).trim(); - if (!jsonText) continue; - - try { - const data = JSON.parse(jsonText); - const innerResponse = data.response || data; - - // Extract usage metadata (including cache tokens) - const usage = innerResponse.usageMetadata; - if (usage) { - inputTokens = usage.promptTokenCount || inputTokens; - outputTokens = usage.candidatesTokenCount || outputTokens; - cacheReadTokens = usage.cachedContentTokenCount || cacheReadTokens; - } - - const candidates = innerResponse.candidates || []; - const firstCandidate = candidates[0] || {}; - const content = firstCandidate.content || {}; - const parts = content.parts || []; - - // Emit message_start on first data - // Note: input_tokens = promptTokenCount - cachedContentTokenCount (Antigravity includes cached in total) - if (!hasEmittedStart && parts.length > 0) { - hasEmittedStart = true; - yield { - type: 'message_start', - message: { - id: messageId, - type: 'message', - role: 'assistant', - content: [], - model: originalModel, - stop_reason: null, - stop_sequence: null, - usage: { - input_tokens: inputTokens - cacheReadTokens, - output_tokens: 0, - cache_read_input_tokens: cacheReadTokens, - cache_creation_input_tokens: 0 - } - } - }; - } - - // Process each part - for (const part of parts) { - if (part.thought === true) { - // Handle thinking block - const text = part.text || ''; - const signature = part.thoughtSignature || ''; - - if (currentBlockType !== 'thinking') { - if (currentBlockType !== null) { - yield { type: 'content_block_stop', index: blockIndex }; - blockIndex++; - } - currentBlockType = 'thinking'; - currentThinkingSignature = ''; - yield { - type: 'content_block_start', - index: blockIndex, - content_block: { type: 'thinking', thinking: '' } - }; - } - - if (signature && signature.length >= MIN_SIGNATURE_LENGTH) { - currentThinkingSignature = signature; - } - - yield { - type: 'content_block_delta', - index: blockIndex, - delta: { type: 'thinking_delta', thinking: text } - }; - - } else if (part.text !== undefined) { - // Skip empty text parts - if (!part.text || part.text.trim().length === 0) { - continue; - } - - // Handle regular text - if (currentBlockType !== 'text') { - if (currentBlockType === 'thinking' && currentThinkingSignature) { - yield { - type: 'content_block_delta', - index: blockIndex, - delta: { type: 'signature_delta', signature: currentThinkingSignature } - }; - currentThinkingSignature = ''; - } - if (currentBlockType !== null) { - yield { type: 'content_block_stop', index: blockIndex }; - blockIndex++; - } - currentBlockType = 'text'; - yield { - type: 'content_block_start', - index: blockIndex, - content_block: { type: 'text', text: '' } - }; - } - - yield { - type: 'content_block_delta', - index: blockIndex, - delta: { type: 'text_delta', text: part.text } - }; - - } else if (part.functionCall) { - // Handle tool use - // For Gemini 3+, capture thoughtSignature from the functionCall part - // The signature is a sibling to functionCall, not inside it - const functionCallSignature = part.thoughtSignature || ''; - - if (currentBlockType === 'thinking' && currentThinkingSignature) { - yield { - type: 'content_block_delta', - index: blockIndex, - delta: { type: 'signature_delta', signature: currentThinkingSignature } - }; - currentThinkingSignature = ''; - } - if (currentBlockType !== null) { - yield { type: 'content_block_stop', index: blockIndex }; - blockIndex++; - } - currentBlockType = 'tool_use'; - stopReason = 'tool_use'; - - const toolId = part.functionCall.id || `toolu_${crypto.randomBytes(12).toString('hex')}`; - - // For Gemini, include the thoughtSignature in the tool_use block - // so it can be sent back in subsequent requests - const toolUseBlock = { - type: 'tool_use', - id: toolId, - name: part.functionCall.name, - input: {} - }; - - // Store the signature in the tool_use block for later retrieval - if (functionCallSignature && functionCallSignature.length >= MIN_SIGNATURE_LENGTH) { - toolUseBlock.thoughtSignature = functionCallSignature; - // Cache for future requests (Claude Code may strip this field) - cacheSignature(toolId, functionCallSignature); - } - - yield { - type: 'content_block_start', - index: blockIndex, - content_block: toolUseBlock - }; - - yield { - type: 'content_block_delta', - index: blockIndex, - delta: { - type: 'input_json_delta', - partial_json: JSON.stringify(part.functionCall.args || {}) - } - }; - } - } - - // Check finish reason - if (firstCandidate.finishReason) { - if (firstCandidate.finishReason === 'MAX_TOKENS') { - stopReason = 'max_tokens'; - } else if (firstCandidate.finishReason === 'STOP') { - stopReason = 'end_turn'; - } - } - - } catch (parseError) { - logger.warn('[CloudCode] SSE parse error:', parseError.message); - } - } - } - - // Handle no content received - if (!hasEmittedStart) { - logger.warn('[CloudCode] No content parts received, emitting empty message'); - yield { - type: 'message_start', - message: { - id: messageId, - type: 'message', - role: 'assistant', - content: [], - model: originalModel, - stop_reason: null, - stop_sequence: null, - usage: { - input_tokens: inputTokens - cacheReadTokens, - output_tokens: 0, - cache_read_input_tokens: cacheReadTokens, - cache_creation_input_tokens: 0 - } - } - }; - - yield { - type: 'content_block_start', - index: 0, - content_block: { type: 'text', text: '' } - }; - yield { - type: 'content_block_delta', - index: 0, - delta: { type: 'text_delta', text: '[No response received from API]' } - }; - yield { type: 'content_block_stop', index: 0 }; - } else { - // Close any open block - if (currentBlockType !== null) { - if (currentBlockType === 'thinking' && currentThinkingSignature) { - yield { - type: 'content_block_delta', - index: blockIndex, - delta: { type: 'signature_delta', signature: currentThinkingSignature } - }; - } - yield { type: 'content_block_stop', index: blockIndex }; - } - } - - // Emit message_delta and message_stop - yield { - type: 'message_delta', - delta: { stop_reason: stopReason, stop_sequence: null }, - usage: { - output_tokens: outputTokens, - cache_read_input_tokens: cacheReadTokens, - cache_creation_input_tokens: 0 - } - }; - - yield { type: 'message_stop' }; -} - -/** - * List available models in Anthropic API format - * Fetches models dynamically from the Cloud Code API - * - * @param {string} token - OAuth access token - * @returns {Promise<{object: string, data: Array<{id: string, object: string, created: number, owned_by: string, description: string}>}>} List of available models - */ -export async function listModels(token) { - const data = await fetchAvailableModels(token); - if (!data || !data.models) { - return { object: 'list', data: [] }; - } - - const modelList = Object.entries(data.models).map(([modelId, modelData]) => ({ - id: modelId, - object: 'model', - created: Math.floor(Date.now() / 1000), - owned_by: 'anthropic', - description: modelData.displayName || modelId - })); - - return { - object: 'list', - data: modelList - }; -} - -/** - * Fetch available models with quota info from Cloud Code API - * Returns model quotas including remaining fraction and reset time - * - * @param {string} token - OAuth access token - * @returns {Promise} Raw response from fetchAvailableModels API - */ -export async function fetchAvailableModels(token) { - const headers = { - 'Authorization': `Bearer ${token}`, - 'Content-Type': 'application/json', - ...ANTIGRAVITY_HEADERS - }; - - for (const endpoint of ANTIGRAVITY_ENDPOINT_FALLBACKS) { - try { - const url = `${endpoint}/v1internal:fetchAvailableModels`; - const response = await fetch(url, { - method: 'POST', - headers, - body: JSON.stringify({}) - }); - - if (!response.ok) { - const errorText = await response.text(); - logger.warn(`[CloudCode] fetchAvailableModels error at ${endpoint}: ${response.status}`); - continue; - } - - return await response.json(); - } catch (error) { - logger.warn(`[CloudCode] fetchAvailableModels failed at ${endpoint}:`, error.message); - } - } - - throw new Error('Failed to fetch available models from all endpoints'); -} - -/** - * Get model quotas for an account - * Extracts quota info (remaining fraction and reset time) for each model - * - * @param {string} token - OAuth access token - * @returns {Promise} Map of modelId -> { remainingFraction, resetTime } - */ -export async function getModelQuotas(token) { - const data = await fetchAvailableModels(token); - if (!data || !data.models) return {}; - - const quotas = {}; - for (const [modelId, modelData] of Object.entries(data.models)) { - if (modelData.quotaInfo) { - quotas[modelId] = { - remainingFraction: modelData.quotaInfo.remainingFraction ?? null, - resetTime: modelData.quotaInfo.resetTime ?? null - }; - } - } - - return quotas; -} - -export default { - sendMessage, - sendMessageStream, - listModels, - fetchAvailableModels, - getModelQuotas -}; diff --git a/src/cloudcode/index.js b/src/cloudcode/index.js new file mode 100644 index 0000000..57898fb --- /dev/null +++ b/src/cloudcode/index.js @@ -0,0 +1,28 @@ +/** + * Cloud Code Client for Antigravity + * + * Communicates with Google's Cloud Code internal API using the + * v1internal:streamGenerateContent endpoint with proper request wrapping. + * + * Supports multi-account load balancing with automatic failover. + * + * Based on: https://github.com/NoeFabris/opencode-antigravity-auth + */ + +// Re-export public API +export { sendMessage } from './message-handler.js'; +export { sendMessageStream } from './streaming-handler.js'; +export { listModels, fetchAvailableModels, getModelQuotas } from './model-api.js'; + +// Default export for backwards compatibility +import { sendMessage } from './message-handler.js'; +import { sendMessageStream } from './streaming-handler.js'; +import { listModels, fetchAvailableModels, getModelQuotas } from './model-api.js'; + +export default { + sendMessage, + sendMessageStream, + listModels, + fetchAvailableModels, + getModelQuotas +}; diff --git a/src/cloudcode/message-handler.js b/src/cloudcode/message-handler.js new file mode 100644 index 0000000..9d64c17 --- /dev/null +++ b/src/cloudcode/message-handler.js @@ -0,0 +1,209 @@ +/** + * Message Handler for Cloud Code + * + * Handles non-streaming message requests with multi-account support, + * retry logic, and endpoint failover. + */ + +import { + ANTIGRAVITY_ENDPOINT_FALLBACKS, + MAX_RETRIES, + MAX_WAIT_BEFORE_ERROR_MS, + isThinkingModel +} from '../constants.js'; +import { convertGoogleToAnthropic } from '../format/index.js'; +import { isRateLimitError, isAuthError } from '../errors.js'; +import { formatDuration, sleep } from '../utils/helpers.js'; +import { logger } from '../utils/logger.js'; +import { parseResetTime } from './rate-limit-parser.js'; +import { buildCloudCodeRequest, buildHeaders } from './request-builder.js'; +import { parseThinkingSSEResponse } from './sse-parser.js'; + +/** + * Check if an error is a rate limit error (429 or RESOURCE_EXHAUSTED) + * @deprecated Use isRateLimitError from errors.js instead + */ +function is429Error(error) { + return isRateLimitError(error); +} + +/** + * Check if an error is an auth-invalid error (credentials need re-authentication) + * @deprecated Use isAuthError from errors.js instead + */ +function isAuthInvalidError(error) { + return isAuthError(error); +} + +/** + * Send a non-streaming request to Cloud Code with multi-account support + * Uses SSE endpoint for thinking models (non-streaming doesn't return thinking blocks) + * + * @param {Object} anthropicRequest - The Anthropic-format request + * @param {Object} anthropicRequest.model - Model name to use + * @param {Array} anthropicRequest.messages - Array of message objects + * @param {number} [anthropicRequest.max_tokens] - Maximum tokens to generate + * @param {Object} [anthropicRequest.thinking] - Thinking configuration + * @param {import('../account-manager/index.js').default} accountManager - The account manager instance + * @returns {Promise} Anthropic-format response object + * @throws {Error} If max retries exceeded or no accounts available + */ +export async function sendMessage(anthropicRequest, accountManager) { + const model = anthropicRequest.model; + const isThinking = isThinkingModel(model); + + // Retry loop with account failover + // Ensure we try at least as many times as there are accounts to cycle through everyone + // +1 to ensure we hit the "all accounts rate-limited" check at the start of the next loop + const maxAttempts = Math.max(MAX_RETRIES, accountManager.getAccountCount() + 1); + + for (let attempt = 0; attempt < maxAttempts; attempt++) { + // Use sticky account selection for cache continuity + const { account: stickyAccount, waitMs } = accountManager.pickStickyAccount(); + let account = stickyAccount; + + // Handle waiting for sticky account + if (!account && waitMs > 0) { + logger.info(`[CloudCode] Waiting ${formatDuration(waitMs)} for sticky account...`); + await sleep(waitMs); + accountManager.clearExpiredLimits(); + account = accountManager.getCurrentStickyAccount(); + } + + // Handle all accounts rate-limited + if (!account) { + if (accountManager.isAllRateLimited()) { + const allWaitMs = accountManager.getMinWaitTimeMs(); + const resetTime = new Date(Date.now() + allWaitMs).toISOString(); + + // If wait time is too long (> 2 minutes), throw error immediately + if (allWaitMs > MAX_WAIT_BEFORE_ERROR_MS) { + throw new Error( + `RESOURCE_EXHAUSTED: Rate limited. Quota will reset after ${formatDuration(allWaitMs)}. Next available: ${resetTime}` + ); + } + + // Wait for reset (applies to both single and multi-account modes) + const accountCount = accountManager.getAccountCount(); + logger.warn(`[CloudCode] All ${accountCount} account(s) rate-limited. Waiting ${formatDuration(allWaitMs)}...`); + await sleep(allWaitMs); + accountManager.clearExpiredLimits(); + account = accountManager.pickNext(); + } + + if (!account) { + throw new Error('No accounts available'); + } + } + + try { + // Get token and project for this account + const token = await accountManager.getTokenForAccount(account); + const project = await accountManager.getProjectForAccount(account, token); + const payload = buildCloudCodeRequest(anthropicRequest, project); + + logger.debug(`[CloudCode] Sending request for model: ${model}`); + + // Try each endpoint + let lastError = null; + for (const endpoint of ANTIGRAVITY_ENDPOINT_FALLBACKS) { + try { + const url = isThinking + ? `${endpoint}/v1internal:streamGenerateContent?alt=sse` + : `${endpoint}/v1internal:generateContent`; + + const response = await fetch(url, { + method: 'POST', + headers: buildHeaders(token, model, isThinking ? 'text/event-stream' : 'application/json'), + body: JSON.stringify(payload) + }); + + if (!response.ok) { + const errorText = await response.text(); + logger.warn(`[CloudCode] Error at ${endpoint}: ${response.status} - ${errorText}`); + + if (response.status === 401) { + // Auth error - clear caches and retry with fresh token + logger.warn('[CloudCode] Auth error, refreshing token...'); + accountManager.clearTokenCache(account.email); + accountManager.clearProjectCache(account.email); + continue; + } + + if (response.status === 429) { + // Rate limited on this endpoint - try next endpoint first (DAILY → PROD) + logger.debug(`[CloudCode] Rate limited at ${endpoint}, trying next endpoint...`); + const resetMs = parseResetTime(response, errorText); + // Keep minimum reset time across all 429 responses + if (!lastError?.is429 || (resetMs && (!lastError.resetMs || resetMs < lastError.resetMs))) { + lastError = { is429: true, response, errorText, resetMs }; + } + continue; + } + + if (response.status >= 400) { + lastError = new Error(`API error ${response.status}: ${errorText}`); + // If it's a 5xx error, wait a bit before trying the next endpoint + if (response.status >= 500) { + logger.warn(`[CloudCode] ${response.status} error, waiting 1s before retry...`); + await sleep(1000); + } + continue; + } + } + + // For thinking models, parse SSE and accumulate all parts + if (isThinking) { + return await parseThinkingSSEResponse(response, anthropicRequest.model); + } + + // Non-thinking models use regular JSON + const data = await response.json(); + logger.debug('[CloudCode] Response received'); + return convertGoogleToAnthropic(data, anthropicRequest.model); + + } catch (endpointError) { + if (is429Error(endpointError)) { + throw endpointError; // Re-throw to trigger account switch + } + logger.warn(`[CloudCode] Error at ${endpoint}:`, endpointError.message); + lastError = endpointError; + } + } + + // If all endpoints failed for this account + if (lastError) { + // If all endpoints returned 429, mark account as rate-limited + if (lastError.is429) { + logger.warn(`[CloudCode] All endpoints rate-limited for ${account.email}`); + accountManager.markRateLimited(account.email, lastError.resetMs); + throw new Error(`Rate limited: ${lastError.errorText}`); + } + throw lastError; + } + + } catch (error) { + if (is429Error(error)) { + // Rate limited - already marked, continue to next account + logger.info(`[CloudCode] Account ${account.email} rate-limited, trying next...`); + continue; + } + if (isAuthInvalidError(error)) { + // Auth invalid - already marked, continue to next account + logger.warn(`[CloudCode] Account ${account.email} has invalid credentials, trying next...`); + continue; + } + // Non-rate-limit error: throw immediately + // UNLESS it's a 500 error, then we treat it as a "soft" failure for this account and try the next one + if (error.message.includes('API error 5') || error.message.includes('500') || error.message.includes('503')) { + logger.warn(`[CloudCode] Account ${account.email} failed with 5xx error, trying next...`); + accountManager.pickNext(); // Force advance to next account + continue; + } + + throw error; + } + } + + throw new Error('Max retries exceeded'); +} diff --git a/src/cloudcode/model-api.js b/src/cloudcode/model-api.js new file mode 100644 index 0000000..06ce4c1 --- /dev/null +++ b/src/cloudcode/model-api.js @@ -0,0 +1,97 @@ +/** + * Model API for Cloud Code + * + * Handles model listing and quota retrieval from the Cloud Code API. + */ + +import { ANTIGRAVITY_ENDPOINT_FALLBACKS, ANTIGRAVITY_HEADERS } from '../constants.js'; +import { logger } from '../utils/logger.js'; + +/** + * List available models in Anthropic API format + * Fetches models dynamically from the Cloud Code API + * + * @param {string} token - OAuth access token + * @returns {Promise<{object: string, data: Array<{id: string, object: string, created: number, owned_by: string, description: string}>}>} List of available models + */ +export async function listModels(token) { + const data = await fetchAvailableModels(token); + if (!data || !data.models) { + return { object: 'list', data: [] }; + } + + const modelList = Object.entries(data.models).map(([modelId, modelData]) => ({ + id: modelId, + object: 'model', + created: Math.floor(Date.now() / 1000), + owned_by: 'anthropic', + description: modelData.displayName || modelId + })); + + return { + object: 'list', + data: modelList + }; +} + +/** + * Fetch available models with quota info from Cloud Code API + * Returns model quotas including remaining fraction and reset time + * + * @param {string} token - OAuth access token + * @returns {Promise} Raw response from fetchAvailableModels API + */ +export async function fetchAvailableModels(token) { + const headers = { + 'Authorization': `Bearer ${token}`, + 'Content-Type': 'application/json', + ...ANTIGRAVITY_HEADERS + }; + + for (const endpoint of ANTIGRAVITY_ENDPOINT_FALLBACKS) { + try { + const url = `${endpoint}/v1internal:fetchAvailableModels`; + const response = await fetch(url, { + method: 'POST', + headers, + body: JSON.stringify({}) + }); + + if (!response.ok) { + const errorText = await response.text(); + logger.warn(`[CloudCode] fetchAvailableModels error at ${endpoint}: ${response.status}`); + continue; + } + + return await response.json(); + } catch (error) { + logger.warn(`[CloudCode] fetchAvailableModels failed at ${endpoint}:`, error.message); + } + } + + throw new Error('Failed to fetch available models from all endpoints'); +} + +/** + * Get model quotas for an account + * Extracts quota info (remaining fraction and reset time) for each model + * + * @param {string} token - OAuth access token + * @returns {Promise} Map of modelId -> { remainingFraction, resetTime } + */ +export async function getModelQuotas(token) { + const data = await fetchAvailableModels(token); + if (!data || !data.models) return {}; + + const quotas = {}; + for (const [modelId, modelData] of Object.entries(data.models)) { + if (modelData.quotaInfo) { + quotas[modelId] = { + remainingFraction: modelData.quotaInfo.remainingFraction ?? null, + resetTime: modelData.quotaInfo.resetTime ?? null + }; + } + } + + return quotas; +} diff --git a/src/cloudcode/rate-limit-parser.js b/src/cloudcode/rate-limit-parser.js new file mode 100644 index 0000000..6e53b15 --- /dev/null +++ b/src/cloudcode/rate-limit-parser.js @@ -0,0 +1,181 @@ +/** + * Rate Limit Parser for Cloud Code + * + * Parses reset times from HTTP headers and error messages. + * Supports various formats: Retry-After, x-ratelimit-reset, + * quotaResetDelay, quotaResetTimeStamp, and duration strings. + */ + +import { formatDuration } from '../utils/helpers.js'; +import { logger } from '../utils/logger.js'; + +/** + * Parse reset time from HTTP response or error + * Checks headers first, then error message body + * Returns milliseconds or null if not found + * + * @param {Response|Error} responseOrError - HTTP Response object or Error + * @param {string} errorText - Optional error body text + */ +export function parseResetTime(responseOrError, errorText = '') { + let resetMs = null; + + // If it's a Response object, check headers first + if (responseOrError && typeof responseOrError.headers?.get === 'function') { + const headers = responseOrError.headers; + + // Standard Retry-After header (seconds or HTTP date) + const retryAfter = headers.get('retry-after'); + if (retryAfter) { + const seconds = parseInt(retryAfter, 10); + if (!isNaN(seconds)) { + resetMs = seconds * 1000; + logger.debug(`[CloudCode] Retry-After header: ${seconds}s`); + } else { + // Try parsing as HTTP date + const date = new Date(retryAfter); + if (!isNaN(date.getTime())) { + resetMs = date.getTime() - Date.now(); + if (resetMs > 0) { + logger.debug(`[CloudCode] Retry-After date: ${retryAfter}`); + } else { + resetMs = null; + } + } + } + } + + // x-ratelimit-reset (Unix timestamp in seconds) + if (!resetMs) { + const ratelimitReset = headers.get('x-ratelimit-reset'); + if (ratelimitReset) { + const resetTimestamp = parseInt(ratelimitReset, 10) * 1000; + resetMs = resetTimestamp - Date.now(); + if (resetMs > 0) { + logger.debug(`[CloudCode] x-ratelimit-reset: ${new Date(resetTimestamp).toISOString()}`); + } else { + resetMs = null; + } + } + } + + // x-ratelimit-reset-after (seconds) + if (!resetMs) { + const resetAfter = headers.get('x-ratelimit-reset-after'); + if (resetAfter) { + const seconds = parseInt(resetAfter, 10); + if (!isNaN(seconds) && seconds > 0) { + resetMs = seconds * 1000; + logger.debug(`[CloudCode] x-ratelimit-reset-after: ${seconds}s`); + } + } + } + } + + // If no header found, try parsing from error message/body + if (!resetMs) { + const msg = (responseOrError instanceof Error ? responseOrError.message : errorText) || ''; + + // Try to extract "quotaResetDelay" first (e.g. "754.431528ms" or "1.5s") + // This is Google's preferred format for rate limit reset delay + const quotaDelayMatch = msg.match(/quotaResetDelay[:\s"]+(\\d+(?:\\.\\d+)?)(ms|s)/i); + if (quotaDelayMatch) { + const value = parseFloat(quotaDelayMatch[1]); + const unit = quotaDelayMatch[2].toLowerCase(); + resetMs = unit === 's' ? Math.ceil(value * 1000) : Math.ceil(value); + logger.debug(`[CloudCode] Parsed quotaResetDelay from body: ${resetMs}ms`); + } + + // Try to extract "quotaResetTimeStamp" (ISO format like "2025-12-31T07:00:47Z") + if (!resetMs) { + const quotaTimestampMatch = msg.match(/quotaResetTimeStamp[:\s"]+(\d{4}-\d{2}-\d{2}T[\d:.]+Z?)/i); + if (quotaTimestampMatch) { + const resetTime = new Date(quotaTimestampMatch[1]).getTime(); + if (!isNaN(resetTime)) { + resetMs = resetTime - Date.now(); + // Even if expired or 0, we found a timestamp, so rely on it. + // But if it's negative, it means "now", so treat as small wait. + logger.debug(`[CloudCode] Parsed quotaResetTimeStamp: ${quotaTimestampMatch[1]} (Delta: ${resetMs}ms)`); + } + } + } + + // Try to extract "retry-after-ms" or "retryDelay" - check seconds format first (e.g. "7739.23s") + // Added stricter regex to avoid partial matches + if (!resetMs) { + const secMatch = msg.match(/(?:retry[-_]?after[-_]?ms|retryDelay)[:\s"]+([\\d\\.]+)(?:s\b|s")/i); + if (secMatch) { + resetMs = Math.ceil(parseFloat(secMatch[1]) * 1000); + logger.debug(`[CloudCode] Parsed retry seconds from body (precise): ${resetMs}ms`); + } + } + + if (!resetMs) { + // Check for ms (explicit "ms" suffix or implicit if no suffix) + const msMatch = msg.match(/(?:retry[-_]?after[-_]?ms|retryDelay)[:\s"]+(\d+)(?:\s*ms)?(?![\w.])/i); + if (msMatch) { + resetMs = parseInt(msMatch[1], 10); + logger.debug(`[CloudCode] Parsed retry-after-ms from body: ${resetMs}ms`); + } + } + + // Try to extract seconds value like "retry after 60 seconds" + if (!resetMs) { + const secMatch = msg.match(/retry\s+(?:after\s+)?(\d+)\s*(?:sec|s\b)/i); + if (secMatch) { + resetMs = parseInt(secMatch[1], 10) * 1000; + logger.debug(`[CloudCode] Parsed retry seconds from body: ${secMatch[1]}s`); + } + } + + // Try to extract duration like "1h23m45s" or "23m45s" or "45s" + if (!resetMs) { + const durationMatch = msg.match(/(\d+)h(\d+)m(\d+)s|(\d+)m(\d+)s|(\d+)s/i); + if (durationMatch) { + if (durationMatch[1]) { + const hours = parseInt(durationMatch[1], 10); + const minutes = parseInt(durationMatch[2], 10); + const seconds = parseInt(durationMatch[3], 10); + resetMs = (hours * 3600 + minutes * 60 + seconds) * 1000; + } else if (durationMatch[4]) { + const minutes = parseInt(durationMatch[4], 10); + const seconds = parseInt(durationMatch[5], 10); + resetMs = (minutes * 60 + seconds) * 1000; + } else if (durationMatch[6]) { + resetMs = parseInt(durationMatch[6], 10) * 1000; + } + if (resetMs) { + logger.debug(`[CloudCode] Parsed duration from body: ${formatDuration(resetMs)}`); + } + } + } + + // Try to extract ISO timestamp or Unix timestamp + if (!resetMs) { + const isoMatch = msg.match(/reset[:\s"]+(\d{4}-\d{2}-\d{2}T[\d:.]+Z?)/i); + if (isoMatch) { + const resetTime = new Date(isoMatch[1]).getTime(); + if (!isNaN(resetTime)) { + resetMs = resetTime - Date.now(); + if (resetMs > 0) { + logger.debug(`[CloudCode] Parsed ISO reset time: ${isoMatch[1]}`); + } else { + resetMs = null; + } + } + } + } + } + + // SANITY CHECK: Enforce strict minimums for found rate limits + // If we found a reset time, but it's very small (e.g. < 1s) or negative, + // explicitly bump it up to avoid "Available in 0s" loops. + if (resetMs !== null) { + if (resetMs < 1000) { + logger.debug(`[CloudCode] Reset time too small (${resetMs}ms), enforcing 2s buffer`); + resetMs = 2000; + } + } + + return resetMs; +} diff --git a/src/cloudcode/request-builder.js b/src/cloudcode/request-builder.js new file mode 100644 index 0000000..09adec9 --- /dev/null +++ b/src/cloudcode/request-builder.js @@ -0,0 +1,68 @@ +/** + * Request Builder for Cloud Code + * + * Builds request payloads and headers for the Cloud Code API. + */ + +import crypto from 'crypto'; +import { + ANTIGRAVITY_HEADERS, + getModelFamily, + isThinkingModel +} from '../constants.js'; +import { convertAnthropicToGoogle } from '../format/index.js'; +import { deriveSessionId } from './session-manager.js'; + +/** + * Build the wrapped request body for Cloud Code API + * + * @param {Object} anthropicRequest - The Anthropic-format request + * @param {string} projectId - The project ID to use + * @returns {Object} The Cloud Code API request payload + */ +export function buildCloudCodeRequest(anthropicRequest, projectId) { + const model = anthropicRequest.model; + const googleRequest = convertAnthropicToGoogle(anthropicRequest); + + // Use stable session ID derived from first user message for cache continuity + googleRequest.sessionId = deriveSessionId(anthropicRequest); + + const payload = { + project: projectId, + model: model, + request: googleRequest, + userAgent: 'antigravity', + requestId: 'agent-' + crypto.randomUUID() + }; + + return payload; +} + +/** + * Build headers for Cloud Code API requests + * + * @param {string} token - OAuth access token + * @param {string} model - Model name + * @param {string} accept - Accept header value (default: 'application/json') + * @returns {Object} Headers object + */ +export function buildHeaders(token, model, accept = 'application/json') { + const headers = { + 'Authorization': `Bearer ${token}`, + 'Content-Type': 'application/json', + ...ANTIGRAVITY_HEADERS + }; + + const modelFamily = getModelFamily(model); + + // Add interleaved thinking header only for Claude thinking models + if (modelFamily === 'claude' && isThinkingModel(model)) { + headers['anthropic-beta'] = 'interleaved-thinking-2025-05-14'; + } + + if (accept !== 'application/json') { + headers['Accept'] = accept; + } + + return headers; +} diff --git a/src/cloudcode/session-manager.js b/src/cloudcode/session-manager.js new file mode 100644 index 0000000..621b6ed --- /dev/null +++ b/src/cloudcode/session-manager.js @@ -0,0 +1,47 @@ +/** + * Session Management for Cloud Code + * + * Handles session ID derivation for prompt caching continuity. + * Session IDs are derived from the first user message to ensure + * the same conversation uses the same session across turns. + */ + +import crypto from 'crypto'; + +/** + * Derive a stable session ID from the first user message in the conversation. + * This ensures the same conversation uses the same session ID across turns, + * enabling prompt caching (cache is scoped to session + organization). + * + * @param {Object} anthropicRequest - The Anthropic-format request + * @returns {string} A stable session ID (32 hex characters) or random UUID if no user message + */ +export function deriveSessionId(anthropicRequest) { + const messages = anthropicRequest.messages || []; + + // Find the first user message + for (const msg of messages) { + if (msg.role === 'user') { + let content = ''; + + if (typeof msg.content === 'string') { + content = msg.content; + } else if (Array.isArray(msg.content)) { + // Extract text from content blocks + content = msg.content + .filter(block => block.type === 'text' && block.text) + .map(block => block.text) + .join('\n'); + } + + if (content) { + // Hash the content with SHA256, return first 32 hex chars + const hash = crypto.createHash('sha256').update(content).digest('hex'); + return hash.substring(0, 32); + } + } + } + + // Fallback to random UUID if no user message found + return crypto.randomUUID(); +} diff --git a/src/cloudcode/sse-parser.js b/src/cloudcode/sse-parser.js new file mode 100644 index 0000000..f1b2551 --- /dev/null +++ b/src/cloudcode/sse-parser.js @@ -0,0 +1,116 @@ +/** + * SSE Parser for Cloud Code + * + * Parses SSE responses for non-streaming thinking models. + * Accumulates all parts and returns a single response. + */ + +import { convertGoogleToAnthropic } from '../format/index.js'; +import { logger } from '../utils/logger.js'; + +/** + * Parse SSE response for thinking models and accumulate all parts + * + * @param {Response} response - The HTTP response with SSE body + * @param {string} originalModel - The original model name + * @returns {Promise} Anthropic-format response object + */ +export async function parseThinkingSSEResponse(response, originalModel) { + let accumulatedThinkingText = ''; + let accumulatedThinkingSignature = ''; + let accumulatedText = ''; + const finalParts = []; + let usageMetadata = {}; + let finishReason = 'STOP'; + + const flushThinking = () => { + if (accumulatedThinkingText) { + finalParts.push({ + thought: true, + text: accumulatedThinkingText, + thoughtSignature: accumulatedThinkingSignature + }); + accumulatedThinkingText = ''; + accumulatedThinkingSignature = ''; + } + }; + + const flushText = () => { + if (accumulatedText) { + finalParts.push({ text: accumulatedText }); + accumulatedText = ''; + } + }; + + const reader = response.body.getReader(); + const decoder = new TextDecoder(); + let buffer = ''; + + while (true) { + const { done, value } = await reader.read(); + if (done) break; + + buffer += decoder.decode(value, { stream: true }); + const lines = buffer.split('\n'); + buffer = lines.pop() || ''; + + for (const line of lines) { + if (!line.startsWith('data:')) continue; + const jsonText = line.slice(5).trim(); + if (!jsonText) continue; + + try { + const data = JSON.parse(jsonText); + const innerResponse = data.response || data; + + if (innerResponse.usageMetadata) { + usageMetadata = innerResponse.usageMetadata; + } + + const candidates = innerResponse.candidates || []; + const firstCandidate = candidates[0] || {}; + if (firstCandidate.finishReason) { + finishReason = firstCandidate.finishReason; + } + + const parts = firstCandidate.content?.parts || []; + for (const part of parts) { + if (part.thought === true) { + flushText(); + accumulatedThinkingText += (part.text || ''); + if (part.thoughtSignature) { + accumulatedThinkingSignature = part.thoughtSignature; + } + } else if (part.functionCall) { + flushThinking(); + flushText(); + finalParts.push(part); + } else if (part.text !== undefined) { + if (!part.text) continue; + flushThinking(); + accumulatedText += part.text; + } + } + } catch (e) { + logger.debug('[CloudCode] SSE parse warning:', e.message, 'Raw:', jsonText.slice(0, 100)); + } + } + } + + flushThinking(); + flushText(); + + const accumulatedResponse = { + candidates: [{ content: { parts: finalParts }, finishReason }], + usageMetadata + }; + + const partTypes = finalParts.map(p => p.thought ? 'thought' : (p.functionCall ? 'functionCall' : 'text')); + logger.debug('[CloudCode] Response received (SSE), part types:', partTypes); + if (finalParts.some(p => p.thought)) { + const thinkingPart = finalParts.find(p => p.thought); + logger.debug('[CloudCode] Thinking signature length:', thinkingPart?.thoughtSignature?.length || 0); + } + + return convertGoogleToAnthropic(accumulatedResponse, originalModel); +} diff --git a/src/cloudcode/sse-streamer.js b/src/cloudcode/sse-streamer.js new file mode 100644 index 0000000..8c8974e --- /dev/null +++ b/src/cloudcode/sse-streamer.js @@ -0,0 +1,285 @@ +/** + * SSE Streamer for Cloud Code + * + * Streams SSE events in real-time, converting Google format to Anthropic format. + * Handles thinking blocks, text blocks, and tool use blocks. + */ + +import crypto from 'crypto'; +import { MIN_SIGNATURE_LENGTH } from '../constants.js'; +import { cacheSignature } from '../format/signature-cache.js'; +import { logger } from '../utils/logger.js'; + +/** + * Stream SSE response and yield Anthropic-format events + * + * @param {Response} response - The HTTP response with SSE body + * @param {string} originalModel - The original model name + * @yields {Object} Anthropic-format SSE events + */ +export async function* streamSSEResponse(response, originalModel) { + const messageId = `msg_${crypto.randomBytes(16).toString('hex')}`; + let hasEmittedStart = false; + let blockIndex = 0; + let currentBlockType = null; + let currentThinkingSignature = ''; + let inputTokens = 0; + let outputTokens = 0; + let cacheReadTokens = 0; + let stopReason = 'end_turn'; + + const reader = response.body.getReader(); + const decoder = new TextDecoder(); + let buffer = ''; + + while (true) { + const { done, value } = await reader.read(); + if (done) break; + + buffer += decoder.decode(value, { stream: true }); + const lines = buffer.split('\n'); + buffer = lines.pop() || ''; + + for (const line of lines) { + if (!line.startsWith('data:')) continue; + + const jsonText = line.slice(5).trim(); + if (!jsonText) continue; + + try { + const data = JSON.parse(jsonText); + const innerResponse = data.response || data; + + // Extract usage metadata (including cache tokens) + const usage = innerResponse.usageMetadata; + if (usage) { + inputTokens = usage.promptTokenCount || inputTokens; + outputTokens = usage.candidatesTokenCount || outputTokens; + cacheReadTokens = usage.cachedContentTokenCount || cacheReadTokens; + } + + const candidates = innerResponse.candidates || []; + const firstCandidate = candidates[0] || {}; + const content = firstCandidate.content || {}; + const parts = content.parts || []; + + // Emit message_start on first data + // Note: input_tokens = promptTokenCount - cachedContentTokenCount (Antigravity includes cached in total) + if (!hasEmittedStart && parts.length > 0) { + hasEmittedStart = true; + yield { + type: 'message_start', + message: { + id: messageId, + type: 'message', + role: 'assistant', + content: [], + model: originalModel, + stop_reason: null, + stop_sequence: null, + usage: { + input_tokens: inputTokens - cacheReadTokens, + output_tokens: 0, + cache_read_input_tokens: cacheReadTokens, + cache_creation_input_tokens: 0 + } + } + }; + } + + // Process each part + for (const part of parts) { + if (part.thought === true) { + // Handle thinking block + const text = part.text || ''; + const signature = part.thoughtSignature || ''; + + if (currentBlockType !== 'thinking') { + if (currentBlockType !== null) { + yield { type: 'content_block_stop', index: blockIndex }; + blockIndex++; + } + currentBlockType = 'thinking'; + currentThinkingSignature = ''; + yield { + type: 'content_block_start', + index: blockIndex, + content_block: { type: 'thinking', thinking: '' } + }; + } + + if (signature && signature.length >= MIN_SIGNATURE_LENGTH) { + currentThinkingSignature = signature; + } + + yield { + type: 'content_block_delta', + index: blockIndex, + delta: { type: 'thinking_delta', thinking: text } + }; + + } else if (part.text !== undefined) { + // Skip empty text parts + if (!part.text || part.text.trim().length === 0) { + continue; + } + + // Handle regular text + if (currentBlockType !== 'text') { + if (currentBlockType === 'thinking' && currentThinkingSignature) { + yield { + type: 'content_block_delta', + index: blockIndex, + delta: { type: 'signature_delta', signature: currentThinkingSignature } + }; + currentThinkingSignature = ''; + } + if (currentBlockType !== null) { + yield { type: 'content_block_stop', index: blockIndex }; + blockIndex++; + } + currentBlockType = 'text'; + yield { + type: 'content_block_start', + index: blockIndex, + content_block: { type: 'text', text: '' } + }; + } + + yield { + type: 'content_block_delta', + index: blockIndex, + delta: { type: 'text_delta', text: part.text } + }; + + } else if (part.functionCall) { + // Handle tool use + // For Gemini 3+, capture thoughtSignature from the functionCall part + // The signature is a sibling to functionCall, not inside it + const functionCallSignature = part.thoughtSignature || ''; + + if (currentBlockType === 'thinking' && currentThinkingSignature) { + yield { + type: 'content_block_delta', + index: blockIndex, + delta: { type: 'signature_delta', signature: currentThinkingSignature } + }; + currentThinkingSignature = ''; + } + if (currentBlockType !== null) { + yield { type: 'content_block_stop', index: blockIndex }; + blockIndex++; + } + currentBlockType = 'tool_use'; + stopReason = 'tool_use'; + + const toolId = part.functionCall.id || `toolu_${crypto.randomBytes(12).toString('hex')}`; + + // For Gemini, include the thoughtSignature in the tool_use block + // so it can be sent back in subsequent requests + const toolUseBlock = { + type: 'tool_use', + id: toolId, + name: part.functionCall.name, + input: {} + }; + + // Store the signature in the tool_use block for later retrieval + if (functionCallSignature && functionCallSignature.length >= MIN_SIGNATURE_LENGTH) { + toolUseBlock.thoughtSignature = functionCallSignature; + // Cache for future requests (Claude Code may strip this field) + cacheSignature(toolId, functionCallSignature); + } + + yield { + type: 'content_block_start', + index: blockIndex, + content_block: toolUseBlock + }; + + yield { + type: 'content_block_delta', + index: blockIndex, + delta: { + type: 'input_json_delta', + partial_json: JSON.stringify(part.functionCall.args || {}) + } + }; + } + } + + // Check finish reason + if (firstCandidate.finishReason) { + if (firstCandidate.finishReason === 'MAX_TOKENS') { + stopReason = 'max_tokens'; + } else if (firstCandidate.finishReason === 'STOP') { + stopReason = 'end_turn'; + } + } + + } catch (parseError) { + logger.warn('[CloudCode] SSE parse error:', parseError.message); + } + } + } + + // Handle no content received + if (!hasEmittedStart) { + logger.warn('[CloudCode] No content parts received, emitting empty message'); + yield { + type: 'message_start', + message: { + id: messageId, + type: 'message', + role: 'assistant', + content: [], + model: originalModel, + stop_reason: null, + stop_sequence: null, + usage: { + input_tokens: inputTokens - cacheReadTokens, + output_tokens: 0, + cache_read_input_tokens: cacheReadTokens, + cache_creation_input_tokens: 0 + } + } + }; + + yield { + type: 'content_block_start', + index: 0, + content_block: { type: 'text', text: '' } + }; + yield { + type: 'content_block_delta', + index: 0, + delta: { type: 'text_delta', text: '[No response received from API]' } + }; + yield { type: 'content_block_stop', index: 0 }; + } else { + // Close any open block + if (currentBlockType !== null) { + if (currentBlockType === 'thinking' && currentThinkingSignature) { + yield { + type: 'content_block_delta', + index: blockIndex, + delta: { type: 'signature_delta', signature: currentThinkingSignature } + }; + } + yield { type: 'content_block_stop', index: blockIndex }; + } + } + + // Emit message_delta and message_stop + yield { + type: 'message_delta', + delta: { stop_reason: stopReason, stop_sequence: null }, + usage: { + output_tokens: outputTokens, + cache_read_input_tokens: cacheReadTokens, + cache_creation_input_tokens: 0 + } + }; + + yield { type: 'message_stop' }; +} diff --git a/src/cloudcode/streaming-handler.js b/src/cloudcode/streaming-handler.js new file mode 100644 index 0000000..06ab484 --- /dev/null +++ b/src/cloudcode/streaming-handler.js @@ -0,0 +1,199 @@ +/** + * Streaming Handler for Cloud Code + * + * Handles streaming message requests with multi-account support, + * retry logic, and endpoint failover. + */ + +import { + ANTIGRAVITY_ENDPOINT_FALLBACKS, + MAX_RETRIES, + MAX_WAIT_BEFORE_ERROR_MS +} from '../constants.js'; +import { isRateLimitError, isAuthError } from '../errors.js'; +import { formatDuration, sleep } from '../utils/helpers.js'; +import { logger } from '../utils/logger.js'; +import { parseResetTime } from './rate-limit-parser.js'; +import { buildCloudCodeRequest, buildHeaders } from './request-builder.js'; +import { streamSSEResponse } from './sse-streamer.js'; + +/** + * Check if an error is a rate limit error (429 or RESOURCE_EXHAUSTED) + * @deprecated Use isRateLimitError from errors.js instead + */ +function is429Error(error) { + return isRateLimitError(error); +} + +/** + * Check if an error is an auth-invalid error (credentials need re-authentication) + * @deprecated Use isAuthError from errors.js instead + */ +function isAuthInvalidError(error) { + return isAuthError(error); +} + +/** + * Send a streaming request to Cloud Code with multi-account support + * Streams events in real-time as they arrive from the server + * + * @param {Object} anthropicRequest - The Anthropic-format request + * @param {string} anthropicRequest.model - Model name to use + * @param {Array} anthropicRequest.messages - Array of message objects + * @param {number} [anthropicRequest.max_tokens] - Maximum tokens to generate + * @param {Object} [anthropicRequest.thinking] - Thinking configuration + * @param {import('../account-manager/index.js').default} accountManager - The account manager instance + * @yields {Object} Anthropic-format SSE events (message_start, content_block_start, content_block_delta, etc.) + * @throws {Error} If max retries exceeded or no accounts available + */ +export async function* sendMessageStream(anthropicRequest, accountManager) { + const model = anthropicRequest.model; + + // Retry loop with account failover + // Ensure we try at least as many times as there are accounts to cycle through everyone + // +1 to ensure we hit the "all accounts rate-limited" check at the start of the next loop + const maxAttempts = Math.max(MAX_RETRIES, accountManager.getAccountCount() + 1); + + for (let attempt = 0; attempt < maxAttempts; attempt++) { + // Use sticky account selection for cache continuity + const { account: stickyAccount, waitMs } = accountManager.pickStickyAccount(); + let account = stickyAccount; + + // Handle waiting for sticky account + if (!account && waitMs > 0) { + logger.info(`[CloudCode] Waiting ${formatDuration(waitMs)} for sticky account...`); + await sleep(waitMs); + accountManager.clearExpiredLimits(); + account = accountManager.getCurrentStickyAccount(); + } + + // Handle all accounts rate-limited + if (!account) { + if (accountManager.isAllRateLimited()) { + const allWaitMs = accountManager.getMinWaitTimeMs(); + const resetTime = new Date(Date.now() + allWaitMs).toISOString(); + + // If wait time is too long (> 2 minutes), throw error immediately + if (allWaitMs > MAX_WAIT_BEFORE_ERROR_MS) { + throw new Error( + `RESOURCE_EXHAUSTED: Rate limited. Quota will reset after ${formatDuration(allWaitMs)}. Next available: ${resetTime}` + ); + } + + // Wait for reset (applies to both single and multi-account modes) + const accountCount = accountManager.getAccountCount(); + logger.warn(`[CloudCode] All ${accountCount} account(s) rate-limited. Waiting ${formatDuration(allWaitMs)}...`); + await sleep(allWaitMs); + accountManager.clearExpiredLimits(); + account = accountManager.pickNext(); + } + + if (!account) { + throw new Error('No accounts available'); + } + } + + try { + // Get token and project for this account + const token = await accountManager.getTokenForAccount(account); + const project = await accountManager.getProjectForAccount(account, token); + const payload = buildCloudCodeRequest(anthropicRequest, project); + + logger.debug(`[CloudCode] Starting stream for model: ${model}`); + + // Try each endpoint for streaming + let lastError = null; + for (const endpoint of ANTIGRAVITY_ENDPOINT_FALLBACKS) { + try { + const url = `${endpoint}/v1internal:streamGenerateContent?alt=sse`; + + const response = await fetch(url, { + method: 'POST', + headers: buildHeaders(token, model, 'text/event-stream'), + body: JSON.stringify(payload) + }); + + if (!response.ok) { + const errorText = await response.text(); + logger.warn(`[CloudCode] Stream error at ${endpoint}: ${response.status} - ${errorText}`); + + if (response.status === 401) { + // Auth error - clear caches and retry + accountManager.clearTokenCache(account.email); + accountManager.clearProjectCache(account.email); + continue; + } + + if (response.status === 429) { + // Rate limited on this endpoint - try next endpoint first (DAILY → PROD) + logger.debug(`[CloudCode] Stream rate limited at ${endpoint}, trying next endpoint...`); + const resetMs = parseResetTime(response, errorText); + // Keep minimum reset time across all 429 responses + if (!lastError?.is429 || (resetMs && (!lastError.resetMs || resetMs < lastError.resetMs))) { + lastError = { is429: true, response, errorText, resetMs }; + } + continue; + } + + lastError = new Error(`API error ${response.status}: ${errorText}`); + + // If it's a 5xx error, wait a bit before trying the next endpoint + if (response.status >= 500) { + logger.warn(`[CloudCode] ${response.status} stream error, waiting 1s before retry...`); + await sleep(1000); + } + + continue; + } + + // Stream the response - yield events as they arrive + yield* streamSSEResponse(response, anthropicRequest.model); + + logger.debug('[CloudCode] Stream completed'); + return; + + } catch (endpointError) { + if (is429Error(endpointError)) { + throw endpointError; // Re-throw to trigger account switch + } + logger.warn(`[CloudCode] Stream error at ${endpoint}:`, endpointError.message); + lastError = endpointError; + } + } + + // If all endpoints failed for this account + if (lastError) { + // If all endpoints returned 429, mark account as rate-limited + if (lastError.is429) { + logger.warn(`[CloudCode] All endpoints rate-limited for ${account.email}`); + accountManager.markRateLimited(account.email, lastError.resetMs); + throw new Error(`Rate limited: ${lastError.errorText}`); + } + throw lastError; + } + + } catch (error) { + if (is429Error(error)) { + // Rate limited - already marked, continue to next account + logger.info(`[CloudCode] Account ${account.email} rate-limited, trying next...`); + continue; + } + if (isAuthInvalidError(error)) { + // Auth invalid - already marked, continue to next account + logger.warn(`[CloudCode] Account ${account.email} has invalid credentials, trying next...`); + continue; + } + // Non-rate-limit error: throw immediately + // UNLESS it's a 500 error, then we treat it as a "soft" failure for this account and try the next one + if (error.message.includes('API error 5') || error.message.includes('500') || error.message.includes('503')) { + logger.warn(`[CloudCode] Account ${account.email} failed with 5xx stream error, trying next...`); + accountManager.pickNext(); // Force advance to next account + continue; + } + + throw error; + } + } + + throw new Error('Max retries exceeded'); +} diff --git a/src/server.js b/src/server.js index 4a2f627..305062b 100644 --- a/src/server.js +++ b/src/server.js @@ -6,10 +6,10 @@ import express from 'express'; import cors from 'cors'; -import { sendMessage, sendMessageStream, listModels, getModelQuotas } from './cloudcode-client.js'; -import { forceRefresh } from './token-extractor.js'; +import { sendMessage, sendMessageStream, listModels, getModelQuotas } from './cloudcode/index.js'; +import { forceRefresh } from './auth/token-extractor.js'; import { REQUEST_BODY_LIMIT } from './constants.js'; -import { AccountManager } from './account-manager.js'; +import { AccountManager } from './account-manager/index.js'; import { formatDuration } from './utils/helpers.js'; import { logger } from './utils/logger.js';