Building a Pipeline to Aggregate Macro-Economic Indicators

Introduction

Economic data sources for macro trends often update on a periodic basis. This guide details how to build a pipeline that collects, transforms, and aggregates these indicators, usually to answer specific questions and not to create trend data as the update schedule is not fast enough to predict market changes with.

It would be better to ask this data is the debit of the world going up at a faster rate than the previous 10 years, or maybe is external lending in Europe growing faster than the last 5 years.

Pipeline Components

  1. Data Extraction: Use web scraping or API calls to retrieve data like selenium.
  2. Data Transformation: Clean and standardize the data.
  3. Data Aggregation: Combine datasets for comparative analysis.
  4. Data Loading: Store the aggregated data for further analysis.

Note: 1 could be preceded by a collection phase that loads data into a data wharehouse or lake if your process is better with lots of local data instead of pulling data from sources as you run the pipeline. Read our Guide about hydrating data lakes for more information on this step.

Data Collection Implementation

The pipeline starts by collecting GDP data from FRED, inflation figures from IMF, and employment statistics from OECD. Here’s how the collection works for each source:

For FRED data collection, we use their API to fetch GDP figures released quarterly:

from fredapi import Fred

fred = Fred(api_key='your_api_key')
gdp_data = fred.get_series('GDP')
# Returns: 
# 2024-Q4    24984.1
# 2024-Q3    24878.9
# 2024-Q2    24772.8

The IMF data requires different handling due to their unique API structure. When fetching inflation data:

import imfapi

imf = IMF(api_key='your_api_key')
inflation_data = imf.get_data('PCPIPCH')
# Returns:
# 2024-12    3.1
# 2024-11    3.2
# 2024-10    3.4

Data Processing Implementation

The raw data comes in different frequencies and formats. GDP is quarterly, inflation is monthly, and some market data is daily. Here’s how we standardize it:

import pandas as pd

def standardize_gdp_frequency(gdp_quarterly):
    # Convert quarterly GDP to monthly through interpolation
    monthly_dates = pd.date_range(
        start=gdp_quarterly.index[0],
        end=gdp_quarterly.index[-1],
        freq='M'
    )
    return gdp_quarterly.reindex(monthly_dates).interpolate()

# Example output:
# 2024-12    24984.1
# 2024-11    24931.5
# 2024-10    24878.9

The inflation data needs cleaning to handle missing values and outliers:

def clean_inflation_data(inflation_monthly):
    # Remove outliers beyond 3 standard deviations
    mean = inflation_monthly.mean()
    std = inflation_monthly.std()
    clean_data = inflation_monthly[
        (inflation_monthly > mean - 3*std) & 
        (inflation_monthly < mean + 3*std)
    ]
    
    # Fill missing values with forward fill
    return clean_data.fillna(method='ffill')

Database Implementation

The processed data needs a schema that can handle different update frequencies. Here’s the core table structure:

CREATE TABLE economic_indicators (
    date DATE NOT NULL,
    indicator_type VARCHAR(50) NOT NULL,
    value NUMERIC(15,4),
    update_frequency CHAR(1),
    last_updated TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    PRIMARY KEY (date, indicator_type)
);

-- Example data:
INSERT INTO economic_indicators (date, indicator_type, value, update_frequency)
VALUES 
    ('2024-12-31', 'gdp', 24984.1, 'Q'),
    ('2024-12-31', 'inflation', 3.1, 'M'),
    ('2024-12-31', 'unemployment', 3.8, 'M');

Scheduling Implementation

The pipeline runs on different schedules based on data release timing. GDP data updates quarterly, typically around the 30th of the month following quarter end. Here’s the Airflow DAG implementation:

from airflow import DAG
from datetime import datetime, timedelta

dag = DAG(
    'economic_indicators',
    schedule_interval={
        'gdp_collection': '0 12 1-3 1,4,7,10 *',  # Quarterly
        'inflation_collection': '0 12 1 * *',      # Monthly
        'market_data': '0 */4 * * *'              # Every 4 hours
    }
)

Error Handling Implementation

When the FRED API is down or returns invalid data, the pipeline needs to handle it gracefully:

def fetch_with_retry(indicator, max_retries=3):
    for attempt in range(max_retries):
        try:
            data = fred.get_series(indicator)
            if validate_data(data):
                return data
        except Exception as e:
            if attempt == max_retries - 1:
                notify_team(f"Failed to fetch {indicator}")
                raise e
            time.sleep(60 * attempt)  # Exponential backoff

def validate_data(data):
    if len(data) < 10:  # Expect at least 10 data points
        return False
    if data.isnull().sum() / len(data) > 0.1:  # Max 10% missing
        return False
    return True

Monitoring Implementation

To ensure data quality, we monitor several metrics. Here’s a query that checks for stale data:

WITH latest_updates AS (
    SELECT 
        indicator_type,
        MAX(date) as last_update,
        update_frequency
    FROM economic_indicators
    GROUP BY indicator_type, update_frequency
)
SELECT 
    indicator_type,
    last_update,
    CASE update_frequency
        WHEN 'D' WHEN current_date - last_update > 2 THEN 'STALE'
        WHEN 'M' WHEN current_date - last_update > 35 THEN 'STALE'
        WHEN 'Q' WHEN current_date - last_update > 100 THEN 'STALE'
        ELSE 'CURRENT'
    END as status
FROM latest_updates;

When monitoring detects issues, it triggers alerts:

def check_data_freshness():
    with db.connect() as conn:
        stale_indicators = conn.execute("""
            SELECT indicator_type 
            FROM latest_updates 
            WHERE status = 'STALE'
        """)
        
        if stale_indicators:
            notify_team(f"Stale data detected for: {stale_indicators}")

This implementation gives us a reliable pipeline that handles the complexities of different data sources, frequencies, and quality requirements while maintaining consistent monitoring and error handling.

Recent Articles