- Fix test isolation by clearing LOCALE env var in o3-pro test - Add restriction service cleanup in test_model_restrictions.py - Fix PII sanitizer phone regex to not match timestamps - Convert all print statements to logging in test files per PR review - Re-record o3-pro cassette with correct environment 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com>
291 lines
11 KiB
Python
291 lines
11 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
PII (Personally Identifiable Information) Sanitizer for HTTP recordings.
|
|
|
|
This module provides comprehensive sanitization of sensitive data in HTTP
|
|
request/response recordings to prevent accidental exposure of API keys,
|
|
tokens, personal information, and other sensitive data.
|
|
"""
|
|
|
|
import logging
|
|
import re
|
|
from copy import deepcopy
|
|
from dataclasses import dataclass
|
|
from re import Pattern
|
|
from typing import Any, Optional
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
@dataclass
|
|
class PIIPattern:
|
|
"""Defines a pattern for detecting and sanitizing PII."""
|
|
|
|
name: str
|
|
pattern: Pattern[str]
|
|
replacement: str
|
|
description: str
|
|
|
|
@classmethod
|
|
def create(cls, name: str, pattern: str, replacement: str, description: str) -> "PIIPattern":
|
|
"""Create a PIIPattern with compiled regex."""
|
|
return cls(name=name, pattern=re.compile(pattern), replacement=replacement, description=description)
|
|
|
|
|
|
class PIISanitizer:
|
|
"""Sanitizes PII from various data structures while preserving format."""
|
|
|
|
def __init__(self, patterns: Optional[list[PIIPattern]] = None):
|
|
"""Initialize with optional custom patterns."""
|
|
self.patterns: list[PIIPattern] = patterns or []
|
|
self.sanitize_enabled = True
|
|
|
|
# Add default patterns if none provided
|
|
if not patterns:
|
|
self._add_default_patterns()
|
|
|
|
def _add_default_patterns(self):
|
|
"""Add comprehensive default PII patterns."""
|
|
default_patterns = [
|
|
# API Keys - Core patterns (Bearer tokens handled in sanitize_headers)
|
|
PIIPattern.create(
|
|
name="openai_api_key_proj",
|
|
pattern=r"sk-proj-[A-Za-z0-9\-_]{48,}",
|
|
replacement="sk-proj-SANITIZED",
|
|
description="OpenAI project API keys",
|
|
),
|
|
PIIPattern.create(
|
|
name="openai_api_key",
|
|
pattern=r"sk-[A-Za-z0-9]{48,}",
|
|
replacement="sk-SANITIZED",
|
|
description="OpenAI API keys",
|
|
),
|
|
PIIPattern.create(
|
|
name="anthropic_api_key",
|
|
pattern=r"sk-ant-[A-Za-z0-9\-_]{48,}",
|
|
replacement="sk-ant-SANITIZED",
|
|
description="Anthropic API keys",
|
|
),
|
|
PIIPattern.create(
|
|
name="google_api_key",
|
|
pattern=r"AIza[A-Za-z0-9\-_]{35,}",
|
|
replacement="AIza-SANITIZED",
|
|
description="Google API keys",
|
|
),
|
|
PIIPattern.create(
|
|
name="github_tokens",
|
|
pattern=r"gh[psr]_[A-Za-z0-9]{36}",
|
|
replacement="gh_SANITIZED",
|
|
description="GitHub tokens (all types)",
|
|
),
|
|
# JWT tokens
|
|
PIIPattern.create(
|
|
name="jwt_token",
|
|
pattern=r"eyJ[A-Za-z0-9\-_]+\.eyJ[A-Za-z0-9\-_]+\.[A-Za-z0-9\-_]+",
|
|
replacement="eyJ-SANITIZED",
|
|
description="JSON Web Tokens",
|
|
),
|
|
# Personal Information
|
|
PIIPattern.create(
|
|
name="email_address",
|
|
pattern=r"[a-zA-Z0-9._%+\-]+@[a-zA-Z0-9.\-]+\.[a-zA-Z]{2,}",
|
|
replacement="user@example.com",
|
|
description="Email addresses",
|
|
),
|
|
PIIPattern.create(
|
|
name="ipv4_address",
|
|
pattern=r"\b(?:(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.){3}(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\b",
|
|
replacement="0.0.0.0",
|
|
description="IPv4 addresses",
|
|
),
|
|
PIIPattern.create(
|
|
name="ssn",
|
|
pattern=r"\b\d{3}-\d{2}-\d{4}\b",
|
|
replacement="XXX-XX-XXXX",
|
|
description="Social Security Numbers",
|
|
),
|
|
PIIPattern.create(
|
|
name="credit_card",
|
|
pattern=r"\b\d{4}[\s\-]?\d{4}[\s\-]?\d{4}[\s\-]?\d{4}\b",
|
|
replacement="XXXX-XXXX-XXXX-XXXX",
|
|
description="Credit card numbers",
|
|
),
|
|
PIIPattern.create(
|
|
name="phone_number",
|
|
pattern=r"(?:\+\d{1,3}[\s\-]?)?\(?\d{3}\)?[\s\-]?\d{3}[\s\-]?\d{4}\b(?![\d\.\,\]\}])",
|
|
replacement="(XXX) XXX-XXXX",
|
|
description="Phone numbers (all formats)",
|
|
),
|
|
# AWS
|
|
PIIPattern.create(
|
|
name="aws_access_key",
|
|
pattern=r"AKIA[0-9A-Z]{16}",
|
|
replacement="AKIA-SANITIZED",
|
|
description="AWS access keys",
|
|
),
|
|
# Other common patterns
|
|
PIIPattern.create(
|
|
name="slack_token",
|
|
pattern=r"xox[baprs]-[0-9]{10,13}-[0-9]{10,13}-[a-zA-Z0-9]{24,34}",
|
|
replacement="xox-SANITIZED",
|
|
description="Slack tokens",
|
|
),
|
|
PIIPattern.create(
|
|
name="stripe_key",
|
|
pattern=r"(?:sk|pk)_(?:test|live)_[0-9a-zA-Z]{24,99}",
|
|
replacement="sk_SANITIZED",
|
|
description="Stripe API keys",
|
|
),
|
|
]
|
|
|
|
self.patterns.extend(default_patterns)
|
|
|
|
def add_pattern(self, pattern: PIIPattern):
|
|
"""Add a custom PII pattern."""
|
|
self.patterns.append(pattern)
|
|
logger.info(f"Added PII pattern: {pattern.name}")
|
|
|
|
def sanitize_string(self, text: str) -> str:
|
|
"""Apply all patterns to sanitize a string."""
|
|
if not self.sanitize_enabled or not isinstance(text, str):
|
|
return text
|
|
|
|
sanitized = text
|
|
for pattern in self.patterns:
|
|
if pattern.pattern.search(sanitized):
|
|
sanitized = pattern.pattern.sub(pattern.replacement, sanitized)
|
|
logger.debug(f"Applied {pattern.name} sanitization")
|
|
|
|
return sanitized
|
|
|
|
def sanitize_headers(self, headers: dict[str, str]) -> dict[str, str]:
|
|
"""Special handling for HTTP headers."""
|
|
if not self.sanitize_enabled:
|
|
return headers
|
|
|
|
sanitized_headers = {}
|
|
|
|
for key, value in headers.items():
|
|
# Special case for Authorization headers to preserve auth type
|
|
if key.lower() == "authorization" and " " in value:
|
|
auth_type = value.split(" ", 1)[0]
|
|
if auth_type in ("Bearer", "Basic"):
|
|
sanitized_headers[key] = f"{auth_type} SANITIZED"
|
|
else:
|
|
sanitized_headers[key] = self.sanitize_string(value)
|
|
else:
|
|
# Apply standard sanitization to all other headers
|
|
sanitized_headers[key] = self.sanitize_string(value)
|
|
|
|
return sanitized_headers
|
|
|
|
def sanitize_value(self, value: Any) -> Any:
|
|
"""Recursively sanitize any value (string, dict, list, etc)."""
|
|
if not self.sanitize_enabled:
|
|
return value
|
|
|
|
if isinstance(value, str):
|
|
return self.sanitize_string(value)
|
|
elif isinstance(value, dict):
|
|
return {k: self.sanitize_value(v) for k, v in value.items()}
|
|
elif isinstance(value, list):
|
|
return [self.sanitize_value(item) for item in value]
|
|
elif isinstance(value, tuple):
|
|
return tuple(self.sanitize_value(item) for item in value)
|
|
else:
|
|
# For other types (int, float, bool, None), return as-is
|
|
return value
|
|
|
|
def sanitize_url(self, url: str) -> str:
|
|
"""Sanitize sensitive data from URLs (query params, etc)."""
|
|
if not self.sanitize_enabled:
|
|
return url
|
|
|
|
# First apply general string sanitization
|
|
url = self.sanitize_string(url)
|
|
|
|
# Parse and sanitize query parameters
|
|
if "?" in url:
|
|
base, query = url.split("?", 1)
|
|
params = []
|
|
|
|
for param in query.split("&"):
|
|
if "=" in param:
|
|
key, value = param.split("=", 1)
|
|
# Sanitize common sensitive parameter names
|
|
sensitive_params = {"key", "token", "api_key", "secret", "password"}
|
|
if key.lower() in sensitive_params:
|
|
params.append(f"{key}=SANITIZED")
|
|
else:
|
|
# Still sanitize the value for PII
|
|
params.append(f"{key}={self.sanitize_string(value)}")
|
|
else:
|
|
params.append(param)
|
|
|
|
return f"{base}?{'&'.join(params)}"
|
|
|
|
return url
|
|
|
|
def sanitize_request(self, request_data: dict[str, Any]) -> dict[str, Any]:
|
|
"""Sanitize a complete request dictionary."""
|
|
sanitized = deepcopy(request_data)
|
|
|
|
# Sanitize headers
|
|
if "headers" in sanitized:
|
|
sanitized["headers"] = self.sanitize_headers(sanitized["headers"])
|
|
|
|
# Sanitize URL
|
|
if "url" in sanitized:
|
|
sanitized["url"] = self.sanitize_url(sanitized["url"])
|
|
|
|
# Sanitize content
|
|
if "content" in sanitized:
|
|
sanitized["content"] = self.sanitize_value(sanitized["content"])
|
|
|
|
return sanitized
|
|
|
|
def sanitize_response(self, response_data: dict[str, Any]) -> dict[str, Any]:
|
|
"""Sanitize a complete response dictionary."""
|
|
sanitized = deepcopy(response_data)
|
|
|
|
# Sanitize headers
|
|
if "headers" in sanitized:
|
|
sanitized["headers"] = self.sanitize_headers(sanitized["headers"])
|
|
|
|
# Sanitize content
|
|
if "content" in sanitized:
|
|
# Handle base64 encoded content specially
|
|
if isinstance(sanitized["content"], dict) and sanitized["content"].get("encoding") == "base64":
|
|
if "data" in sanitized["content"]:
|
|
import base64
|
|
|
|
try:
|
|
# Decode, sanitize, and re-encode the actual response body
|
|
decoded_bytes = base64.b64decode(sanitized["content"]["data"])
|
|
# Attempt to decode as UTF-8 for sanitization. If it fails, it's likely binary.
|
|
try:
|
|
decoded_str = decoded_bytes.decode("utf-8")
|
|
sanitized_str = self.sanitize_string(decoded_str)
|
|
sanitized["content"]["data"] = base64.b64encode(sanitized_str.encode("utf-8")).decode(
|
|
"utf-8"
|
|
)
|
|
except UnicodeDecodeError:
|
|
# Content is not text, leave as is.
|
|
pass
|
|
except (base64.binascii.Error, TypeError):
|
|
# Handle cases where data is not valid base64
|
|
pass
|
|
|
|
# Sanitize other metadata fields
|
|
for key, value in sanitized["content"].items():
|
|
if key != "data":
|
|
sanitized["content"][key] = self.sanitize_value(value)
|
|
else:
|
|
sanitized["content"] = self.sanitize_value(sanitized["content"])
|
|
|
|
return sanitized
|
|
|
|
|
|
# Global instance for convenience
|
|
default_sanitizer = PIISanitizer()
|