# API Client Patterns ## Base API Client Pattern Using requests.Session for connection pooling: ```python # api_client.py import requests from typing import Dict, List, Optional from config import config class TweedeKamerAPI: def __init__(self): self.odata_base_url = "https://gegevensmagazijn.tweedekamer.nl/OData/v4/2.0" self.session = requests.Session() self.session.headers.update({ "Accept": "application/json", "User-Agent": "Dutch-Political-Compass-Tool/1.0", }) def get_motions( self, start_date: datetime = None, end_date: datetime = None, limit: int = 500, ) -> List[Dict]: """Get motions with voting results using OData API.""" if not start_date: start_date = datetime.now() - timedelta(days=730) try: voting_records, besluit_meta = self._get_voting_records( start_date, end_date, limit ) return self._process_voting_records(voting_records, besluit_meta) except Exception as e: print(f"Error fetching motions from API: {e}") return [] ``` ## OData Pagination Pattern Handle server-side pagination with $skip: ```python def _get_voting_records( self, start_date: datetime, end_date: datetime = None, limit: int = 50000 ) -> tuple: """Fetch with automatic pagination.""" filter_query = ( f"GewijzigdOp ge {start_date.strftime('%Y-%m-%d')}T00:00:00Z" " and StemmingsSoort ne null" " and Verwijderd eq false" ) page_size = 250 # API caps $top at 250 base_url = f"{self.odata_base_url}/Besluit" base_params = { "$filter": filter_query, "$top": page_size, "$expand": "Stemming", "$orderby": "GewijzigdOp desc", } all_records = [] skip = 0 while len(all_records) < limit: params = {**base_params, "$skip": skip} response = self.session.get( base_url, params=params, timeout=config.API_TIMEOUT ) response.raise_for_status() data = response.json() besluit_page = data.get("value", []) if not besluit_page: break # Process page for besluit in besluit_page: all_records.extend(self._extract_votes(besluit)) skip += page_size return all_records ``` ## Retry with Backoff Pattern For transient failures: ```python # ai_provider.py import time import random from requests.exceptions import ConnectionError def _post_with_retries( path: str, json: dict, retries: int = 3 ) -> requests.Response: """POST with exponential backoff retry.""" backoff = 0.5 for attempt in range(1, retries + 1): try: resp = requests.post(url, json=json, headers=headers, timeout=10) # Handle rate limiting if resp.status_code == 429: if attempt == retries: raise ProviderError("Rate limited") retry_after = resp.headers.get("Retry-After") if retry_after: time.sleep(int(retry_after)) else: sleep = backoff * (2 ** (attempt - 1)) sleep += random.uniform(0, sleep * 0.1) time.sleep(sleep) continue # Handle server errors if 500 <= resp.status_code < 600: if attempt == retries: raise ProviderError(f"Server error: {resp.status_code}") time.sleep(backoff * (2 ** (attempt - 1))) continue return resp except ConnectionError as exc: if attempt == retries: raise ProviderError(f"Connection error: {exc}") time.sleep(backoff * (2 ** (attempt - 1))) raise ProviderError("Failed after retries") ``` ## Batch Processing Pattern Process items in batches to manage API limits: ```python def get_embeddings_with_retry( texts: List[str], batch_size: int = 50, retries: int = 3, ) -> List[Optional[List[float]]]: """Process embeddings in batches with fallback to single items.""" results = [None] * len(texts) i = 0 while i < len(texts): end = min(len(texts), i + batch_size) chunk = texts[i:end] # Try batch first try: emb_chunk = get_embeddings_batch(chunk) for j, emb in enumerate(emb_chunk): results[i + j] = emb i = end continue except Exception: pass # Fallback: single items for j, text in enumerate(chunk): try: results[i + j] = get_embedding(text) except Exception: results[i + j] = None i = end return results ``` ## Response Validation Pattern Validate API responses before processing: ```python def _process_response(self, response: requests.Response) -> Dict: """Validate and parse API response.""" response.raise_for_status() data = response.json() if "value" not in data: raise ValueError("Unexpected response format: missing 'value' key") return data def _validate_besluit(self, besluit: Dict) -> bool: """Check required fields exist.""" required = ["Id", "GewijzigdOp"] return all(field in besluit for field in required) ``` ## Error Handling Patterns Always provide safe fallbacks: ```python def safe_api_call(self, endpoint: str, params: Dict = None) -> List[Dict]: """Call API with error handling and fallback.""" try: response = self.session.get( endpoint, params=params, timeout=config.API_TIMEOUT ) response.raise_for_status() data = response.json() return data.get("value", []) except requests.Timeout: _logger.warning(f"API timeout for {endpoint}") return [] except requests.HTTPError as e: _logger.error(f"HTTP error: {e}") return [] except Exception as e: _logger.error(f"API call failed: {e}") return [] ``` ## Session Management Reuse session for connection pooling: ```python class TweedeKamerAPI: def __init__(self): self.session = requests.Session() self.session.headers.update({ "Accept": "application/json", "User-Agent": "Dutch-Political-Compass-Tool/1.0", }) def close(self): """Clean up session when done.""" self.session.close() def __enter__(self): return self def __exit__(self, *args): self.close() # Usage with TweedeKamerAPI() as api: motions = api.get_motions(start_date) ```