Introduction
Economic data presents unique quality challenges: reporting lags, frequent revisions, methodological changes, and inconsistent standards. This guide provides practical implementations for ensuring data quality throughout the economic data lifecycle.
Core Quality Dimensions
When working with economic datasets, focus on these fundamental quality dimensions:
- Accuracy: Correctness of values
- Completeness: Absence of missing values
- Consistency: Internal coherence and logical relationships
- Timeliness: Recency of data
- Validity: Conformance to defined formats and ranges
- Uniqueness: Absence of duplicates
Automated Quality Checks
Implementing automated quality checks ensures consistent validation:
import pandas as pd
import numpy as np
from datetime import datetime
def quality_check_economic_dataset(df, config):
"""Perform comprehensive quality checks on economic dataset"""
results = {
'passed': True,
'checks': {},
'summary': {'total': 0, 'passed': 0, 'failed': 0}
}
# 1. Completeness checks
completeness_results = {}
for col in config['required_columns']:
missing_rate = df[col].isna().mean() * 100
completeness_results[col] = {
'missing_rate': missing_rate,
'passed': missing_rate <= config['max_missing_rate']
}
if not completeness_results[col]['passed']:
results['passed'] = False
results['checks']['completeness'] = completeness_results
# 2. Range validation checks
range_results = {}
for col, ranges in config['valid_ranges'].items():
if col in df.columns:
min_val, max_val = ranges
out_of_range = ((df[col] < min_val) | (df[col] > max_val)).sum()
range_results[col] = {
'out_of_range_count': out_of_range,
'out_of_range_rate': (out_of_range / len(df)) * 100,
'passed': out_of_range == 0
}
if not range_results[col]['passed']:
results['passed'] = False
results['checks']['range_validation'] = range_results
# 3. Temporal consistency checks
if 'date_column' in config and config['date_column'] in df.columns:
date_col = config['date_column']
if 'expected_frequency' in config:
freq = config['expected_frequency']
expected_dates = pd.date_range(
start=df[date_col].min(),
end=df[date_col].max(),
freq=freq
)
missing_dates = set(expected_dates) - set(df[date_col])
results['checks']['temporal_consistency'] = {
'missing_dates_count': len(missing_dates),
'passed': len(missing_dates) == 0
}
if len(missing_dates) > 0:
results['passed'] = False
# 4. Logical consistency checks
if 'logical_checks' in config:
logical_results = {}
for check_name, check_func in config['logical_checks'].items():
check_result = check_func(df)
logical_results[check_name] = check_result
if not check_result['passed']:
results['passed'] = False
results['checks']['logical_consistency'] = logical_results
# 5. Duplication checks
if 'unique_columns' in config:
dup_results = {}
for col_set in config['unique_columns']:
if isinstance(col_set, str):
col_set = [col_set]
dup_count = len(df) - df.drop_duplicates(subset=col_set).shape[0]
dup_results[','.join(col_set)] = {
'duplicate_count': dup_count,
'passed': dup_count == 0
}
if dup_count > 0:
results['passed'] = False
results['checks']['uniqueness'] = dup_results
# Update summary counts
for check_type in results['checks']:
for item in results['checks'][check_type]:
results['summary']['total'] += 1
if results['checks'][check_type][item]['passed']:
results['summary']['passed'] += 1
else:
results['summary']['failed'] += 1
return results
Example configuration for validating GDP data:
gdp_quality_config = {
'required_columns': ['date', 'gdp_nominal', 'gdp_real', 'country'],
'max_missing_rate': 5.0, # Maximum 5% missing values allowed
'valid_ranges': {
'gdp_nominal': [0, 1e15], # Valid range for nominal GDP
'gdp_real': [0, 1e15], # Valid range for real GDP
'gdp_growth': [-30, 30] # Valid range for GDP growth (percentage)
},
'date_column': 'date',
'expected_frequency': 'Q', # Quarterly data expected
'unique_columns': [['date', 'country']], # Each country should have unique quarterly entries
'logical_checks': {
'nominal_vs_real': lambda df: {
'description': 'Check if nominal GDP >= real GDP',
'passed': (df['gdp_nominal'] >= df['gdp_real']).all()
}
}
}
Data Profiling
Automated profiling provides statistical summaries that expose quality issues:
def profile_economic_dataset(df, categorical_cols=None):
"""Generate comprehensive profile of economic dataset"""
# Identify column types if not specified
if categorical_cols is None:
categorical_cols = [col for col in df.columns
if df[col].dtype == 'object' or df[col].nunique() < 10]
numeric_cols = [c for c in df.columns if c not in categorical_cols]
# Create profile structure
profile = {
'basic_stats': {
'row_count': len(df),
'column_count': len(df.columns),
'missing_values': {
col: {
'count': df[col].isna().sum(),
'percentage': df[col].isna().mean() * 100
} for col in df.columns
},
'numeric_stats': {}
},
'distributions': {},
'correlations': {},
'temporal_analysis': {}
}
# Add descriptive statistics for numeric columns
for col in numeric_cols:
profile['basic_stats']['numeric_stats'][col] = {
'mean': df[col].mean(),
'median': df[col].median(),
'std': df[col].std(),
'min': df[col].min(),
'max': df[col].max()
}
# Correlation analysis
if len(numeric_cols) > 1:
profile['correlations'] = df[numeric_cols].corr().to_dict()
# Temporal analysis if date column exists
date_cols = [col for col in df.columns
if 'date' in col.lower() or df[col].dtype == 'datetime64[ns]']
if date_cols:
date_col = date_cols[0]
df_date = df.copy()
if df[date_col].dtype != 'datetime64[ns]':
try:
df_date[date_col] = pd.to_datetime(df[date_col])
df_date = df_date.sort_values(date_col)
profile['temporal_analysis'] = {
'date_range': {
'start': df_date[date_col].min().strftime('%Y-%m-%d'),
'end': df_date[date_col].max().strftime('%Y-%m-%d')
},
'recency': {
'days_since_latest': (datetime.now() - df_date[date_col].max()).days
}
}
except:
pass # Skip temporal analysis if conversion fails
return profile
Data Reconciliation
Economic datasets often come from different sources with varying methodologies:
def reconcile_economic_data(sources_dict):
"""Reconcile economic indicators from multiple sources"""
# Combine all sources into one dataframe
all_data = []
for source, df in sources_dict.items():
df_copy = df.copy()
df_copy['source'] = source
all_data.append(df_copy)
combined = pd.concat(all_data)
# Calculate source quality metrics
source_metrics = {}
for source, df in sources_dict.items():
# Calculate timeliness score
latest_date = df['date'].max()
days_since_latest = (datetime.now() - latest_date).days
# Score decreases as data gets older
if days_since_latest <= 30:
timeliness = 1.0 # Very timely (within a month)
elif days_since_latest <= 90:
timeliness = 0.75 # Somewhat timely (within a quarter)
elif days_since_latest <= 180:
timeliness = 0.5 # Moderately outdated
else:
timeliness = days_since_latest <= 365 and 0.25 or 0.0 # Outdated
source_metrics[source] = {
'timeliness': timeliness,
'coverage': len(df) / len(combined) # Relative coverage
}
# Group by key dimensions
grouped = combined.groupby(['date', 'indicator'])
reconciled_rows = []
for (date, indicator), group in grouped:
sources_present = group['source'].unique()
if len(sources_present) == 1:
# Only one source - use its value directly
source = sources_present[0]
value = group['value'].iloc[0]
confidence = source_metrics[source]['timeliness']
else:
# Multiple sources - weighted average based on source quality
weights = {source: source_metrics[source]['timeliness'] for source in sources_present}
total_weight = sum(weights.values())
normalized_weights = {source: weight/total_weight for source, weight in weights.items()}
# Calculate weighted average
values = {source: group[group['source'] == source]['value'].iloc[0] for source in sources_present}
value = sum(values[source] * normalized_weights[source] for source in sources_present)
# Calculate agreement score based on variance between sources
values_array = np.array([values[s] for s in sources_present])
max_value = np.max(values_array)
if max_value != 0:
variance = np.std(values_array) / max_value
agreement_score = max(0, 1 - variance)
else:
agreement_score = 1.0
confidence = agreement_score * 0.8 + 0.2 # Ensure minimum confidence
reconciled_rows.append({
'date': date,
'indicator': indicator,
'value': value,
'confidence': confidence,
'sources': ','.join(sources_present)
})
return pd.DataFrame(reconciled_rows)
Managing Data Revisions
Economic data frequently undergoes revisions as new information becomes available:
def manage_economic_revisions(current_df, new_df, version_col='version'):
"""Track and manage revisions in economic datasets"""
# Add version column if it doesn't exist
if version_col not in current_df.columns:
current_df[version_col] = 1
output_df = current_df.copy()
# Identify key columns
tracking_cols = [col for col in current_df.columns
if pd.api.types.is_numeric_dtype(current_df[col])
and col != version_col]
key_cols = [col for col in current_df.columns
if not pd.api.types.is_numeric_dtype(current_df[col])
and col != version_col]
# Track revision statistics
revisions = {
'new_records': 0,
'revised_records': 0,
'unchanged_records': 0,
'column_revisions': {col: 0 for col in tracking_cols}
}
# Process new data
for _, new_row in new_df.iterrows():
# Create filter for matching rows in current data
row_filter = True
for key in key_cols:
if key in new_row:
row_filter = row_filter & (output_df[key] == new_row[key])
matching_rows = output_df[row_filter]
if len(matching_rows) == 0:
# This is a new record
new_row_df = pd.DataFrame([new_row])
if version_col not in new_row:
new_row_df[version_col] = 1
output_df = pd.concat([output_df, new_row_df], ignore_index=True)
revisions['new_records'] += 1
else:
# Potential revision - check tracking columns for changes
revised = False
for col in tracking_cols:
if col in new_row and col in matching_rows.columns:
old_value = matching_rows[col].iloc[0]
new_value = new_row[col]
# Check for actual change (handling NaNs)
is_equal = False
if pd.isna(old_value) and pd.isna(new_value):
is_equal = True
elif not pd.isna(old_value) and not pd.isna(new_value):
# For numeric columns, consider small differences as equal
if isinstance(old_value, (int, float)) and isinstance(new_value, (int, float)):
is_equal = abs(old_value - new_value) < 1e-10
else:
is_equal = old_value == new_value
if not is_equal:
# This is a revision
row_idx = matching_rows.index[0]
output_df.at[row_idx, col] = new_value
output_df.at[row_idx, version_col] = matching_rows[version_col].iloc[0] + 1
revisions['column_revisions'][col] += 1
revised = True
if revised:
revisions['revised_records'] += 1
else:
revisions['unchanged_records'] += 1
return output_df, revisions
Data Quality Pipeline Integration
Integrate quality checks directly into your data pipelines:
def build_validation_pipeline(source_connector, validation_config, output_connector,
notification_handler=None):
"""Build a data validation pipeline that integrates with ETL processes"""
def validation_pipeline(execution_date=None):
results = {
'execution_date': execution_date or datetime.now(),
'status': 'success',
'errors': []
}
try:
# Fetch data from source
raw_data = source_connector()
if raw_data is None or len(raw_data) == 0:
results['status'] = 'error'
results['errors'].append("No data received from source")
return results
# Perform validation
validation_results = quality_check_economic_dataset(raw_data, validation_config)
results['validation_results'] = validation_results
# Create data profile
profile = profile_economic_dataset(raw_data)
results['profile'] = profile
# Process data based on validation results
if validation_results['passed']:
# All validations passed, proceed with output
output_connector(raw_data)
else:
# Some validations failed
results['status'] = 'warning'
# Check if failures are critical
critical_failures = []
for check_type, checks in validation_results['checks'].items():
for item, result in checks.items():
# Check if this is a critical check that failed
is_critical = any(
check['type'] == check_type and check['item'] == item
for check in validation_config.get('critical_checks', [])
)
if is_critical and not result['passed']:
critical_failures.append(f"{check_type} - {item}")
if critical_failures:
# Critical failures present, halt pipeline
results['status'] = 'error'
results['errors'].extend([
f"Critical validation failure: {failure}"
for failure in critical_failures
])
else:
# Non-critical failures, proceed with output
output_connector(raw_data)
except Exception as e:
results['status'] = 'error'
results['errors'].append(f"Pipeline error: {str(e)}")
# Send notifications if handler is provided
if notification_handler is not None:
notification_handler(results)
return results
return validation_pipeline
By implementing these data quality practices, you can ensure that economic datasets are accurate, consistent, and reliable throughout their lifecycle. Automated validation, profiling, reconciliation, and revision tracking maintain data integrity and improve the quality of subsequent analysis and decision-making.