- Fix o3-pro response parsing to use output_text convenience field - Replace respx with custom httpx transport solution for better reliability - Implement comprehensive PII sanitization to prevent secret exposure - Add HTTP request/response recording with cassette format for testing - Sanitize all existing cassettes to remove exposed API keys - Update documentation to reflect new HTTP transport recorder - Add test suite for PII sanitization and HTTP recording This change: 1. Fixes timeout issues with o3-pro API calls (was 2+ minutes, now ~15-22 seconds) 2. Properly captures response content without httpx.ResponseNotRead exceptions 3. Preserves original HTTP response format including gzip compression 4. Prevents future secret exposure with automatic PII sanitization 5. Enables reliable replay testing for o3-pro interactions Co-Authored-By: Claude <noreply@anthropic.com>
374 lines
14 KiB
Python
374 lines
14 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
PII (Personally Identifiable Information) Sanitizer for HTTP recordings.
|
|
|
|
This module provides comprehensive sanitization of sensitive data in HTTP
|
|
request/response recordings to prevent accidental exposure of API keys,
|
|
tokens, personal information, and other sensitive data.
|
|
"""
|
|
|
|
import re
|
|
import base64
|
|
import json
|
|
from typing import Any, Dict, List, Optional, Pattern, Tuple
|
|
from dataclasses import dataclass
|
|
from copy import deepcopy
|
|
import logging
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
@dataclass
|
|
class PIIPattern:
|
|
"""Defines a pattern for detecting and sanitizing PII."""
|
|
name: str
|
|
pattern: Pattern[str]
|
|
replacement: str
|
|
description: str
|
|
|
|
@classmethod
|
|
def create(cls, name: str, pattern: str, replacement: str, description: str) -> 'PIIPattern':
|
|
"""Create a PIIPattern with compiled regex."""
|
|
return cls(
|
|
name=name,
|
|
pattern=re.compile(pattern),
|
|
replacement=replacement,
|
|
description=description
|
|
)
|
|
|
|
|
|
class PIISanitizer:
|
|
"""Sanitizes PII from various data structures while preserving format."""
|
|
|
|
def __init__(self, patterns: Optional[List[PIIPattern]] = None):
|
|
"""Initialize with optional custom patterns."""
|
|
self.patterns: List[PIIPattern] = patterns or []
|
|
self.sanitize_enabled = True
|
|
|
|
# Add default patterns if none provided
|
|
if not patterns:
|
|
self._add_default_patterns()
|
|
|
|
def _add_default_patterns(self):
|
|
"""Add comprehensive default PII patterns."""
|
|
default_patterns = [
|
|
# API Keys and Tokens
|
|
PIIPattern.create(
|
|
name="openai_api_key_proj",
|
|
pattern=r'sk-proj-[A-Za-z0-9\-_]{48,}',
|
|
replacement="sk-proj-SANITIZED",
|
|
description="OpenAI project API keys"
|
|
),
|
|
PIIPattern.create(
|
|
name="openai_api_key",
|
|
pattern=r'sk-[A-Za-z0-9]{48,}',
|
|
replacement="sk-SANITIZED",
|
|
description="OpenAI API keys"
|
|
),
|
|
PIIPattern.create(
|
|
name="anthropic_api_key",
|
|
pattern=r'sk-ant-[A-Za-z0-9\-_]{48,}',
|
|
replacement="sk-ant-SANITIZED",
|
|
description="Anthropic API keys"
|
|
),
|
|
PIIPattern.create(
|
|
name="google_api_key",
|
|
pattern=r'AIza[A-Za-z0-9\-_]{35,}',
|
|
replacement="AIza-SANITIZED",
|
|
description="Google API keys"
|
|
),
|
|
PIIPattern.create(
|
|
name="github_token_personal",
|
|
pattern=r'ghp_[A-Za-z0-9]{36}',
|
|
replacement="ghp_SANITIZED",
|
|
description="GitHub personal access tokens"
|
|
),
|
|
PIIPattern.create(
|
|
name="github_token_server",
|
|
pattern=r'ghs_[A-Za-z0-9]{36}',
|
|
replacement="ghs_SANITIZED",
|
|
description="GitHub server tokens"
|
|
),
|
|
PIIPattern.create(
|
|
name="github_token_refresh",
|
|
pattern=r'ghr_[A-Za-z0-9]{36}',
|
|
replacement="ghr_SANITIZED",
|
|
description="GitHub refresh tokens"
|
|
),
|
|
|
|
# Bearer tokens with specific API keys (must come before generic patterns)
|
|
PIIPattern.create(
|
|
name="bearer_openai_proj",
|
|
pattern=r'Bearer\s+sk-proj-[A-Za-z0-9\-_]{48,}',
|
|
replacement="Bearer sk-proj-SANITIZED",
|
|
description="Bearer with OpenAI project key"
|
|
),
|
|
PIIPattern.create(
|
|
name="bearer_openai",
|
|
pattern=r'Bearer\s+sk-[A-Za-z0-9]{48,}',
|
|
replacement="Bearer sk-SANITIZED",
|
|
description="Bearer with OpenAI key"
|
|
),
|
|
PIIPattern.create(
|
|
name="bearer_anthropic",
|
|
pattern=r'Bearer\s+sk-ant-[A-Za-z0-9\-_]{48,}',
|
|
replacement="Bearer sk-ant-SANITIZED",
|
|
description="Bearer with Anthropic key"
|
|
),
|
|
|
|
# JWT tokens
|
|
PIIPattern.create(
|
|
name="jwt_token",
|
|
pattern=r'eyJ[A-Za-z0-9\-_]+\.eyJ[A-Za-z0-9\-_]+\.[A-Za-z0-9\-_]+',
|
|
replacement="eyJ-SANITIZED.eyJ-SANITIZED.SANITIZED",
|
|
description="JSON Web Tokens"
|
|
),
|
|
|
|
# Personal Information
|
|
PIIPattern.create(
|
|
name="email_address",
|
|
pattern=r'[a-zA-Z0-9._%+\-]+@[a-zA-Z0-9.\-]+\.[a-zA-Z]{2,}',
|
|
replacement="user@example.com",
|
|
description="Email addresses"
|
|
),
|
|
PIIPattern.create(
|
|
name="ipv4_address",
|
|
pattern=r'\b(?:(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.){3}(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\b',
|
|
replacement="0.0.0.0",
|
|
description="IPv4 addresses"
|
|
),
|
|
PIIPattern.create(
|
|
name="ipv6_address",
|
|
pattern=r'(?:[0-9a-fA-F]{1,4}:){7}[0-9a-fA-F]{1,4}',
|
|
replacement="::1",
|
|
description="IPv6 addresses"
|
|
),
|
|
PIIPattern.create(
|
|
name="ssn",
|
|
pattern=r'\b\d{3}-\d{2}-\d{4}\b',
|
|
replacement="XXX-XX-XXXX",
|
|
description="Social Security Numbers"
|
|
),
|
|
PIIPattern.create(
|
|
name="credit_card",
|
|
pattern=r'\b\d{4}[\s\-]?\d{4}[\s\-]?\d{4}[\s\-]?\d{4}\b',
|
|
replacement="XXXX-XXXX-XXXX-XXXX",
|
|
description="Credit card numbers"
|
|
),
|
|
# Phone patterns - international first to avoid partial matches
|
|
PIIPattern.create(
|
|
name="phone_intl",
|
|
pattern=r'\+\d{1,3}[\s\-]?\d{3}[\s\-]?\d{3}[\s\-]?\d{4}',
|
|
replacement="+X-XXX-XXX-XXXX",
|
|
description="International phone numbers"
|
|
),
|
|
PIIPattern.create(
|
|
name="phone_us",
|
|
pattern=r'\(?\d{3}\)?[\s\-]?\d{3}[\s\-]?\d{4}',
|
|
replacement="(XXX) XXX-XXXX",
|
|
description="US phone numbers"
|
|
),
|
|
|
|
# AWS
|
|
PIIPattern.create(
|
|
name="aws_access_key",
|
|
pattern=r'AKIA[0-9A-Z]{16}',
|
|
replacement="AKIA-SANITIZED",
|
|
description="AWS access keys"
|
|
),
|
|
PIIPattern.create(
|
|
name="aws_secret_key",
|
|
pattern=r'(?i)aws[_\s]*secret[_\s]*access[_\s]*key["\s]*[:=]["\s]*[A-Za-z0-9/+=]{40}',
|
|
replacement="aws_secret_access_key=SANITIZED",
|
|
description="AWS secret keys"
|
|
),
|
|
|
|
# Other common patterns
|
|
PIIPattern.create(
|
|
name="slack_token",
|
|
pattern=r'xox[baprs]-[0-9]{10,13}-[0-9]{10,13}-[a-zA-Z0-9]{24,34}',
|
|
replacement="xox-SANITIZED",
|
|
description="Slack tokens"
|
|
),
|
|
PIIPattern.create(
|
|
name="stripe_key",
|
|
pattern=r'(?:sk|pk)_(?:test|live)_[0-9a-zA-Z]{24,99}',
|
|
replacement="sk_SANITIZED",
|
|
description="Stripe API keys"
|
|
),
|
|
]
|
|
|
|
self.patterns.extend(default_patterns)
|
|
|
|
def add_pattern(self, pattern: PIIPattern):
|
|
"""Add a custom PII pattern."""
|
|
self.patterns.append(pattern)
|
|
logger.info(f"Added PII pattern: {pattern.name}")
|
|
|
|
def sanitize_string(self, text: str) -> str:
|
|
"""Apply all patterns to sanitize a string."""
|
|
if not self.sanitize_enabled or not isinstance(text, str):
|
|
return text
|
|
|
|
sanitized = text
|
|
for pattern in self.patterns:
|
|
if pattern.pattern.search(sanitized):
|
|
sanitized = pattern.pattern.sub(pattern.replacement, sanitized)
|
|
logger.debug(f"Applied {pattern.name} sanitization")
|
|
|
|
return sanitized
|
|
|
|
def sanitize_headers(self, headers: Dict[str, str]) -> Dict[str, str]:
|
|
"""Special handling for HTTP headers."""
|
|
if not self.sanitize_enabled:
|
|
return headers
|
|
|
|
sanitized_headers = {}
|
|
sensitive_headers = {
|
|
'authorization', 'api-key', 'x-api-key', 'cookie',
|
|
'set-cookie', 'x-auth-token', 'x-access-token'
|
|
}
|
|
|
|
for key, value in headers.items():
|
|
lower_key = key.lower()
|
|
|
|
if lower_key in sensitive_headers:
|
|
# Special handling for authorization headers
|
|
if lower_key == 'authorization':
|
|
if value.startswith('Bearer '):
|
|
sanitized_headers[key] = 'Bearer SANITIZED'
|
|
elif value.startswith('Basic '):
|
|
sanitized_headers[key] = 'Basic SANITIZED'
|
|
else:
|
|
sanitized_headers[key] = 'SANITIZED'
|
|
else:
|
|
# For other sensitive headers, sanitize the value
|
|
sanitized_headers[key] = self.sanitize_string(value)
|
|
else:
|
|
# For non-sensitive headers, still check for PII patterns
|
|
sanitized_headers[key] = self.sanitize_string(value)
|
|
|
|
return sanitized_headers
|
|
|
|
def sanitize_value(self, value: Any) -> Any:
|
|
"""Recursively sanitize any value (string, dict, list, etc)."""
|
|
if not self.sanitize_enabled:
|
|
return value
|
|
|
|
if isinstance(value, str):
|
|
# Check if it might be base64 encoded
|
|
if self._is_base64(value) and len(value) > 20:
|
|
try:
|
|
decoded = base64.b64decode(value).decode('utf-8')
|
|
if self._contains_pii(decoded):
|
|
sanitized = self.sanitize_string(decoded)
|
|
return base64.b64encode(sanitized.encode()).decode()
|
|
except:
|
|
pass # Not valid base64 or not UTF-8
|
|
|
|
return self.sanitize_string(value)
|
|
|
|
elif isinstance(value, dict):
|
|
return {k: self.sanitize_value(v) for k, v in value.items()}
|
|
|
|
elif isinstance(value, list):
|
|
return [self.sanitize_value(item) for item in value]
|
|
|
|
elif isinstance(value, tuple):
|
|
return tuple(self.sanitize_value(item) for item in value)
|
|
|
|
else:
|
|
# For other types (int, float, bool, None), return as-is
|
|
return value
|
|
|
|
def sanitize_url(self, url: str) -> str:
|
|
"""Sanitize sensitive data from URLs (query params, etc)."""
|
|
if not self.sanitize_enabled:
|
|
return url
|
|
|
|
# First apply general string sanitization
|
|
url = self.sanitize_string(url)
|
|
|
|
# Parse and sanitize query parameters
|
|
if '?' in url:
|
|
base, query = url.split('?', 1)
|
|
params = []
|
|
|
|
for param in query.split('&'):
|
|
if '=' in param:
|
|
key, value = param.split('=', 1)
|
|
# Sanitize common sensitive parameter names
|
|
sensitive_params = {'key', 'token', 'api_key', 'secret', 'password'}
|
|
if key.lower() in sensitive_params:
|
|
params.append(f"{key}=SANITIZED")
|
|
else:
|
|
# Still sanitize the value for PII
|
|
params.append(f"{key}={self.sanitize_string(value)}")
|
|
else:
|
|
params.append(param)
|
|
|
|
return f"{base}?{'&'.join(params)}"
|
|
|
|
return url
|
|
|
|
def _is_base64(self, s: str) -> bool:
|
|
"""Check if a string might be base64 encoded."""
|
|
try:
|
|
if len(s) % 4 != 0:
|
|
return False
|
|
return re.match(r'^[A-Za-z0-9+/]*={0,2}$', s) is not None
|
|
except:
|
|
return False
|
|
|
|
def _contains_pii(self, text: str) -> bool:
|
|
"""Quick check if text contains any PII patterns."""
|
|
for pattern in self.patterns:
|
|
if pattern.pattern.search(text):
|
|
return True
|
|
return False
|
|
|
|
def sanitize_request(self, request_data: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""Sanitize a complete request dictionary."""
|
|
sanitized = deepcopy(request_data)
|
|
|
|
# Sanitize headers
|
|
if 'headers' in sanitized:
|
|
sanitized['headers'] = self.sanitize_headers(sanitized['headers'])
|
|
|
|
# Sanitize URL
|
|
if 'url' in sanitized:
|
|
sanitized['url'] = self.sanitize_url(sanitized['url'])
|
|
|
|
# Sanitize content
|
|
if 'content' in sanitized:
|
|
sanitized['content'] = self.sanitize_value(sanitized['content'])
|
|
|
|
return sanitized
|
|
|
|
def sanitize_response(self, response_data: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""Sanitize a complete response dictionary."""
|
|
sanitized = deepcopy(response_data)
|
|
|
|
# Sanitize headers
|
|
if 'headers' in sanitized:
|
|
sanitized['headers'] = self.sanitize_headers(sanitized['headers'])
|
|
|
|
# Sanitize content
|
|
if 'content' in sanitized:
|
|
# Handle base64 encoded content specially
|
|
if isinstance(sanitized['content'], dict) and sanitized['content'].get('encoding') == 'base64':
|
|
# Don't decode/re-encode the actual response body
|
|
# but sanitize any metadata
|
|
if 'data' in sanitized['content']:
|
|
# Keep the data as-is but sanitize other fields
|
|
for key, value in sanitized['content'].items():
|
|
if key != 'data':
|
|
sanitized['content'][key] = self.sanitize_value(value)
|
|
else:
|
|
sanitized['content'] = self.sanitize_value(sanitized['content'])
|
|
|
|
return sanitized
|
|
|
|
|
|
# Global instance for convenience
|
|
default_sanitizer = PIISanitizer() |