Economic Data Governance and Compliance: Managing Regulatory Requirements

Introduction

Economic data governance represents one of the most complex challenges in modern data management due to the intersection of multiple regulatory frameworks, varying international standards, and the critical importance of economic data for policy decisions and market operations. Unlike typical business data governance that focuses primarily on data quality and access control, economic data governance must address regulatory compliance, cross-border data sharing, and the unique audit requirements that characterize financial and government institutions.

The regulatory landscape for economic data continues to evolve rapidly, with new requirements emerging from central banks, statistical agencies, and international organizations. Financial institutions must comply with regulations like Basel III, Solvency II, and various national banking regulations that mandate specific data quality standards, reporting timelines, and audit capabilities. Government agencies face additional requirements for data transparency, public access, and international reporting standards set by organizations like the IMF and World Bank.

The global nature of economic data creates additional governance challenges around data sovereignty, cross-border transfer restrictions, and varying privacy regulations. Economic datasets often contain sensitive information about national economic conditions, individual financial institutions, or proprietary research that requires careful access control and usage monitoring. The governance framework must balance the need for analytical access with regulatory requirements and competitive sensitivity.

This guide builds upon the data quality foundations from Data Quality Practices for Economic Datasets and provides the governance framework that supports the security requirements discussed in Economic Data Security and Privacy. The governance patterns presented here integrate with the monitoring capabilities from Economic Indicator Alerting and Monitoring Systems.

Regulatory Compliance Framework

Economic data systems must navigate a complex web of regulatory requirements that vary by jurisdiction, data type, and organizational context. The compliance framework must provide systematic approaches to identifying applicable regulations, implementing required controls, and demonstrating ongoing compliance through comprehensive audit trails and reporting capabilities.

The foundation of regulatory compliance lies in understanding the specific requirements that apply to your economic data systems. Financial institutions typically must comply with banking regulations that mandate specific data retention periods, quality standards, and reporting frequencies. Central banks and statistical agencies face additional requirements for data transparency, methodology documentation, and international reporting standards. The compliance framework must map these requirements to specific technical controls and operational procedures.

Automated compliance monitoring becomes essential given the volume and complexity of economic data processing. The system should continuously monitor data flows, quality metrics, and access patterns to detect potential compliance violations before they become regulatory issues. This monitoring must cover both technical compliance (data quality, retention, security) and procedural compliance (approval workflows, change management, audit documentation).

from abc import ABC, abstractmethod
from typing import Dict, List, Any, Optional, Set
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from enum import Enum
import json
import logging
from pathlib import Path
import hashlib
import uuid

class RegulationType(Enum):
    BANKING = "banking"
    SECURITIES = "securities"
    PRIVACY = "privacy"
    STATISTICAL = "statistical"
    INTERNATIONAL = "international"
    SECTORAL = "sectoral"

class ComplianceStatus(Enum):
    COMPLIANT = "compliant"
    NON_COMPLIANT = "non_compliant"
    PENDING_REVIEW = "pending_review"
    REMEDIATION_REQUIRED = "remediation_required"

@dataclass
class RegulatoryRequirement:
    """Definition of a regulatory requirement"""
    requirement_id: str
    name: str
    description: str
    regulation_type: RegulationType
    jurisdiction: str
    regulation_name: str
    regulation_section: str
    effective_date: datetime
    compliance_deadline: Optional[datetime] = None
    technical_controls: List[str] = field(default_factory=list)
    operational_controls: List[str] = field(default_factory=list)
    documentation_requirements: List[str] = field(default_factory=list)
    monitoring_requirements: List[str] = field(default_factory=list)
    penalty_risk: str = "medium"  # low, medium, high, critical

@dataclass
class ComplianceAssessment:
    """Result of compliance assessment"""
    assessment_id: str
    requirement_id: str
    assessed_date: datetime
    status: ComplianceStatus
    compliance_score: float  # 0.0 to 1.0
    findings: List[str]
    recommendations: List[str]
    remediation_actions: List[str]
    evidence: List[str]
    next_assessment_date: datetime
    assessor: str

class EconomicDataGovernanceFramework:
    """Comprehensive governance framework for economic data"""
    
    def __init__(self):
        self.regulatory_requirements = {}
        self.compliance_assessments = {}
        self.data_lineage_tracker = DataLineageTracker()
        self.access_control_manager = AccessControlManager()
        self.audit_manager = AuditManager()
        self.policy_engine = PolicyEngine()
        self.compliance_monitor = ComplianceMonitor()
        
    def register_regulatory_requirement(self, requirement: RegulatoryRequirement):
        """Register a new regulatory requirement"""
        self.regulatory_requirements[requirement.requirement_id] = requirement
        
        # Update policy engine with new requirement
        self.policy_engine.add_requirement_policies(requirement)
        
        # Schedule compliance monitoring
        self.compliance_monitor.schedule_monitoring(requirement)
        
        logging.info(f"Registered regulatory requirement: {requirement.name}")
    
    async def assess_compliance(self, requirement_id: str, assessor: str) -> ComplianceAssessment:
        """Perform comprehensive compliance assessment"""
        
        if requirement_id not in self.regulatory_requirements:
            raise ValueError(f"Requirement {requirement_id} not found")
        
        requirement = self.regulatory_requirements[requirement_id]
        
        # Collect evidence for assessment
        evidence = await self._collect_compliance_evidence(requirement)
        
        # Evaluate technical controls
        technical_compliance = await self._evaluate_technical_controls(requirement, evidence)
        
        # Evaluate operational controls
        operational_compliance = await self._evaluate_operational_controls(requirement, evidence)
        
        # Calculate overall compliance score
        compliance_score = (technical_compliance + operational_compliance) / 2
        
        # Determine compliance status
        if compliance_score >= 0.95:
            status = ComplianceStatus.COMPLIANT
        elif compliance_score >= 0.80:
            status = ComplianceStatus.PENDING_REVIEW
        else:
            status = ComplianceStatus.NON_COMPLIANT
        
        # Generate findings and recommendations
        findings = self._generate_findings(requirement, evidence, technical_compliance, operational_compliance)
        recommendations = self._generate_recommendations(requirement, findings)
        
        # Create assessment record
        assessment = ComplianceAssessment(
            assessment_id=str(uuid.uuid4()),
            requirement_id=requirement_id,
            assessed_date=datetime.utcnow(),
            status=status,
            compliance_score=compliance_score,
            findings=findings,
            recommendations=recommendations,
            remediation_actions=[],
            evidence=[str(e) for e in evidence],
            next_assessment_date=datetime.utcnow() + timedelta(days=90),
            assessor=assessor
        )
        
        # Store assessment
        self.compliance_assessments[assessment.assessment_id] = assessment
        
        # Log compliance event
        self.audit_manager.log_compliance_assessment(assessment)
        
        return assessment
    
    async def _collect_compliance_evidence(self, requirement: RegulatoryRequirement) -> Dict[str, Any]:
        """Collect evidence for compliance assessment"""
        evidence = {
            'data_quality_metrics': await self._get_data_quality_evidence(),
            'access_logs': await self._get_access_log_evidence(),
            'data_lineage': await self._get_lineage_evidence(),
            'retention_compliance': await self._get_retention_evidence(),
            'documentation': await self._get_documentation_evidence(),
            'training_records': await self._get_training_evidence(),
            'incident_reports': await self._get_incident_evidence(),
            'audit_trails': await self._get_audit_trail_evidence()
        }
        
        return evidence
    
    async def _evaluate_technical_controls(self, requirement: RegulatoryRequirement, 
                                         evidence: Dict[str, Any]) -> float:
        """Evaluate technical control compliance"""
        control_scores = []
        
        for control in requirement.technical_controls:
            if control == 'data_encryption':
                score = self._evaluate_encryption_compliance(evidence)
            elif control == 'access_control':
                score = self._evaluate_access_control_compliance(evidence)
            elif control == 'data_quality':
                score = self._evaluate_data_quality_compliance(evidence)
            elif control == 'backup_recovery':
                score = self._evaluate_backup_compliance(evidence)
            elif control == 'audit_logging':
                score = self._evaluate_audit_logging_compliance(evidence)
            else:
                logging.warning(f"Unknown technical control: {control}")
                score = 0.5  # Default to partial compliance
            
            control_scores.append(score)
        
        return sum(control_scores) / len(control_scores) if control_scores else 1.0
    
    async def _evaluate_operational_controls(self, requirement: RegulatoryRequirement,
                                           evidence: Dict[str, Any]) -> float:
        """Evaluate operational control compliance"""
        control_scores = []
        
        for control in requirement.operational_controls:
            if control == 'change_management':
                score = self._evaluate_change_management_compliance(evidence)
            elif control == 'incident_response':
                score = self._evaluate_incident_response_compliance(evidence)
            elif control == 'staff_training':
                score = self._evaluate_training_compliance(evidence)
            elif control == 'documentation':
                score = self._evaluate_documentation_compliance(evidence)
            elif control == 'vendor_management':
                score = self._evaluate_vendor_management_compliance(evidence)
            else:
                logging.warning(f"Unknown operational control: {control}")
                score = 0.5  # Default to partial compliance
            
            control_scores.append(score)
        
        return sum(control_scores) / len(control_scores) if control_scores else 1.0
    
    def _evaluate_encryption_compliance(self, evidence: Dict[str, Any]) -> float:
        """Evaluate encryption compliance"""
        # Check for encryption at rest and in transit
        encryption_evidence = evidence.get('encryption_status', {})
        
        at_rest_encrypted = encryption_evidence.get('at_rest', False)
        in_transit_encrypted = encryption_evidence.get('in_transit', False)
        key_management = encryption_evidence.get('key_management', False)
        
        score = 0
        if at_rest_encrypted:
            score += 0.4
        if in_transit_encrypted:
            score += 0.4
        if key_management:
            score += 0.2
        
        return score
    
    def _evaluate_access_control_compliance(self, evidence: Dict[str, Any]) -> float:
        """Evaluate access control compliance"""
        access_evidence = evidence.get('access_logs', {})
        
        # Check for key access control features
        mfa_enabled = access_evidence.get('mfa_enabled', False)
        role_based_access = access_evidence.get('rbac_implemented', False)
        access_reviews = access_evidence.get('regular_access_reviews', False)
        privileged_access_monitoring = access_evidence.get('privileged_monitoring', False)
        
        score = 0
        if mfa_enabled:
            score += 0.3
        if role_based_access:
            score += 0.3
        if access_reviews:
            score += 0.2
        if privileged_access_monitoring:
            score += 0.2
        
        return score
    
    def _evaluate_data_quality_compliance(self, evidence: Dict[str, Any]) -> float:
        """Evaluate data quality compliance"""
        quality_evidence = evidence.get('data_quality_metrics', {})
        
        completeness = quality_evidence.get('completeness_score', 0)
        accuracy = quality_evidence.get('accuracy_score', 0)
        timeliness = quality_evidence.get('timeliness_score', 0)
        consistency = quality_evidence.get('consistency_score', 0)
        
        return (completeness + accuracy + timeliness + consistency) / 4
    
    def _generate_findings(self, requirement: RegulatoryRequirement, evidence: Dict[str, Any],
                          technical_score: float, operational_score: float) -> List[str]:
        """Generate compliance findings"""
        findings = []
        
        if technical_score < 0.8:
            findings.append(f"Technical controls for {requirement.name} below acceptable threshold ({technical_score:.2f})")
        
        if operational_score < 0.8:
            findings.append(f"Operational controls for {requirement.name} below acceptable threshold ({operational_score:.2f})")
        
        # Add specific findings based on evidence
        quality_metrics = evidence.get('data_quality_metrics', {})
        if quality_metrics.get('completeness_score', 1.0) < 0.95:
            findings.append("Data completeness below regulatory standards")
        
        if quality_metrics.get('timeliness_score', 1.0) < 0.90:
            findings.append("Data timeliness not meeting regulatory deadlines")
        
        return findings
    
    def _generate_recommendations(self, requirement: RegulatoryRequirement, 
                                findings: List[str]) -> List[str]:
        """Generate compliance recommendations"""
        recommendations = []
        
        for finding in findings:
            if "technical controls" in finding.lower():
                recommendations.append("Implement additional technical controls and monitoring")
            elif "operational controls" in finding.lower():
                recommendations.append("Enhance operational procedures and staff training")
            elif "completeness" in finding.lower():
                recommendations.append("Improve data collection and validation processes")
            elif "timeliness" in finding.lower():
                recommendations.append("Optimize data processing pipelines for faster delivery")
        
        return recommendations
    
    async def _get_data_quality_evidence(self) -> Dict[str, Any]:
        """Get data quality evidence"""
        # This would typically query your data quality monitoring system
        return {
            'completeness_score': 0.96,
            'accuracy_score': 0.94,
            'timeliness_score': 0.92,
            'consistency_score': 0.95,
            'last_updated': datetime.utcnow().isoformat()
        }
    
    async def _get_access_log_evidence(self) -> Dict[str, Any]:
        """Get access control evidence"""
        return {
            'mfa_enabled': True,
            'rbac_implemented': True,
            'regular_access_reviews': True,
            'privileged_monitoring': True,
            'failed_access_attempts': 12,
            'last_access_review': (datetime.utcnow() - timedelta(days=30)).isoformat()
        }
    
    async def _get_lineage_evidence(self) -> Dict[str, Any]:
        """Get data lineage evidence"""
        return {
            'lineage_coverage': 0.89,
            'automated_lineage': True,
            'manual_documentation': True,
            'lineage_verification_date': datetime.utcnow().isoformat()
        }
    
    async def _get_retention_evidence(self) -> Dict[str, Any]:
        """Get data retention evidence"""
        return {
            'retention_policy_implemented': True,
            'automated_purging': True,
            'retention_compliance_rate': 0.98,
            'last_retention_audit': (datetime.utcnow() - timedelta(days=60)).isoformat()
        }
    
    async def _get_documentation_evidence(self) -> Dict[str, Any]:
        """Get documentation evidence"""
        return {
            'policy_documentation': True,
            'procedure_documentation': True,
            'technical_documentation': True,
            'user_documentation': True,
            'documentation_current': True,
            'last_documentation_review': (datetime.utcnow() - timedelta(days=45)).isoformat()
        }
    
    async def _get_training_evidence(self) -> Dict[str, Any]:
        """Get training records evidence"""
        return {
            'staff_training_completion': 0.94,
            'compliance_training_current': True,
            'security_awareness_training': True,
            'last_training_cycle': (datetime.utcnow() - timedelta(days=90)).isoformat()
        }
    
    async def _get_incident_evidence(self) -> Dict[str, Any]:
        """Get incident management evidence"""
        return {
            'incident_response_plan': True,
            'incident_tracking_system': True,
            'incidents_last_12_months': 3,
            'average_resolution_time_hours': 8.5,
            'regulatory_reporting_compliant': True
        }
    
    async def _get_audit_trail_evidence(self) -> Dict[str, Any]:
        """Get audit trail evidence"""
        return {
            'comprehensive_audit_logging': True,
            'log_retention_compliant': True,
            'log_integrity_protection': True,
            'audit_log_monitoring': True,
            'log_storage_encrypted': True
        }
    
    def get_compliance_dashboard(self) -> Dict[str, Any]:
        """Generate compliance dashboard data"""
        total_requirements = len(self.regulatory_requirements)
        recent_assessments = [
            assessment for assessment in self.compliance_assessments.values()
            if assessment.assessed_date > datetime.utcnow() - timedelta(days=30)
        ]
        
        compliance_by_status = {}
        for status in ComplianceStatus:
            compliance_by_status[status.value] = len([
                a for a in recent_assessments if a.status == status
            ])
        
        avg_compliance_score = (
            sum(a.compliance_score for a in recent_assessments) / len(recent_assessments)
            if recent_assessments else 0
        )
        
        return {
            'total_regulatory_requirements': total_requirements,
            'recent_assessments_count': len(recent_assessments),
            'compliance_by_status': compliance_by_status,
            'average_compliance_score': avg_compliance_score,
            'high_risk_findings': len([
                a for a in recent_assessments 
                if a.status == ComplianceStatus.NON_COMPLIANT
            ]),
            'upcoming_deadlines': self._get_upcoming_deadlines()
        }
    
    def _get_upcoming_deadlines(self) -> List[Dict[str, Any]]:
        """Get upcoming compliance deadlines"""
        upcoming = []
        cutoff_date = datetime.utcnow() + timedelta(days=90)
        
        for requirement in self.regulatory_requirements.values():
            if requirement.compliance_deadline and requirement.compliance_deadline <= cutoff_date:
                upcoming.append({
                    'requirement_id': requirement.requirement_id,
                    'name': requirement.name,
                    'deadline': requirement.compliance_deadline.isoformat(),
                    'days_remaining': (requirement.compliance_deadline - datetime.utcnow()).days
                })
        
        return sorted(upcoming, key=lambda x: x['days_remaining'])

class DataLineageTracker:
    """Tracks data lineage for governance and compliance"""
    
    def __init__(self):
        self.lineage_records = {}
        self.transformation_catalog = {}
        
    def record_data_lineage(self, dataset_id: str, source_datasets: List[str],
                          transformations: List[str], processing_timestamp: datetime):
        """Record data lineage information"""
        lineage_record = {
            'dataset_id': dataset_id,
            'source_datasets': source_datasets,
            'transformations': transformations,
            'processing_timestamp': processing_timestamp,
            'recorded_at': datetime.utcnow(),
            'lineage_hash': self._calculate_lineage_hash(dataset_id, source_datasets, transformations)
        }
        
        self.lineage_records[dataset_id] = lineage_record
        
    def _calculate_lineage_hash(self, dataset_id: str, source_datasets: List[str],
                              transformations: List[str]) -> str:
        """Calculate hash for lineage integrity verification"""
        lineage_string = f"{dataset_id}|{','.join(sorted(source_datasets))}|{','.join(sorted(transformations))}"
        return hashlib.sha256(lineage_string.encode()).hexdigest()
    
    def get_data_lineage(self, dataset_id: str) -> Optional[Dict[str, Any]]:
        """Get complete lineage for a dataset"""
        return self.lineage_records.get(dataset_id)
    
    def trace_data_origin(self, dataset_id: str) -> List[str]:
        """Trace data back to original sources"""
        visited = set()
        origins = []
        
        def trace_recursive(current_id):
            if current_id in visited:
                return
            visited.add(current_id)
            
            lineage = self.lineage_records.get(current_id)
            if lineage:
                source_datasets = lineage['source_datasets']
                if not source_datasets:  # No sources means this is an origin
                    origins.append(current_id)
                else:
                    for source in source_datasets:
                        trace_recursive(source)
            else:
                origins.append(current_id)  # No lineage record means external source
        
        trace_recursive(dataset_id)
        return origins
    
    def get_downstream_impact(self, dataset_id: str) -> List[str]:
        """Get all datasets that depend on this dataset"""
        downstream = []
        
        for lineage_id, lineage in self.lineage_records.items():
            if dataset_id in lineage['source_datasets']:
                downstream.append(lineage_id)
        
        return downstream

class AccessControlManager:
    """Manages access control for economic data"""
    
    def __init__(self):
        self.access_policies = {}
        self.role_definitions = {}
        self.user_assignments = {}
        self.access_logs = []
        
    def define_role(self, role_id: str, role_name: str, permissions: List[str],
                   data_access_scope: Dict[str, Any]):
        """Define a data access role"""
        self.role_definitions[role_id] = {
            'role_name': role_name,
            'permissions': permissions,
            'data_access_scope': data_access_scope,
            'created_at': datetime.utcnow(),
            'last_modified': datetime.utcnow()
        }
    
    def assign_user_role(self, user_id: str, role_id: str, assigned_by: str,
                        expiration_date: Optional[datetime] = None):
        """Assign role to user"""
        if role_id not in self.role_definitions:
            raise ValueError(f"Role {role_id} not defined")
        
        assignment = {
            'user_id': user_id,
            'role_id': role_id,
            'assigned_by': assigned_by,
            'assigned_date': datetime.utcnow(),
            'expiration_date': expiration_date,
            'active': True
        }
        
        if user_id not in self.user_assignments:
            self.user_assignments[user_id] = []
        
        self.user_assignments[user_id].append(assignment)
        
        # Log access control change
        self._log_access_event('role_assigned', user_id, {
            'role_id': role_id,
            'assigned_by': assigned_by
        })
    
    def check_data_access(self, user_id: str, dataset_id: str, operation: str) -> bool:
        """Check if user has access to specific dataset and operation"""
        user_roles = self._get_active_user_roles(user_id)
        
        for role_id in user_roles:
            role = self.role_definitions[role_id]
            
            # Check if operation is permitted
            if operation not in role['permissions']:
                continue
            
            # Check data access scope
            scope = role['data_access_scope']
            if self._is_dataset_in_scope(dataset_id, scope):
                self._log_access_event('access_granted', user_id, {
                    'dataset_id': dataset_id,
                    'operation': operation,
                    'role_id': role_id
                })
                return True
        
        self._log_access_event('access_denied', user_id, {
            'dataset_id': dataset_id,
            'operation': operation
        })
        return False
    
    def _get_active_user_roles(self, user_id: str) -> List[str]:
        """Get active roles for user"""
        if user_id not in self.user_assignments:
            return []
        
        active_roles = []
        now = datetime.utcnow()
        
        for assignment in self.user_assignments[user_id]:
            if (assignment['active'] and 
                (assignment['expiration_date'] is None or assignment['expiration_date'] > now)):
                active_roles.append(assignment['role_id'])
        
        return active_roles
    
    def _is_dataset_in_scope(self, dataset_id: str, scope: Dict[str, Any]) -> bool:
        """Check if dataset is within access scope"""
        # Simplified scope checking - in production would be more sophisticated
        allowed_datasets = scope.get('datasets', [])
        allowed_categories = scope.get('categories', [])
        allowed_countries = scope.get('countries', [])
        
        if allowed_datasets and dataset_id not in allowed_datasets:
            # Check if dataset matches category or country scope
            # This would typically query metadata about the dataset
            return False
        
        return True
    
    def _log_access_event(self, event_type: str, user_id: str, details: Dict[str, Any]):
        """Log access control event"""
        log_entry = {
            'timestamp': datetime.utcnow(),
            'event_type': event_type,
            'user_id': user_id,
            'details': details,
            'session_id': details.get('session_id')
        }
        
        self.access_logs.append(log_entry)
        
        # In production, would also send to external logging system
        logging.info(f"Access control event: {event_type} for user {user_id}")

class AuditManager:
    """Manages audit trails and compliance reporting"""
    
    def __init__(self):
        self.audit_logs = []
        self.compliance_reports = {}
        
    def log_compliance_assessment(self, assessment: ComplianceAssessment):
        """Log compliance assessment for audit trail"""
        audit_entry = {
            'timestamp': datetime.utcnow(),
            'event_type': 'compliance_assessment',
            'assessment_id': assessment.assessment_id,
            'requirement_id': assessment.requirement_id,
            'status': assessment.status.value,
            'compliance_score': assessment.compliance_score,
            'assessor': assessment.assessor,
            'findings_count': len(assessment.findings)
        }
        
        self.audit_logs.append(audit_entry)
    
    def generate_compliance_report(self, report_type: str, date_range: Dict[str, datetime]) -> Dict[str, Any]:
        """Generate compliance report for regulatory submission"""
        start_date = date_range['start']
        end_date = date_range['end']
        
        # Filter audit logs for date range
        relevant_logs = [
            log for log in self.audit_logs
            if start_date <= log['timestamp'] <= end_date
        ]
        
        report = {
            'report_id': str(uuid.uuid4()),
            'report_type': report_type,
            'generated_date': datetime.utcnow(),
            'period_start': start_date,
            'period_end': end_date,
            'total_events': len(relevant_logs),
            'compliance_assessments': len([
                log for log in relevant_logs 
                if log['event_type'] == 'compliance_assessment'
            ]),
            'access_events': len([
                log for log in relevant_logs 
                if log['event_type'] in ['access_granted', 'access_denied']
            ]),
            'data_quality_events': len([
                log for log in relevant_logs 
                if log['event_type'] == 'data_quality_check'
            ])
        }
        
        self.compliance_reports[report['report_id']] = report
        return report

class PolicyEngine:
    """Manages governance policies and automated enforcement"""
    
    def __init__(self):
        self.policies = {}
        self.policy_violations = []
        
    def add_requirement_policies(self, requirement: RegulatoryRequirement):
        """Add policies based on regulatory requirement"""
        policy_id = f"req_{requirement.requirement_id}"
        
        policy = {
            'policy_id': policy_id,
            'requirement_id': requirement.requirement_id,
            'rules': self._generate_policy_rules(requirement),
            'enforcement_level': 'strict' if requirement.penalty_risk == 'critical' else 'moderate',
            'created_at': datetime.utcnow()
        }
        
        self.policies[policy_id] = policy
    
    def _generate_policy_rules(self, requirement: RegulatoryRequirement) -> List[Dict[str, Any]]:
        """Generate policy rules from requirement"""
        rules = []
        
        # Data retention rules
        if 'data_retention' in requirement.technical_controls:
            rules.append({
                'rule_type': 'data_retention',
                'condition': 'data_age > retention_period',
                'action': 'archive_or_delete',
                'parameters': {'retention_period_days': 2555}  # 7 years default
            })
        
        # Access control rules
        if 'access_control' in requirement.technical_controls:
            rules.append({
                'rule_type': 'access_control',
                'condition': 'privileged_access_without_approval',
                'action': 'deny_access',
                'parameters': {'require_approval': True}
            })
        
        # Data quality rules
        if 'data_quality' in requirement.technical_controls:
            rules.append({
                'rule_type': 'data_quality',
                'condition': 'quality_score < threshold',
                'action': 'quarantine_data',
                'parameters': {'quality_threshold': 0.95}
            })
        
        return rules
    
    def evaluate_policy_compliance(self, data_event: Dict[str, Any]) -> List[str]:
        """Evaluate data event against policies"""
        violations = []
        
        for policy in self.policies.values():
            for rule in policy['rules']:
                if self._evaluate_rule(rule, data_event):
                    violation = {
                        'policy_id': policy['policy_id'],
                        'rule_type': rule['rule_type'],
                        'event': data_event,
                        'timestamp': datetime.utcnow(),
                        'severity': policy['enforcement_level']
                    }
                    violations.append(violation)
                    self.policy_violations.append(violation)
        
        return violations
    
    def _evaluate_rule(self, rule: Dict[str, Any], data_event: Dict[str, Any]) -> bool:
        """Evaluate individual policy rule"""
        # Simplified rule evaluation - production would be more sophisticated
        rule_type = rule['rule_type']
        
        if rule_type == 'data_retention':
            data_age_days = (datetime.utcnow() - data_event.get('created_date', datetime.utcnow())).days
            retention_days = rule['parameters']['retention_period_days']
            return data_age_days > retention_days
        
        elif rule_type == 'access_control':
            return data_event.get('privileged_access', False) and not data_event.get('approved', False)
        
        elif rule_type == 'data_quality':
            quality_score = data_event.get('quality_score', 1.0)
            threshold = rule['parameters']['quality_threshold']
            return quality_score < threshold
        
        return False

class ComplianceMonitor:
    """Continuous compliance monitoring"""
    
    def __init__(self):
        self.monitoring_schedules = {}
        self.alerts = []
        
    def schedule_monitoring(self, requirement: RegulatoryRequirement):
        """Schedule monitoring for a requirement"""
        monitor_config = {
            'requirement_id': requirement.requirement_id,
            'monitoring_frequency': self._determine_monitoring_frequency(requirement),
            'next_check': datetime.utcnow() + timedelta(hours=24),
            'escalation_threshold': 3,  # Number of failures before escalation
            'current_failures': 0
        }
        
        self.monitoring_schedules[requirement.requirement_id] = monitor_config
    
    def _determine_monitoring_frequency(self, requirement: RegulatoryRequirement) -> timedelta:
        """Determine appropriate monitoring frequency"""
        if requirement.penalty_risk == 'critical':
            return timedelta(hours=6)  # Every 6 hours
        elif requirement.penalty_risk == 'high':
            return timedelta(hours=12)  # Every 12 hours
        else:
            return timedelta(days=1)  # Daily
    
    async def run_compliance_checks(self):
        """Run scheduled compliance checks"""
        now = datetime.utcnow()
        
        for requirement_id, config in self.monitoring_schedules.items():
            if now >= config['next_check']:
                try:
                    # Run compliance check
                    compliance_result = await self._check_requirement_compliance(requirement_id)
                    
                    if not compliance_result['compliant']:
                        config['current_failures'] += 1
                        
                        if config['current_failures'] >= config['escalation_threshold']:
                            await self._escalate_compliance_issue(requirement_id, compliance_result)
                    else:
                        config['current_failures'] = 0  # Reset on success
                    
                    # Schedule next check
                    config['next_check'] = now + config['monitoring_frequency']
                    
                except Exception as e:
                    logging.error(f"Compliance check failed for {requirement_id}: {e}")
    
    async def _check_requirement_compliance(self, requirement_id: str) -> Dict[str, Any]:
        """Check compliance for specific requirement"""
        # This would implement actual compliance checking logic
        # For demo, return mock result
        return {
            'requirement_id': requirement_id,
            'compliant': True,
            'compliance_score': 0.95,
            'issues': []
        }
    
    async def _escalate_compliance_issue(self, requirement_id: str, compliance_result: Dict[str, Any]):
        """Escalate compliance issue"""
        alert = {
            'alert_id': str(uuid.uuid4()),
            'requirement_id': requirement_id,
            'severity': 'high',
            'message': f"Compliance failure for requirement {requirement_id}",
            'compliance_result': compliance_result,
            'timestamp': datetime.utcnow()
        }
        
        self.alerts.append(alert)
        logging.error(f"Compliance escalation: {alert['message']}")

Data Classification and Sensitivity Management

Economic data classification requires sophisticated frameworks that account for both regulatory requirements and business sensitivity. The classification system must handle multiple dimensions of sensitivity including regulatory classification (public, confidential, restricted), business impact (market-moving, competitively sensitive), and personal data protection requirements under various privacy regulations.

The classification framework must be dynamic, automatically adjusting classifications based on data context, aggregation levels, and temporal factors. Individual economic indicators might be public information, but aggregated patterns or early access to the data might be highly sensitive. The system must track these contextual factors and apply appropriate controls automatically.

Integration with downstream systems ensures that classification decisions influence access controls, retention policies, and sharing restrictions throughout the data lifecycle. When data classification changes due to regulatory updates or business decisions, the system must automatically update all dependent systems and notify relevant stakeholders of the changes.

from enum import Enum
from typing import Dict, List, Any, Optional, Set
from dataclasses import dataclass, field
from datetime import datetime, timedelta
import json
import re

class DataClassification(Enum):
    PUBLIC = "public"
    INTERNAL = "internal"
    CONFIDENTIAL = "confidential"
    RESTRICTED = "restricted"
    TOP_SECRET = "top_secret"

class SensitivityReason(Enum):
    MARKET_MOVING = "market_moving"
    COMPETITIVE = "competitive"
    PERSONAL_DATA = "personal_data"
    REGULATORY = "regulatory"
    NATIONAL_SECURITY = "national_security"
    EARLY_ACCESS = "early_access"

@dataclass
class DataSensitivityRule:
    """Rule for determining data sensitivity"""
    rule_id: str
    name: str
    description: str
    conditions: List[Dict[str, Any]]
    resulting_classification: DataClassification
    sensitivity_reasons: List[SensitivityReason]
    temporal_factors: Optional[Dict[str, Any]] = None
    geographic_restrictions: Optional[List[str]] = None
    review_period_days: int = 365

@dataclass
class ClassificationResult:
    """Result of data classification"""
    dataset_id: str
    classification: DataClassification
    sensitivity_reasons: List[SensitivityReason]
    applied_rules: List[str]
    classification_date: datetime
    review_date: datetime
    geographic_restrictions: List[str]
    handling_requirements: List[str]
    retention_period: Optional[timedelta] = None

class DataClassificationEngine:
    """Engine for economic data classification and sensitivity management"""
    
    def __init__(self):
        self.classification_rules = {}
        self.classification_results = {}
        self.sensitivity_patterns = self._initialize_sensitivity_patterns()
        self.geographic_regulations = self._initialize_geographic_regulations()
        
    def _initialize_sensitivity_patterns(self) -> Dict[str, Dict[str, Any]]:
        """Initialize patterns for detecting sensitive data"""
        return {
            'market_moving_indicators': {
                'patterns': [
                    r'interest.?rate',
                    r'unemployment.?rate',
                    r'inflation',
                    r'gdp',
                    r'federal.?funds',
                    r'monetary.?policy'
                ],
                'sensitivity': SensitivityReason.MARKET_MOVING,
                'classification': DataClassification.CONFIDENTIAL
            },
            'personal_identifiers': {
                'patterns': [
                    r'\b\d{3}-\d{2}-\d{4}\b',  # SSN
                    r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b',  # Email
                    r'\b\d{4}[-\s]?\d{4}[-\s]?\d{4}[-\s]?\d{4}\b'  # Credit card
                ],
                'sensitivity': SensitivityReason.PERSONAL_DATA,
                'classification': DataClassification.RESTRICTED
            },
            'competitive_intelligence': {
                'patterns': [
                    r'bank.?examination',
                    r'stress.?test',
                    r'regulatory.?action',
                    r'supervisory.?guidance'
                ],
                'sensitivity': SensitivityReason.COMPETITIVE,
                'classification': DataClassification.CONFIDENTIAL
            }
        }
    
    def _initialize_geographic_regulations(self) -> Dict[str, Dict[str, Any]]:
        """Initialize geographic data regulations"""
        return {
            'EU': {
                'regulation': 'GDPR',
                'data_residency_required': True,
                'cross_border_restrictions': ['personal_data'],
                'retention_limits': {'personal_data': timedelta(days=2555)}  # 7 years
            },
            'US': {
                'regulation': 'Various Federal',
                'data_residency_required': False,
                'cross_border_restrictions': ['national_security'],
                'retention_limits': {'financial_data': timedelta(days=2555)}
            },
            'CN': {
                'regulation': 'Data Security Law',
                'data_residency_required': True,
                'cross_border_restrictions': ['economic_data', 'personal_data'],
                'retention_limits': {'all_data': timedelta(days=1825)}  # 5 years
            }
        }
    
    def add_classification_rule(self, rule: DataSensitivityRule):
        """Add a new classification rule"""
        self.classification_rules[rule.rule_id] = rule
        
    def classify_dataset(self, dataset_id: str, metadata: Dict[str, Any]) -> ClassificationResult:
        """Classify a dataset based on its content and metadata"""
        
        applied_rules = []
        sensitivity_reasons = []
        max_classification = DataClassification.PUBLIC
        geographic_restrictions = []
        handling_requirements = []
        
        # Apply pattern-based classification
        content_analysis = self._analyze_content_sensitivity(metadata)
        if content_analysis['sensitive']:
            max_classification = self._escalate_classification(
                max_classification, content_analysis['classification']
            )
            sensitivity_reasons.extend(content_analysis['reasons'])
        
        # Apply rule-based classification
        for rule_id, rule in self.classification_rules.items():
            if self._evaluate_classification_rule(rule, metadata):
                applied_rules.append(rule_id)
                max_classification = self._escalate_classification(
                    max_classification, rule.resulting_classification
                )
                sensitivity_reasons.extend(rule.sensitivity_reasons)
                
                if rule.geographic_restrictions:
                    geographic_restrictions.extend(rule.geographic_restrictions)
        
        # Apply temporal factors
        temporal_classification = self._apply_temporal_factors(metadata, max_classification)
        max_classification = self._escalate_classification(max_classification, temporal_classification)
        
        # Determine handling requirements
        handling_requirements = self._determine_handling_requirements(
            max_classification, sensitivity_reasons
        )
        
        # Calculate review date
        review_date = datetime.utcnow() + timedelta(days=365)  # Default 1 year
        for rule_id in applied_rules:
            rule = self.classification_rules[rule_id]
            rule_review_date = datetime.utcnow() + timedelta(days=rule.review_period_days)
            if rule_review_date < review_date:
                review_date = rule_review_date
        
        # Create classification result
        result = ClassificationResult(
            dataset_id=dataset_id,
            classification=max_classification,
            sensitivity_reasons=list(set(sensitivity_reasons)),
            applied_rules=applied_rules,
            classification_date=datetime.utcnow(),
            review_date=review_date,
            geographic_restrictions=list(set(geographic_restrictions)),
            handling_requirements=handling_requirements
        )
        
        # Store result
        self.classification_results[dataset_id] = result
        
        return result
    
    def _analyze_content_sensitivity(self, metadata: Dict[str, Any]) -> Dict[str, Any]:
        """Analyze content for sensitivity patterns"""
        content_text = self._extract_searchable_content(metadata)
        
        for pattern_category, pattern_config in self.sensitivity_patterns.items():
            for pattern in pattern_config['patterns']:
                if re.search(pattern, content_text, re.IGNORECASE):
                    return {
                        'sensitive': True,
                        'classification': pattern_config['classification'],
                        'reasons': [pattern_config['sensitivity']],
                        'matched_pattern': pattern_category
                    }
        
        return {'sensitive': False, 'classification': DataClassification.PUBLIC, 'reasons': []}
    
    def _extract_searchable_content(self, metadata: Dict[str, Any]) -> str:
        """Extract text content for pattern matching"""
        searchable_fields = [
            'title', 'description', 'indicator_name', 'methodology',
            'data_source', 'category', 'tags'
        ]
        
        content_parts = []
        for field in searchable_fields:
            if field in metadata and metadata[field]:
                content_parts.append(str(metadata[field]))
        
        return ' '.join(content_parts)
    
    def _evaluate_classification_rule(self, rule: DataSensitivityRule, metadata: Dict[str, Any]) -> bool:
        """Evaluate if a classification rule applies to the data"""
        for condition in rule.conditions:
            if not self._evaluate_condition(condition, metadata):
                return False
        return True
    
    def _evaluate_condition(self, condition: Dict[str, Any], metadata: Dict[str, Any]) -> bool:
        """Evaluate individual classification condition"""
        field = condition['field']
        operator = condition['operator']
        value = condition['value']
        
        metadata_value = metadata.get(field)
        
        if operator == 'equals':
            return metadata_value == value
        elif operator == 'contains':
            return value.lower() in str(metadata_value).lower() if metadata_value else False
        elif operator == 'in':
            return metadata_value in value if isinstance(value, list) else False
        elif operator == 'regex':
            return bool(re.search(value, str(metadata_value), re.IGNORECASE)) if metadata_value else False
        
        return False
    
    def _escalate_classification(self, current: DataClassification, new: DataClassification) -> DataClassification:
        """Return the higher of two classifications"""
        classification_levels = {
            DataClassification.PUBLIC: 0,
            DataClassification.INTERNAL: 1,
            DataClassification.CONFIDENTIAL: 2,
            DataClassification.RESTRICTED: 3,
            DataClassification.TOP_SECRET: 4
        }
        
        current_level = classification_levels[current]
        new_level = classification_levels[new]
        
        if new_level > current_level:
            return new
        return current
    
    def _apply_temporal_factors(self, metadata: Dict[str, Any], 
                              current_classification: DataClassification) -> DataClassification:
        """Apply temporal factors to classification"""
        release_date = metadata.get('release_date')
        observation_date = metadata.get('observation_date')
        
        if not release_date:
            return current_classification
        
        # Convert to datetime if string
        if isinstance(release_date, str):
            release_date = datetime.fromisoformat(release_date.replace('Z', '+00:00'))
        
        now = datetime.utcnow()
        
        # If data is not yet released, increase classification
        if release_date > now:
            time_to_release = release_date - now
            
            if time_to_release <= timedelta(hours=24):
                # Within 24 hours of release - highly sensitive
                return self._escalate_classification(current_classification, DataClassification.RESTRICTED)
            elif time_to_release <= timedelta(days=7):
                # Within a week - moderately sensitive
                return self._escalate_classification(current_classification, DataClassification.CONFIDENTIAL)
        
        return current_classification
    
    def _determine_handling_requirements(self, classification: DataClassification,
                                       sensitivity_reasons: List[SensitivityReason]) -> List[str]:
        """Determine data handling requirements"""
        requirements = []
        
        # Classification-based requirements
        if classification in [DataClassification.RESTRICTED, DataClassification.TOP_SECRET]:
            requirements.extend([
                'encryption_at_rest_required',
                'encryption_in_transit_required',
                'access_logging_required',
                'multi_factor_authentication_required'
            ])
        
        if classification in [DataClassification.CONFIDENTIAL, DataClassification.RESTRICTED, DataClassification.TOP_SECRET]:
            requirements.extend([
                'background_check_required',
                'need_to_know_basis',
                'secure_disposal_required'
            ])
        
        # Sensitivity reason-based requirements
        if SensitivityReason.PERSONAL_DATA in sensitivity_reasons:
            requirements.extend([
                'gdpr_compliance_required',
                'data_subject_rights_enabled',
                'consent_tracking_required'
            ])
        
        if SensitivityReason.MARKET_MOVING in sensitivity_reasons:
            requirements.extend([
                'insider_trading_controls',
                'information_barrier_required',
                'trading_window_restrictions'
            ])
        
        return list(set(requirements))
    
    def reclassify_on_schedule(self):
        """Reclassify datasets that are due for review"""
        now = datetime.utcnow()
        
        for dataset_id, result in self.classification_results.items():
            if result.review_date <= now:
                # Get fresh metadata for reclassification
                # In production, this would fetch current metadata
                metadata = {'dataset_id': dataset_id, 'review_triggered': True}
                
                new_result = self.classify_dataset(dataset_id, metadata)
                
                # Check if classification changed
                if new_result.classification != result.classification:
                    self._handle_classification_change(dataset_id, result, new_result)
    
    def _handle_classification_change(self, dataset_id: str, old_result: ClassificationResult,
                                    new_result: ClassificationResult):
        """Handle classification changes"""
        logging.info(f"Classification changed for {dataset_id}: {old_result.classification} -> {new_result.classification}")
        
        # This would trigger updates to:
        # - Access control systems
        # - Retention policies
        # - Security controls
        # - Notifications to data stewards
    
    def get_classification_summary(self) -> Dict[str, Any]:
        """Get summary of current classifications"""
        classification_counts = {}
        sensitivity_counts = {}
        
        for result in self.classification_results.values():
            # Count classifications
            classification = result.classification.value
            classification_counts[classification] = classification_counts.get(classification, 0) + 1
            
            # Count sensitivity reasons
            for reason in result.sensitivity_reasons:
                reason_val = reason.value
                sensitivity_counts[reason_val] = sensitivity_counts.get(reason_val, 0) + 1
        
        return {
            'total_classified_datasets': len(self.classification_results),
            'classification_distribution': classification_counts,
            'sensitivity_distribution': sensitivity_counts,
            'pending_reviews': len([
                r for r in self.classification_results.values()
                if r.review_date <= datetime.utcnow()
            ])
        }

International Data Compliance

Cross-border economic data sharing requires sophisticated compliance frameworks that navigate the complex landscape of international data protection regulations, economic sanctions, and bilateral data sharing agreements. The framework must understand how different jurisdictions classify economic data and implement appropriate controls for cross-border transfers.

The compliance system must track data residency requirements and implement geographic controls that prevent unauthorized data movement. Some economic data might be required to remain within specific jurisdictions due to national security considerations or data sovereignty laws. The system must maintain detailed records of data location and movement to demonstrate compliance with these requirements.

Integration with sanctions screening and export control systems ensures that economic data sharing complies with international trade restrictions. The system must screen data recipients against sanctions lists and understand how economic data might be subject to export control regulations, particularly when the data relates to strategic industries or sensitive economic indicators.

The governance framework presented in this guide provides the foundation for the security controls discussed in Economic Data Security and Privacy and integrates with the cloud deployment strategies from Cloud Deployment Scaling Economic Data Systems. Together, these frameworks enable organizations to build economic data systems that meet the most stringent regulatory and compliance requirements while supporting the analytical capabilities needed for modern economic analysis.

For comprehensive economic data governance implementation, explore these complementary resources:

Recent Articles