diff --git a/src/cloudcode/streaming-handler.js b/src/cloudcode/streaming-handler.js index 6729c80..5db796f 100644 --- a/src/cloudcode/streaming-handler.js +++ b/src/cloudcode/streaming-handler.js @@ -145,63 +145,79 @@ export async function* sendMessageStream(anthropicRequest, accountManager, fallb } // Stream the response with retry logic for empty responses - let emptyRetries = 0; + // Uses a for-loop for clearer retry semantics let currentResponse = response; - while (emptyRetries <= MAX_EMPTY_RESPONSE_RETRIES) { + for (let emptyRetries = 0; emptyRetries <= MAX_EMPTY_RESPONSE_RETRIES; emptyRetries++) { try { yield* streamSSEResponse(currentResponse, anthropicRequest.model); logger.debug('[CloudCode] Stream completed'); return; } catch (streamError) { - if (isEmptyResponseError(streamError) && emptyRetries < MAX_EMPTY_RESPONSE_RETRIES) { - emptyRetries++; - logger.warn(`[CloudCode] Empty response, retry ${emptyRetries}/${MAX_EMPTY_RESPONSE_RETRIES}...`); - - // Refetch the response - currentResponse = await fetch(url, { - method: 'POST', - headers: buildHeaders(token, model, 'text/event-stream'), - body: JSON.stringify(payload) - }); - - // Handle specific error codes on retry - if (!currentResponse.ok) { - const retryErrorText = await currentResponse.text(); - - // Re-throw rate limit errors to trigger account switch - if (currentResponse.status === 429) { - const resetMs = parseResetTime(currentResponse, retryErrorText); - throw new Error(`Rate limited during retry: ${retryErrorText}`); - } - - // Re-throw auth errors for proper handling - if (currentResponse.status === 401) { - accountManager.clearTokenCache(account.email); - accountManager.clearProjectCache(account.email); - throw new Error(`Auth error during retry: ${retryErrorText}`); - } - - // For 5xx errors, continue to next retry attempt - if (currentResponse.status >= 500) { - logger.warn(`[CloudCode] Retry got ${currentResponse.status}, continuing...`); - await sleep(1000); - continue; - } - - throw new Error(`Empty response retry failed: ${currentResponse.status} - ${retryErrorText}`); - } - continue; + // Only retry on EmptyResponseError + if (!isEmptyResponseError(streamError)) { + throw streamError; } - // After max retries, emit fallback message - if (isEmptyResponseError(streamError)) { + // Check if we have retries left + if (emptyRetries >= MAX_EMPTY_RESPONSE_RETRIES) { logger.error(`[CloudCode] Empty response after ${MAX_EMPTY_RESPONSE_RETRIES} retries`); yield* emitEmptyResponseFallback(anthropicRequest.model); return; } - throw streamError; + // Exponential backoff: 500ms, 1000ms, 2000ms + const backoffMs = 500 * Math.pow(2, emptyRetries); + logger.warn(`[CloudCode] Empty response, retry ${emptyRetries + 1}/${MAX_EMPTY_RESPONSE_RETRIES} after ${backoffMs}ms...`); + await sleep(backoffMs); + + // Refetch the response + currentResponse = await fetch(url, { + method: 'POST', + headers: buildHeaders(token, model, 'text/event-stream'), + body: JSON.stringify(payload) + }); + + // Handle specific error codes on retry + if (!currentResponse.ok) { + const retryErrorText = await currentResponse.text(); + + // Rate limit error - mark account and throw to trigger account switch + if (currentResponse.status === 429) { + const resetMs = parseResetTime(currentResponse, retryErrorText); + accountManager.markRateLimited(account.email, resetMs, model); + throw new Error(`429 RESOURCE_EXHAUSTED during retry: ${retryErrorText}`); + } + + // Auth error - clear caches and throw with recognizable message + if (currentResponse.status === 401) { + accountManager.clearTokenCache(account.email); + accountManager.clearProjectCache(account.email); + throw new Error(`401 AUTH_INVALID during retry: ${retryErrorText}`); + } + + // For 5xx errors, don't pass to streamer - just continue to next retry + if (currentResponse.status >= 500) { + logger.warn(`[CloudCode] Retry got ${currentResponse.status}, will retry...`); + // Don't continue here - let the loop increment and refetch + // Set currentResponse to null to force refetch at loop start + emptyRetries--; // Compensate for loop increment since we didn't actually try + await sleep(1000); + // Refetch immediately for 5xx + currentResponse = await fetch(url, { + method: 'POST', + headers: buildHeaders(token, model, 'text/event-stream'), + body: JSON.stringify(payload) + }); + if (currentResponse.ok) { + continue; // Try streaming with new response + } + // If still failing, let it fall through to throw + } + + throw new Error(`Empty response retry failed: ${currentResponse.status} - ${retryErrorText}`); + } + // Response is OK, loop will continue to try streamSSEResponse } }