Economic data analysis requires reliable, up-to-date information, but many valuable sources don’t provide convenient APIs. Today, we’ll build a complete pipeline to extract data from the Federal Reserve Economic Data (FRED) website and store it in a data lake for analysis.
Understanding the Source
The FRED website presents economic data through interactive tables that load dynamically. These tables often contain valuable information like GDP growth rates, inflation figures, and employment statistics. While FRED does offer an API, we’ll focus on web scraping as many other economic data sources don’t provide API access.
Initial Data Extraction
First, let’s create a robust scraper that can handle dynamic content and authentication:
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
class FREDScraper:
def __init__(self):
self.driver = self._setup_driver()
self.wait = WebDriverWait(self.driver, 20)
def _setup_driver(self):
options = webdriver.ChromeOptions()
options.add_argument('--headless')
options.add_argument('--disable-gpu')
return webdriver.Chrome(options=options)
def extract_economic_data(self, indicator_url):
self.driver.get(indicator_url)
# Wait for the data table to load
table = self.wait.until(
EC.presence_of_element_located((By.ID, "data-table"))
)
# Extract headers and data
headers = [th.text for th in table.find_elements(By.TAG_NAME, "th")]
rows = table.find_elements(By.TAG_NAME, "tr")
data = []
for row in rows[1:]: # Skip header row
cells = row.find_elements(By.TAG_NAME, "td")
data.append([cell.text for cell in cells])
return headers, data
Data Transformation
Raw data from the web needs cleaning and standardization before storage. Let’s create a transformer:
import pandas as pd
from datetime import datetime
class EconomicDataTransformer:
def __init__(self):
self.date_format = "%Y-%m-%d"
def transform_data(self, headers, raw_data):
df = pd.DataFrame(raw_data, columns=headers)
# Convert date strings to datetime
df['date'] = pd.to_datetime(df['date'])
# Convert percentage strings to floats
value_columns = [col for col in df.columns if 'value' in col.lower()]
for col in value_columns:
df[col] = df[col].str.rstrip('%').astype('float') / 100.0
# Add metadata
df['extraction_timestamp'] = datetime.utcnow()
df['source'] = 'FRED'
return df
Data Lake Storage
We’ll use Azure Data Lake Storage Gen2 for its scalability and integration capabilities:
from azure.storage.filedatalake import DataLakeServiceClient
import json
class DataLakeStorage:
def __init__(self, connection_string):
self.service_client = DataLakeServiceClient.from_connection_string(
connection_string
)
def store_economic_data(self, df, indicator_name):
# Create a path with date partitioning
current_date = datetime.utcnow()
path = f"economic_data/{indicator_name}/year={current_date.year}/month={current_date.month}/data.parquet"
# Get file system client
file_system_client = self.service_client.get_file_system_client(
file_system="economicdata"
)
# Convert DataFrame to parquet format
parquet_data = df.to_parquet()
# Upload to data lake
file_client = file_system_client.get_file_client(path)
file_client.upload_data(parquet_data, overwrite=True)
# Store metadata separately
metadata = {
'rows': len(df),
'columns': list(df.columns),
'extraction_time': current_date.isoformat(),
'indicator': indicator_name
}
metadata_path = f"economic_data/{indicator_name}/metadata.json"
metadata_client = file_system_client.get_file_client(metadata_path)
metadata_client.upload_data(json.dumps(metadata), overwrite=True)
Putting It All Together
Now let’s create a pipeline that coordinates the extraction, transformation, and storage:
class EconomicDataPipeline:
def __init__(self, connection_string):
self.scraper = FREDScraper()
self.transformer = EconomicDataTransformer()
self.storage = DataLakeStorage(connection_string)
def run_pipeline(self, indicators):
for indicator_name, url in indicators.items():
try:
# Extract
headers, raw_data = self.scraper.extract_economic_data(url)
# Transform
df = self.transformer.transform_data(headers, raw_data)
# Load
self.storage.store_economic_data(df, indicator_name)
print(f"Successfully processed {indicator_name}")
except Exception as e:
print(f"Failed to process {indicator_name}: {str(e)}")
continue
# Usage example
indicators = {
'gdp': 'https://fred.stlouisfed.org/series/GDP',
'inflation': 'https://fred.stlouisfed.org/series/CPIAUCSL'
}
pipeline = EconomicDataPipeline('your_connection_string')
pipeline.run_pipeline(indicators)
Error Handling and Monitoring
When running this in production, we need robust error handling and monitoring:
from datetime import datetime, timedelta
import logging
class PipelineMonitor:
def __init__(self):
self.logger = logging.getLogger('EconomicDataPipeline')
self.start_time = None
def start_monitoring(self):
self.start_time = datetime.utcnow()
self.logger.info(f"Pipeline started at {self.start_time}")
def end_monitoring(self):
end_time = datetime.utcnow()
duration = end_time - self.start_time
self.logger.info(f"Pipeline completed. Duration: {duration}")
def alert_if_needed(self, df, indicator):
# Alert on missing data
if df.isnull().any().any():
self.logger.warning(f"Missing values detected in {indicator}")
# Alert on stale data
latest_date = df['date'].max()
if datetime.utcnow() - latest_date > timedelta(days=30):
self.logger.error(f"Data for {indicator} might be stale")
Data Validation
Before storing the data, we should validate it to ensure quality:
class DataValidator:
def __init__(self):
self.validations = {
'gdp': {
'min_value': -30,
'max_value': 30,
'frequency': 'quarterly'
},
'inflation': {
'min_value': -5,
'max_value': 50,
'frequency': 'monthly'
}
}
def validate_data(self, df, indicator):
rules = self.validations[indicator]
# Check value ranges
values_valid = df['value'].between(
rules['min_value'],
rules['max_value']
).all()
# Check update frequency
dates = pd.to_datetime(df['date'])
frequency_valid = self._check_frequency(dates, rules['frequency'])
if not (values_valid and frequency_valid):
raise ValueError(f"Validation failed for {indicator}")
This implementation provides a robust foundation for collecting economic data and storing it in a data lake. The modular design allows for easy extensions and modifications as requirements evolve. Remember to implement appropriate rate limiting and respect the data source’s terms of service when deploying this solution.
The data lake structure enables easy analysis and integration with other tools. You can now use this data for various purposes:
- Building economic dashboards
- Running machine learning models
- Conducting research and analysis
- Creating automated reports
The pipeline can be scheduled to run regularly, ensuring your data lake always contains the latest economic indicators for your analysis needs.