diff --git a/src/cloudcode/sse-streamer.js b/src/cloudcode/sse-streamer.js index eaf9136..b591279 100644 --- a/src/cloudcode/sse-streamer.js +++ b/src/cloudcode/sse-streamer.js @@ -7,6 +7,7 @@ import crypto from 'crypto'; import { MIN_SIGNATURE_LENGTH, getModelFamily } from '../constants.js'; +import { EmptyResponseError } from '../errors.js'; import { cacheSignature, cacheThinkingSignature } from '../format/signature-cache.js'; import { logger } from '../utils/logger.js'; @@ -226,39 +227,10 @@ export async function* streamSSEResponse(response, originalModel) { } } - // Handle no content received + // Handle no content received - throw error to trigger retry in streaming-handler if (!hasEmittedStart) { - logger.warn('[CloudCode] No content parts received, emitting empty message'); - yield { - type: 'message_start', - message: { - id: messageId, - type: 'message', - role: 'assistant', - content: [], - model: originalModel, - stop_reason: null, - stop_sequence: null, - usage: { - input_tokens: inputTokens - cacheReadTokens, - output_tokens: 0, - cache_read_input_tokens: cacheReadTokens, - cache_creation_input_tokens: 0 - } - } - }; - - yield { - type: 'content_block_start', - index: 0, - content_block: { type: 'text', text: '' } - }; - yield { - type: 'content_block_delta', - index: 0, - delta: { type: 'text_delta', text: '[No response received from API]' } - }; - yield { type: 'content_block_stop', index: 0 }; + logger.warn('[CloudCode] No content parts received, throwing for retry'); + throw new EmptyResponseError('No content parts received from API'); } else { // Close any open block if (currentBlockType !== null) { diff --git a/src/cloudcode/streaming-handler.js b/src/cloudcode/streaming-handler.js index f33945b..8a6a1de 100644 --- a/src/cloudcode/streaming-handler.js +++ b/src/cloudcode/streaming-handler.js @@ -10,7 +10,7 @@ import { MAX_RETRIES, MAX_WAIT_BEFORE_ERROR_MS } from '../constants.js'; -import { isRateLimitError, isAuthError } from '../errors.js'; +import { isRateLimitError, isAuthError, isEmptyResponseError } from '../errors.js'; import { formatDuration, sleep, isNetworkError } from '../utils/helpers.js'; import { logger } from '../utils/logger.js'; import { parseResetTime } from './rate-limit-parser.js'; @@ -18,6 +18,8 @@ import { buildCloudCodeRequest, buildHeaders } from './request-builder.js'; import { streamSSEResponse } from './sse-streamer.js'; import { getFallbackModel } from '../fallback-config.js'; +// Maximum retries for empty responses before giving up +const MAX_EMPTY_RETRIES = 2; /** * Send a streaming request to Cloud Code with multi-account support @@ -143,16 +145,51 @@ export async function* sendMessageStream(anthropicRequest, accountManager, fallb continue; } - // Stream the response - yield events as they arrive - yield* streamSSEResponse(response, anthropicRequest.model); + // Stream the response with retry logic for empty responses + let emptyRetries = 0; + let currentResponse = response; - logger.debug('[CloudCode] Stream completed'); - return; + while (emptyRetries <= MAX_EMPTY_RETRIES) { + try { + yield* streamSSEResponse(currentResponse, anthropicRequest.model); + logger.debug('[CloudCode] Stream completed'); + return; + } catch (streamError) { + if (isEmptyResponseError(streamError) && emptyRetries < MAX_EMPTY_RETRIES) { + emptyRetries++; + logger.warn(`[CloudCode] Empty response, retry ${emptyRetries}/${MAX_EMPTY_RETRIES}...`); + + // Refetch the response + currentResponse = await fetch(url, { + method: 'POST', + headers: buildHeaders(token, model, 'text/event-stream'), + body: JSON.stringify(payload) + }); + + if (!currentResponse.ok) { + throw new Error(`Empty response retry failed: ${currentResponse.status}`); + } + continue; + } + + // After max retries, emit fallback message + if (isEmptyResponseError(streamError)) { + logger.error(`[CloudCode] Empty response after ${MAX_EMPTY_RETRIES} retries`); + yield* emitEmptyResponseFallback(anthropicRequest.model); + return; + } + + throw streamError; + } + } } catch (endpointError) { if (isRateLimitError(endpointError)) { throw endpointError; // Re-throw to trigger account switch } + if (isEmptyResponseError(endpointError)) { + throw endpointError; // Re-throw empty response errors to outer handler + } logger.warn(`[CloudCode] Stream error at ${endpoint}:`, endpointError.message); lastError = endpointError; } @@ -201,3 +238,48 @@ export async function* sendMessageStream(anthropicRequest, accountManager, fallb throw new Error('Max retries exceeded'); } + +/** + * Emit a fallback message when all retry attempts fail with empty response + * @param {string} model - The model name + * @yields {Object} Anthropic-format SSE events for empty response fallback + */ +function* emitEmptyResponseFallback(model) { + const messageId = `msg_${Date.now()}_empty`; + + yield { + type: 'message_start', + message: { + id: messageId, + type: 'message', + role: 'assistant', + content: [], + model: model, + stop_reason: null, + stop_sequence: null, + usage: { input_tokens: 0, output_tokens: 0 } + } + }; + + yield { + type: 'content_block_start', + index: 0, + content_block: { type: 'text', text: '' } + }; + + yield { + type: 'content_block_delta', + index: 0, + delta: { type: 'text_delta', text: '[No response after retries - please try again]' } + }; + + yield { type: 'content_block_stop', index: 0 }; + + yield { + type: 'message_delta', + delta: { stop_reason: 'end_turn', stop_sequence: null }, + usage: { output_tokens: 0 } + }; + + yield { type: 'message_stop' }; +} diff --git a/src/errors.js b/src/errors.js index eb2e755..fac9410 100644 --- a/src/errors.js +++ b/src/errors.js @@ -135,6 +135,20 @@ export class NativeModuleError extends AntigravityError { } } +/** + * Empty response error - thrown when API returns no content + * Used to trigger retry logic in streaming handler + */ +export class EmptyResponseError extends AntigravityError { + /** + * @param {string} message - Error message + */ + constructor(message = 'No content received from API') { + super(message, 'EMPTY_RESPONSE', true, {}); + this.name = 'EmptyResponseError'; + } +} + /** * Check if an error is a rate limit error * Works with both custom error classes and legacy string-based errors @@ -164,6 +178,16 @@ export function isAuthError(error) { msg.includes('TOKEN REFRESH FAILED'); } +/** + * Check if an error is an empty response error + * @param {Error} error - Error to check + * @returns {boolean} + */ +export function isEmptyResponseError(error) { + return error instanceof EmptyResponseError || + error?.name === 'EmptyResponseError'; +} + export default { AntigravityError, RateLimitError, @@ -172,6 +196,8 @@ export default { MaxRetriesError, ApiError, NativeModuleError, + EmptyResponseError, isRateLimitError, - isAuthError + isAuthError, + isEmptyResponseError };