Database Integration for Economic Data Storage: SQL and NoSQL Patterns

Introduction

Economic data storage presents unique challenges that distinguish it from typical business application databases. Economic datasets often span decades of historical information, require complex temporal queries, and must support both high-frequency updates and intensive analytical workloads. The choice between SQL and NoSQL databases significantly impacts system performance, scalability, and analytical capabilities.

The temporal nature of economic data creates specific requirements for database design. Economic indicators arrive at irregular intervals, often with revisions to historical values, and require queries that span multiple time dimensions. Additionally, economic analysis frequently involves joining datasets with different temporal granularities, requiring database systems that can efficiently handle mixed-frequency data alignment.

Regulatory compliance adds another layer of complexity to economic database design. Financial institutions and government agencies often require comprehensive audit trails, data lineage tracking, and the ability to reproduce historical analysis exactly as it was performed at specific points in time. These requirements influence database selection, schema design, and backup strategies.

This guide builds upon the data processing foundations from Economic Data Pipeline Aggregation and provides the storage patterns that support the real-time systems described in Real-Time Data Processing Economic Indicators. The database patterns presented here integrate with the quality frameworks from Data Quality Practices for Economic Datasets.

Time-Series Database Patterns

Time-series databases have emerged as the preferred solution for storing high-frequency economic data due to their optimized storage engines and query patterns. These specialized databases provide efficient compression for temporal data, automatic retention policies, and built-in analytical functions that align well with economic analysis requirements.

The choice of time-series database depends heavily on the specific characteristics of your economic data workload. InfluxDB excels at high-frequency financial market data with its columnar storage and built-in downsampling capabilities. TimescaleDB provides the familiarity of PostgreSQL with time-series optimizations, making it ideal for organizations with existing SQL expertise. Prometheus, while primarily designed for monitoring, can effectively handle economic indicator storage with its powerful query language.

Data modeling in time-series databases requires careful consideration of tag strategies and measurement organization. Economic indicators should be organized by logical groupings that align with query patterns - country-level indicators, sector-specific metrics, and cross-cutting measures like inflation or employment. The tag structure should support both individual indicator queries and cross-indicator analysis without requiring complex joins.

from influxdb_client import InfluxDBClient, Point, WritePrecision
from influxdb_client.client.write_api import SYNCHRONOUS
import pandas as pd
from datetime import datetime, timedelta
from typing import Dict, List, Any, Optional
import logging
from dataclasses import dataclass
import asyncio

@dataclass
class EconomicIndicator:
    """Economic indicator data structure"""
    code: str
    name: str
    value: float
    timestamp: datetime
    country: str
    category: str
    source: str
    frequency: str
    unit: str
    metadata: Dict[str, Any] = None

class InfluxEconomicStorage:
    """InfluxDB storage optimized for economic indicators"""
    
    def __init__(self, url: str, token: str, org: str, bucket: str):
        self.client = InfluxDBClient(url=url, token=token, org=org)
        self.write_api = self.client.write_api(write_options=SYNCHRONOUS)
        self.query_api = self.client.query_api()
        self.bucket = bucket
        self.org = org
        
    def store_indicator(self, indicator: EconomicIndicator) -> bool:
        """Store a single economic indicator"""
        try:
            point = Point("economic_indicator") \
                .tag("code", indicator.code) \
                .tag("country", indicator.country) \
                .tag("category", indicator.category) \
                .tag("source", indicator.source) \
                .tag("frequency", indicator.frequency) \
                .tag("unit", indicator.unit) \
                .field("value", indicator.value) \
                .field("name", indicator.name) \
                .time(indicator.timestamp, WritePrecision.S)
            
            # Add metadata as fields if present
            if indicator.metadata:
                for key, value in indicator.metadata.items():
                    if isinstance(value, (int, float)):
                        point = point.field(f"meta_{key}", value)
                    elif isinstance(value, str):
                        point = point.tag(f"meta_{key}", value)
            
            self.write_api.write(bucket=self.bucket, org=self.org, record=point)
            return True
            
        except Exception as e:
            logging.error(f"Failed to store indicator {indicator.code}: {e}")
            return False
    
    def store_indicators_batch(self, indicators: List[EconomicIndicator]) -> Dict[str, int]:
        """Store multiple indicators in batch"""
        points = []
        success_count = 0
        error_count = 0
        
        for indicator in indicators:
            try:
                point = Point("economic_indicator") \
                    .tag("code", indicator.code) \
                    .tag("country", indicator.country) \
                    .tag("category", indicator.category) \
                    .tag("source", indicator.source) \
                    .tag("frequency", indicator.frequency) \
                    .tag("unit", indicator.unit) \
                    .field("value", indicator.value) \
                    .field("name", indicator.name) \
                    .time(indicator.timestamp, WritePrecision.S)
                
                if indicator.metadata:
                    for key, value in indicator.metadata.items():
                        if isinstance(value, (int, float)):
                            point = point.field(f"meta_{key}", value)
                        elif isinstance(value, str):
                            point = point.tag(f"meta_{key}", value)
                
                points.append(point)
                success_count += 1
                
            except Exception as e:
                logging.error(f"Failed to prepare point for {indicator.code}: {e}")
                error_count += 1
        
        if points:
            try:
                self.write_api.write(bucket=self.bucket, org=self.org, record=points)
            except Exception as e:
                logging.error(f"Batch write failed: {e}")
                error_count += len(points)
                success_count = 0
        
        return {"success": success_count, "errors": error_count}
    
    def query_indicator_time_series(self, indicator_code: str, 
                                   start_time: datetime, end_time: datetime,
                                   country: Optional[str] = None) -> pd.DataFrame:
        """Query time series data for a specific indicator"""
        
        # Build the Flux query
        flux_query = f'''
        from(bucket: "{self.bucket}")
          |> range(start: {start_time.isoformat()}Z, stop: {end_time.isoformat()}Z)
          |> filter(fn: (r) => r._measurement == "economic_indicator")
          |> filter(fn: (r) => r.code == "{indicator_code}")
          |> filter(fn: (r) => r._field == "value")
        '''
        
        if country:
            flux_query += f'  |> filter(fn: (r) => r.country == "{country}")\n'
        
        flux_query += '''
          |> sort(columns: ["_time"])
          |> yield(name: "results")
        '''
        
        try:
            result = self.query_api.query_data_frame(flux_query, org=self.org)
            
            if result.empty:
                return pd.DataFrame()
            
            # Clean and format the result
            df = result[['_time', '_value', 'country', 'category', 'source', 'frequency', 'unit']].copy()
            df.columns = ['timestamp', 'value', 'country', 'category', 'source', 'frequency', 'unit']
            df['timestamp'] = pd.to_datetime(df['timestamp'])
            df = df.sort_values('timestamp')
            
            return df
            
        except Exception as e:
            logging.error(f"Query failed for indicator {indicator_code}: {e}")
            return pd.DataFrame()
    
    def query_multiple_indicators(self, indicator_codes: List[str],
                                 start_time: datetime, end_time: datetime,
                                 country: Optional[str] = None) -> pd.DataFrame:
        """Query multiple indicators and return as wide-format DataFrame"""
        
        # Build codes filter
        codes_filter = " or ".join([f'r.code == "{code}"' for code in indicator_codes])
        
        flux_query = f'''
        from(bucket: "{self.bucket}")
          |> range(start: {start_time.isoformat()}Z, stop: {end_time.isoformat()}Z)
          |> filter(fn: (r) => r._measurement == "economic_indicator")
          |> filter(fn: (r) => {codes_filter})
          |> filter(fn: (r) => r._field == "value")
        '''
        
        if country:
            flux_query += f'  |> filter(fn: (r) => r.country == "{country}")\n'
        
        flux_query += '''
          |> pivot(rowKey:["_time"], columnKey: ["code"], valueColumn: "_value")
          |> sort(columns: ["_time"])
          |> yield(name: "results")
        '''
        
        try:
            result = self.query_api.query_data_frame(flux_query, org=self.org)
            
            if result.empty:
                return pd.DataFrame()
            
            # Clean the result
            df = result.copy()
            df['_time'] = pd.to_datetime(df['_time'])
            df = df.rename(columns={'_time': 'timestamp'})
            df = df.sort_values('timestamp')
            
            # Keep only timestamp and indicator columns
            indicator_columns = ['timestamp'] + [col for col in df.columns if col in indicator_codes]
            df = df[indicator_columns]
            
            return df
            
        except Exception as e:
            logging.error(f"Multi-indicator query failed: {e}")
            return pd.DataFrame()
    
    def aggregate_indicator(self, indicator_code: str, 
                           start_time: datetime, end_time: datetime,
                           aggregation: str = "mean", window: str = "1d") -> pd.DataFrame:
        """Aggregate indicator data over time windows"""
        
        flux_query = f'''
        from(bucket: "{self.bucket}")
          |> range(start: {start_time.isoformat()}Z, stop: {end_time.isoformat()}Z)
          |> filter(fn: (r) => r._measurement == "economic_indicator")
          |> filter(fn: (r) => r.code == "{indicator_code}")
          |> filter(fn: (r) => r._field == "value")
          |> aggregateWindow(every: {window}, fn: {aggregation}, createEmpty: false)
          |> sort(columns: ["_time"])
          |> yield(name: "results")
        '''
        
        try:
            result = self.query_api.query_data_frame(flux_query, org=self.org)
            
            if result.empty:
                return pd.DataFrame()
            
            df = result[['_time', '_value']].copy()
            df.columns = ['timestamp', f'{indicator_code}_{aggregation}']
            df['timestamp'] = pd.to_datetime(df['timestamp'])
            
            return df
            
        except Exception as e:
            logging.error(f"Aggregation query failed for {indicator_code}: {e}")
            return pd.DataFrame()
    
    def get_latest_values(self, indicator_codes: List[str],
                         country: Optional[str] = None) -> Dict[str, Dict[str, Any]]:
        """Get the latest values for specified indicators"""
        
        codes_filter = " or ".join([f'r.code == "{code}"' for code in indicator_codes])
        
        flux_query = f'''
        from(bucket: "{self.bucket}")
          |> range(start: -30d)
          |> filter(fn: (r) => r._measurement == "economic_indicator")
          |> filter(fn: (r) => {codes_filter})
          |> filter(fn: (r) => r._field == "value")
        '''
        
        if country:
            flux_query += f'  |> filter(fn: (r) => r.country == "{country}")\n'
        
        flux_query += '''
          |> group(columns: ["code"])
          |> last()
          |> yield(name: "results")
        '''
        
        try:
            result = self.query_api.query_data_frame(flux_query, org=self.org)
            
            if result.empty:
                return {}
            
            latest_values = {}
            for _, row in result.iterrows():
                latest_values[row['code']] = {
                    'value': row['_value'],
                    'timestamp': pd.to_datetime(row['_time']),
                    'country': row['country'],
                    'source': row['source']
                }
            
            return latest_values
            
        except Exception as e:
            logging.error(f"Latest values query failed: {e}")
            return {}
    
    def close(self):
        """Close the database connection"""
        self.client.close()

class TimescaleEconomicStorage:
    """TimescaleDB storage for economic indicators"""
    
    def __init__(self, connection_string: str):
        import psycopg2
        from psycopg2.extras import RealDictCursor
        self.connection = psycopg2.connect(connection_string)
        self.cursor = self.connection.cursor(cursor_factory=RealDictCursor)
        self._create_tables()
        
    def _create_tables(self):
        """Create the necessary tables and hypertables"""
        create_table_sql = """
        CREATE TABLE IF NOT EXISTS economic_indicators (
            time TIMESTAMPTZ NOT NULL,
            indicator_code TEXT NOT NULL,
            value DOUBLE PRECISION,
            country TEXT,
            category TEXT,
            source TEXT,
            frequency TEXT,
            unit TEXT,
            metadata JSONB,
            created_at TIMESTAMPTZ DEFAULT NOW()
        );
        
        CREATE INDEX IF NOT EXISTS idx_indicators_code_time 
        ON economic_indicators (indicator_code, time DESC);
        
        CREATE INDEX IF NOT EXISTS idx_indicators_country_time 
        ON economic_indicators (country, time DESC);
        
        CREATE INDEX IF NOT EXISTS idx_indicators_category 
        ON economic_indicators (category);
        """
        
        # Create hypertable if it doesn't exist
        hypertable_sql = """
        SELECT create_hypertable('economic_indicators', 'time', 
                                if_not_exists => TRUE,
                                chunk_time_interval => INTERVAL '1 week');
        """
        
        try:
            self.cursor.execute(create_table_sql)
            self.cursor.execute(hypertable_sql)
            self.connection.commit()
        except Exception as e:
            logging.error(f"Failed to create tables: {e}")
            self.connection.rollback()
    
    def store_indicator(self, indicator: EconomicIndicator) -> bool:
        """Store a single economic indicator"""
        insert_sql = """
        INSERT INTO economic_indicators 
        (time, indicator_code, value, country, category, source, frequency, unit, metadata)
        VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)
        ON CONFLICT (time, indicator_code, country) DO UPDATE SET
        value = EXCLUDED.value,
        metadata = EXCLUDED.metadata,
        created_at = NOW()
        """
        
        try:
            self.cursor.execute(insert_sql, (
                indicator.timestamp,
                indicator.code,
                indicator.value,
                indicator.country,
                indicator.category,
                indicator.source,
                indicator.frequency,
                indicator.unit,
                json.dumps(indicator.metadata) if indicator.metadata else None
            ))
            self.connection.commit()
            return True
            
        except Exception as e:
            logging.error(f"Failed to store indicator {indicator.code}: {e}")
            self.connection.rollback()
            return False
    
    def query_indicator_time_series(self, indicator_code: str,
                                   start_time: datetime, end_time: datetime,
                                   country: Optional[str] = None) -> pd.DataFrame:
        """Query time series data for a specific indicator"""
        
        base_sql = """
        SELECT time, value, country, category, source, frequency, unit
        FROM economic_indicators
        WHERE indicator_code = %s
        AND time BETWEEN %s AND %s
        """
        
        params = [indicator_code, start_time, end_time]
        
        if country:
            base_sql += " AND country = %s"
            params.append(country)
        
        base_sql += " ORDER BY time"
        
        try:
            self.cursor.execute(base_sql, params)
            results = self.cursor.fetchall()
            
            if not results:
                return pd.DataFrame()
            
            df = pd.DataFrame(results)
            df['time'] = pd.to_datetime(df['time'])
            return df
            
        except Exception as e:
            logging.error(f"Query failed for indicator {indicator_code}: {e}")
            return pd.DataFrame()
    
    def get_correlation_data(self, indicator_codes: List[str],
                           start_time: datetime, end_time: datetime,
                           country: Optional[str] = None) -> pd.DataFrame:
        """Get data for correlation analysis"""
        
        base_sql = """
        SELECT time, indicator_code, value
        FROM economic_indicators
        WHERE indicator_code = ANY(%s)
        AND time BETWEEN %s AND %s
        """
        
        params = [indicator_codes, start_time, end_time]
        
        if country:
            base_sql += " AND country = %s"
            params.append(country)
        
        base_sql += " ORDER BY time, indicator_code"
        
        try:
            self.cursor.execute(base_sql, params)
            results = self.cursor.fetchall()
            
            if not results:
                return pd.DataFrame()
            
            df = pd.DataFrame(results)
            df['time'] = pd.to_datetime(df['time'])
            
            # Pivot to wide format for correlation analysis
            pivot_df = df.pivot(index='time', columns='indicator_code', values='value')
            pivot_df.reset_index(inplace=True)
            
            return pivot_df
            
        except Exception as e:
            logging.error(f"Correlation query failed: {e}")
            return pd.DataFrame()
    
    def close(self):
        """Close the database connection"""
        self.cursor.close()
        self.connection.close()

SQL Database Patterns for Economic Analysis

Traditional SQL databases remain valuable for economic data storage, particularly when complex analytical queries, regulatory reporting, and data integrity are paramount. PostgreSQL, with its advanced analytical functions and JSON support, provides an excellent foundation for economic data systems that need both relational structure and flexible schema evolution.

The key to effective SQL database design for economic data lies in balancing normalization with query performance. Economic datasets benefit from a star schema approach where fact tables contain the actual indicator values and dimension tables provide contextual information about countries, sources, and indicator definitions. This design supports both efficient storage and complex analytical queries.

Partitioning strategies become critical for managing large historical datasets. Time-based partitioning enables efficient data lifecycle management and query performance optimization. Range partitioning by date allows older data to be moved to slower storage tiers while keeping recent data in high-performance storage for active analysis.

import psycopg2
from psycopg2.extras import RealDictCursor, execute_batch
import pandas as pd
from datetime import datetime, timedelta
from typing import Dict, List, Any, Optional, Tuple
import logging
import json

class PostgreSQLEconomicStorage:
    """PostgreSQL storage optimized for economic data analysis"""
    
    def __init__(self, connection_string: str):
        self.connection = psycopg2.connect(connection_string)
        self.cursor = self.connection.cursor(cursor_factory=RealDictCursor)
        self._create_schema()
        
    def _create_schema(self):
        """Create the complete economic data schema"""
        schema_sql = """
        -- Countries dimension table
        CREATE TABLE IF NOT EXISTS dim_countries (
            country_id SERIAL PRIMARY KEY,
            country_code VARCHAR(3) UNIQUE NOT NULL,
            country_name VARCHAR(100) NOT NULL,
            region VARCHAR(50),
            income_group VARCHAR(50),
            created_at TIMESTAMPTZ DEFAULT NOW()
        );
        
        -- Data sources dimension table
        CREATE TABLE IF NOT EXISTS dim_sources (
            source_id SERIAL PRIMARY KEY,
            source_code VARCHAR(20) UNIQUE NOT NULL,
            source_name VARCHAR(100) NOT NULL,
            source_type VARCHAR(50),
            api_endpoint VARCHAR(255),
            update_frequency VARCHAR(20),
            created_at TIMESTAMPTZ DEFAULT NOW()
        );
        
        -- Economic indicators dimension table
        CREATE TABLE IF NOT EXISTS dim_indicators (
            indicator_id SERIAL PRIMARY KEY,
            indicator_code VARCHAR(50) UNIQUE NOT NULL,
            indicator_name VARCHAR(255) NOT NULL,
            category VARCHAR(100),
            subcategory VARCHAR(100),
            unit VARCHAR(50),
            frequency VARCHAR(20),
            seasonal_adjustment BOOLEAN DEFAULT FALSE,
            description TEXT,
            methodology TEXT,
            created_at TIMESTAMPTZ DEFAULT NOW()
        );
        
        -- Main fact table for economic indicator values
        CREATE TABLE IF NOT EXISTS fact_economic_data (
            id BIGSERIAL,
            indicator_id INTEGER REFERENCES dim_indicators(indicator_id),
            country_id INTEGER REFERENCES dim_countries(country_id),
            source_id INTEGER REFERENCES dim_sources(source_id),
            observation_date DATE NOT NULL,
            value NUMERIC(15,6),
            preliminary BOOLEAN DEFAULT FALSE,
            revised BOOLEAN DEFAULT FALSE,
            revision_number INTEGER DEFAULT 1,
            confidence_interval_lower NUMERIC(15,6),
            confidence_interval_upper NUMERIC(15,6),
            metadata JSONB,
            created_at TIMESTAMPTZ DEFAULT NOW(),
            updated_at TIMESTAMPTZ DEFAULT NOW(),
            
            PRIMARY KEY (id, observation_date)
        ) PARTITION BY RANGE (observation_date);
        
        -- Create partitions for different years
        CREATE TABLE IF NOT EXISTS fact_economic_data_2020 
        PARTITION OF fact_economic_data 
        FOR VALUES FROM ('2020-01-01') TO ('2021-01-01');
        
        CREATE TABLE IF NOT EXISTS fact_economic_data_2021 
        PARTITION OF fact_economic_data 
        FOR VALUES FROM ('2021-01-01') TO ('2022-01-01');
        
        CREATE TABLE IF NOT EXISTS fact_economic_data_2022 
        PARTITION OF fact_economic_data 
        FOR VALUES FROM ('2022-01-01') TO ('2023-01-01');
        
        CREATE TABLE IF NOT EXISTS fact_economic_data_2023 
        PARTITION OF fact_economic_data 
        FOR VALUES FROM ('2023-01-01') TO ('2024-01-01');
        
        CREATE TABLE IF NOT EXISTS fact_economic_data_2024 
        PARTITION OF fact_economic_data 
        FOR VALUES FROM ('2024-01-01') TO ('2025-01-01');
        
        CREATE TABLE IF NOT EXISTS fact_economic_data_2025 
        PARTITION OF fact_economic_data 
        FOR VALUES FROM ('2025-01-01') TO ('2026-01-01');
        
        -- Indexes for performance
        CREATE INDEX IF NOT EXISTS idx_fact_indicator_date 
        ON fact_economic_data (indicator_id, observation_date DESC);
        
        CREATE INDEX IF NOT EXISTS idx_fact_country_date 
        ON fact_economic_data (country_id, observation_date DESC);
        
        CREATE INDEX IF NOT EXISTS idx_fact_date_value 
        ON fact_economic_data (observation_date DESC, value);
        
        -- Revision tracking table
        CREATE TABLE IF NOT EXISTS revision_history (
            revision_id BIGSERIAL PRIMARY KEY,
            fact_id BIGINT,
            observation_date DATE,
            indicator_id INTEGER,
            country_id INTEGER,
            old_value NUMERIC(15,6),
            new_value NUMERIC(15,6),
            revision_type VARCHAR(50),
            revision_reason TEXT,
            revised_at TIMESTAMPTZ DEFAULT NOW()
        );
        
        -- Data quality metrics table
        CREATE TABLE IF NOT EXISTS data_quality_metrics (
            metric_id BIGSERIAL PRIMARY KEY,
            indicator_id INTEGER REFERENCES dim_indicators(indicator_id),
            country_id INTEGER REFERENCES dim_countries(country_id),
            metric_date DATE,
            completeness_score NUMERIC(5,4),
            accuracy_score NUMERIC(5,4),
            consistency_score NUMERIC(5,4),
            timeliness_score NUMERIC(5,4),
            overall_score NUMERIC(5,4),
            quality_issues JSONB,
            calculated_at TIMESTAMPTZ DEFAULT NOW()
        );
        """
        
        try:
            self.cursor.execute(schema_sql)
            self.connection.commit()
            logging.info("Economic data schema created successfully")
        except Exception as e:
            logging.error(f"Failed to create schema: {e}")
            self.connection.rollback()
    
    def insert_country(self, country_code: str, country_name: str,
                      region: str = None, income_group: str = None) -> int:
        """Insert or get country ID"""
        insert_sql = """
        INSERT INTO dim_countries (country_code, country_name, region, income_group)
        VALUES (%s, %s, %s, %s)
        ON CONFLICT (country_code) DO UPDATE SET
        country_name = EXCLUDED.country_name,
        region = EXCLUDED.region,
        income_group = EXCLUDED.income_group
        RETURNING country_id
        """
        
        try:
            self.cursor.execute(insert_sql, (country_code, country_name, region, income_group))
            country_id = self.cursor.fetchone()['country_id']
            self.connection.commit()
            return country_id
        except Exception as e:
            logging.error(f"Failed to insert country {country_code}: {e}")
            self.connection.rollback()
            return None
    
    def insert_indicator(self, indicator_code: str, indicator_name: str,
                        category: str = None, unit: str = None,
                        frequency: str = None, description: str = None) -> int:
        """Insert or get indicator ID"""
        insert_sql = """
        INSERT INTO dim_indicators 
        (indicator_code, indicator_name, category, unit, frequency, description)
        VALUES (%s, %s, %s, %s, %s, %s)
        ON CONFLICT (indicator_code) DO UPDATE SET
        indicator_name = EXCLUDED.indicator_name,
        category = EXCLUDED.category,
        unit = EXCLUDED.unit,
        frequency = EXCLUDED.frequency,
        description = EXCLUDED.description
        RETURNING indicator_id
        """
        
        try:
            self.cursor.execute(insert_sql, (
                indicator_code, indicator_name, category, unit, frequency, description
            ))
            indicator_id = self.cursor.fetchone()['indicator_id']
            self.connection.commit()
            return indicator_id
        except Exception as e:
            logging.error(f"Failed to insert indicator {indicator_code}: {e}")
            self.connection.rollback()
            return None
    
    def insert_source(self, source_code: str, source_name: str,
                     source_type: str = None, api_endpoint: str = None) -> int:
        """Insert or get source ID"""
        insert_sql = """
        INSERT INTO dim_sources (source_code, source_name, source_type, api_endpoint)
        VALUES (%s, %s, %s, %s)
        ON CONFLICT (source_code) DO UPDATE SET
        source_name = EXCLUDED.source_name,
        source_type = EXCLUDED.source_type,
        api_endpoint = EXCLUDED.api_endpoint
        RETURNING source_id
        """
        
        try:
            self.cursor.execute(insert_sql, (source_code, source_name, source_type, api_endpoint))
            source_id = self.cursor.fetchone()['source_id']
            self.connection.commit()
            return source_id
        except Exception as e:
            logging.error(f"Failed to insert source {source_code}: {e}")
            self.connection.rollback()
            return None
    
    def insert_economic_data(self, indicator_code: str, country_code: str,
                           source_code: str, observation_date: datetime,
                           value: float, preliminary: bool = False,
                           metadata: Dict[str, Any] = None) -> bool:
        """Insert economic data point"""
        
        # Get dimension IDs
        indicator_id = self._get_indicator_id(indicator_code)
        country_id = self._get_country_id(country_code)
        source_id = self._get_source_id(source_code)
        
        if not all([indicator_id, country_id, source_id]):
            logging.error(f"Missing dimension IDs for {indicator_code}, {country_code}, {source_code}")
            return False
        
        insert_sql = """
        INSERT INTO fact_economic_data 
        (indicator_id, country_id, source_id, observation_date, value, preliminary, metadata)
        VALUES (%s, %s, %s, %s, %s, %s, %s)
        ON CONFLICT (indicator_id, country_id, source_id, observation_date) DO UPDATE SET
        value = EXCLUDED.value,
        preliminary = EXCLUDED.preliminary,
        metadata = EXCLUDED.metadata,
        updated_at = NOW()
        """
        
        try:
            self.cursor.execute(insert_sql, (
                indicator_id, country_id, source_id, observation_date.date(),
                value, preliminary, json.dumps(metadata) if metadata else None
            ))
            self.connection.commit()
            return True
        except Exception as e:
            logging.error(f"Failed to insert economic data: {e}")
            self.connection.rollback()
            return False
    
    def get_time_series(self, indicator_code: str, country_code: str = None,
                       start_date: datetime = None, end_date: datetime = None) -> pd.DataFrame:
        """Get time series data for analysis"""
        
        base_sql = """
        SELECT 
            f.observation_date,
            f.value,
            f.preliminary,
            f.confidence_interval_lower,
            f.confidence_interval_upper,
            i.indicator_code,
            i.indicator_name,
            i.unit,
            c.country_code,
            c.country_name,
            s.source_code
        FROM fact_economic_data f
        JOIN dim_indicators i ON f.indicator_id = i.indicator_id
        JOIN dim_countries c ON f.country_id = c.country_id
        JOIN dim_sources s ON f.source_id = s.source_id
        WHERE i.indicator_code = %s
        """
        
        params = [indicator_code]
        
        if country_code:
            base_sql += " AND c.country_code = %s"
            params.append(country_code)
        
        if start_date:
            base_sql += " AND f.observation_date >= %s"
            params.append(start_date.date())
        
        if end_date:
            base_sql += " AND f.observation_date <= %s"
            params.append(end_date.date())
        
        base_sql += " ORDER BY f.observation_date"
        
        try:
            self.cursor.execute(base_sql, params)
            results = self.cursor.fetchall()
            
            if not results:
                return pd.DataFrame()
            
            df = pd.DataFrame(results)
            df['observation_date'] = pd.to_datetime(df['observation_date'])
            return df
            
        except Exception as e:
            logging.error(f"Time series query failed: {e}")
            return pd.DataFrame()
    
    def get_cross_country_data(self, indicator_code: str, countries: List[str],
                              observation_date: datetime) -> pd.DataFrame:
        """Get cross-country data for a specific indicator and date"""
        
        sql = """
        SELECT 
            c.country_code,
            c.country_name,
            f.value,
            f.observation_date,
            i.unit
        FROM fact_economic_data f
        JOIN dim_indicators i ON f.indicator_id = i.indicator_id
        JOIN dim_countries c ON f.country_id = c.country_id
        WHERE i.indicator_code = %s
        AND c.country_code = ANY(%s)
        AND f.observation_date = %s
        ORDER BY c.country_name
        """
        
        try:
            self.cursor.execute(sql, (indicator_code, countries, observation_date.date()))
            results = self.cursor.fetchall()
            
            if not results:
                return pd.DataFrame()
            
            return pd.DataFrame(results)
            
        except Exception as e:
            logging.error(f"Cross-country query failed: {e}")
            return pd.DataFrame()
    
    def calculate_correlation_matrix(self, indicator_codes: List[str],
                                   country_code: str, start_date: datetime,
                                   end_date: datetime) -> pd.DataFrame:
        """Calculate correlation matrix for multiple indicators"""
        
        sql = """
        SELECT 
            i.indicator_code,
            f.observation_date,
            f.value
        FROM fact_economic_data f
        JOIN dim_indicators i ON f.indicator_id = i.indicator_id
        JOIN dim_countries c ON f.country_id = c.country_id
        WHERE i.indicator_code = ANY(%s)
        AND c.country_code = %s
        AND f.observation_date BETWEEN %s AND %s
        ORDER BY f.observation_date, i.indicator_code
        """
        
        try:
            self.cursor.execute(sql, (
                indicator_codes, country_code, start_date.date(), end_date.date()
            ))
            results = self.cursor.fetchall()
            
            if not results:
                return pd.DataFrame()
            
            df = pd.DataFrame(results)
            df['observation_date'] = pd.to_datetime(df['observation_date'])
            
            # Pivot to wide format for correlation analysis
            pivot_df = df.pivot(
                index='observation_date', 
                columns='indicator_code', 
                values='value'
            )
            
            # Calculate correlation matrix
            correlation_matrix = pivot_df.corr()
            
            return correlation_matrix
            
        except Exception as e:
            logging.error(f"Correlation calculation failed: {e}")
            return pd.DataFrame()
    
    def _get_indicator_id(self, indicator_code: str) -> Optional[int]:
        """Get indicator ID by code"""
        self.cursor.execute(
            "SELECT indicator_id FROM dim_indicators WHERE indicator_code = %s",
            (indicator_code,)
        )
        result = self.cursor.fetchone()
        return result['indicator_id'] if result else None
    
    def _get_country_id(self, country_code: str) -> Optional[int]:
        """Get country ID by code"""
        self.cursor.execute(
            "SELECT country_id FROM dim_countries WHERE country_code = %s",
            (country_code,)
        )
        result = self.cursor.fetchone()
        return result['country_id'] if result else None
    
    def _get_source_id(self, source_code: str) -> Optional[int]:
        """Get source ID by code"""
        self.cursor.execute(
            "SELECT source_id FROM dim_sources WHERE source_code = %s",
            (source_code,)
        )
        result = self.cursor.fetchone()
        return result['source_id'] if result else None
    
    def close(self):
        """Close the database connection"""
        self.cursor.close()
        self.connection.close()

NoSQL Patterns for Economic Data

NoSQL databases provide valuable alternatives for economic data storage when schema flexibility, horizontal scaling, or document-based storage patterns align better with analytical requirements. Document databases like MongoDB excel at storing economic datasets with varying structures, while graph databases like Neo4j can effectively model complex economic relationships and dependencies.

The choice of NoSQL database depends heavily on the specific analytical patterns and data characteristics. MongoDB works well for storing economic research datasets where the schema may evolve over time or where each economic indicator has unique metadata requirements. The document model naturally represents the hierarchical structure of economic data, from global indicators down to country and regional levels.

Graph databases become particularly valuable when modeling economic relationships and dependencies. The interconnected nature of economic systems - where changes in one indicator influence others through complex chains of causation - maps naturally to graph structures. Neo4j can efficiently query these relationships and support network analysis of economic systems.

from pymongo import MongoClient
from pymongo.errors import BulkWriteError
import pandas as pd
from datetime import datetime, timedelta
from typing import Dict, List, Any, Optional
import logging
from bson import ObjectId
import json

class MongoEconomicStorage:
    """MongoDB storage for flexible economic data schemas"""
    
    def __init__(self, connection_string: str, database_name: str):
        self.client = MongoClient(connection_string)
        self.db = self.client[database_name]
        self._create_collections()
        
    def _create_collections(self):
        """Create collections and indexes"""
        # Economic indicators collection
        indicators_collection = self.db.economic_indicators
        
        # Create indexes for performance
        indicators_collection.create_index([
            ("indicator_code", 1),
            ("country_code", 1),
            ("observation_date", -1)
        ])
        
        indicators_collection.create_index([
            ("observation_date", -1),
            ("category", 1)
        ])
        
        indicators_collection.create_index([
            ("country_code", 1),
            ("observation_date", -1)
        ])
        
        # Metadata collection for indicator definitions
        metadata_collection = self.db.indicator_metadata
        metadata_collection.create_index([("indicator_code", 1)], unique=True)
        
        # Data sources collection
        sources_collection = self.db.data_sources
        sources_collection.create_index([("source_code", 1)], unique=True)
        
        logging.info("MongoDB collections and indexes created")
    
    def store_indicator_metadata(self, indicator_code: str, metadata: Dict[str, Any]) -> bool:
        """Store indicator metadata"""
        try:
            self.db.indicator_metadata.update_one(
                {"indicator_code": indicator_code},
                {
                    "$set": {
                        **metadata,
                        "updated_at": datetime.utcnow()
                    },
                    "$setOnInsert": {
                        "created_at": datetime.utcnow()
                    }
                },
                upsert=True
            )
            return True
        except Exception as e:
            logging.error(f"Failed to store indicator metadata: {e}")
            return False
    
    def store_economic_data(self, data_points: List[Dict[str, Any]]) -> Dict[str, int]:
        """Store multiple economic data points"""
        if not data_points:
            return {"inserted": 0, "modified": 0, "errors": 0}
        
        operations = []
        for point in data_points:
            # Add timestamps
            point["created_at"] = datetime.utcnow()
            point["updated_at"] = datetime.utcnow()
            
            # Create upsert operation
            filter_doc = {
                "indicator_code": point["indicator_code"],
                "country_code": point["country_code"],
                "observation_date": point["observation_date"]
            }
            
            update_doc = {
                "$set": point,
                "$setOnInsert": {"created_at": datetime.utcnow()}
            }
            
            operations.append({
                "update_one": {
                    "filter": filter_doc,
                    "update": update_doc,
                    "upsert": True
                }
            })
        
        try:
            result = self.db.economic_indicators.bulk_write(operations)
            return {
                "inserted": result.upserted_count,
                "modified": result.modified_count,
                "errors": 0
            }
        except BulkWriteError as e:
            logging.error(f"Bulk write error: {e.details}")
            return {
                "inserted": e.details.get("nUpserted", 0),
                "modified": e.details.get("nModified", 0),
                "errors": len(e.details.get("writeErrors", []))
            }
        except Exception as e:
            logging.error(f"Failed to store economic data: {e}")
            return {"inserted": 0, "modified": 0, "errors": len(data_points)}
    
    def query_time_series(self, indicator_code: str, country_code: str = None,
                         start_date: datetime = None, end_date: datetime = None) -> pd.DataFrame:
        """Query time series data"""
        
        filter_doc = {"indicator_code": indicator_code}
        
        if country_code:
            filter_doc["country_code"] = country_code
        
        if start_date or end_date:
            date_filter = {}
            if start_date:
                date_filter["$gte"] = start_date
            if end_date:
                date_filter["$lte"] = end_date
            filter_doc["observation_date"] = date_filter
        
        try:
            cursor = self.db.economic_indicators.find(
                filter_doc,
                {"_id": 0}  # Exclude MongoDB ObjectId
            ).sort("observation_date", 1)
            
            results = list(cursor)
            
            if not results:
                return pd.DataFrame()
            
            df = pd.DataFrame(results)
            df['observation_date'] = pd.to_datetime(df['observation_date'])
            
            return df
            
        except Exception as e:
            logging.error(f"Time series query failed: {e}")
            return pd.DataFrame()
    
    def query_latest_values(self, indicator_codes: List[str],
                           country_codes: List[str] = None) -> pd.DataFrame:
        """Get latest values for specified indicators"""
        
        pipeline = []
        
        # Match stage
        match_filter = {"indicator_code": {"$in": indicator_codes}}
        if country_codes:
            match_filter["country_code"] = {"$in": country_codes}
        
        pipeline.append({"$match": match_filter})
        
        # Sort and group to get latest values
        pipeline.extend([
            {"$sort": {"indicator_code": 1, "country_code": 1, "observation_date": -1}},
            {
                "$group": {
                    "_id": {
                        "indicator_code": "$indicator_code",
                        "country_code": "$country_code"
                    },
                    "latest_value": {"$first": "$value"},
                    "latest_date": {"$first": "$observation_date"},
                    "source": {"$first": "$source"},
                    "unit": {"$first": "$unit"}
                }
            },
            {
                "$project": {
                    "_id": 0,
                    "indicator_code": "$_id.indicator_code",
                    "country_code": "$_id.country_code",
                    "value": "$latest_value",
                    "observation_date": "$latest_date",
                    "source": 1,
                    "unit": 1
                }
            }
        ])
        
        try:
            results = list(self.db.economic_indicators.aggregate(pipeline))
            
            if not results:
                return pd.DataFrame()
            
            df = pd.DataFrame(results)
            df['observation_date'] = pd.to_datetime(df['observation_date'])
            
            return df
            
        except Exception as e:
            logging.error(f"Latest values query failed: {e}")
            return pd.DataFrame()
    
    def aggregate_by_category(self, category: str, aggregation_type: str = "avg",
                             start_date: datetime = None, end_date: datetime = None) -> pd.DataFrame:
        """Aggregate indicators by category"""
        
        pipeline = []
        
        # Match stage
        match_filter = {"category": category}
        if start_date or end_date:
            date_filter = {}
            if start_date:
                date_filter["$gte"] = start_date
            if end_date:
                date_filter["$lte"] = end_date
            match_filter["observation_date"] = date_filter
        
        pipeline.append({"$match": match_filter})
        
        # Group and aggregate
        group_stage = {
            "$group": {
                "_id": {
                    "country_code": "$country_code",
                    "observation_date": "$observation_date"
                },
                "count": {"$sum": 1}
            }
        }
        
        if aggregation_type == "avg":
            group_stage["$group"]["avg_value"] = {"$avg": "$value"}
        elif aggregation_type == "sum":
            group_stage["$group"]["sum_value"] = {"$sum": "$value"}
        elif aggregation_type == "min":
            group_stage["$group"]["min_value"] = {"$min": "$value"}
        elif aggregation_type == "max":
            group_stage["$group"]["max_value"] = {"$max": "$value"}
        
        pipeline.append(group_stage)
        
        # Project and sort
        pipeline.extend([
            {
                "$project": {
                    "_id": 0,
                    "country_code": "$_id.country_code",
                    "observation_date": "$_id.observation_date",
                    "aggregated_value": f"${aggregation_type}_value",
                    "count": 1
                }
            },
            {"$sort": {"country_code": 1, "observation_date": 1}}
        ])
        
        try:
            results = list(self.db.economic_indicators.aggregate(pipeline))
            
            if not results:
                return pd.DataFrame()
            
            df = pd.DataFrame(results)
            df['observation_date'] = pd.to_datetime(df['observation_date'])
            
            return df
            
        except Exception as e:
            logging.error(f"Category aggregation failed: {e}")
            return pd.DataFrame()
    
    def search_indicators(self, search_terms: List[str], categories: List[str] = None) -> pd.DataFrame:
        """Search for indicators by name or description"""
        
        # Build text search query
        search_query = " ".join(search_terms)
        
        filter_doc = {
            "$or": [
                {"indicator_name": {"$regex": search_query, "$options": "i"}},
                {"description": {"$regex": search_query, "$options": "i"}},
                {"category": {"$regex": search_query, "$options": "i"}}
            ]
        }
        
        if categories:
            filter_doc["category"] = {"$in": categories}
        
        try:
            cursor = self.db.indicator_metadata.find(filter_doc, {"_id": 0})
            results = list(cursor)
            
            if not results:
                return pd.DataFrame()
            
            return pd.DataFrame(results)
            
        except Exception as e:
            logging.error(f"Indicator search failed: {e}")
            return pd.DataFrame()
    
    def get_data_quality_summary(self) -> Dict[str, Any]:
        """Generate data quality summary statistics"""
        
        pipeline = [
            {
                "$group": {
                    "_id": {
                        "indicator_code": "$indicator_code",
                        "country_code": "$country_code"
                    },
                    "total_observations": {"$sum": 1},
                    "null_values": {
                        "$sum": {
                            "$cond": [{"$eq": ["$value", None]}, 1, 0]
                        }
                    },
                    "latest_date": {"$max": "$observation_date"},
                    "earliest_date": {"$min": "$observation_date"}
                }
            },
            {
                "$group": {
                    "_id": None,
                    "total_series": {"$sum": 1},
                    "avg_observations_per_series": {"$avg": "$total_observations"},
                    "total_null_values": {"$sum": "$null_values"},
                    "total_observations": {"$sum": "$total_observations"},
                    "latest_update": {"$max": "$latest_date"},
                    "earliest_data": {"$min": "$earliest_date"}
                }
            }
        ]
        
        try:
            result = list(self.db.economic_indicators.aggregate(pipeline))
            
            if not result:
                return {}
            
            summary = result[0]
            summary["data_completeness"] = (
                1 - (summary["total_null_values"] / summary["total_observations"])
            ) * 100 if summary["total_observations"] > 0 else 0
            
            del summary["_id"]
            
            return summary
            
        except Exception as e:
            logging.error(f"Data quality summary failed: {e}")
            return {}
    
    def close(self):
        """Close the database connection"""
        self.client.close()

# Graph database pattern for economic relationships
class Neo4jEconomicStorage:
    """Neo4j graph database for economic relationships"""
    
    def __init__(self, uri: str, username: str, password: str):
        from neo4j import GraphDatabase
        self.driver = GraphDatabase.driver(uri, auth=(username, password))
        self._create_constraints()
    
    def _create_constraints(self):
        """Create necessary constraints and indexes"""
        with self.driver.session() as session:
            # Create constraints
            constraints = [
                "CREATE CONSTRAINT IF NOT EXISTS FOR (c:Country) REQUIRE c.code IS UNIQUE",
                "CREATE CONSTRAINT IF NOT EXISTS FOR (i:Indicator) REQUIRE i.code IS UNIQUE",
                "CREATE CONSTRAINT IF NOT EXISTS FOR (s:Source) REQUIRE s.code IS UNIQUE"
            ]
            
            for constraint in constraints:
                try:
                    session.run(constraint)
                except Exception as e:
                    logging.warning(f"Constraint creation warning: {e}")
    
    def create_economic_relationship(self, indicator1: str, indicator2: str,
                                   relationship_type: str, strength: float,
                                   metadata: Dict[str, Any] = None):
        """Create relationship between economic indicators"""
        
        cypher = """
        MERGE (i1:Indicator {code: $indicator1})
        MERGE (i2:Indicator {code: $indicator2})
        MERGE (i1)-[r:CORRELATES_WITH {type: $relationship_type}]->(i2)
        SET r.strength = $strength,
            r.created_at = datetime(),
            r.metadata = $metadata
        """
        
        with self.driver.session() as session:
            try:
                session.run(cypher, {
                    "indicator1": indicator1,
                    "indicator2": indicator2,
                    "relationship_type": relationship_type,
                    "strength": strength,
                    "metadata": metadata or {}
                })
                return True
            except Exception as e:
                logging.error(f"Failed to create relationship: {e}")
                return False
    
    def find_correlated_indicators(self, indicator_code: str,
                                  min_strength: float = 0.5) -> List[Dict[str, Any]]:
        """Find indicators correlated with the given indicator"""
        
        cypher = """
        MATCH (i1:Indicator {code: $indicator_code})-[r:CORRELATES_WITH]-(i2:Indicator)
        WHERE r.strength >= $min_strength
        RETURN i2.code as indicator_code,
               i2.name as indicator_name,
               r.strength as correlation_strength,
               r.type as relationship_type,
               r.metadata as metadata
        ORDER BY r.strength DESC
        """
        
        with self.driver.session() as session:
            try:
                result = session.run(cypher, {
                    "indicator_code": indicator_code,
                    "min_strength": min_strength
                })
                
                return [dict(record) for record in result]
                
            except Exception as e:
                logging.error(f"Correlation query failed: {e}")
                return []
    
    def close(self):
        """Close the database connection"""
        self.driver.close()

Hybrid Storage Strategies

Many economic data systems benefit from hybrid storage approaches that leverage the strengths of different database technologies for different aspects of the system. A typical hybrid architecture might use time-series databases for high-frequency market data, SQL databases for regulatory reporting, and NoSQL databases for research datasets with evolving schemas.

The key to successful hybrid storage lies in designing clear data flow patterns and maintaining consistency across different storage systems. Each database system should have a well-defined role and ownership of specific data types or analytical workloads. The integration layer must handle data synchronization, format translation, and query federation across the different systems.

Data governance becomes particularly important in hybrid environments where the same economic indicators might be stored in multiple systems with different levels of processing, aggregation, or quality controls. Clear data lineage tracking and master data management ensure that analysts understand which system provides the authoritative version of specific datasets.

Cost optimization also requires careful consideration in hybrid architectures. Different database technologies have very different cost structures - time-series databases might be expensive for large historical datasets, while data lakes provide cost-effective storage for infrequently accessed data. The hybrid architecture should route data to the most cost-effective storage tier based on access patterns and analytical requirements.

The database integration patterns presented in this guide provide the foundation for the analytics capabilities discussed in Machine Learning Applications Economic Data Analysis and support the real-time monitoring systems covered in Economic Indicator Alerting and Monitoring Systems. Understanding these storage patterns is essential for building scalable economic data platforms that can grow with evolving analytical requirements.

For comprehensive database integration in economic data systems, explore these complementary resources:

Recent Articles