API Integration for Economic Data Sources

Introduction

Economic data APIs provide programmatic access to indicators, financial metrics, and statistical information. This guide presents strategies for integrating with common economic data sources while addressing common challenges.

Core API Integration Architecture

A robust economic data integration architecture requires these key components:

  1. Authentication Layer: Securely manage API keys and OAuth flows
  2. Request Handler: Construct API calls with error handling and retries
  3. Response Parser: Process and standardize returned data
  4. Caching Layer: Minimize redundant requests
  5. Rate Limiting: Respect API usage constraints

Here’s a foundational implementation:

import requests
import pandas as pd
import time
import logging
import hashlib
import json
from datetime import datetime, timedelta
from functools import lru_cache
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry

class EconomicDataClient:
    """Base client for accessing economic data APIs"""
    
    def __init__(self, base_url, auth_config=None, cache_ttl=3600, max_retries=3):
        self.base_url = base_url
        self.auth_config = auth_config or {}
        self.cache_ttl = cache_ttl
        self.session = self._create_resilient_session(max_retries)
        self.rate_limiter = RateLimiter()
        self._setup_auth()
        self.logger = logging.getLogger(__name__)
        
    def _create_resilient_session(self, max_retries):
        """Create session with retry logic"""
        session = requests.Session()
        retry_strategy = Retry(
            total=max_retries,
            backoff_factor=0.5,
            status_forcelist=[429, 500, 502, 503, 504],
            allowed_methods=["GET", "POST"]
        )
        adapter = HTTPAdapter(max_retries=retry_strategy)
        session.mount("http://", adapter)
        session.mount("https://", adapter)
        return session
    
    def _setup_auth(self):
        """Configure authentication based on auth_config"""
        auth_type = self.auth_config.get('type', 'none')
        
        if auth_type == 'api_key':
            self.session.headers.update({
                self.auth_config['header_name']: self.auth_config['api_key']
            })
        elif auth_type == 'oauth':
            self._refresh_oauth_token()
        elif auth_type == 'basic':
            self.session.auth = (
                self.auth_config['username'], 
                self.auth_config['password']
            )
    
    def _refresh_oauth_token(self):
        """Get and set OAuth token"""
        try:
            response = requests.post(
                self.auth_config['token_url'],
                data={
                    'grant_type': 'client_credentials',
                    'client_id': self.auth_config['client_id'],
                    'client_secret': self.auth_config['client_secret'],
                    'scope': self.auth_config.get('scope', '')
                }
            )
            response.raise_for_status()
            token_data = response.json()
            self.auth_config['token'] = token_data['access_token']
            self.auth_config['token_expiry'] = datetime.now() + timedelta(
                seconds=token_data['expires_in'] - 300  # 5-minute buffer
            )
            self.session.headers.update({
                'Authorization': f"Bearer {token_data['access_token']}"
            })
        except Exception as e:
            self.logger.error(f"Token refresh failed: {e}")
            raise
    
    @lru_cache(maxsize=128)
    def _cached_request(self, cache_key, endpoint, params=None, method="GET"):
        """Execute cached request"""
        url = f"{self.base_url}/{endpoint}"
        if method.upper() == "GET":
            response = self.session.get(url, params=params)
        elif method.upper() == "POST":
            response = self.session.post(url, json=params)
        else:
            raise ValueError(f"Unsupported HTTP method: {method}")
        response.raise_for_status()
        return response.json()
    
    def request(self, endpoint, params=None, method="GET", use_cache=True):
        """Execute API request with caching and rate limiting"""
        # Check token expiry for OAuth
        if self.auth_config.get('type') == 'oauth' and \
           (not self.auth_config.get('token_expiry') or 
            datetime.now() >= self.auth_config['token_expiry']):
            self._refresh_oauth_token()
        
        # Respect rate limits
        self.rate_limiter.wait()
        
        try:
            if use_cache:
                # Create cache key and use cached request
                cache_key = hashlib.md5(
                    f"{endpoint}:{json.dumps(params, sort_keys=True) if params else ''}".encode()
                ).hexdigest()
                return self._cached_request(cache_key, endpoint, params, method)
            else:
                # Execute uncached request
                url = f"{self.base_url}/{endpoint}"
                if method.upper() == "GET":
                    response = self.session.get(url, params=params)
                elif method.upper() == "POST":
                    response = self.session.post(url, json=params)
                else:
                    raise ValueError(f"Unsupported HTTP method: {method}")
                response.raise_for_status()
                return response.json()
                
        except requests.exceptions.HTTPError as e:
            self.logger.error(f"HTTP error: {e}")
            # Update rate limiter on rate limit error
            if e.response.status_code == 429:
                self.rate_limiter.update_from_headers(e.response.headers)
            raise
            
        except Exception as e:
            self.logger.error(f"Request failed: {e}")
            raise
            
    def clear_cache(self):
        """Clear request cache"""
        self._cached_request.cache_clear()

class RateLimiter:
    """Rate limiting for API requests"""
    
    def __init__(self, requests_per_minute=60):
        self.requests_per_minute = requests_per_minute
        self.last_request_time = None
        self.min_interval = 60.0 / requests_per_minute
    
    def wait(self):
        """Wait if necessary to respect rate limits"""
        if self.last_request_time is None:
            self.last_request_time = time.time()
            return
            
        elapsed = time.time() - self.last_request_time
        wait_time = max(0, self.min_interval - elapsed)
        if wait_time > 0:
            time.sleep(wait_time)
        self.last_request_time = time.time()
    
    def update_from_headers(self, headers):
        """Adjust rate limits based on API response headers"""
        if 'X-RateLimit-Limit' in headers and 'X-RateLimit-Remaining' in headers:
            limit = int(headers['X-RateLimit-Limit'])
            remaining = int(headers['X-RateLimit-Remaining'])
            
            if remaining <= 1:
                # Slow down when approaching limits
                self.requests_per_minute = max(1, self.requests_per_minute // 2)
            elif remaining > limit * 0.5 and self.requests_per_minute < 60:
                # Speed up when plenty of capacity
                self.requests_per_minute = min(60, self.requests_per_minute * 1.5)
                
            self.min_interval = 60.0 / self.requests_per_minute

FRED API Integration

The Federal Reserve Economic Data (FRED) API provides access to over 800,000 US and international economic data series:

class FredClient:
    """Client for FRED economic data API"""
    
    def __init__(self, api_key):
        self.client = EconomicDataClient(
            base_url="https://api.stlouisfed.org/fred",
            auth_config={
                'type': 'api_key',
                'header_name': 'api_key',
                'api_key': api_key
            },
            cache_ttl=86400,  # 24 hours
            max_retries=5
        )
    
    def get_series(self, series_id, start_date=None, end_date=None, frequency=None, 
                   aggregation_method=None, units=None, use_cache=True):
        """Get time series data for specific series ID"""
        params = {
            'series_id': series_id,
            'file_type': 'json',
            'api_key': self.client.auth_config['api_key']
        }
        
        # Add optional parameters
        for param, value in {
            'observation_start': start_date,
            'observation_end': end_date,
            'frequency': frequency,
            'aggregation_method': aggregation_method,
            'units': units
        }.items():
            if value is not None:
                params[param] = value
        
        # Execute request and parse response
        response_data = self.client.request(
            endpoint="series/observations", 
            params=params,
            use_cache=use_cache
        )
        
        observations = response_data.get('observations', [])
        if not observations:
            return pd.DataFrame()
        
        df = pd.DataFrame(observations)
        df['date'] = pd.to_datetime(df['date'])
        df['value'] = pd.to_numeric(df['value'], errors='coerce')
        
        return df.set_index('date')['value'].to_frame(name=series_id)
    
    def get_multiple_series(self, series_ids, start_date=None, end_date=None, 
                           frequency=None, use_cache=True):
        """Get multiple series and combine into one DataFrame"""
        dfs = []
        for series_id in series_ids:
            df = self.get_series(
                series_id, 
                start_date=start_date, 
                end_date=end_date, 
                frequency=frequency,
                use_cache=use_cache
            )
            if not df.empty:
                dfs.append(df)
        
        return pd.concat(dfs, axis=1) if dfs else pd.DataFrame()

World Bank API Integration

The World Bank API provides economic indicators for countries worldwide:

class WorldBankClient:
    """Client for World Bank economic data API"""
    
    def __init__(self):
        self.client = EconomicDataClient(
            base_url="https://api.worldbank.org/v2",
            cache_ttl=86400,  # 24 hours
            max_retries=3
        )
    
    def get_indicator(self, indicator_code, country_codes=None, start_year=None, 
                     end_year=None, use_cache=True):
        """Get indicator data for specified countries and years"""
        # Set up parameters
        countries_param = ";".join(country_codes) if country_codes else "all"
        params = {
            'format': 'json',
            'per_page': 1000,
        }
        if start_year and end_year:
            params['date'] = f"{start_year}:{end_year}"
            
        # Paginated data collection
        all_results = []
        page = 1
        total_pages = None
        
        while total_pages is None or page <= total_pages:
            params['page'] = page
            endpoint = f"countries/{countries_param}/indicators/{indicator_code}"
            response_data = self.client.request(endpoint=endpoint, params=params, use_cache=use_cache)
            
            # World Bank API returns metadata as first element
            if not response_data or len(response_data) < 2:
                break
                
            metadata, data = response_data[0], response_data[1]
            if total_pages is None:
                total_pages = metadata.get('pages', 1)
            
            if not data:
                break
                
            all_results.extend(data)
            page += 1
        
        if not all_results:
            return pd.DataFrame()
        
        # Process response into DataFrame
        df = pd.DataFrame(all_results)
        df = df.rename(columns={
            'countryiso3code': 'country_code',
            'date': 'year',
            'value': 'value'
        })
        
        # Extract nested values
        df['country'] = df['country'].apply(lambda x: x['value'] if isinstance(x, dict) else None)
        if 'indicator' in df.columns and isinstance(df['indicator'].iloc[0], dict):
            df['indicator_name'] = df['indicator'].apply(lambda x: x.get('value'))
            df['indicator_id'] = df['indicator'].apply(lambda x: x.get('id'))
            df = df.drop('indicator', axis=1)
        
        # Convert types
        df['year'] = pd.to_numeric(df['year'], errors='coerce').astype('Int64')
        df['value'] = pd.to_numeric(df['value'], errors='coerce')
        
        # Select key columns
        columns_to_keep = [col for col in ['country', 'country_code', 'year', 'value', 
                                          'indicator_name', 'indicator_id']
                           if col in df.columns]
        
        return df[columns_to_keep]

Error Handling with Circuit Breaker

The circuit breaker pattern prevents cascading failures when APIs become unavailable:

class CircuitBreaker:
    """Circuit breaker pattern for API calls"""
    
    def __init__(self, max_failures=3, reset_timeout=300):
        self.max_failures = max_failures
        self.reset_timeout = reset_timeout
        self.failures = 0
        self.state = "CLOSED"  # CLOSED, OPEN, HALF-OPEN
        self.last_failure_time = None
        self.logger = logging.getLogger(__name__)
    
    def execute(self, func, *args, **kwargs):
        """Execute function with circuit breaker protection"""
        if self.state == "OPEN":
            # Check if timeout has elapsed
            if self.last_failure_time and time.time() - self.last_failure_time > self.reset_timeout:
                self.logger.info("Circuit half-open, attempting reset")
                self.state = "HALF-OPEN"
            else:
                raise Exception("Circuit breaker is open")
        
        try:
            result = func(*args, **kwargs)
            
            # Success, close circuit if half-open
            if self.state == "HALF-OPEN":
                self.logger.info("Circuit closed after successful call")
                self.state = "CLOSED"
                self.failures = 0
            
            return result
            
        except Exception as e:
            # Handle failure
            self.failures += 1
            self.last_failure_time = time.time()
            
            if self.failures >= self.max_failures or self.state == "HALF-OPEN":
                self.state = "OPEN"
                self.logger.warning(f"Circuit opened after {self.failures} failures")
            
            raise

Data Normalization

Economic data from different sources requires standardization:

def normalize_economic_data(dfs_dict, date_format='%Y-%m-%d'):
    """Normalize data from multiple sources into consistent format"""
    normalized_dfs = []
    
    for source, df in dfs_dict.items():
        norm_df = df.copy()
        norm_df['source'] = source
        norm_df.columns = [col.lower() for col in norm_df.columns]
        
        # Standardize date column
        date_cols = [col for col in norm_df.columns if any(
            term in col.lower() for term in ['date', 'time', 'period', 'year'])]
        
        if date_cols:
            date_col = date_cols[0]
            try:
                if not pd.api.types.is_datetime64_any_dtype(norm_df[date_col]):
                    norm_df['date'] = pd.to_datetime(norm_df[date_col], errors='coerce')
                else:
                    norm_df['date'] = norm_df[date_col]
                
                if date_col != 'date':
                    norm_df = norm_df.drop(columns=[date_col])
            except:
                norm_df['date'] = norm_df[date_col]
        
        # Standardize value column
        value_cols = [col for col in norm_df.columns if any(
            term in col.lower() for term in ['value', 'obs_value', 'measure'])]
        
        if value_cols:
            value_col = value_cols[0]
            if value_col != 'value':
                norm_df = norm_df.rename(columns={value_col: 'value'})
        
        if 'value' in norm_df.columns:
            norm_df['value'] = pd.to_numeric(norm_df['value'], errors='coerce')
        
        normalized_dfs.append(norm_df)
    
    return pd.concat(normalized_dfs, ignore_index=True) if normalized_dfs else pd.DataFrame()

Integration Example

Here’s an example comparing unemployment rates from multiple sources:

def compare_unemployment_rates(countries, start_year=2020, end_year=2025):
    """Compare unemployment rates from different data sources"""
    # Initialize clients
    fred_client = FredClient(api_key="YOUR_FRED_API_KEY")
    wb_client = WorldBankClient()
    
    # Common country code mappings
    country_series_map = {
        'USA': 'UNRATE',  # US unemployment rate
        'JPN': 'JPNUR',   # Japan unemployment rate
        'DEU': 'DEUUR',   # Germany unemployment rate
        'GBR': 'GBRUR',   # UK unemployment rate
        'FRA': 'FRAUR'    # France unemployment rate
    }
    
    # Fetch FRED data
    fred_data = {}
    for country_code, series_id in country_series_map.items():
        if country_code in countries:
            try:
                df = fred_client.get_series(
                    series_id,
                    start_date=f"{start_year}-01-01",
                    end_date=f"{end_year}-12-31"
                )
                
                if not df.empty:
                    df['country_code'] = country_code
                    fred_data[country_code] = df
            except Exception as e:
                print(f"Error fetching FRED data for {country_code}: {e}")
    
    fred_df = pd.concat(list(fred_data.values()), axis=0) if fred_data else pd.DataFrame()
    if not fred_df.empty:
        fred_df = fred_df.reset_index()
    
    # Fetch World Bank unemployment data
    try:
        wb_df = wb_client.get_indicator(
            'SL.UEM.TOTL.ZS',  # Unemployment rate indicator
            country_codes=countries,
            start_year=start_year,
            end_year=end_year
        )
    except Exception as e:
        print(f"Error fetching World Bank data: {e}")
        wb_df = pd.DataFrame()
    
    # Normalize and combine data
    normalized_df = normalize_economic_data({
        'FRED': fred_df,
        'World Bank': wb_df
    })
    
    if not normalized_df.empty:
        # Create pivot table comparing sources
        pivot_df = normalized_df.pivot_table(
            index=['country', 'date'],
            columns='source',
            values='value'
        ).reset_index()
        
        return pivot_df
    else:
        return pd.DataFrame()

This guide provides implementations for connecting to key economic data APIs and handling common challenges including authentication, rate limiting, and data normalization. These patterns can be extended to other economic data sources like IMF, OECD, or ECB.

Recent Articles