You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

265 lines
6.9 KiB

# API Client Patterns
## Base API Client Pattern
Using requests.Session for connection pooling:
```python
# api_client.py
import requests
from typing import Dict, List, Optional
from config import config
class TweedeKamerAPI:
def __init__(self):
self.odata_base_url = "https://gegevensmagazijn.tweedekamer.nl/OData/v4/2.0"
self.session = requests.Session()
self.session.headers.update({
"Accept": "application/json",
"User-Agent": "Dutch-Political-Compass-Tool/1.0",
})
def get_motions(
self,
start_date: datetime = None,
end_date: datetime = None,
limit: int = 500,
) -> List[Dict]:
"""Get motions with voting results using OData API."""
if not start_date:
start_date = datetime.now() - timedelta(days=730)
try:
voting_records, besluit_meta = self._get_voting_records(
start_date, end_date, limit
)
return self._process_voting_records(voting_records, besluit_meta)
except Exception as e:
print(f"Error fetching motions from API: {e}")
return []
```
## OData Pagination Pattern
Handle server-side pagination with $skip:
```python
def _get_voting_records(
self,
start_date: datetime,
end_date: datetime = None,
limit: int = 50000
) -> tuple:
"""Fetch with automatic pagination."""
filter_query = (
f"GewijzigdOp ge {start_date.strftime('%Y-%m-%d')}T00:00:00Z"
" and StemmingsSoort ne null"
" and Verwijderd eq false"
)
page_size = 250 # API caps $top at 250
base_url = f"{self.odata_base_url}/Besluit"
base_params = {
"$filter": filter_query,
"$top": page_size,
"$expand": "Stemming",
"$orderby": "GewijzigdOp desc",
}
all_records = []
skip = 0
while len(all_records) < limit:
params = {**base_params, "$skip": skip}
response = self.session.get(
base_url,
params=params,
timeout=config.API_TIMEOUT
)
response.raise_for_status()
data = response.json()
besluit_page = data.get("value", [])
if not besluit_page:
break
# Process page
for besluit in besluit_page:
all_records.extend(self._extract_votes(besluit))
skip += page_size
return all_records
```
## Retry with Backoff Pattern
For transient failures:
```python
# ai_provider.py
import time
import random
from requests.exceptions import ConnectionError
def _post_with_retries(
path: str,
json: dict,
retries: int = 3
) -> requests.Response:
"""POST with exponential backoff retry."""
backoff = 0.5
for attempt in range(1, retries + 1):
try:
resp = requests.post(url, json=json, headers=headers, timeout=10)
# Handle rate limiting
if resp.status_code == 429:
if attempt == retries:
raise ProviderError("Rate limited")
retry_after = resp.headers.get("Retry-After")
if retry_after:
time.sleep(int(retry_after))
else:
sleep = backoff * (2 ** (attempt - 1))
sleep += random.uniform(0, sleep * 0.1)
time.sleep(sleep)
continue
# Handle server errors
if 500 <= resp.status_code < 600:
if attempt == retries:
raise ProviderError(f"Server error: {resp.status_code}")
time.sleep(backoff * (2 ** (attempt - 1)))
continue
return resp
except ConnectionError as exc:
if attempt == retries:
raise ProviderError(f"Connection error: {exc}")
time.sleep(backoff * (2 ** (attempt - 1)))
raise ProviderError("Failed after retries")
```
## Batch Processing Pattern
Process items in batches to manage API limits:
```python
def get_embeddings_with_retry(
texts: List[str],
batch_size: int = 50,
retries: int = 3,
) -> List[Optional[List[float]]]:
"""Process embeddings in batches with fallback to single items."""
results = [None] * len(texts)
i = 0
while i < len(texts):
end = min(len(texts), i + batch_size)
chunk = texts[i:end]
# Try batch first
try:
emb_chunk = get_embeddings_batch(chunk)
for j, emb in enumerate(emb_chunk):
results[i + j] = emb
i = end
continue
except Exception:
pass
# Fallback: single items
for j, text in enumerate(chunk):
try:
results[i + j] = get_embedding(text)
except Exception:
results[i + j] = None
i = end
return results
```
## Response Validation Pattern
Validate API responses before processing:
```python
def _process_response(self, response: requests.Response) -> Dict:
"""Validate and parse API response."""
response.raise_for_status()
data = response.json()
if "value" not in data:
raise ValueError("Unexpected response format: missing 'value' key")
return data
def _validate_besluit(self, besluit: Dict) -> bool:
"""Check required fields exist."""
required = ["Id", "GewijzigdOp"]
return all(field in besluit for field in required)
```
## Error Handling Patterns
Always provide safe fallbacks:
```python
def safe_api_call(self, endpoint: str, params: Dict = None) -> List[Dict]:
"""Call API with error handling and fallback."""
try:
response = self.session.get(
endpoint,
params=params,
timeout=config.API_TIMEOUT
)
response.raise_for_status()
data = response.json()
return data.get("value", [])
except requests.Timeout:
_logger.warning(f"API timeout for {endpoint}")
return []
except requests.HTTPError as e:
_logger.error(f"HTTP error: {e}")
return []
except Exception as e:
_logger.error(f"API call failed: {e}")
return []
```
## Session Management
Reuse session for connection pooling:
```python
class TweedeKamerAPI:
def __init__(self):
self.session = requests.Session()
self.session.headers.update({
"Accept": "application/json",
"User-Agent": "Dutch-Political-Compass-Tool/1.0",
})
def close(self):
"""Clean up session when done."""
self.session.close()
def __enter__(self):
return self
def __exit__(self, *args):
self.close()
# Usage
with TweedeKamerAPI() as api:
motions = api.get_motions(start_date)
```