Introduction
Economic data sources for macro trends often update on a periodic basis. This guide details how to build a pipeline that collects, transforms, and aggregates these indicators, usually to answer specific questions and not to create trend data as the update schedule is not fast enough to predict market changes with.
It would be better to ask this data is the debit of the world going up at a faster rate than the previous 10 years, or maybe is external lending in Europe growing faster than the last 5 years.
Pipeline Components
- Data Extraction: Use web scraping or API calls to retrieve data like selenium.
- Data Transformation: Clean and standardize the data.
- Data Aggregation: Combine datasets for comparative analysis.
- Data Loading: Store the aggregated data for further analysis.
Note: 1 could be preceded by a collection phase that loads data into a data wharehouse or lake if your process is better with lots of local data instead of pulling data from sources as you run the pipeline. Read our Guide about hydrating data lakes for more information on this step.
Data Collection Implementation
The pipeline starts by collecting GDP data from FRED, inflation figures from IMF, and employment statistics from OECD. Here’s how the collection works for each source:
For FRED data collection, we use their API to fetch GDP figures released quarterly:
from fredapi import Fred
fred = Fred(api_key='your_api_key')
gdp_data = fred.get_series('GDP')
# Returns:
# 2024-Q4 24984.1
# 2024-Q3 24878.9
# 2024-Q2 24772.8
The IMF data requires different handling due to their unique API structure. When fetching inflation data:
import imfapi
imf = IMF(api_key='your_api_key')
inflation_data = imf.get_data('PCPIPCH')
# Returns:
# 2024-12 3.1
# 2024-11 3.2
# 2024-10 3.4
Data Processing Implementation
The raw data comes in different frequencies and formats. GDP is quarterly, inflation is monthly, and some market data is daily. Here’s how we standardize it:
import pandas as pd
def standardize_gdp_frequency(gdp_quarterly):
# Convert quarterly GDP to monthly through interpolation
monthly_dates = pd.date_range(
start=gdp_quarterly.index[0],
end=gdp_quarterly.index[-1],
freq='M'
)
return gdp_quarterly.reindex(monthly_dates).interpolate()
# Example output:
# 2024-12 24984.1
# 2024-11 24931.5
# 2024-10 24878.9
The inflation data needs cleaning to handle missing values and outliers:
def clean_inflation_data(inflation_monthly):
# Remove outliers beyond 3 standard deviations
mean = inflation_monthly.mean()
std = inflation_monthly.std()
clean_data = inflation_monthly[
(inflation_monthly > mean - 3*std) &
(inflation_monthly < mean + 3*std)
]
# Fill missing values with forward fill
return clean_data.fillna(method='ffill')
Database Implementation
The processed data needs a schema that can handle different update frequencies. Here’s the core table structure:
CREATE TABLE economic_indicators (
date DATE NOT NULL,
indicator_type VARCHAR(50) NOT NULL,
value NUMERIC(15,4),
update_frequency CHAR(1),
last_updated TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (date, indicator_type)
);
-- Example data:
INSERT INTO economic_indicators (date, indicator_type, value, update_frequency)
VALUES
('2024-12-31', 'gdp', 24984.1, 'Q'),
('2024-12-31', 'inflation', 3.1, 'M'),
('2024-12-31', 'unemployment', 3.8, 'M');
Scheduling Implementation
The pipeline runs on different schedules based on data release timing. GDP data updates quarterly, typically around the 30th of the month following quarter end. Here’s the Airflow DAG implementation:
from airflow import DAG
from datetime import datetime, timedelta
dag = DAG(
'economic_indicators',
schedule_interval={
'gdp_collection': '0 12 1-3 1,4,7,10 *', # Quarterly
'inflation_collection': '0 12 1 * *', # Monthly
'market_data': '0 */4 * * *' # Every 4 hours
}
)
Error Handling Implementation
When the FRED API is down or returns invalid data, the pipeline needs to handle it gracefully:
def fetch_with_retry(indicator, max_retries=3):
for attempt in range(max_retries):
try:
data = fred.get_series(indicator)
if validate_data(data):
return data
except Exception as e:
if attempt == max_retries - 1:
notify_team(f"Failed to fetch {indicator}")
raise e
time.sleep(60 * attempt) # Exponential backoff
def validate_data(data):
if len(data) < 10: # Expect at least 10 data points
return False
if data.isnull().sum() / len(data) > 0.1: # Max 10% missing
return False
return True
Monitoring Implementation
To ensure data quality, we monitor several metrics. Here’s a query that checks for stale data:
WITH latest_updates AS (
SELECT
indicator_type,
MAX(date) as last_update,
update_frequency
FROM economic_indicators
GROUP BY indicator_type, update_frequency
)
SELECT
indicator_type,
last_update,
CASE update_frequency
WHEN 'D' WHEN current_date - last_update > 2 THEN 'STALE'
WHEN 'M' WHEN current_date - last_update > 35 THEN 'STALE'
WHEN 'Q' WHEN current_date - last_update > 100 THEN 'STALE'
ELSE 'CURRENT'
END as status
FROM latest_updates;
When monitoring detects issues, it triggers alerts:
def check_data_freshness():
with db.connect() as conn:
stale_indicators = conn.execute("""
SELECT indicator_type
FROM latest_updates
WHERE status = 'STALE'
""")
if stale_indicators:
notify_team(f"Stale data detected for: {stale_indicators}")
This implementation gives us a reliable pipeline that handles the complexities of different data sources, frequencies, and quality requirements while maintaining consistent monitoring and error handling.