diff --git a/Makefile b/Makefile index eb5e985..046acfe 100644 --- a/Makefile +++ b/Makefile @@ -33,14 +33,25 @@ session: # Try the web interface - starts stack, creates a session and opens it try: up - @echo "Waiting for services to be ready..." - @for i in $$(seq 1 30); do \ - curl -sf http://localhost:8080/api/health >/dev/null 2>&1 && break; \ + @echo "Waiting for services to be ready (Docker daemon can take ~30s)..." + @for i in $$(seq 1 60); do \ + STATUS=$$(curl -sf http://localhost:8080/api/health 2>/dev/null | jq -r '.docker // false') && \ + [ "$$STATUS" = "true" ] && break; \ + printf '.'; \ sleep 1; \ done - @echo "Creating session and opening web interface..." + @echo "" + @echo "Creating session..." @SESSION_ID=$$(curl -s -X POST http://localhost:8080/api/sessions | jq -r '.session_id') && \ - echo "Session created: $$SESSION_ID" && \ + echo "Session $$SESSION_ID created, waiting for container to start..." && \ + for i in $$(seq 1 30); do \ + S=$$(curl -sf http://localhost:8080/api/sessions/$$SESSION_ID 2>/dev/null | jq -r '.status // "unknown"') && \ + [ "$$S" = "running" ] && break; \ + [ "$$S" = "error" ] && echo "Container failed to start" && exit 1; \ + printf '.'; \ + sleep 1; \ + done && \ + echo "" && \ echo "Opening http://localhost:8080/session/$$SESSION_ID" && \ (xdg-open "http://localhost:8080/session/$$SESSION_ID" 2>/dev/null || \ open "http://localhost:8080/session/$$SESSION_ID" 2>/dev/null || \ diff --git a/session-manager/http_pool.py b/session-manager/http_pool.py index 80e9f25..a072f9e 100644 --- a/session-manager/http_pool.py +++ b/session-manager/http_pool.py @@ -159,6 +159,24 @@ async def make_http_request(method: str, url: str, **kwargs) -> httpx.Response: return await client.request(method, url, **kwargs) +@asynccontextmanager +async def stream_http_request(method: str, url: str, **kwargs): + """Stream an HTTP response using a dedicated client with no read timeout. + + Yields an httpx.Response whose body has NOT been read -- caller must + iterate over ``response.aiter_bytes()`` / ``aiter_lines()`` etc. + + A separate AsyncClient is used (not the pool) because httpx's + ``stream()`` keeps the connection checked-out for the lifetime of the + context manager, and SSE streams are effectively infinite. Using a + short-lived client avoids starving the pool. + """ + timeout = httpx.Timeout(connect=10.0, read=None, write=10.0, pool=5.0) + async with httpx.AsyncClient(timeout=timeout, follow_redirects=False) as client: + async with client.stream(method, url, **kwargs) as response: + yield response + + async def get_connection_pool_stats() -> Dict[str, Any]: """Get connection pool statistics.""" return await _http_pool.get_pool_stats() diff --git a/session-manager/routes/proxy.py b/session-manager/routes/proxy.py index 5b9adda..3c61ef3 100644 --- a/session-manager/routes/proxy.py +++ b/session-manager/routes/proxy.py @@ -4,10 +4,11 @@ import time from urllib.parse import urlparse from fastapi import APIRouter, HTTPException, Request, Response +from starlette.responses import StreamingResponse import httpx from session_manager import session_manager -from http_pool import make_http_request +from http_pool import make_http_request, stream_http_request from logging_config import ( RequestContext, log_request, @@ -270,6 +271,17 @@ async def proxy_internal_session_to_session(request: Request): return await proxy_to_session(request, session_id, path) +def _is_sse_request(request: Request, path: str) -> bool: + """Detect SSE requests by Accept header or path convention.""" + accept = request.headers.get("accept", "") + if "text/event-stream" in accept: + return True + # OpenCode uses /global/event and /event paths for SSE + if path == "event" or path.endswith("/event"): + return True + return False + + @router.api_route("/session/{session_id}/{path:path}", methods=ALL_METHODS) async def proxy_to_session(request: Request, session_id: str, path: str): start_time = time.time() @@ -319,6 +331,11 @@ async def proxy_to_session(request: Request, session_id: str, path: str): headers["X-Session-Token"] = session.auth_token headers["X-Session-ID"] = session.session_id + # --- SSE streaming path --- + if _is_sse_request(request, path): + return await _proxy_sse(request, session_id, path, url, headers, body, start_time) + + # --- Buffered path (original behaviour) --- try: log_session_operation( session_id, "proxy_request", method=request.method, path=path @@ -425,3 +442,71 @@ async def proxy_to_session(request: Request, session_id: str, path: str): status_code=502, detail=f"Failed to connect to session container: {str(e)}", ) + + +async def _proxy_sse( + request: Request, + session_id: str, + path: str, + url: str, + headers: dict, + body: bytes, + start_time: float, +): + """Proxy an SSE event stream without buffering.""" + log_session_operation( + session_id, "proxy_sse_stream", method=request.method, path=path + ) + + # We need to keep the httpx stream context alive for the lifetime of the + # StreamingResponse. Starlette calls our async generator and only closes + # it when the client disconnects, so we enter the context manager inside + # the generator and exit on cleanup. + + async def event_generator(): + try: + async with stream_http_request( + method=request.method, + url=url, + headers=headers, + content=body, + ) as upstream: + async for chunk in upstream.aiter_bytes(): + yield chunk + except httpx.RequestError as e: + log_security_event( + "proxy_sse_error", + "error", + session_id=session_id, + method=request.method, + path=path, + error=str(e), + ) + finally: + duration_ms = (time.time() - start_time) * 1000 + log_request( + request.method, + f"/session/{session_id}/{path}", + 200, + duration_ms, + session_id=session_id, + operation="proxy_sse_complete", + ) + + resp = StreamingResponse( + event_generator(), + media_type="text/event-stream", + headers={ + "Cache-Control": "no-cache", + "Connection": "keep-alive", + "X-Accel-Buffering": "no", + }, + ) + resp.set_cookie( + key="lovdata_session", + value=session_id, + httponly=True, + samesite="lax", + max_age=86400, + ) + return resp