Introduction
Economic data storage presents unique challenges that distinguish it from typical business application databases. Economic datasets often span decades of historical information, require complex temporal queries, and must support both high-frequency updates and intensive analytical workloads. The choice between SQL and NoSQL databases significantly impacts system performance, scalability, and analytical capabilities.
The temporal nature of economic data creates specific requirements for database design. Economic indicators arrive at irregular intervals, often with revisions to historical values, and require queries that span multiple time dimensions. Additionally, economic analysis frequently involves joining datasets with different temporal granularities, requiring database systems that can efficiently handle mixed-frequency data alignment.
Regulatory compliance adds another layer of complexity to economic database design. Financial institutions and government agencies often require comprehensive audit trails, data lineage tracking, and the ability to reproduce historical analysis exactly as it was performed at specific points in time. These requirements influence database selection, schema design, and backup strategies.
This guide builds upon the data processing foundations from Economic Data Pipeline Aggregation and provides the storage patterns that support the real-time systems described in Real-Time Data Processing Economic Indicators. The database patterns presented here integrate with the quality frameworks from Data Quality Practices for Economic Datasets.
Time-Series Database Patterns
Time-series databases have emerged as the preferred solution for storing high-frequency economic data due to their optimized storage engines and query patterns. These specialized databases provide efficient compression for temporal data, automatic retention policies, and built-in analytical functions that align well with economic analysis requirements.
The choice of time-series database depends heavily on the specific characteristics of your economic data workload. InfluxDB excels at high-frequency financial market data with its columnar storage and built-in downsampling capabilities. TimescaleDB provides the familiarity of PostgreSQL with time-series optimizations, making it ideal for organizations with existing SQL expertise. Prometheus, while primarily designed for monitoring, can effectively handle economic indicator storage with its powerful query language.
Data modeling in time-series databases requires careful consideration of tag strategies and measurement organization. Economic indicators should be organized by logical groupings that align with query patterns - country-level indicators, sector-specific metrics, and cross-cutting measures like inflation or employment. The tag structure should support both individual indicator queries and cross-indicator analysis without requiring complex joins.
from influxdb_client import InfluxDBClient, Point, WritePrecision
from influxdb_client.client.write_api import SYNCHRONOUS
import pandas as pd
from datetime import datetime, timedelta
from typing import Dict, List, Any, Optional
import logging
from dataclasses import dataclass
import asyncio
@dataclass
class EconomicIndicator:
"""Economic indicator data structure"""
code: str
name: str
value: float
timestamp: datetime
country: str
category: str
source: str
frequency: str
unit: str
metadata: Dict[str, Any] = None
class InfluxEconomicStorage:
"""InfluxDB storage optimized for economic indicators"""
def __init__(self, url: str, token: str, org: str, bucket: str):
self.client = InfluxDBClient(url=url, token=token, org=org)
self.write_api = self.client.write_api(write_options=SYNCHRONOUS)
self.query_api = self.client.query_api()
self.bucket = bucket
self.org = org
def store_indicator(self, indicator: EconomicIndicator) -> bool:
"""Store a single economic indicator"""
try:
point = Point("economic_indicator") \
.tag("code", indicator.code) \
.tag("country", indicator.country) \
.tag("category", indicator.category) \
.tag("source", indicator.source) \
.tag("frequency", indicator.frequency) \
.tag("unit", indicator.unit) \
.field("value", indicator.value) \
.field("name", indicator.name) \
.time(indicator.timestamp, WritePrecision.S)
# Add metadata as fields if present
if indicator.metadata:
for key, value in indicator.metadata.items():
if isinstance(value, (int, float)):
point = point.field(f"meta_{key}", value)
elif isinstance(value, str):
point = point.tag(f"meta_{key}", value)
self.write_api.write(bucket=self.bucket, org=self.org, record=point)
return True
except Exception as e:
logging.error(f"Failed to store indicator {indicator.code}: {e}")
return False
def store_indicators_batch(self, indicators: List[EconomicIndicator]) -> Dict[str, int]:
"""Store multiple indicators in batch"""
points = []
success_count = 0
error_count = 0
for indicator in indicators:
try:
point = Point("economic_indicator") \
.tag("code", indicator.code) \
.tag("country", indicator.country) \
.tag("category", indicator.category) \
.tag("source", indicator.source) \
.tag("frequency", indicator.frequency) \
.tag("unit", indicator.unit) \
.field("value", indicator.value) \
.field("name", indicator.name) \
.time(indicator.timestamp, WritePrecision.S)
if indicator.metadata:
for key, value in indicator.metadata.items():
if isinstance(value, (int, float)):
point = point.field(f"meta_{key}", value)
elif isinstance(value, str):
point = point.tag(f"meta_{key}", value)
points.append(point)
success_count += 1
except Exception as e:
logging.error(f"Failed to prepare point for {indicator.code}: {e}")
error_count += 1
if points:
try:
self.write_api.write(bucket=self.bucket, org=self.org, record=points)
except Exception as e:
logging.error(f"Batch write failed: {e}")
error_count += len(points)
success_count = 0
return {"success": success_count, "errors": error_count}
def query_indicator_time_series(self, indicator_code: str,
start_time: datetime, end_time: datetime,
country: Optional[str] = None) -> pd.DataFrame:
"""Query time series data for a specific indicator"""
# Build the Flux query
flux_query = f'''
from(bucket: "{self.bucket}")
|> range(start: {start_time.isoformat()}Z, stop: {end_time.isoformat()}Z)
|> filter(fn: (r) => r._measurement == "economic_indicator")
|> filter(fn: (r) => r.code == "{indicator_code}")
|> filter(fn: (r) => r._field == "value")
'''
if country:
flux_query += f' |> filter(fn: (r) => r.country == "{country}")\n'
flux_query += '''
|> sort(columns: ["_time"])
|> yield(name: "results")
'''
try:
result = self.query_api.query_data_frame(flux_query, org=self.org)
if result.empty:
return pd.DataFrame()
# Clean and format the result
df = result[['_time', '_value', 'country', 'category', 'source', 'frequency', 'unit']].copy()
df.columns = ['timestamp', 'value', 'country', 'category', 'source', 'frequency', 'unit']
df['timestamp'] = pd.to_datetime(df['timestamp'])
df = df.sort_values('timestamp')
return df
except Exception as e:
logging.error(f"Query failed for indicator {indicator_code}: {e}")
return pd.DataFrame()
def query_multiple_indicators(self, indicator_codes: List[str],
start_time: datetime, end_time: datetime,
country: Optional[str] = None) -> pd.DataFrame:
"""Query multiple indicators and return as wide-format DataFrame"""
# Build codes filter
codes_filter = " or ".join([f'r.code == "{code}"' for code in indicator_codes])
flux_query = f'''
from(bucket: "{self.bucket}")
|> range(start: {start_time.isoformat()}Z, stop: {end_time.isoformat()}Z)
|> filter(fn: (r) => r._measurement == "economic_indicator")
|> filter(fn: (r) => {codes_filter})
|> filter(fn: (r) => r._field == "value")
'''
if country:
flux_query += f' |> filter(fn: (r) => r.country == "{country}")\n'
flux_query += '''
|> pivot(rowKey:["_time"], columnKey: ["code"], valueColumn: "_value")
|> sort(columns: ["_time"])
|> yield(name: "results")
'''
try:
result = self.query_api.query_data_frame(flux_query, org=self.org)
if result.empty:
return pd.DataFrame()
# Clean the result
df = result.copy()
df['_time'] = pd.to_datetime(df['_time'])
df = df.rename(columns={'_time': 'timestamp'})
df = df.sort_values('timestamp')
# Keep only timestamp and indicator columns
indicator_columns = ['timestamp'] + [col for col in df.columns if col in indicator_codes]
df = df[indicator_columns]
return df
except Exception as e:
logging.error(f"Multi-indicator query failed: {e}")
return pd.DataFrame()
def aggregate_indicator(self, indicator_code: str,
start_time: datetime, end_time: datetime,
aggregation: str = "mean", window: str = "1d") -> pd.DataFrame:
"""Aggregate indicator data over time windows"""
flux_query = f'''
from(bucket: "{self.bucket}")
|> range(start: {start_time.isoformat()}Z, stop: {end_time.isoformat()}Z)
|> filter(fn: (r) => r._measurement == "economic_indicator")
|> filter(fn: (r) => r.code == "{indicator_code}")
|> filter(fn: (r) => r._field == "value")
|> aggregateWindow(every: {window}, fn: {aggregation}, createEmpty: false)
|> sort(columns: ["_time"])
|> yield(name: "results")
'''
try:
result = self.query_api.query_data_frame(flux_query, org=self.org)
if result.empty:
return pd.DataFrame()
df = result[['_time', '_value']].copy()
df.columns = ['timestamp', f'{indicator_code}_{aggregation}']
df['timestamp'] = pd.to_datetime(df['timestamp'])
return df
except Exception as e:
logging.error(f"Aggregation query failed for {indicator_code}: {e}")
return pd.DataFrame()
def get_latest_values(self, indicator_codes: List[str],
country: Optional[str] = None) -> Dict[str, Dict[str, Any]]:
"""Get the latest values for specified indicators"""
codes_filter = " or ".join([f'r.code == "{code}"' for code in indicator_codes])
flux_query = f'''
from(bucket: "{self.bucket}")
|> range(start: -30d)
|> filter(fn: (r) => r._measurement == "economic_indicator")
|> filter(fn: (r) => {codes_filter})
|> filter(fn: (r) => r._field == "value")
'''
if country:
flux_query += f' |> filter(fn: (r) => r.country == "{country}")\n'
flux_query += '''
|> group(columns: ["code"])
|> last()
|> yield(name: "results")
'''
try:
result = self.query_api.query_data_frame(flux_query, org=self.org)
if result.empty:
return {}
latest_values = {}
for _, row in result.iterrows():
latest_values[row['code']] = {
'value': row['_value'],
'timestamp': pd.to_datetime(row['_time']),
'country': row['country'],
'source': row['source']
}
return latest_values
except Exception as e:
logging.error(f"Latest values query failed: {e}")
return {}
def close(self):
"""Close the database connection"""
self.client.close()
class TimescaleEconomicStorage:
"""TimescaleDB storage for economic indicators"""
def __init__(self, connection_string: str):
import psycopg2
from psycopg2.extras import RealDictCursor
self.connection = psycopg2.connect(connection_string)
self.cursor = self.connection.cursor(cursor_factory=RealDictCursor)
self._create_tables()
def _create_tables(self):
"""Create the necessary tables and hypertables"""
create_table_sql = """
CREATE TABLE IF NOT EXISTS economic_indicators (
time TIMESTAMPTZ NOT NULL,
indicator_code TEXT NOT NULL,
value DOUBLE PRECISION,
country TEXT,
category TEXT,
source TEXT,
frequency TEXT,
unit TEXT,
metadata JSONB,
created_at TIMESTAMPTZ DEFAULT NOW()
);
CREATE INDEX IF NOT EXISTS idx_indicators_code_time
ON economic_indicators (indicator_code, time DESC);
CREATE INDEX IF NOT EXISTS idx_indicators_country_time
ON economic_indicators (country, time DESC);
CREATE INDEX IF NOT EXISTS idx_indicators_category
ON economic_indicators (category);
"""
# Create hypertable if it doesn't exist
hypertable_sql = """
SELECT create_hypertable('economic_indicators', 'time',
if_not_exists => TRUE,
chunk_time_interval => INTERVAL '1 week');
"""
try:
self.cursor.execute(create_table_sql)
self.cursor.execute(hypertable_sql)
self.connection.commit()
except Exception as e:
logging.error(f"Failed to create tables: {e}")
self.connection.rollback()
def store_indicator(self, indicator: EconomicIndicator) -> bool:
"""Store a single economic indicator"""
insert_sql = """
INSERT INTO economic_indicators
(time, indicator_code, value, country, category, source, frequency, unit, metadata)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)
ON CONFLICT (time, indicator_code, country) DO UPDATE SET
value = EXCLUDED.value,
metadata = EXCLUDED.metadata,
created_at = NOW()
"""
try:
self.cursor.execute(insert_sql, (
indicator.timestamp,
indicator.code,
indicator.value,
indicator.country,
indicator.category,
indicator.source,
indicator.frequency,
indicator.unit,
json.dumps(indicator.metadata) if indicator.metadata else None
))
self.connection.commit()
return True
except Exception as e:
logging.error(f"Failed to store indicator {indicator.code}: {e}")
self.connection.rollback()
return False
def query_indicator_time_series(self, indicator_code: str,
start_time: datetime, end_time: datetime,
country: Optional[str] = None) -> pd.DataFrame:
"""Query time series data for a specific indicator"""
base_sql = """
SELECT time, value, country, category, source, frequency, unit
FROM economic_indicators
WHERE indicator_code = %s
AND time BETWEEN %s AND %s
"""
params = [indicator_code, start_time, end_time]
if country:
base_sql += " AND country = %s"
params.append(country)
base_sql += " ORDER BY time"
try:
self.cursor.execute(base_sql, params)
results = self.cursor.fetchall()
if not results:
return pd.DataFrame()
df = pd.DataFrame(results)
df['time'] = pd.to_datetime(df['time'])
return df
except Exception as e:
logging.error(f"Query failed for indicator {indicator_code}: {e}")
return pd.DataFrame()
def get_correlation_data(self, indicator_codes: List[str],
start_time: datetime, end_time: datetime,
country: Optional[str] = None) -> pd.DataFrame:
"""Get data for correlation analysis"""
base_sql = """
SELECT time, indicator_code, value
FROM economic_indicators
WHERE indicator_code = ANY(%s)
AND time BETWEEN %s AND %s
"""
params = [indicator_codes, start_time, end_time]
if country:
base_sql += " AND country = %s"
params.append(country)
base_sql += " ORDER BY time, indicator_code"
try:
self.cursor.execute(base_sql, params)
results = self.cursor.fetchall()
if not results:
return pd.DataFrame()
df = pd.DataFrame(results)
df['time'] = pd.to_datetime(df['time'])
# Pivot to wide format for correlation analysis
pivot_df = df.pivot(index='time', columns='indicator_code', values='value')
pivot_df.reset_index(inplace=True)
return pivot_df
except Exception as e:
logging.error(f"Correlation query failed: {e}")
return pd.DataFrame()
def close(self):
"""Close the database connection"""
self.cursor.close()
self.connection.close()
SQL Database Patterns for Economic Analysis
Traditional SQL databases remain valuable for economic data storage, particularly when complex analytical queries, regulatory reporting, and data integrity are paramount. PostgreSQL, with its advanced analytical functions and JSON support, provides an excellent foundation for economic data systems that need both relational structure and flexible schema evolution.
The key to effective SQL database design for economic data lies in balancing normalization with query performance. Economic datasets benefit from a star schema approach where fact tables contain the actual indicator values and dimension tables provide contextual information about countries, sources, and indicator definitions. This design supports both efficient storage and complex analytical queries.
Partitioning strategies become critical for managing large historical datasets. Time-based partitioning enables efficient data lifecycle management and query performance optimization. Range partitioning by date allows older data to be moved to slower storage tiers while keeping recent data in high-performance storage for active analysis.
import psycopg2
from psycopg2.extras import RealDictCursor, execute_batch
import pandas as pd
from datetime import datetime, timedelta
from typing import Dict, List, Any, Optional, Tuple
import logging
import json
class PostgreSQLEconomicStorage:
"""PostgreSQL storage optimized for economic data analysis"""
def __init__(self, connection_string: str):
self.connection = psycopg2.connect(connection_string)
self.cursor = self.connection.cursor(cursor_factory=RealDictCursor)
self._create_schema()
def _create_schema(self):
"""Create the complete economic data schema"""
schema_sql = """
-- Countries dimension table
CREATE TABLE IF NOT EXISTS dim_countries (
country_id SERIAL PRIMARY KEY,
country_code VARCHAR(3) UNIQUE NOT NULL,
country_name VARCHAR(100) NOT NULL,
region VARCHAR(50),
income_group VARCHAR(50),
created_at TIMESTAMPTZ DEFAULT NOW()
);
-- Data sources dimension table
CREATE TABLE IF NOT EXISTS dim_sources (
source_id SERIAL PRIMARY KEY,
source_code VARCHAR(20) UNIQUE NOT NULL,
source_name VARCHAR(100) NOT NULL,
source_type VARCHAR(50),
api_endpoint VARCHAR(255),
update_frequency VARCHAR(20),
created_at TIMESTAMPTZ DEFAULT NOW()
);
-- Economic indicators dimension table
CREATE TABLE IF NOT EXISTS dim_indicators (
indicator_id SERIAL PRIMARY KEY,
indicator_code VARCHAR(50) UNIQUE NOT NULL,
indicator_name VARCHAR(255) NOT NULL,
category VARCHAR(100),
subcategory VARCHAR(100),
unit VARCHAR(50),
frequency VARCHAR(20),
seasonal_adjustment BOOLEAN DEFAULT FALSE,
description TEXT,
methodology TEXT,
created_at TIMESTAMPTZ DEFAULT NOW()
);
-- Main fact table for economic indicator values
CREATE TABLE IF NOT EXISTS fact_economic_data (
id BIGSERIAL,
indicator_id INTEGER REFERENCES dim_indicators(indicator_id),
country_id INTEGER REFERENCES dim_countries(country_id),
source_id INTEGER REFERENCES dim_sources(source_id),
observation_date DATE NOT NULL,
value NUMERIC(15,6),
preliminary BOOLEAN DEFAULT FALSE,
revised BOOLEAN DEFAULT FALSE,
revision_number INTEGER DEFAULT 1,
confidence_interval_lower NUMERIC(15,6),
confidence_interval_upper NUMERIC(15,6),
metadata JSONB,
created_at TIMESTAMPTZ DEFAULT NOW(),
updated_at TIMESTAMPTZ DEFAULT NOW(),
PRIMARY KEY (id, observation_date)
) PARTITION BY RANGE (observation_date);
-- Create partitions for different years
CREATE TABLE IF NOT EXISTS fact_economic_data_2020
PARTITION OF fact_economic_data
FOR VALUES FROM ('2020-01-01') TO ('2021-01-01');
CREATE TABLE IF NOT EXISTS fact_economic_data_2021
PARTITION OF fact_economic_data
FOR VALUES FROM ('2021-01-01') TO ('2022-01-01');
CREATE TABLE IF NOT EXISTS fact_economic_data_2022
PARTITION OF fact_economic_data
FOR VALUES FROM ('2022-01-01') TO ('2023-01-01');
CREATE TABLE IF NOT EXISTS fact_economic_data_2023
PARTITION OF fact_economic_data
FOR VALUES FROM ('2023-01-01') TO ('2024-01-01');
CREATE TABLE IF NOT EXISTS fact_economic_data_2024
PARTITION OF fact_economic_data
FOR VALUES FROM ('2024-01-01') TO ('2025-01-01');
CREATE TABLE IF NOT EXISTS fact_economic_data_2025
PARTITION OF fact_economic_data
FOR VALUES FROM ('2025-01-01') TO ('2026-01-01');
-- Indexes for performance
CREATE INDEX IF NOT EXISTS idx_fact_indicator_date
ON fact_economic_data (indicator_id, observation_date DESC);
CREATE INDEX IF NOT EXISTS idx_fact_country_date
ON fact_economic_data (country_id, observation_date DESC);
CREATE INDEX IF NOT EXISTS idx_fact_date_value
ON fact_economic_data (observation_date DESC, value);
-- Revision tracking table
CREATE TABLE IF NOT EXISTS revision_history (
revision_id BIGSERIAL PRIMARY KEY,
fact_id BIGINT,
observation_date DATE,
indicator_id INTEGER,
country_id INTEGER,
old_value NUMERIC(15,6),
new_value NUMERIC(15,6),
revision_type VARCHAR(50),
revision_reason TEXT,
revised_at TIMESTAMPTZ DEFAULT NOW()
);
-- Data quality metrics table
CREATE TABLE IF NOT EXISTS data_quality_metrics (
metric_id BIGSERIAL PRIMARY KEY,
indicator_id INTEGER REFERENCES dim_indicators(indicator_id),
country_id INTEGER REFERENCES dim_countries(country_id),
metric_date DATE,
completeness_score NUMERIC(5,4),
accuracy_score NUMERIC(5,4),
consistency_score NUMERIC(5,4),
timeliness_score NUMERIC(5,4),
overall_score NUMERIC(5,4),
quality_issues JSONB,
calculated_at TIMESTAMPTZ DEFAULT NOW()
);
"""
try:
self.cursor.execute(schema_sql)
self.connection.commit()
logging.info("Economic data schema created successfully")
except Exception as e:
logging.error(f"Failed to create schema: {e}")
self.connection.rollback()
def insert_country(self, country_code: str, country_name: str,
region: str = None, income_group: str = None) -> int:
"""Insert or get country ID"""
insert_sql = """
INSERT INTO dim_countries (country_code, country_name, region, income_group)
VALUES (%s, %s, %s, %s)
ON CONFLICT (country_code) DO UPDATE SET
country_name = EXCLUDED.country_name,
region = EXCLUDED.region,
income_group = EXCLUDED.income_group
RETURNING country_id
"""
try:
self.cursor.execute(insert_sql, (country_code, country_name, region, income_group))
country_id = self.cursor.fetchone()['country_id']
self.connection.commit()
return country_id
except Exception as e:
logging.error(f"Failed to insert country {country_code}: {e}")
self.connection.rollback()
return None
def insert_indicator(self, indicator_code: str, indicator_name: str,
category: str = None, unit: str = None,
frequency: str = None, description: str = None) -> int:
"""Insert or get indicator ID"""
insert_sql = """
INSERT INTO dim_indicators
(indicator_code, indicator_name, category, unit, frequency, description)
VALUES (%s, %s, %s, %s, %s, %s)
ON CONFLICT (indicator_code) DO UPDATE SET
indicator_name = EXCLUDED.indicator_name,
category = EXCLUDED.category,
unit = EXCLUDED.unit,
frequency = EXCLUDED.frequency,
description = EXCLUDED.description
RETURNING indicator_id
"""
try:
self.cursor.execute(insert_sql, (
indicator_code, indicator_name, category, unit, frequency, description
))
indicator_id = self.cursor.fetchone()['indicator_id']
self.connection.commit()
return indicator_id
except Exception as e:
logging.error(f"Failed to insert indicator {indicator_code}: {e}")
self.connection.rollback()
return None
def insert_source(self, source_code: str, source_name: str,
source_type: str = None, api_endpoint: str = None) -> int:
"""Insert or get source ID"""
insert_sql = """
INSERT INTO dim_sources (source_code, source_name, source_type, api_endpoint)
VALUES (%s, %s, %s, %s)
ON CONFLICT (source_code) DO UPDATE SET
source_name = EXCLUDED.source_name,
source_type = EXCLUDED.source_type,
api_endpoint = EXCLUDED.api_endpoint
RETURNING source_id
"""
try:
self.cursor.execute(insert_sql, (source_code, source_name, source_type, api_endpoint))
source_id = self.cursor.fetchone()['source_id']
self.connection.commit()
return source_id
except Exception as e:
logging.error(f"Failed to insert source {source_code}: {e}")
self.connection.rollback()
return None
def insert_economic_data(self, indicator_code: str, country_code: str,
source_code: str, observation_date: datetime,
value: float, preliminary: bool = False,
metadata: Dict[str, Any] = None) -> bool:
"""Insert economic data point"""
# Get dimension IDs
indicator_id = self._get_indicator_id(indicator_code)
country_id = self._get_country_id(country_code)
source_id = self._get_source_id(source_code)
if not all([indicator_id, country_id, source_id]):
logging.error(f"Missing dimension IDs for {indicator_code}, {country_code}, {source_code}")
return False
insert_sql = """
INSERT INTO fact_economic_data
(indicator_id, country_id, source_id, observation_date, value, preliminary, metadata)
VALUES (%s, %s, %s, %s, %s, %s, %s)
ON CONFLICT (indicator_id, country_id, source_id, observation_date) DO UPDATE SET
value = EXCLUDED.value,
preliminary = EXCLUDED.preliminary,
metadata = EXCLUDED.metadata,
updated_at = NOW()
"""
try:
self.cursor.execute(insert_sql, (
indicator_id, country_id, source_id, observation_date.date(),
value, preliminary, json.dumps(metadata) if metadata else None
))
self.connection.commit()
return True
except Exception as e:
logging.error(f"Failed to insert economic data: {e}")
self.connection.rollback()
return False
def get_time_series(self, indicator_code: str, country_code: str = None,
start_date: datetime = None, end_date: datetime = None) -> pd.DataFrame:
"""Get time series data for analysis"""
base_sql = """
SELECT
f.observation_date,
f.value,
f.preliminary,
f.confidence_interval_lower,
f.confidence_interval_upper,
i.indicator_code,
i.indicator_name,
i.unit,
c.country_code,
c.country_name,
s.source_code
FROM fact_economic_data f
JOIN dim_indicators i ON f.indicator_id = i.indicator_id
JOIN dim_countries c ON f.country_id = c.country_id
JOIN dim_sources s ON f.source_id = s.source_id
WHERE i.indicator_code = %s
"""
params = [indicator_code]
if country_code:
base_sql += " AND c.country_code = %s"
params.append(country_code)
if start_date:
base_sql += " AND f.observation_date >= %s"
params.append(start_date.date())
if end_date:
base_sql += " AND f.observation_date <= %s"
params.append(end_date.date())
base_sql += " ORDER BY f.observation_date"
try:
self.cursor.execute(base_sql, params)
results = self.cursor.fetchall()
if not results:
return pd.DataFrame()
df = pd.DataFrame(results)
df['observation_date'] = pd.to_datetime(df['observation_date'])
return df
except Exception as e:
logging.error(f"Time series query failed: {e}")
return pd.DataFrame()
def get_cross_country_data(self, indicator_code: str, countries: List[str],
observation_date: datetime) -> pd.DataFrame:
"""Get cross-country data for a specific indicator and date"""
sql = """
SELECT
c.country_code,
c.country_name,
f.value,
f.observation_date,
i.unit
FROM fact_economic_data f
JOIN dim_indicators i ON f.indicator_id = i.indicator_id
JOIN dim_countries c ON f.country_id = c.country_id
WHERE i.indicator_code = %s
AND c.country_code = ANY(%s)
AND f.observation_date = %s
ORDER BY c.country_name
"""
try:
self.cursor.execute(sql, (indicator_code, countries, observation_date.date()))
results = self.cursor.fetchall()
if not results:
return pd.DataFrame()
return pd.DataFrame(results)
except Exception as e:
logging.error(f"Cross-country query failed: {e}")
return pd.DataFrame()
def calculate_correlation_matrix(self, indicator_codes: List[str],
country_code: str, start_date: datetime,
end_date: datetime) -> pd.DataFrame:
"""Calculate correlation matrix for multiple indicators"""
sql = """
SELECT
i.indicator_code,
f.observation_date,
f.value
FROM fact_economic_data f
JOIN dim_indicators i ON f.indicator_id = i.indicator_id
JOIN dim_countries c ON f.country_id = c.country_id
WHERE i.indicator_code = ANY(%s)
AND c.country_code = %s
AND f.observation_date BETWEEN %s AND %s
ORDER BY f.observation_date, i.indicator_code
"""
try:
self.cursor.execute(sql, (
indicator_codes, country_code, start_date.date(), end_date.date()
))
results = self.cursor.fetchall()
if not results:
return pd.DataFrame()
df = pd.DataFrame(results)
df['observation_date'] = pd.to_datetime(df['observation_date'])
# Pivot to wide format for correlation analysis
pivot_df = df.pivot(
index='observation_date',
columns='indicator_code',
values='value'
)
# Calculate correlation matrix
correlation_matrix = pivot_df.corr()
return correlation_matrix
except Exception as e:
logging.error(f"Correlation calculation failed: {e}")
return pd.DataFrame()
def _get_indicator_id(self, indicator_code: str) -> Optional[int]:
"""Get indicator ID by code"""
self.cursor.execute(
"SELECT indicator_id FROM dim_indicators WHERE indicator_code = %s",
(indicator_code,)
)
result = self.cursor.fetchone()
return result['indicator_id'] if result else None
def _get_country_id(self, country_code: str) -> Optional[int]:
"""Get country ID by code"""
self.cursor.execute(
"SELECT country_id FROM dim_countries WHERE country_code = %s",
(country_code,)
)
result = self.cursor.fetchone()
return result['country_id'] if result else None
def _get_source_id(self, source_code: str) -> Optional[int]:
"""Get source ID by code"""
self.cursor.execute(
"SELECT source_id FROM dim_sources WHERE source_code = %s",
(source_code,)
)
result = self.cursor.fetchone()
return result['source_id'] if result else None
def close(self):
"""Close the database connection"""
self.cursor.close()
self.connection.close()
NoSQL Patterns for Economic Data
NoSQL databases provide valuable alternatives for economic data storage when schema flexibility, horizontal scaling, or document-based storage patterns align better with analytical requirements. Document databases like MongoDB excel at storing economic datasets with varying structures, while graph databases like Neo4j can effectively model complex economic relationships and dependencies.
The choice of NoSQL database depends heavily on the specific analytical patterns and data characteristics. MongoDB works well for storing economic research datasets where the schema may evolve over time or where each economic indicator has unique metadata requirements. The document model naturally represents the hierarchical structure of economic data, from global indicators down to country and regional levels.
Graph databases become particularly valuable when modeling economic relationships and dependencies. The interconnected nature of economic systems - where changes in one indicator influence others through complex chains of causation - maps naturally to graph structures. Neo4j can efficiently query these relationships and support network analysis of economic systems.
from pymongo import MongoClient
from pymongo.errors import BulkWriteError
import pandas as pd
from datetime import datetime, timedelta
from typing import Dict, List, Any, Optional
import logging
from bson import ObjectId
import json
class MongoEconomicStorage:
"""MongoDB storage for flexible economic data schemas"""
def __init__(self, connection_string: str, database_name: str):
self.client = MongoClient(connection_string)
self.db = self.client[database_name]
self._create_collections()
def _create_collections(self):
"""Create collections and indexes"""
# Economic indicators collection
indicators_collection = self.db.economic_indicators
# Create indexes for performance
indicators_collection.create_index([
("indicator_code", 1),
("country_code", 1),
("observation_date", -1)
])
indicators_collection.create_index([
("observation_date", -1),
("category", 1)
])
indicators_collection.create_index([
("country_code", 1),
("observation_date", -1)
])
# Metadata collection for indicator definitions
metadata_collection = self.db.indicator_metadata
metadata_collection.create_index([("indicator_code", 1)], unique=True)
# Data sources collection
sources_collection = self.db.data_sources
sources_collection.create_index([("source_code", 1)], unique=True)
logging.info("MongoDB collections and indexes created")
def store_indicator_metadata(self, indicator_code: str, metadata: Dict[str, Any]) -> bool:
"""Store indicator metadata"""
try:
self.db.indicator_metadata.update_one(
{"indicator_code": indicator_code},
{
"$set": {
**metadata,
"updated_at": datetime.utcnow()
},
"$setOnInsert": {
"created_at": datetime.utcnow()
}
},
upsert=True
)
return True
except Exception as e:
logging.error(f"Failed to store indicator metadata: {e}")
return False
def store_economic_data(self, data_points: List[Dict[str, Any]]) -> Dict[str, int]:
"""Store multiple economic data points"""
if not data_points:
return {"inserted": 0, "modified": 0, "errors": 0}
operations = []
for point in data_points:
# Add timestamps
point["created_at"] = datetime.utcnow()
point["updated_at"] = datetime.utcnow()
# Create upsert operation
filter_doc = {
"indicator_code": point["indicator_code"],
"country_code": point["country_code"],
"observation_date": point["observation_date"]
}
update_doc = {
"$set": point,
"$setOnInsert": {"created_at": datetime.utcnow()}
}
operations.append({
"update_one": {
"filter": filter_doc,
"update": update_doc,
"upsert": True
}
})
try:
result = self.db.economic_indicators.bulk_write(operations)
return {
"inserted": result.upserted_count,
"modified": result.modified_count,
"errors": 0
}
except BulkWriteError as e:
logging.error(f"Bulk write error: {e.details}")
return {
"inserted": e.details.get("nUpserted", 0),
"modified": e.details.get("nModified", 0),
"errors": len(e.details.get("writeErrors", []))
}
except Exception as e:
logging.error(f"Failed to store economic data: {e}")
return {"inserted": 0, "modified": 0, "errors": len(data_points)}
def query_time_series(self, indicator_code: str, country_code: str = None,
start_date: datetime = None, end_date: datetime = None) -> pd.DataFrame:
"""Query time series data"""
filter_doc = {"indicator_code": indicator_code}
if country_code:
filter_doc["country_code"] = country_code
if start_date or end_date:
date_filter = {}
if start_date:
date_filter["$gte"] = start_date
if end_date:
date_filter["$lte"] = end_date
filter_doc["observation_date"] = date_filter
try:
cursor = self.db.economic_indicators.find(
filter_doc,
{"_id": 0} # Exclude MongoDB ObjectId
).sort("observation_date", 1)
results = list(cursor)
if not results:
return pd.DataFrame()
df = pd.DataFrame(results)
df['observation_date'] = pd.to_datetime(df['observation_date'])
return df
except Exception as e:
logging.error(f"Time series query failed: {e}")
return pd.DataFrame()
def query_latest_values(self, indicator_codes: List[str],
country_codes: List[str] = None) -> pd.DataFrame:
"""Get latest values for specified indicators"""
pipeline = []
# Match stage
match_filter = {"indicator_code": {"$in": indicator_codes}}
if country_codes:
match_filter["country_code"] = {"$in": country_codes}
pipeline.append({"$match": match_filter})
# Sort and group to get latest values
pipeline.extend([
{"$sort": {"indicator_code": 1, "country_code": 1, "observation_date": -1}},
{
"$group": {
"_id": {
"indicator_code": "$indicator_code",
"country_code": "$country_code"
},
"latest_value": {"$first": "$value"},
"latest_date": {"$first": "$observation_date"},
"source": {"$first": "$source"},
"unit": {"$first": "$unit"}
}
},
{
"$project": {
"_id": 0,
"indicator_code": "$_id.indicator_code",
"country_code": "$_id.country_code",
"value": "$latest_value",
"observation_date": "$latest_date",
"source": 1,
"unit": 1
}
}
])
try:
results = list(self.db.economic_indicators.aggregate(pipeline))
if not results:
return pd.DataFrame()
df = pd.DataFrame(results)
df['observation_date'] = pd.to_datetime(df['observation_date'])
return df
except Exception as e:
logging.error(f"Latest values query failed: {e}")
return pd.DataFrame()
def aggregate_by_category(self, category: str, aggregation_type: str = "avg",
start_date: datetime = None, end_date: datetime = None) -> pd.DataFrame:
"""Aggregate indicators by category"""
pipeline = []
# Match stage
match_filter = {"category": category}
if start_date or end_date:
date_filter = {}
if start_date:
date_filter["$gte"] = start_date
if end_date:
date_filter["$lte"] = end_date
match_filter["observation_date"] = date_filter
pipeline.append({"$match": match_filter})
# Group and aggregate
group_stage = {
"$group": {
"_id": {
"country_code": "$country_code",
"observation_date": "$observation_date"
},
"count": {"$sum": 1}
}
}
if aggregation_type == "avg":
group_stage["$group"]["avg_value"] = {"$avg": "$value"}
elif aggregation_type == "sum":
group_stage["$group"]["sum_value"] = {"$sum": "$value"}
elif aggregation_type == "min":
group_stage["$group"]["min_value"] = {"$min": "$value"}
elif aggregation_type == "max":
group_stage["$group"]["max_value"] = {"$max": "$value"}
pipeline.append(group_stage)
# Project and sort
pipeline.extend([
{
"$project": {
"_id": 0,
"country_code": "$_id.country_code",
"observation_date": "$_id.observation_date",
"aggregated_value": f"${aggregation_type}_value",
"count": 1
}
},
{"$sort": {"country_code": 1, "observation_date": 1}}
])
try:
results = list(self.db.economic_indicators.aggregate(pipeline))
if not results:
return pd.DataFrame()
df = pd.DataFrame(results)
df['observation_date'] = pd.to_datetime(df['observation_date'])
return df
except Exception as e:
logging.error(f"Category aggregation failed: {e}")
return pd.DataFrame()
def search_indicators(self, search_terms: List[str], categories: List[str] = None) -> pd.DataFrame:
"""Search for indicators by name or description"""
# Build text search query
search_query = " ".join(search_terms)
filter_doc = {
"$or": [
{"indicator_name": {"$regex": search_query, "$options": "i"}},
{"description": {"$regex": search_query, "$options": "i"}},
{"category": {"$regex": search_query, "$options": "i"}}
]
}
if categories:
filter_doc["category"] = {"$in": categories}
try:
cursor = self.db.indicator_metadata.find(filter_doc, {"_id": 0})
results = list(cursor)
if not results:
return pd.DataFrame()
return pd.DataFrame(results)
except Exception as e:
logging.error(f"Indicator search failed: {e}")
return pd.DataFrame()
def get_data_quality_summary(self) -> Dict[str, Any]:
"""Generate data quality summary statistics"""
pipeline = [
{
"$group": {
"_id": {
"indicator_code": "$indicator_code",
"country_code": "$country_code"
},
"total_observations": {"$sum": 1},
"null_values": {
"$sum": {
"$cond": [{"$eq": ["$value", None]}, 1, 0]
}
},
"latest_date": {"$max": "$observation_date"},
"earliest_date": {"$min": "$observation_date"}
}
},
{
"$group": {
"_id": None,
"total_series": {"$sum": 1},
"avg_observations_per_series": {"$avg": "$total_observations"},
"total_null_values": {"$sum": "$null_values"},
"total_observations": {"$sum": "$total_observations"},
"latest_update": {"$max": "$latest_date"},
"earliest_data": {"$min": "$earliest_date"}
}
}
]
try:
result = list(self.db.economic_indicators.aggregate(pipeline))
if not result:
return {}
summary = result[0]
summary["data_completeness"] = (
1 - (summary["total_null_values"] / summary["total_observations"])
) * 100 if summary["total_observations"] > 0 else 0
del summary["_id"]
return summary
except Exception as e:
logging.error(f"Data quality summary failed: {e}")
return {}
def close(self):
"""Close the database connection"""
self.client.close()
# Graph database pattern for economic relationships
class Neo4jEconomicStorage:
"""Neo4j graph database for economic relationships"""
def __init__(self, uri: str, username: str, password: str):
from neo4j import GraphDatabase
self.driver = GraphDatabase.driver(uri, auth=(username, password))
self._create_constraints()
def _create_constraints(self):
"""Create necessary constraints and indexes"""
with self.driver.session() as session:
# Create constraints
constraints = [
"CREATE CONSTRAINT IF NOT EXISTS FOR (c:Country) REQUIRE c.code IS UNIQUE",
"CREATE CONSTRAINT IF NOT EXISTS FOR (i:Indicator) REQUIRE i.code IS UNIQUE",
"CREATE CONSTRAINT IF NOT EXISTS FOR (s:Source) REQUIRE s.code IS UNIQUE"
]
for constraint in constraints:
try:
session.run(constraint)
except Exception as e:
logging.warning(f"Constraint creation warning: {e}")
def create_economic_relationship(self, indicator1: str, indicator2: str,
relationship_type: str, strength: float,
metadata: Dict[str, Any] = None):
"""Create relationship between economic indicators"""
cypher = """
MERGE (i1:Indicator {code: $indicator1})
MERGE (i2:Indicator {code: $indicator2})
MERGE (i1)-[r:CORRELATES_WITH {type: $relationship_type}]->(i2)
SET r.strength = $strength,
r.created_at = datetime(),
r.metadata = $metadata
"""
with self.driver.session() as session:
try:
session.run(cypher, {
"indicator1": indicator1,
"indicator2": indicator2,
"relationship_type": relationship_type,
"strength": strength,
"metadata": metadata or {}
})
return True
except Exception as e:
logging.error(f"Failed to create relationship: {e}")
return False
def find_correlated_indicators(self, indicator_code: str,
min_strength: float = 0.5) -> List[Dict[str, Any]]:
"""Find indicators correlated with the given indicator"""
cypher = """
MATCH (i1:Indicator {code: $indicator_code})-[r:CORRELATES_WITH]-(i2:Indicator)
WHERE r.strength >= $min_strength
RETURN i2.code as indicator_code,
i2.name as indicator_name,
r.strength as correlation_strength,
r.type as relationship_type,
r.metadata as metadata
ORDER BY r.strength DESC
"""
with self.driver.session() as session:
try:
result = session.run(cypher, {
"indicator_code": indicator_code,
"min_strength": min_strength
})
return [dict(record) for record in result]
except Exception as e:
logging.error(f"Correlation query failed: {e}")
return []
def close(self):
"""Close the database connection"""
self.driver.close()
Hybrid Storage Strategies
Many economic data systems benefit from hybrid storage approaches that leverage the strengths of different database technologies for different aspects of the system. A typical hybrid architecture might use time-series databases for high-frequency market data, SQL databases for regulatory reporting, and NoSQL databases for research datasets with evolving schemas.
The key to successful hybrid storage lies in designing clear data flow patterns and maintaining consistency across different storage systems. Each database system should have a well-defined role and ownership of specific data types or analytical workloads. The integration layer must handle data synchronization, format translation, and query federation across the different systems.
Data governance becomes particularly important in hybrid environments where the same economic indicators might be stored in multiple systems with different levels of processing, aggregation, or quality controls. Clear data lineage tracking and master data management ensure that analysts understand which system provides the authoritative version of specific datasets.
Cost optimization also requires careful consideration in hybrid architectures. Different database technologies have very different cost structures - time-series databases might be expensive for large historical datasets, while data lakes provide cost-effective storage for infrequently accessed data. The hybrid architecture should route data to the most cost-effective storage tier based on access patterns and analytical requirements.
The database integration patterns presented in this guide provide the foundation for the analytics capabilities discussed in Machine Learning Applications Economic Data Analysis and support the real-time monitoring systems covered in Economic Indicator Alerting and Monitoring Systems. Understanding these storage patterns is essential for building scalable economic data platforms that can grow with evolving analytical requirements.
Related Guides
For comprehensive database integration in economic data systems, explore these complementary resources:
- Data Lake Architecture Economic Analytics - Design storage architectures that complement database systems
- Economic Data Pipeline Aggregation - Build pipelines that efficiently populate database systems
- Data Quality Practices for Economic Datasets - Implement quality controls for database-stored data
- API Integration for Economic Data Sources - Connect data sources to database storage systems
- Real-Time Data Processing Economic Indicators - Stream data into database systems
- Machine Learning Applications Economic Data Analysis - Use database-stored data for ML applications
- Cloud Deployment Scaling Economic Data Systems - Deploy database systems in scalable cloud architectures