Introduction
Economic data APIs provide programmatic access to indicators, financial metrics, and statistical information. This guide presents strategies for integrating with common economic data sources while addressing common challenges.
Core API Integration Architecture
A robust economic data integration architecture requires these key components:
- Authentication Layer: Securely manage API keys and OAuth flows
- Request Handler: Construct API calls with error handling and retries
- Response Parser: Process and standardize returned data
- Caching Layer: Minimize redundant requests
- Rate Limiting: Respect API usage constraints
Here’s a foundational implementation:
import requests
import pandas as pd
import time
import logging
import hashlib
import json
from datetime import datetime, timedelta
from functools import lru_cache
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
class EconomicDataClient:
"""Base client for accessing economic data APIs"""
def __init__(self, base_url, auth_config=None, cache_ttl=3600, max_retries=3):
self.base_url = base_url
self.auth_config = auth_config or {}
self.cache_ttl = cache_ttl
self.session = self._create_resilient_session(max_retries)
self.rate_limiter = RateLimiter()
self._setup_auth()
self.logger = logging.getLogger(__name__)
def _create_resilient_session(self, max_retries):
"""Create session with retry logic"""
session = requests.Session()
retry_strategy = Retry(
total=max_retries,
backoff_factor=0.5,
status_forcelist=[429, 500, 502, 503, 504],
allowed_methods=["GET", "POST"]
)
adapter = HTTPAdapter(max_retries=retry_strategy)
session.mount("http://", adapter)
session.mount("https://", adapter)
return session
def _setup_auth(self):
"""Configure authentication based on auth_config"""
auth_type = self.auth_config.get('type', 'none')
if auth_type == 'api_key':
self.session.headers.update({
self.auth_config['header_name']: self.auth_config['api_key']
})
elif auth_type == 'oauth':
self._refresh_oauth_token()
elif auth_type == 'basic':
self.session.auth = (
self.auth_config['username'],
self.auth_config['password']
)
def _refresh_oauth_token(self):
"""Get and set OAuth token"""
try:
response = requests.post(
self.auth_config['token_url'],
data={
'grant_type': 'client_credentials',
'client_id': self.auth_config['client_id'],
'client_secret': self.auth_config['client_secret'],
'scope': self.auth_config.get('scope', '')
}
)
response.raise_for_status()
token_data = response.json()
self.auth_config['token'] = token_data['access_token']
self.auth_config['token_expiry'] = datetime.now() + timedelta(
seconds=token_data['expires_in'] - 300 # 5-minute buffer
)
self.session.headers.update({
'Authorization': f"Bearer {token_data['access_token']}"
})
except Exception as e:
self.logger.error(f"Token refresh failed: {e}")
raise
@lru_cache(maxsize=128)
def _cached_request(self, cache_key, endpoint, params=None, method="GET"):
"""Execute cached request"""
url = f"{self.base_url}/{endpoint}"
if method.upper() == "GET":
response = self.session.get(url, params=params)
elif method.upper() == "POST":
response = self.session.post(url, json=params)
else:
raise ValueError(f"Unsupported HTTP method: {method}")
response.raise_for_status()
return response.json()
def request(self, endpoint, params=None, method="GET", use_cache=True):
"""Execute API request with caching and rate limiting"""
# Check token expiry for OAuth
if self.auth_config.get('type') == 'oauth' and \
(not self.auth_config.get('token_expiry') or
datetime.now() >= self.auth_config['token_expiry']):
self._refresh_oauth_token()
# Respect rate limits
self.rate_limiter.wait()
try:
if use_cache:
# Create cache key and use cached request
cache_key = hashlib.md5(
f"{endpoint}:{json.dumps(params, sort_keys=True) if params else ''}".encode()
).hexdigest()
return self._cached_request(cache_key, endpoint, params, method)
else:
# Execute uncached request
url = f"{self.base_url}/{endpoint}"
if method.upper() == "GET":
response = self.session.get(url, params=params)
elif method.upper() == "POST":
response = self.session.post(url, json=params)
else:
raise ValueError(f"Unsupported HTTP method: {method}")
response.raise_for_status()
return response.json()
except requests.exceptions.HTTPError as e:
self.logger.error(f"HTTP error: {e}")
# Update rate limiter on rate limit error
if e.response.status_code == 429:
self.rate_limiter.update_from_headers(e.response.headers)
raise
except Exception as e:
self.logger.error(f"Request failed: {e}")
raise
def clear_cache(self):
"""Clear request cache"""
self._cached_request.cache_clear()
class RateLimiter:
"""Rate limiting for API requests"""
def __init__(self, requests_per_minute=60):
self.requests_per_minute = requests_per_minute
self.last_request_time = None
self.min_interval = 60.0 / requests_per_minute
def wait(self):
"""Wait if necessary to respect rate limits"""
if self.last_request_time is None:
self.last_request_time = time.time()
return
elapsed = time.time() - self.last_request_time
wait_time = max(0, self.min_interval - elapsed)
if wait_time > 0:
time.sleep(wait_time)
self.last_request_time = time.time()
def update_from_headers(self, headers):
"""Adjust rate limits based on API response headers"""
if 'X-RateLimit-Limit' in headers and 'X-RateLimit-Remaining' in headers:
limit = int(headers['X-RateLimit-Limit'])
remaining = int(headers['X-RateLimit-Remaining'])
if remaining <= 1:
# Slow down when approaching limits
self.requests_per_minute = max(1, self.requests_per_minute // 2)
elif remaining > limit * 0.5 and self.requests_per_minute < 60:
# Speed up when plenty of capacity
self.requests_per_minute = min(60, self.requests_per_minute * 1.5)
self.min_interval = 60.0 / self.requests_per_minute
FRED API Integration
The Federal Reserve Economic Data (FRED) API provides access to over 800,000 US and international economic data series:
class FredClient:
"""Client for FRED economic data API"""
def __init__(self, api_key):
self.client = EconomicDataClient(
base_url="https://api.stlouisfed.org/fred",
auth_config={
'type': 'api_key',
'header_name': 'api_key',
'api_key': api_key
},
cache_ttl=86400, # 24 hours
max_retries=5
)
def get_series(self, series_id, start_date=None, end_date=None, frequency=None,
aggregation_method=None, units=None, use_cache=True):
"""Get time series data for specific series ID"""
params = {
'series_id': series_id,
'file_type': 'json',
'api_key': self.client.auth_config['api_key']
}
# Add optional parameters
for param, value in {
'observation_start': start_date,
'observation_end': end_date,
'frequency': frequency,
'aggregation_method': aggregation_method,
'units': units
}.items():
if value is not None:
params[param] = value
# Execute request and parse response
response_data = self.client.request(
endpoint="series/observations",
params=params,
use_cache=use_cache
)
observations = response_data.get('observations', [])
if not observations:
return pd.DataFrame()
df = pd.DataFrame(observations)
df['date'] = pd.to_datetime(df['date'])
df['value'] = pd.to_numeric(df['value'], errors='coerce')
return df.set_index('date')['value'].to_frame(name=series_id)
def get_multiple_series(self, series_ids, start_date=None, end_date=None,
frequency=None, use_cache=True):
"""Get multiple series and combine into one DataFrame"""
dfs = []
for series_id in series_ids:
df = self.get_series(
series_id,
start_date=start_date,
end_date=end_date,
frequency=frequency,
use_cache=use_cache
)
if not df.empty:
dfs.append(df)
return pd.concat(dfs, axis=1) if dfs else pd.DataFrame()
World Bank API Integration
The World Bank API provides economic indicators for countries worldwide:
class WorldBankClient:
"""Client for World Bank economic data API"""
def __init__(self):
self.client = EconomicDataClient(
base_url="https://api.worldbank.org/v2",
cache_ttl=86400, # 24 hours
max_retries=3
)
def get_indicator(self, indicator_code, country_codes=None, start_year=None,
end_year=None, use_cache=True):
"""Get indicator data for specified countries and years"""
# Set up parameters
countries_param = ";".join(country_codes) if country_codes else "all"
params = {
'format': 'json',
'per_page': 1000,
}
if start_year and end_year:
params['date'] = f"{start_year}:{end_year}"
# Paginated data collection
all_results = []
page = 1
total_pages = None
while total_pages is None or page <= total_pages:
params['page'] = page
endpoint = f"countries/{countries_param}/indicators/{indicator_code}"
response_data = self.client.request(endpoint=endpoint, params=params, use_cache=use_cache)
# World Bank API returns metadata as first element
if not response_data or len(response_data) < 2:
break
metadata, data = response_data[0], response_data[1]
if total_pages is None:
total_pages = metadata.get('pages', 1)
if not data:
break
all_results.extend(data)
page += 1
if not all_results:
return pd.DataFrame()
# Process response into DataFrame
df = pd.DataFrame(all_results)
df = df.rename(columns={
'countryiso3code': 'country_code',
'date': 'year',
'value': 'value'
})
# Extract nested values
df['country'] = df['country'].apply(lambda x: x['value'] if isinstance(x, dict) else None)
if 'indicator' in df.columns and isinstance(df['indicator'].iloc[0], dict):
df['indicator_name'] = df['indicator'].apply(lambda x: x.get('value'))
df['indicator_id'] = df['indicator'].apply(lambda x: x.get('id'))
df = df.drop('indicator', axis=1)
# Convert types
df['year'] = pd.to_numeric(df['year'], errors='coerce').astype('Int64')
df['value'] = pd.to_numeric(df['value'], errors='coerce')
# Select key columns
columns_to_keep = [col for col in ['country', 'country_code', 'year', 'value',
'indicator_name', 'indicator_id']
if col in df.columns]
return df[columns_to_keep]
Error Handling with Circuit Breaker
The circuit breaker pattern prevents cascading failures when APIs become unavailable:
class CircuitBreaker:
"""Circuit breaker pattern for API calls"""
def __init__(self, max_failures=3, reset_timeout=300):
self.max_failures = max_failures
self.reset_timeout = reset_timeout
self.failures = 0
self.state = "CLOSED" # CLOSED, OPEN, HALF-OPEN
self.last_failure_time = None
self.logger = logging.getLogger(__name__)
def execute(self, func, *args, **kwargs):
"""Execute function with circuit breaker protection"""
if self.state == "OPEN":
# Check if timeout has elapsed
if self.last_failure_time and time.time() - self.last_failure_time > self.reset_timeout:
self.logger.info("Circuit half-open, attempting reset")
self.state = "HALF-OPEN"
else:
raise Exception("Circuit breaker is open")
try:
result = func(*args, **kwargs)
# Success, close circuit if half-open
if self.state == "HALF-OPEN":
self.logger.info("Circuit closed after successful call")
self.state = "CLOSED"
self.failures = 0
return result
except Exception as e:
# Handle failure
self.failures += 1
self.last_failure_time = time.time()
if self.failures >= self.max_failures or self.state == "HALF-OPEN":
self.state = "OPEN"
self.logger.warning(f"Circuit opened after {self.failures} failures")
raise
Data Normalization
Economic data from different sources requires standardization:
def normalize_economic_data(dfs_dict, date_format='%Y-%m-%d'):
"""Normalize data from multiple sources into consistent format"""
normalized_dfs = []
for source, df in dfs_dict.items():
norm_df = df.copy()
norm_df['source'] = source
norm_df.columns = [col.lower() for col in norm_df.columns]
# Standardize date column
date_cols = [col for col in norm_df.columns if any(
term in col.lower() for term in ['date', 'time', 'period', 'year'])]
if date_cols:
date_col = date_cols[0]
try:
if not pd.api.types.is_datetime64_any_dtype(norm_df[date_col]):
norm_df['date'] = pd.to_datetime(norm_df[date_col], errors='coerce')
else:
norm_df['date'] = norm_df[date_col]
if date_col != 'date':
norm_df = norm_df.drop(columns=[date_col])
except:
norm_df['date'] = norm_df[date_col]
# Standardize value column
value_cols = [col for col in norm_df.columns if any(
term in col.lower() for term in ['value', 'obs_value', 'measure'])]
if value_cols:
value_col = value_cols[0]
if value_col != 'value':
norm_df = norm_df.rename(columns={value_col: 'value'})
if 'value' in norm_df.columns:
norm_df['value'] = pd.to_numeric(norm_df['value'], errors='coerce')
normalized_dfs.append(norm_df)
return pd.concat(normalized_dfs, ignore_index=True) if normalized_dfs else pd.DataFrame()
Integration Example
Here’s an example comparing unemployment rates from multiple sources:
def compare_unemployment_rates(countries, start_year=2020, end_year=2025):
"""Compare unemployment rates from different data sources"""
# Initialize clients
fred_client = FredClient(api_key="YOUR_FRED_API_KEY")
wb_client = WorldBankClient()
# Common country code mappings
country_series_map = {
'USA': 'UNRATE', # US unemployment rate
'JPN': 'JPNUR', # Japan unemployment rate
'DEU': 'DEUUR', # Germany unemployment rate
'GBR': 'GBRUR', # UK unemployment rate
'FRA': 'FRAUR' # France unemployment rate
}
# Fetch FRED data
fred_data = {}
for country_code, series_id in country_series_map.items():
if country_code in countries:
try:
df = fred_client.get_series(
series_id,
start_date=f"{start_year}-01-01",
end_date=f"{end_year}-12-31"
)
if not df.empty:
df['country_code'] = country_code
fred_data[country_code] = df
except Exception as e:
print(f"Error fetching FRED data for {country_code}: {e}")
fred_df = pd.concat(list(fred_data.values()), axis=0) if fred_data else pd.DataFrame()
if not fred_df.empty:
fred_df = fred_df.reset_index()
# Fetch World Bank unemployment data
try:
wb_df = wb_client.get_indicator(
'SL.UEM.TOTL.ZS', # Unemployment rate indicator
country_codes=countries,
start_year=start_year,
end_year=end_year
)
except Exception as e:
print(f"Error fetching World Bank data: {e}")
wb_df = pd.DataFrame()
# Normalize and combine data
normalized_df = normalize_economic_data({
'FRED': fred_df,
'World Bank': wb_df
})
if not normalized_df.empty:
# Create pivot table comparing sources
pivot_df = normalized_df.pivot_table(
index=['country', 'date'],
columns='source',
values='value'
).reset_index()
return pivot_df
else:
return pd.DataFrame()
This guide provides implementations for connecting to key economic data APIs and handling common challenges including authentication, rate limiting, and data normalization. These patterns can be extended to other economic data sources like IMF, OECD, or ECB.