Economic Data Security and Privacy: Comprehensive Protection Framework

Introduction

Economic data security represents one of the most critical aspects of financial technology infrastructure due to the sensitive nature of economic indicators, their potential market impact, and the strict regulatory requirements governing their protection. Unlike typical business data security that focuses primarily on preventing unauthorized access, economic data security must address market manipulation risks, insider trading concerns, and the complex international regulations that govern cross-border economic data sharing.

The challenge lies in balancing robust security controls with the analytical accessibility required for effective economic research and policy making. Economic data systems must provide real-time access to authorized analysts while maintaining comprehensive audit trails, implementing dynamic access controls based on data sensitivity and market conditions, and ensuring compliance with evolving privacy regulations across multiple jurisdictions.

Privacy requirements for economic data extend beyond traditional personal data protection to include institutional privacy, market-sensitive information protection, and the unique challenges of protecting aggregated data that might reveal sensitive economic patterns. The privacy framework must address both explicit privacy regulations like GDPR and the implicit privacy requirements that come from handling competitively sensitive economic information.

This guide builds upon the governance foundations from Economic Data Governance and Compliance and provides the security implementation that supports the container orchestration patterns discussed in Container Orchestration for Economic Data Systems. The security controls presented here integrate with the monitoring capabilities from Economic Indicator Alerting and Monitoring Systems.

Data Classification and Protection Levels

Economic data security requires sophisticated classification schemes that account for multiple dimensions of sensitivity including regulatory requirements, market impact potential, competitive sensitivity, and temporal factors. The classification system must dynamically adjust protection levels based on changing market conditions, data aggregation levels, and the specific context in which data is being accessed.

The foundational classification framework distinguishes between different types of economic sensitivity: market-moving indicators that could affect trading decisions, competitive intelligence that provides strategic advantages, regulatory data that must meet specific compliance requirements, and personal economic data that falls under privacy protection laws. Each classification level requires distinct security controls and access procedures.

Temporal sensitivity adds complexity to economic data classification because the same data might have different sensitivity levels at different times. Preliminary economic indicators might be highly sensitive before official release but become public information afterward. The security system must automatically adjust protection levels based on release schedules, market hours, and other temporal factors.

from enum import Enum
from typing import Dict, List, Any, Optional, Set, Union
from dataclasses import dataclass, field
from datetime import datetime, timedelta
import hashlib
import json
import logging
from abc import ABC, abstractmethod
import cryptography
from cryptography.fernet import Fernet
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import rsa, padding
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
import secrets
import base64

class DataClassification(Enum):
    PUBLIC = "public"
    INTERNAL = "internal"
    CONFIDENTIAL = "confidential"
    RESTRICTED = "restricted"
    TOP_SECRET = "top_secret"

class SecurityLevel(Enum):
    BASIC = "basic"
    ENHANCED = "enhanced"
    HIGH = "high"
    MAXIMUM = "maximum"

class AccessContext(Enum):
    RESEARCH = "research"
    TRADING = "trading"
    REGULATORY = "regulatory"
    AUDIT = "audit"
    EMERGENCY = "emergency"

@dataclass
class SecurityPolicy:
    """Security policy for economic data"""
    policy_id: str
    classification: DataClassification
    security_level: SecurityLevel
    encryption_required: bool
    access_logging: bool
    data_masking: bool
    geographic_restrictions: List[str]
    time_restrictions: Optional[Dict[str, Any]] = None
    access_approval_required: bool = False
    multi_factor_auth_required: bool = True
    background_check_required: bool = False
    information_barriers: List[str] = field(default_factory=list)

@dataclass
class DataElement:
    """Individual data element with security metadata"""
    element_id: str
    content: Union[str, bytes, Dict[str, Any]]
    classification: DataClassification
    sensitivity_tags: Set[str]
    created_at: datetime
    expires_at: Optional[datetime] = None
    owner: str = ""
    source_system: str = ""
    encryption_key_id: Optional[str] = None
    access_history: List[Dict[str, Any]] = field(default_factory=list)

class EconomicDataSecurityFramework:
    """Comprehensive security framework for economic data"""
    
    def __init__(self):
        self.security_policies = {}
        self.encryption_manager = EncryptionManager()
        self.access_control = AccessControlSystem()
        self.privacy_engine = PrivacyProtectionEngine()
        self.audit_logger = SecurityAuditLogger()
        self.threat_detector = ThreatDetectionSystem()
        
    def register_security_policy(self, policy: SecurityPolicy):
        """Register a security policy for data classification"""
        self.security_policies[policy.classification] = policy
        logging.info(f"Registered security policy for {policy.classification.value}")
    
    def secure_data_element(self, element: DataElement) -> DataElement:
        """Apply security controls to data element"""
        policy = self.security_policies.get(element.classification)
        if not policy:
            raise ValueError(f"No security policy found for classification {element.classification}")
        
        secured_element = element
        
        # Apply encryption if required
        if policy.encryption_required:
            secured_element = self.encryption_manager.encrypt_element(secured_element)
        
        # Apply data masking if required
        if policy.data_masking:
            secured_element = self.privacy_engine.apply_data_masking(secured_element)
        
        # Log security application
        self.audit_logger.log_security_event({
            'event_type': 'data_secured',
            'element_id': element.element_id,
            'classification': element.classification.value,
            'security_controls_applied': {
                'encryption': policy.encryption_required,
                'masking': policy.data_masking,
                'access_logging': policy.access_logging
            },
            'timestamp': datetime.utcnow()
        })
        
        return secured_element
    
    def authorize_data_access(self, user_id: str, element_id: str, 
                            access_context: AccessContext) -> Dict[str, Any]:
        """Authorize access to secured data element"""
        
        # Get data element
        element = self._get_data_element(element_id)
        if not element:
            return {'authorized': False, 'reason': 'Element not found'}
        
        # Get security policy
        policy = self.security_policies.get(element.classification)
        if not policy:
            return {'authorized': False, 'reason': 'No security policy found'}
        
        # Check access controls
        access_result = self.access_control.check_access(
            user_id, element, access_context, policy
        )
        
        if not access_result['authorized']:
            # Log access denial
            self.audit_logger.log_security_event({
                'event_type': 'access_denied',
                'user_id': user_id,
                'element_id': element_id,
                'classification': element.classification.value,
                'reason': access_result['reason'],
                'timestamp': datetime.utcnow()
            })
            return access_result
        
        # Check for suspicious access patterns
        threat_assessment = self.threat_detector.assess_access_request(
            user_id, element, access_context
        )
        
        if threat_assessment['risk_level'] == 'high':
            # Require additional authorization for high-risk access
            additional_auth_result = self._require_additional_authorization(
                user_id, element_id, threat_assessment
            )
            if not additional_auth_result['authorized']:
                return additional_auth_result
        
        # Log successful access authorization
        self.audit_logger.log_security_event({
            'event_type': 'access_authorized',
            'user_id': user_id,
            'element_id': element_id,
            'classification': element.classification.value,
            'access_context': access_context.value,
            'risk_level': threat_assessment['risk_level'],
            'timestamp': datetime.utcnow()
        })
        
        return {
            'authorized': True,
            'access_token': self._generate_access_token(user_id, element_id),
            'restrictions': access_result.get('restrictions', []),
            'expires_at': datetime.utcnow() + timedelta(hours=1)
        }
    
    def retrieve_secured_data(self, access_token: str) -> Dict[str, Any]:
        """Retrieve data using authorized access token"""
        
        # Validate access token
        token_validation = self._validate_access_token(access_token)
        if not token_validation['valid']:
            return {'success': False, 'reason': 'Invalid access token'}
        
        user_id = token_validation['user_id']
        element_id = token_validation['element_id']
        
        # Get secured element
        element = self._get_data_element(element_id)
        if not element:
            return {'success': False, 'reason': 'Element not found'}
        
        # Decrypt if necessary
        if element.encryption_key_id:
            decrypted_element = self.encryption_manager.decrypt_element(element)
        else:
            decrypted_element = element
        
        # Apply privacy protection based on user context
        protected_data = self.privacy_engine.apply_privacy_protection(
            decrypted_element, user_id
        )
        
        # Log data access
        self.audit_logger.log_security_event({
            'event_type': 'data_accessed',
            'user_id': user_id,
            'element_id': element_id,
            'classification': element.classification.value,
            'access_token': access_token[:8] + '...',  # Partial token for audit
            'timestamp': datetime.utcnow()
        })
        
        # Update access history
        element.access_history.append({
            'user_id': user_id,
            'accessed_at': datetime.utcnow(),
            'access_token': access_token
        })
        
        return {
            'success': True,
            'data': protected_data,
            'classification': element.classification.value,
            'restrictions': token_validation.get('restrictions', [])
        }
    
    def _get_data_element(self, element_id: str) -> Optional[DataElement]:
        """Get data element by ID (mock implementation)"""
        # In production, this would query the actual data store
        return DataElement(
            element_id=element_id,
            content={'sample': 'economic data'},
            classification=DataClassification.CONFIDENTIAL,
            sensitivity_tags={'market-moving', 'preliminary'},
            created_at=datetime.utcnow(),
            owner='economic-data-system'
        )
    
    def _require_additional_authorization(self, user_id: str, element_id: str,
                                       threat_assessment: Dict[str, Any]) -> Dict[str, Any]:
        """Require additional authorization for high-risk access"""
        # In production, this would integrate with approval workflows
        return {
            'authorized': True,
            'additional_auth_method': 'supervisor_approval',
            'auth_reference': 'AUTH-12345'
        }
    
    def _generate_access_token(self, user_id: str, element_id: str) -> str:
        """Generate secure access token"""
        token_data = {
            'user_id': user_id,
            'element_id': element_id,
            'issued_at': datetime.utcnow().isoformat(),
            'nonce': secrets.token_hex(16)
        }
        
        # In production, this would use proper JWT or similar token format
        token_string = json.dumps(token_data)
        return base64.b64encode(token_string.encode()).decode()
    
    def _validate_access_token(self, access_token: str) -> Dict[str, Any]:
        """Validate access token"""
        try:
            token_string = base64.b64decode(access_token.encode()).decode()
            token_data = json.loads(token_string)
            
            # Check token expiration (1 hour)
            issued_at = datetime.fromisoformat(token_data['issued_at'])
            if datetime.utcnow() - issued_at > timedelta(hours=1):
                return {'valid': False, 'reason': 'Token expired'}
            
            return {
                'valid': True,
                'user_id': token_data['user_id'],
                'element_id': token_data['element_id']
            }
        except Exception as e:
            return {'valid': False, 'reason': f'Token validation failed: {e}'}

class EncryptionManager:
    """Manages encryption for economic data"""
    
    def __init__(self):
        self.encryption_keys = {}
        self.key_rotation_schedule = {}
        self._initialize_master_keys()
    
    def _initialize_master_keys(self):
        """Initialize master encryption keys"""
        # Generate master key for symmetric encryption
        self.master_key = Fernet.generate_key()
        self.symmetric_cipher = Fernet(self.master_key)
        
        # Generate RSA key pair for asymmetric encryption
        self.private_key = rsa.generate_private_key(
            public_exponent=65537,
            key_size=4096
        )
        self.public_key = self.private_key.public_key()
    
    def encrypt_element(self, element: DataElement) -> DataElement:
        """Encrypt data element based on classification"""
        
        if element.classification in [DataClassification.RESTRICTED, DataClassification.TOP_SECRET]:
            # Use asymmetric encryption for highest security
            encrypted_content = self._encrypt_asymmetric(element.content)
            encryption_method = 'rsa-4096'
        else:
            # Use symmetric encryption for other classifications
            encrypted_content = self._encrypt_symmetric(element.content)
            encryption_method = 'aes-256-gcm'
        
        # Generate key ID for tracking
        key_id = hashlib.sha256(f"{element.element_id}{datetime.utcnow()}".encode()).hexdigest()[:16]
        
        # Store encryption metadata
        self.encryption_keys[key_id] = {
            'element_id': element.element_id,
            'encryption_method': encryption_method,
            'created_at': datetime.utcnow(),
            'classification': element.classification.value
        }
        
        # Create encrypted element
        encrypted_element = DataElement(
            element_id=element.element_id,
            content=encrypted_content,
            classification=element.classification,
            sensitivity_tags=element.sensitivity_tags,
            created_at=element.created_at,
            expires_at=element.expires_at,
            owner=element.owner,
            source_system=element.source_system,
            encryption_key_id=key_id,
            access_history=element.access_history
        )
        
        return encrypted_element
    
    def decrypt_element(self, element: DataElement) -> DataElement:
        """Decrypt data element"""
        
        if not element.encryption_key_id:
            return element  # Not encrypted
        
        key_info = self.encryption_keys.get(element.encryption_key_id)
        if not key_info:
            raise ValueError(f"Encryption key not found: {element.encryption_key_id}")
        
        if key_info['encryption_method'] == 'rsa-4096':
            decrypted_content = self._decrypt_asymmetric(element.content)
        elif key_info['encryption_method'] == 'aes-256-gcm':
            decrypted_content = self._decrypt_symmetric(element.content)
        else:
            raise ValueError(f"Unknown encryption method: {key_info['encryption_method']}")
        
        # Create decrypted element
        decrypted_element = DataElement(
            element_id=element.element_id,
            content=decrypted_content,
            classification=element.classification,
            sensitivity_tags=element.sensitivity_tags,
            created_at=element.created_at,
            expires_at=element.expires_at,
            owner=element.owner,
            source_system=element.source_system,
            encryption_key_id=None,  # Remove encryption metadata
            access_history=element.access_history
        )
        
        return decrypted_element
    
    def _encrypt_symmetric(self, content: Any) -> bytes:
        """Encrypt content using symmetric encryption"""
        content_bytes = json.dumps(content).encode() if not isinstance(content, bytes) else content
        return self.symmetric_cipher.encrypt(content_bytes)
    
    def _decrypt_symmetric(self, encrypted_content: bytes) -> Any:
        """Decrypt content using symmetric encryption"""
        decrypted_bytes = self.symmetric_cipher.decrypt(encrypted_content)
        try:
            return json.loads(decrypted_bytes.decode())
        except json.JSONDecodeError:
            return decrypted_bytes
    
    def _encrypt_asymmetric(self, content: Any) -> bytes:
        """Encrypt content using asymmetric encryption"""
        content_bytes = json.dumps(content).encode() if not isinstance(content, bytes) else content
        
        # RSA has size limitations, so use hybrid encryption for large content
        if len(content_bytes) > 446:  # RSA-4096 can encrypt up to 446 bytes
            # Generate random AES key
            aes_key = secrets.token_bytes(32)
            
            # Encrypt content with AES
            cipher = Cipher(algorithms.AES(aes_key), modes.GCM(secrets.token_bytes(12)))
            encryptor = cipher.encryptor()
            ciphertext = encryptor.update(content_bytes) + encryptor.finalize()
            
            # Encrypt AES key with RSA
            encrypted_aes_key = self.public_key.encrypt(
                aes_key,
                padding.OAEP(
                    mgf=padding.MGF1(algorithm=hashes.SHA256()),
                    algorithm=hashes.SHA256(),
                    label=None
                )
            )
            
            # Combine encrypted key and content
            return encrypted_aes_key + encryptor.tag + ciphertext
        else:
            # Direct RSA encryption for small content
            return self.public_key.encrypt(
                content_bytes,
                padding.OAEP(
                    mgf=padding.MGF1(algorithm=hashes.SHA256()),
                    algorithm=hashes.SHA256(),
                    label=None
                )
            )
    
    def _decrypt_asymmetric(self, encrypted_content: bytes) -> Any:
        """Decrypt content using asymmetric encryption"""
        try:
            # Try direct RSA decryption first
            decrypted_bytes = self.private_key.decrypt(
                encrypted_content,
                padding.OAEP(
                    mgf=padding.MGF1(algorithm=hashes.SHA256()),
                    algorithm=hashes.SHA256(),
                    label=None
                )
            )
            return json.loads(decrypted_bytes.decode())
        except Exception:
            # Try hybrid decryption
            # Extract encrypted AES key (first 512 bytes for RSA-4096)
            encrypted_aes_key = encrypted_content[:512]
            tag = encrypted_content[512:528]  # GCM tag is 16 bytes
            ciphertext = encrypted_content[528:]
            
            # Decrypt AES key with RSA
            aes_key = self.private_key.decrypt(
                encrypted_aes_key,
                padding.OAEP(
                    mgf=padding.MGF1(algorithm=hashes.SHA256()),
                    algorithm=hashes.SHA256(),
                    label=None
                )
            )
            
            # Decrypt content with AES
            cipher = Cipher(algorithms.AES(aes_key), modes.GCM(secrets.token_bytes(12), tag))
            decryptor = cipher.decryptor()
            decrypted_bytes = decryptor.update(ciphertext) + decryptor.finalize()
            
            return json.loads(decrypted_bytes.decode())
    
    def rotate_encryption_keys(self):
        """Rotate encryption keys for enhanced security"""
        # Store old keys for decryption of existing data
        old_master_key = self.master_key
        old_private_key = self.private_key
        
        # Generate new keys
        self.master_key = Fernet.generate_key()
        self.symmetric_cipher = Fernet(self.master_key)
        
        self.private_key = rsa.generate_private_key(
            public_exponent=65537,
            key_size=4096
        )
        self.public_key = self.private_key.public_key()
        
        # Schedule re-encryption of existing data
        self.key_rotation_schedule[datetime.utcnow()] = {
            'old_master_key': old_master_key,
            'old_private_key': old_private_key,
            'rotation_reason': 'scheduled_rotation'
        }
        
        logging.info("Encryption keys rotated successfully")

class AccessControlSystem:
    """Advanced access control for economic data"""
    
    def __init__(self):
        self.access_policies = {}
        self.user_attributes = {}
        self.information_barriers = {}
        self.temporal_restrictions = {}
        
    def check_access(self, user_id: str, element: DataElement, 
                    context: AccessContext, policy: SecurityPolicy) -> Dict[str, Any]:
        """Check if user has access to data element"""
        
        # Check basic authorization
        basic_auth = self._check_basic_authorization(user_id, element, context)
        if not basic_auth['authorized']:
            return basic_auth
        
        # Check information barriers
        barrier_check = self._check_information_barriers(user_id, element, policy)
        if not barrier_check['authorized']:
            return barrier_check
        
        # Check temporal restrictions
        temporal_check = self._check_temporal_restrictions(user_id, element, policy)
        if not temporal_check['authorized']:
            return temporal_check
        
        # Check geographic restrictions
        geo_check = self._check_geographic_restrictions(user_id, element, policy)
        if not geo_check['authorized']:
            return geo_check
        
        # Determine access restrictions
        restrictions = self._determine_access_restrictions(user_id, element, context, policy)
        
        return {
            'authorized': True,
            'restrictions': restrictions,
            'access_level': self._determine_access_level(user_id, element, context)
        }
    
    def _check_basic_authorization(self, user_id: str, element: DataElement, 
                                 context: AccessContext) -> Dict[str, Any]:
        """Check basic user authorization"""
        user_attrs = self.user_attributes.get(user_id, {})
        
        # Check user clearance level
        user_clearance = user_attrs.get('clearance_level', 'public')
        required_clearance = self._map_classification_to_clearance(element.classification)
        
        if not self._clearance_sufficient(user_clearance, required_clearance):
            return {
                'authorized': False,
                'reason': f"Insufficient clearance: {user_clearance} < {required_clearance}"
            }
        
        # Check context-specific permissions
        user_contexts = user_attrs.get('authorized_contexts', [])
        if context.value not in user_contexts:
            return {
                'authorized': False,
                'reason': f"User not authorized for context: {context.value}"
            }
        
        return {'authorized': True}
    
    def _check_information_barriers(self, user_id: str, element: DataElement,
                                  policy: SecurityPolicy) -> Dict[str, Any]:
        """Check information barrier restrictions"""
        
        if not policy.information_barriers:
            return {'authorized': True}
        
        user_attrs = self.user_attributes.get(user_id, {})
        user_barriers = user_attrs.get('information_barriers', [])
        
        # Check if user is subject to any barriers that would prevent access
        for barrier in policy.information_barriers:
            if barrier in user_barriers:
                # Check if this specific data conflicts with user's barrier
                if self._barrier_blocks_access(barrier, element):
                    return {
                        'authorized': False,
                        'reason': f"Information barrier violation: {barrier}"
                    }
        
        return {'authorized': True}
    
    def _check_temporal_restrictions(self, user_id: str, element: DataElement,
                                   policy: SecurityPolicy) -> Dict[str, Any]:
        """Check temporal access restrictions"""
        
        if not policy.time_restrictions:
            return {'authorized': True}
        
        current_time = datetime.utcnow()
        restrictions = policy.time_restrictions
        
        # Check market hours restriction
        if 'market_hours_only' in restrictions and restrictions['market_hours_only']:
            if not self._is_market_hours(current_time):
                return {
                    'authorized': False,
                    'reason': 'Access restricted to market hours'
                }
        
        # Check embargo periods
        if 'embargo_until' in restrictions:
            embargo_until = datetime.fromisoformat(restrictions['embargo_until'])
            if current_time < embargo_until:
                return {
                    'authorized': False,
                    'reason': f"Data embargoed until {embargo_until}"
                }
        
        return {'authorized': True}
    
    def _check_geographic_restrictions(self, user_id: str, element: DataElement,
                                     policy: SecurityPolicy) -> Dict[str, Any]:
        """Check geographic access restrictions"""
        
        if not policy.geographic_restrictions:
            return {'authorized': True}
        
        user_attrs = self.user_attributes.get(user_id, {})
        user_location = user_attrs.get('location', 'unknown')
        
        # Check if user location is restricted
        if user_location in policy.geographic_restrictions:
            return {
                'authorized': False,
                'reason': f"Access restricted from location: {user_location}"
            }
        
        return {'authorized': True}
    
    def _map_classification_to_clearance(self, classification: DataClassification) -> str:
        """Map data classification to required clearance level"""
        mapping = {
            DataClassification.PUBLIC: 'public',
            DataClassification.INTERNAL: 'internal',
            DataClassification.CONFIDENTIAL: 'confidential',
            DataClassification.RESTRICTED: 'restricted',
            DataClassification.TOP_SECRET: 'top_secret'
        }
        return mapping.get(classification, 'top_secret')
    
    def _clearance_sufficient(self, user_clearance: str, required_clearance: str) -> bool:
        """Check if user clearance is sufficient"""
        clearance_levels = ['public', 'internal', 'confidential', 'restricted', 'top_secret']
        
        user_level = clearance_levels.index(user_clearance) if user_clearance in clearance_levels else -1
        required_level = clearance_levels.index(required_clearance) if required_clearance in clearance_levels else len(clearance_levels)
        
        return user_level >= required_level
    
    def _barrier_blocks_access(self, barrier: str, element: DataElement) -> bool:
        """Check if information barrier blocks access to element"""
        # Simplified barrier logic - in production would be more sophisticated
        barrier_config = self.information_barriers.get(barrier, {})
        blocked_tags = barrier_config.get('blocked_tags', [])
        
        return any(tag in element.sensitivity_tags for tag in blocked_tags)
    
    def _is_market_hours(self, timestamp: datetime) -> bool:
        """Check if timestamp is within market hours"""
        # Simplified market hours check (9:30 AM - 4:00 PM ET, Monday-Friday)
        weekday = timestamp.weekday()  # 0 = Monday, 6 = Sunday
        hour = timestamp.hour
        
        if weekday >= 5:  # Weekend
            return False
        
        if hour < 9 or hour >= 16:  # Outside 9 AM - 4 PM
            return False
        
        return True
    
    def _determine_access_restrictions(self, user_id: str, element: DataElement,
                                     context: AccessContext, policy: SecurityPolicy) -> List[str]:
        """Determine specific access restrictions for user"""
        restrictions = []
        
        user_attrs = self.user_attributes.get(user_id, {})
        
        # Add context-specific restrictions
        if context == AccessContext.RESEARCH:
            restrictions.append('no_trading_decisions')
        elif context == AccessContext.TRADING:
            restrictions.append('trading_compliance_required')
        
        # Add classification-specific restrictions
        if element.classification in [DataClassification.RESTRICTED, DataClassification.TOP_SECRET]:
            restrictions.extend(['no_external_sharing', 'screen_recording_disabled'])
        
        # Add user-specific restrictions
        if user_attrs.get('probationary', False):
            restrictions.append('supervisor_notification')
        
        return restrictions
    
    def _determine_access_level(self, user_id: str, element: DataElement, 
                              context: AccessContext) -> str:
        """Determine the level of access granted"""
        user_attrs = self.user_attributes.get(user_id, {})
        
        if context == AccessContext.AUDIT:
            return 'full_access'
        elif element.classification == DataClassification.PUBLIC:
            return 'full_access'
        elif user_attrs.get('senior_analyst', False):
            return 'full_access'
        else:
            return 'restricted_access'

class PrivacyProtectionEngine:
    """Privacy protection and data anonymization for economic data"""
    
    def __init__(self):
        self.anonymization_techniques = {}
        self.privacy_policies = {}
        self.consent_records = {}
        
    def apply_privacy_protection(self, element: DataElement, user_id: str) -> Any:
        """Apply privacy protection based on user context and data sensitivity"""
        
        # Determine required privacy protection level
        protection_level = self._determine_protection_level(element, user_id)
        
        if protection_level == 'none':
            return element.content
        elif protection_level == 'basic':
            return self._apply_basic_masking(element.content)
        elif protection_level == 'differential':
            return self._apply_differential_privacy(element.content)
        elif protection_level == 'synthetic':
            return self._generate_synthetic_data(element.content)
        else:
            return self._apply_full_anonymization(element.content)
    
    def apply_data_masking(self, element: DataElement) -> DataElement:
        """Apply data masking for sensitive elements"""
        
        if 'personal-data' in element.sensitivity_tags:
            masked_content = self._mask_personal_identifiers(element.content)
        elif 'financial' in element.sensitivity_tags:
            masked_content = self._mask_financial_data(element.content)
        else:
            masked_content = self._apply_generic_masking(element.content)
        
        # Create masked element
        masked_element = DataElement(
            element_id=element.element_id,
            content=masked_content,
            classification=element.classification,
            sensitivity_tags=element.sensitivity_tags | {'masked'},
            created_at=element.created_at,
            expires_at=element.expires_at,
            owner=element.owner,
            source_system=element.source_system,
            encryption_key_id=element.encryption_key_id,
            access_history=element.access_history
        )
        
        return masked_element
    
    def _determine_protection_level(self, element: DataElement, user_id: str) -> str:
        """Determine required privacy protection level"""
        
        # Check if user has special privacy permissions
        user_privacy_role = self._get_user_privacy_role(user_id)
        
        if user_privacy_role == 'privacy_officer':
            return 'none'  # Privacy officers can see unprotected data
        elif 'personal-data' in element.sensitivity_tags:
            return 'differential'  # Personal data requires differential privacy
        elif element.classification == DataClassification.RESTRICTED:
            return 'synthetic'  # Generate synthetic data for restricted content
        elif 'preliminary' in element.sensitivity_tags:
            return 'basic'  # Basic masking for preliminary data
        else:
            return 'none'
    
    def _apply_basic_masking(self, content: Any) -> Any:
        """Apply basic data masking"""
        if isinstance(content, dict):
            masked = content.copy()
            
            # Mask specific sensitive fields
            sensitive_fields = ['ssn', 'account_number', 'phone', 'email']
            for field in sensitive_fields:
                if field in masked:
                    masked[field] = self._mask_string(str(masked[field]))
            
            return masked
        elif isinstance(content, str):
            return self._mask_string(content)
        else:
            return content
    
    def _apply_differential_privacy(self, content: Any) -> Any:
        """Apply differential privacy for numerical data"""
        if isinstance(content, dict):
            protected = content.copy()
            
            # Add noise to numerical values
            for key, value in protected.items():
                if isinstance(value, (int, float)):
                    # Add Laplace noise for differential privacy
                    noise = np.random.laplace(0, 1.0)  # sensitivity/epsilon
                    protected[key] = value + noise
            
            return protected
        else:
            return content
    
    def _generate_synthetic_data(self, content: Any) -> Any:
        """Generate synthetic data preserving statistical properties"""
        if isinstance(content, dict):
            synthetic = {}
            
            for key, value in content.items():
                if isinstance(value, (int, float)):
                    # Generate synthetic numerical data
                    mean = value
                    std = abs(value * 0.1)  # 10% standard deviation
                    synthetic[key] = np.random.normal(mean, std)
                elif isinstance(value, str):
                    # Generate synthetic categorical data
                    synthetic[key] = f"synthetic_{hash(value) % 1000}"
                else:
                    synthetic[key] = value
            
            return synthetic
        else:
            return content
    
    def _apply_full_anonymization(self, content: Any) -> Any:
        """Apply full anonymization removing all identifying information"""
        if isinstance(content, dict):
            anonymized = {}
            
            # Remove or generalize identifying fields
            safe_fields = ['category', 'type', 'region', 'date_range']
            for key, value in content.items():
                if key in safe_fields:
                    anonymized[key] = value
                elif isinstance(value, (int, float)):
                    # Generalize numerical values
                    anonymized[key] = round(value, -1)  # Round to nearest 10
            
            return anonymized
        else:
            return "anonymized_content"
    
    def _mask_personal_identifiers(self, content: Any) -> Any:
        """Mask personal identifiers in content"""
        if isinstance(content, dict):
            masked = content.copy()
            
            # Mask SSNs
            if 'ssn' in masked:
                ssn = str(masked['ssn'])
                masked['ssn'] = f"***-**-{ssn[-4:]}" if len(ssn) >= 4 else "***-**-****"
            
            # Mask phone numbers
            if 'phone' in masked:
                phone = str(masked['phone'])
                masked['phone'] = f"***-***-{phone[-4:]}" if len(phone) >= 4 else "***-***-****"
            
            return masked
        else:
            return content
    
    def _mask_financial_data(self, content: Any) -> Any:
        """Mask sensitive financial information"""
        if isinstance(content, dict):
            masked = content.copy()
            
            # Mask account numbers
            for field in ['account_number', 'routing_number']:
                if field in masked:
                    account = str(masked[field])
                    masked[field] = f"****{account[-4:]}" if len(account) >= 4 else "****"
            
            # Generalize amounts above threshold
            for field in ['balance', 'amount', 'limit']:
                if field in masked and isinstance(masked[field], (int, float)):
                    if masked[field] > 10000:
                        masked[field] = ">$10,000"
            
            return masked
        else:
            return content
    
    def _apply_generic_masking(self, content: Any) -> Any:
        """Apply generic masking for unspecified sensitive content"""
        if isinstance(content, str):
            return self._mask_string(content)
        elif isinstance(content, dict):
            return {k: self._mask_string(str(v)) if isinstance(v, str) else v 
                   for k, v in content.items()}
        else:
            return content
    
    def _mask_string(self, text: str) -> str:
        """Mask string content preserving first and last characters"""
        if len(text) <= 2:
            return '*' * len(text)
        elif len(text) <= 4:
            return text[0] + '*' * (len(text) - 2) + text[-1]
        else:
            return text[:2] + '*' * (len(text) - 4) + text[-2:]
    
    def _get_user_privacy_role(self, user_id: str) -> str:
        """Get user's privacy role"""
        # In production, this would query user management system
        return 'standard_user'

class SecurityAuditLogger:
    """Comprehensive audit logging for security events"""
    
    def __init__(self):
        self.audit_logs = []
        self.log_retention_days = 2555  # 7 years for regulatory compliance
        
    def log_security_event(self, event: Dict[str, Any]):
        """Log security event with comprehensive metadata"""
        
        audit_entry = {
            'event_id': secrets.token_hex(16),
            'timestamp': event.get('timestamp', datetime.utcnow()),
            'event_type': event['event_type'],
            'severity': self._determine_event_severity(event),
            'user_id': event.get('user_id'),
            'resource_id': event.get('element_id'),
            'classification': event.get('classification'),
            'source_ip': event.get('source_ip', 'internal'),
            'user_agent': event.get('user_agent', 'system'),
            'session_id': event.get('session_id'),
            'details': event,
            'compliance_tags': self._generate_compliance_tags(event)
        }
        
        self.audit_logs.append(audit_entry)
        
        # In production, would also send to external SIEM
        self._send_to_siem(audit_entry)
        
        logging.info(f"Security event logged: {event['event_type']} for user {event.get('user_id', 'unknown')}")
    
    def _determine_event_severity(self, event: Dict[str, Any]) -> str:
        """Determine event severity level"""
        event_type = event['event_type']
        
        if event_type in ['access_denied', 'unauthorized_access_attempt']:
            return 'medium'
        elif event_type in ['data_breach', 'privilege_escalation']:
            return 'critical'
        elif event_type in ['data_accessed', 'data_secured']:
            return 'low'
        else:
            return 'medium'
    
    def _generate_compliance_tags(self, event: Dict[str, Any]) -> List[str]:
        """Generate compliance tags for audit entry"""
        tags = []
        
        classification = event.get('classification')
        if classification in ['restricted', 'top_secret']:
            tags.append('high_security')
        
        if event.get('event_type') == 'data_accessed':
            tags.append('data_access')
        
        if 'personal-data' in str(event.get('details', {})):
            tags.append('privacy_relevant')
        
        return tags
    
    def _send_to_siem(self, audit_entry: Dict[str, Any]):
        """Send audit entry to external SIEM system"""
        # In production, would integrate with SIEM
        pass
    
    def generate_compliance_report(self, start_date: datetime, end_date: datetime) -> Dict[str, Any]:
        """Generate compliance report for audit period"""
        
        relevant_logs = [
            log for log in self.audit_logs
            if start_date <= log['timestamp'] <= end_date
        ]
        
        return {
            'report_period': {
                'start': start_date.isoformat(),
                'end': end_date.isoformat()
            },
            'total_events': len(relevant_logs),
            'events_by_type': self._count_events_by_type(relevant_logs),
            'events_by_severity': self._count_events_by_severity(relevant_logs),
            'access_patterns': self._analyze_access_patterns(relevant_logs),
            'security_incidents': self._identify_security_incidents(relevant_logs)
        }
    
    def _count_events_by_type(self, logs: List[Dict[str, Any]]) -> Dict[str, int]:
        """Count events by type"""
        counts = {}
        for log in logs:
            event_type = log['event_type']
            counts[event_type] = counts.get(event_type, 0) + 1
        return counts
    
    def _count_events_by_severity(self, logs: List[Dict[str, Any]]) -> Dict[str, int]:
        """Count events by severity"""
        counts = {}
        for log in logs:
            severity = log['severity']
            counts[severity] = counts.get(severity, 0) + 1
        return counts
    
    def _analyze_access_patterns(self, logs: List[Dict[str, Any]]) -> Dict[str, Any]:
        """Analyze data access patterns"""
        access_logs = [log for log in logs if log['event_type'] == 'data_accessed']
        
        user_access_counts = {}
        resource_access_counts = {}
        
        for log in access_logs:
            user_id = log.get('user_id')
            resource_id = log.get('resource_id')
            
            if user_id:
                user_access_counts[user_id] = user_access_counts.get(user_id, 0) + 1
            if resource_id:
                resource_access_counts[resource_id] = resource_access_counts.get(resource_id, 0) + 1
        
        return {
            'total_accesses': len(access_logs),
            'unique_users': len(user_access_counts),
            'unique_resources': len(resource_access_counts),
            'top_users': sorted(user_access_counts.items(), key=lambda x: x[1], reverse=True)[:10],
            'top_resources': sorted(resource_access_counts.items(), key=lambda x: x[1], reverse=True)[:10]
        }
    
    def _identify_security_incidents(self, logs: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
        """Identify potential security incidents"""
        incidents = []
        
        # Look for suspicious patterns
        failed_access_attempts = {}
        for log in logs:
            if log['event_type'] == 'access_denied':
                user_id = log.get('user_id')
                if user_id:
                    failed_access_attempts[user_id] = failed_access_attempts.get(user_id, 0) + 1
        
        # Flag users with excessive failed attempts
        for user_id, attempts in failed_access_attempts.items():
            if attempts > 10:  # Threshold for suspicious activity
                incidents.append({
                    'type': 'excessive_failed_access',
                    'user_id': user_id,
                    'attempt_count': attempts,
                    'severity': 'medium'
                })
        
        return incidents

class ThreatDetectionSystem:
    """Advanced threat detection for economic data access"""
    
    def __init__(self):
        self.user_behavior_baselines = {}
        self.suspicious_patterns = {}
        self.threat_intelligence = {}
        
    def assess_access_request(self, user_id: str, element: DataElement, 
                            context: AccessContext) -> Dict[str, Any]:
        """Assess risk level of access request"""
        
        risk_factors = []
        risk_score = 0
        
        # Behavioral analysis
        behavioral_risk = self._analyze_user_behavior(user_id, element, context)
        risk_score += behavioral_risk['risk_score']
        risk_factors.extend(behavioral_risk['factors'])
        
        # Temporal analysis
        temporal_risk = self._analyze_temporal_patterns(user_id, element)
        risk_score += temporal_risk['risk_score']
        risk_factors.extend(temporal_risk['factors'])
        
        # Content sensitivity analysis
        content_risk = self._analyze_content_sensitivity(element, context)
        risk_score += content_risk['risk_score']
        risk_factors.extend(content_risk['factors'])
        
        # Determine overall risk level
        if risk_score >= 8:
            risk_level = 'high'
        elif risk_score >= 5:
            risk_level = 'medium'
        else:
            risk_level = 'low'
        
        return {
            'risk_level': risk_level,
            'risk_score': risk_score,
            'risk_factors': risk_factors,
            'requires_additional_auth': risk_level == 'high'
        }
    
    def _analyze_user_behavior(self, user_id: str, element: DataElement, 
                             context: AccessContext) -> Dict[str, Any]:
        """Analyze user behavior for anomalies"""
        
        baseline = self.user_behavior_baselines.get(user_id, {})
        risk_factors = []
        risk_score = 0
        
        # Check access time patterns
        current_hour = datetime.utcnow().hour
        typical_hours = baseline.get('typical_access_hours', [])
        
        if typical_hours and current_hour not in typical_hours:
            risk_factors.append('unusual_access_time')
            risk_score += 2
        
        # Check access frequency
        recent_accesses = baseline.get('recent_access_count', 0)
        typical_daily_accesses = baseline.get('avg_daily_accesses', 10)
        
        if recent_accesses > typical_daily_accesses * 3:
            risk_factors.append('unusual_access_frequency')
            risk_score += 3
        
        # Check data classification pattern
        typical_classifications = baseline.get('typical_classifications', [])
        if element.classification.value not in typical_classifications:
            risk_factors.append('unusual_data_classification')
            risk_score += 2
        
        return {
            'risk_score': risk_score,
            'factors': risk_factors
        }
    
    def _analyze_temporal_patterns(self, user_id: str, element: DataElement) -> Dict[str, Any]:
        """Analyze temporal access patterns"""
        
        risk_factors = []
        risk_score = 0
        
        current_time = datetime.utcnow()
        
        # Check for weekend/holiday access
        if current_time.weekday() >= 5:  # Weekend
            risk_factors.append('weekend_access')
            risk_score += 1
        
        # Check for after-hours access
        if current_time.hour < 6 or current_time.hour > 22:
            risk_factors.append('after_hours_access')
            risk_score += 2
        
        # Check proximity to earnings/announcement periods
        if self._is_earnings_season(current_time):
            risk_factors.append('earnings_season_access')
            risk_score += 1
        
        return {
            'risk_score': risk_score,
            'factors': risk_factors
        }
    
    def _analyze_content_sensitivity(self, element: DataElement, 
                                   context: AccessContext) -> Dict[str, Any]:
        """Analyze content sensitivity for risk assessment"""
        
        risk_factors = []
        risk_score = 0
        
        # High-classification data increases risk
        if element.classification in [DataClassification.RESTRICTED, DataClassification.TOP_SECRET]:
            risk_factors.append('high_classification_data')
            risk_score += 3
        
        # Market-moving data increases risk
        if 'market-moving' in element.sensitivity_tags:
            risk_factors.append('market_moving_data')
            risk_score += 2
        
        # Preliminary data increases risk
        if 'preliminary' in element.sensitivity_tags:
            risk_factors.append('preliminary_data')
            risk_score += 1
        
        # Trading context with sensitive data
        if context == AccessContext.TRADING and 'market-moving' in element.sensitivity_tags:
            risk_factors.append('trading_access_to_market_data')
            risk_score += 3
        
        return {
            'risk_score': risk_score,
            'factors': risk_factors
        }
    
    def _is_earnings_season(self, timestamp: datetime) -> bool:
        """Check if timestamp falls during earnings season"""
        # Simplified earnings season detection
        month = timestamp.month
        return month in [1, 4, 7, 10]  # Quarterly earnings months

Compliance and Regulatory Security

Economic data systems must meet stringent regulatory security requirements that vary by jurisdiction, data type, and organizational context. The compliance framework must address sector-specific regulations like Basel III for banking, Solvency II for insurance, and various securities regulations while maintaining operational flexibility for legitimate business activities.

Cross-border data protection requires sophisticated understanding of international regulations and their implications for economic data sharing. The framework must implement dynamic controls that adjust based on data destination, user location, and the specific regulatory requirements that apply to different types of economic information.

Incident response and breach notification procedures must account for the unique characteristics of economic data breaches, including potential market impact, regulatory notification requirements, and the specialized forensic techniques needed to investigate economic data security incidents. The response framework must integrate with existing business continuity plans while addressing the specific recovery requirements for economic data systems.

The security framework presented in this guide provides the foundation for implementing comprehensive protection for economic data systems. These security controls integrate with the governance frameworks from Economic Data Governance and Compliance and support the operational requirements discussed in Container Orchestration for Economic Data Systems. Together, these frameworks enable organizations to build economic data systems that meet the highest security and compliance standards while supporting the analytical capabilities needed for modern economic analysis.

For comprehensive economic data security implementation, explore these complementary resources:

Recent Articles