feat: add configurable account selection strategies

Refactor account selection into a strategy pattern with three options:
- Sticky: cache-optimized, stays on same account until rate-limited
- Round-robin: load-balanced, rotates every request
- Hybrid (default): smart distribution using health scores, token buckets, and LRU

The hybrid strategy uses multiple signals for optimal account selection:
health tracking for reliability, client-side token buckets for rate limiting,
and LRU freshness to prefer rested accounts.

Includes WebUI settings for strategy selection and unit tests.

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
Badri Narayanan S
2026-01-18 03:48:43 +05:30
parent 973234372b
commit 5ae19a5b72
31 changed files with 2721 additions and 353 deletions

View File

@@ -10,7 +10,12 @@ import {
MAX_RETRIES,
MAX_EMPTY_RESPONSE_RETRIES,
MAX_WAIT_BEFORE_ERROR_MS,
DEFAULT_COOLDOWN_MS
DEFAULT_COOLDOWN_MS,
RATE_LIMIT_DEDUP_WINDOW_MS,
MAX_CONSECUTIVE_FAILURES,
EXTENDED_COOLDOWN_MS,
CAPACITY_RETRY_DELAY_MS,
MAX_CAPACITY_RETRIES
} from '../constants.js';
import { isRateLimitError, isAuthError, isEmptyResponseError } from '../errors.js';
import { formatDuration, sleep, isNetworkError } from '../utils/helpers.js';
@@ -21,6 +26,83 @@ import { streamSSEResponse } from './sse-streamer.js';
import { getFallbackModel } from '../fallback-config.js';
import crypto from 'crypto';
/**
* Gap 1: Rate limit deduplication - prevents thundering herd on concurrent rate limits
* Tracks last rate limit timestamp per model to skip duplicate retries
*/
const lastRateLimitTimestamps = new Map(); // modelId -> timestamp
/**
* Check if we should skip retry due to recent rate limit on this model
* @param {string} model - Model ID
* @returns {boolean} True if retry should be skipped (within dedup window)
*/
function shouldSkipRetryDueToDedup(model) {
const lastTimestamp = lastRateLimitTimestamps.get(model);
if (!lastTimestamp) return false;
const elapsed = Date.now() - lastTimestamp;
if (elapsed < RATE_LIMIT_DEDUP_WINDOW_MS) {
logger.debug(`[CloudCode] Rate limit on ${model} within dedup window (${elapsed}ms ago), skipping retry`);
return true;
}
return false;
}
/**
* Record rate limit timestamp for deduplication
* @param {string} model - Model ID
*/
function recordRateLimitTimestamp(model) {
lastRateLimitTimestamps.set(model, Date.now());
}
/**
* Clear rate limit timestamp after successful retry
* @param {string} model - Model ID
*/
function clearRateLimitTimestamp(model) {
lastRateLimitTimestamps.delete(model);
}
/**
* Gap 3: Detect permanent authentication failures that require re-authentication
* @param {string} errorText - Error message from API
* @returns {boolean} True if permanent auth failure
*/
function isPermanentAuthFailure(errorText) {
const lower = (errorText || '').toLowerCase();
return lower.includes('invalid_grant') ||
lower.includes('token revoked') ||
lower.includes('token has been expired or revoked') ||
lower.includes('token_revoked') ||
lower.includes('invalid_client') ||
lower.includes('credentials are invalid');
}
/**
* Gap 4: Detect if 429 error is due to model capacity (not user quota)
* @param {string} errorText - Error message from API
* @returns {boolean} True if capacity exhausted (not quota)
*/
function isModelCapacityExhausted(errorText) {
const lower = (errorText || '').toLowerCase();
return lower.includes('model_capacity_exhausted') ||
lower.includes('capacity_exhausted') ||
lower.includes('model is currently overloaded') ||
lower.includes('service temporarily unavailable');
}
// Periodically clean up stale dedup timestamps (every 60 seconds)
setInterval(() => {
const cutoff = Date.now() - 60000; // 1 minute
for (const [model, timestamp] of lastRateLimitTimestamps.entries()) {
if (timestamp < cutoff) {
lastRateLimitTimestamps.delete(model);
}
}
}, 60000);
/**
* Send a streaming request to Cloud Code with multi-account support
* Streams events in real-time as they arrive from the server
@@ -83,10 +165,14 @@ export async function* sendMessageStream(anthropicRequest, accountManager, fallb
throw new Error('No accounts available');
}
// Pick sticky account (prefers current for cache continuity)
let account = accountManager.getCurrentStickyAccount(model);
if (!account) {
account = accountManager.pickNext(model);
// Select account using configured strategy
const { account, waitMs } = accountManager.selectAccount(model);
// If strategy returns a wait time, sleep and retry
if (!account && waitMs > 0) {
logger.info(`[CloudCode] Waiting ${formatDuration(waitMs)} for account...`);
await sleep(waitMs + 500);
continue;
}
if (!account) {
@@ -101,11 +187,14 @@ export async function* sendMessageStream(anthropicRequest, accountManager, fallb
logger.debug(`[CloudCode] Starting stream for model: ${model}`);
// Try each endpoint for streaming
// Try each endpoint with index-based loop for capacity retry support
let lastError = null;
let retriedOnce = false; // Track if we've already retried for short rate limit
let capacityRetryCount = 0; // Gap 4: Track capacity exhaustion retries
let endpointIndex = 0;
for (const endpoint of ANTIGRAVITY_ENDPOINT_FALLBACKS) {
while (endpointIndex < ANTIGRAVITY_ENDPOINT_FALLBACKS.length) {
const endpoint = ANTIGRAVITY_ENDPOINT_FALLBACKS[endpointIndex];
try {
const url = `${endpoint}/v1internal:streamGenerateContent?alt=sse`;
@@ -120,15 +209,44 @@ export async function* sendMessageStream(anthropicRequest, accountManager, fallb
logger.warn(`[CloudCode] Stream error at ${endpoint}: ${response.status} - ${errorText}`);
if (response.status === 401) {
// Auth error - clear caches and retry
// Gap 3: Check for permanent auth failures
if (isPermanentAuthFailure(errorText)) {
logger.error(`[CloudCode] Permanent auth failure for ${account.email}: ${errorText.substring(0, 100)}`);
accountManager.markInvalid(account.email, 'Token revoked - re-authentication required');
throw new Error(`AUTH_INVALID_PERMANENT: ${errorText}`);
}
// Transient auth error - clear caches and retry
accountManager.clearTokenCache(account.email);
accountManager.clearProjectCache(account.email);
endpointIndex++;
continue;
}
if (response.status === 429) {
const resetMs = parseResetTime(response, errorText);
// Gap 4: Check if capacity issue (NOT quota) - retry SAME endpoint
if (isModelCapacityExhausted(errorText)) {
if (capacityRetryCount < MAX_CAPACITY_RETRIES) {
capacityRetryCount++;
const waitMs = resetMs || CAPACITY_RETRY_DELAY_MS;
logger.info(`[CloudCode] Model capacity exhausted, retry ${capacityRetryCount}/${MAX_CAPACITY_RETRIES} after ${formatDuration(waitMs)}...`);
await sleep(waitMs);
// Don't increment endpointIndex - retry same endpoint
continue;
}
// Max capacity retries exceeded - treat as quota exhaustion
logger.warn(`[CloudCode] Max capacity retries (${MAX_CAPACITY_RETRIES}) exceeded, switching account`);
}
// Gap 1: Check deduplication window to prevent thundering herd
if (shouldSkipRetryDueToDedup(model)) {
logger.info(`[CloudCode] Skipping retry due to recent rate limit, switching account...`);
accountManager.markRateLimited(account.email, resetMs || DEFAULT_COOLDOWN_MS, model);
throw new Error(`RATE_LIMITED_DEDUP: ${errorText}`);
}
// Decision: wait and retry OR switch account
if (resetMs && resetMs > DEFAULT_COOLDOWN_MS) {
// Long-term quota exhaustion (> 10s) - switch to next account
@@ -141,28 +259,11 @@ export async function* sendMessageStream(anthropicRequest, accountManager, fallb
if (!retriedOnce) {
retriedOnce = true;
recordRateLimitTimestamp(model); // Gap 1: Record before retry
logger.info(`[CloudCode] Short rate limit (${formatDuration(waitMs)}), waiting and retrying...`);
await sleep(waitMs);
// Retry same endpoint
const retryResponse = await fetch(url, {
method: 'POST',
headers: buildHeaders(token, model, 'text/event-stream'),
body: JSON.stringify(payload)
});
if (retryResponse.ok) {
// Stream the retry response
yield* streamSSEResponse(retryResponse, anthropicRequest.model);
logger.debug('[CloudCode] Stream completed after retry');
return;
}
// Retry also failed - parse new reset time
const retryErrorText = await retryResponse.text();
const retryResetMs = parseResetTime(retryResponse, retryErrorText);
logger.warn(`[CloudCode] Retry also failed, marking and switching...`);
accountManager.markRateLimited(account.email, retryResetMs || waitMs, model);
throw new Error(`RATE_LIMITED_AFTER_RETRY: ${retryErrorText}`);
// Don't increment endpointIndex - retry same endpoint
continue;
} else {
// Already retried once, mark and switch
accountManager.markRateLimited(account.email, waitMs, model);
@@ -179,6 +280,7 @@ export async function* sendMessageStream(anthropicRequest, accountManager, fallb
await sleep(1000);
}
endpointIndex++;
continue;
}
@@ -189,6 +291,9 @@ export async function* sendMessageStream(anthropicRequest, accountManager, fallb
try {
yield* streamSSEResponse(currentResponse, anthropicRequest.model);
logger.debug('[CloudCode] Stream completed');
// Gap 1: Clear timestamp on success
clearRateLimitTimestamp(model);
accountManager.notifySuccess(account, model);
return;
} catch (streamError) {
// Only retry on EmptyResponseError
@@ -226,8 +331,13 @@ export async function* sendMessageStream(anthropicRequest, accountManager, fallb
throw new Error(`429 RESOURCE_EXHAUSTED during retry: ${retryErrorText}`);
}
// Auth error - clear caches and throw with recognizable message
// Auth error - check for permanent failure
if (currentResponse.status === 401) {
if (isPermanentAuthFailure(retryErrorText)) {
logger.error(`[CloudCode] Permanent auth failure during retry for ${account.email}`);
accountManager.markInvalid(account.email, 'Token revoked - re-authentication required');
throw new Error(`AUTH_INVALID_PERMANENT: ${retryErrorText}`);
}
accountManager.clearTokenCache(account.email);
accountManager.clearProjectCache(account.email);
throw new Error(`401 AUTH_INVALID during retry: ${retryErrorText}`);
@@ -261,6 +371,7 @@ export async function* sendMessageStream(anthropicRequest, accountManager, fallb
}
logger.warn(`[CloudCode] Stream error at ${endpoint}:`, endpointError.message);
lastError = endpointError;
endpointIndex++;
}
}
@@ -276,7 +387,8 @@ export async function* sendMessageStream(anthropicRequest, accountManager, fallb
} catch (error) {
if (isRateLimitError(error)) {
// Rate limited - already marked, continue to next account
// Rate limited - already marked, notify strategy and continue to next account
accountManager.notifyRateLimit(account, model);
logger.info(`[CloudCode] Account ${account.email} rate-limited, trying next...`);
continue;
}
@@ -287,15 +399,31 @@ export async function* sendMessageStream(anthropicRequest, accountManager, fallb
}
// Handle 5xx errors
if (error.message.includes('API error 5') || error.message.includes('500') || error.message.includes('503')) {
logger.warn(`[CloudCode] Account ${account.email} failed with 5xx stream error, trying next...`);
accountManager.pickNext(model);
accountManager.notifyFailure(account, model);
// Gap 2: Check consecutive failures for extended cooldown
const consecutiveFailures = accountManager.getHealthTracker()?.getConsecutiveFailures(account.email) || 0;
if (consecutiveFailures >= MAX_CONSECUTIVE_FAILURES) {
logger.warn(`[CloudCode] Account ${account.email} has ${consecutiveFailures} consecutive failures, applying extended cooldown (${formatDuration(EXTENDED_COOLDOWN_MS)})`);
accountManager.markRateLimited(account.email, EXTENDED_COOLDOWN_MS, model);
} else {
logger.warn(`[CloudCode] Account ${account.email} failed with 5xx stream error, trying next...`);
}
continue;
}
if (isNetworkError(error)) {
logger.warn(`[CloudCode] Network error for ${account.email} (stream), trying next account... (${error.message})`);
accountManager.notifyFailure(account, model);
// Gap 2: Check consecutive failures for extended cooldown
const consecutiveFailures = accountManager.getHealthTracker()?.getConsecutiveFailures(account.email) || 0;
if (consecutiveFailures >= MAX_CONSECUTIVE_FAILURES) {
logger.warn(`[CloudCode] Account ${account.email} has ${consecutiveFailures} consecutive network failures, applying extended cooldown (${formatDuration(EXTENDED_COOLDOWN_MS)})`);
accountManager.markRateLimited(account.email, EXTENDED_COOLDOWN_MS, model);
} else {
logger.warn(`[CloudCode] Network error for ${account.email} (stream), trying next account... (${error.message})`);
}
await sleep(1000);
accountManager.pickNext(model);
continue;
}