Economic Indicator Alerting and Monitoring Systems: Real-Time Event Detection

Introduction

Economic indicator monitoring requires sophisticated alerting systems that can distinguish between normal market fluctuations and significant economic events requiring immediate attention. Unlike typical application monitoring that focuses on system health metrics, economic alerting must understand the contextual significance of data changes, account for seasonal patterns, and adapt to changing economic conditions.

The challenge lies in creating alert systems that are sensitive enough to catch important economic developments early, yet robust enough to avoid alert fatigue from normal market volatility. Economic indicators exhibit complex temporal patterns, correlation structures, and regime changes that simple threshold-based alerting cannot effectively handle. Modern economic monitoring systems must incorporate statistical models, machine learning techniques, and domain expertise to provide actionable intelligence.

Regulatory and compliance requirements add another dimension to economic alerting systems. Financial institutions must monitor specific indicators for regulatory reporting, while central banks need real-time awareness of economic conditions that might influence policy decisions. The alerting system must provide audit trails, configurable escalation procedures, and integration with existing operational workflows.

This guide builds upon the data processing capabilities from Real-Time Data Processing Economic Indicators and leverages the storage patterns from Database Integration for Economic Data Storage. The monitoring patterns presented here complement the data quality frameworks from Data Quality Practices for Economic Datasets.

Alert Framework Architecture

A comprehensive economic alerting system requires a layered architecture that separates data ingestion, analysis, alert generation, and notification delivery. This separation enables different components to evolve independently and allows for sophisticated alert logic without impacting data collection or delivery performance.

The analysis layer represents the heart of the alerting system, where raw economic data is transformed into actionable insights. This layer must handle multiple types of analysis simultaneously - statistical anomaly detection, threshold monitoring, trend analysis, and cross-indicator correlation analysis. Each analysis type contributes to the overall assessment of economic conditions and potential alert generation.

The alert orchestration layer manages the complex decision-making process that determines when and how to escalate economic events. This layer must consider alert priorities, recipient preferences, escalation schedules, and the broader context of current economic conditions. During periods of high market volatility, the system might adjust alert thresholds or consolidate related alerts to prevent overwhelming recipients.

from abc import ABC, abstractmethod
from typing import Dict, List, Any, Optional, Callable, Union
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from enum import Enum
import asyncio
import logging
import pandas as pd
import numpy as np
from concurrent.futures import ThreadPoolExecutor
import json
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
import aiohttp
import websockets

class AlertSeverity(Enum):
    LOW = "low"
    MEDIUM = "medium"
    HIGH = "high"
    CRITICAL = "critical"

class AlertType(Enum):
    THRESHOLD = "threshold"
    ANOMALY = "anomaly"
    TREND = "trend"
    CORRELATION = "correlation"
    COMPOSITE = "composite"
    DATA_QUALITY = "data_quality"

@dataclass
class AlertConfig:
    """Configuration for economic indicator alerts"""
    alert_id: str
    name: str
    description: str
    alert_type: AlertType
    severity: AlertSeverity
    indicator_codes: List[str]
    countries: Optional[List[str]] = None
    conditions: Dict[str, Any] = field(default_factory=dict)
    enabled: bool = True
    notification_channels: List[str] = field(default_factory=list)
    escalation_policy: Optional[str] = None
    metadata: Dict[str, Any] = field(default_factory=dict)

@dataclass
class AlertEvent:
    """Economic alert event"""
    alert_id: str
    event_id: str
    timestamp: datetime
    severity: AlertSeverity
    alert_type: AlertType
    title: str
    description: str
    affected_indicators: List[str]
    affected_countries: List[str]
    trigger_values: Dict[str, Any]
    context: Dict[str, Any]
    resolved: bool = False
    acknowledged: bool = False
    acknowledged_by: Optional[str] = None

class EconomicAlertFramework:
    """Comprehensive framework for economic indicator alerting"""
    
    def __init__(self):
        self.alert_configs = {}
        self.active_alerts = {}
        self.alert_history = []
        self.analyzers = self._initialize_analyzers()
        self.notification_manager = NotificationManager()
        self.escalation_manager = EscalationManager()
        self.alert_suppression = AlertSuppression()
        self.metrics_collector = AlertMetricsCollector()
        
    def _initialize_analyzers(self) -> Dict[AlertType, 'AlertAnalyzer']:
        """Initialize alert analyzers for different types"""
        return {
            AlertType.THRESHOLD: ThresholdAnalyzer(),
            AlertType.ANOMALY: AnomalyAnalyzer(),
            AlertType.TREND: TrendAnalyzer(),
            AlertType.CORRELATION: CorrelationAnalyzer(),
            AlertType.COMPOSITE: CompositeAnalyzer(),
            AlertType.DATA_QUALITY: DataQualityAnalyzer()
        }
    
    def register_alert(self, config: AlertConfig):
        """Register a new alert configuration"""
        self.alert_configs[config.alert_id] = config
        logging.info(f"Registered alert: {config.name} ({config.alert_id})")
    
    async def process_data_update(self, data: Dict[str, Any]):
        """Process incoming data and check for alerts"""
        alert_tasks = []
        
        for alert_id, config in self.alert_configs.items():
            if not config.enabled:
                continue
            
            # Check if this data update is relevant to the alert
            if self._is_relevant_data(data, config):
                analyzer = self.analyzers[config.alert_type]
                task = asyncio.create_task(
                    self._evaluate_alert(alert_id, config, analyzer, data)
                )
                alert_tasks.append(task)
        
        # Process all relevant alerts concurrently
        if alert_tasks:
            alert_results = await asyncio.gather(*alert_tasks, return_exceptions=True)
            
            # Handle alert results
            for i, result in enumerate(alert_results):
                if isinstance(result, Exception):
                    logging.error(f"Alert evaluation failed: {result}")
                elif result:  # Alert was triggered
                    await self._handle_alert_trigger(result)
    
    def _is_relevant_data(self, data: Dict[str, Any], config: AlertConfig) -> bool:
        """Check if data update is relevant to alert configuration"""
        data_indicator = data.get('indicator_code')
        data_country = data.get('country_code')
        
        # Check indicator relevance
        if data_indicator not in config.indicator_codes:
            return False
        
        # Check country relevance
        if config.countries and data_country not in config.countries:
            return False
        
        return True
    
    async def _evaluate_alert(self, alert_id: str, config: AlertConfig,
                            analyzer: 'AlertAnalyzer', data: Dict[str, Any]) -> Optional[AlertEvent]:
        """Evaluate a specific alert against new data"""
        try:
            # Check if alert is currently suppressed
            if self.alert_suppression.is_suppressed(alert_id):
                return None
            
            # Perform analysis
            analysis_result = await analyzer.analyze(config, data)
            
            if analysis_result.should_alert:
                # Create alert event
                alert_event = AlertEvent(
                    alert_id=alert_id,
                    event_id=f"{alert_id}_{datetime.utcnow().isoformat()}",
                    timestamp=datetime.utcnow(),
                    severity=config.severity,
                    alert_type=config.alert_type,
                    title=analysis_result.title,
                    description=analysis_result.description,
                    affected_indicators=analysis_result.affected_indicators,
                    affected_countries=analysis_result.affected_countries,
                    trigger_values=analysis_result.trigger_values,
                    context=analysis_result.context
                )
                
                return alert_event
            
            return None
            
        except Exception as e:
            logging.error(f"Alert evaluation failed for {alert_id}: {e}")
            return None
    
    async def _handle_alert_trigger(self, alert_event: AlertEvent):
        """Handle triggered alert event"""
        # Store alert
        self.active_alerts[alert_event.event_id] = alert_event
        self.alert_history.append(alert_event)
        
        # Record metrics
        self.metrics_collector.record_alert(alert_event)
        
        # Send notifications
        config = self.alert_configs[alert_event.alert_id]
        await self.notification_manager.send_notifications(alert_event, config)
        
        # Handle escalation if configured
        if config.escalation_policy:
            await self.escalation_manager.start_escalation(alert_event, config)
        
        logging.info(f"Alert triggered: {alert_event.title} ({alert_event.event_id})")
    
    async def acknowledge_alert(self, event_id: str, acknowledged_by: str) -> bool:
        """Acknowledge an active alert"""
        if event_id in self.active_alerts:
            alert = self.active_alerts[event_id]
            alert.acknowledged = True
            alert.acknowledged_by = acknowledged_by
            
            # Stop escalation
            config = self.alert_configs[alert.alert_id]
            if config.escalation_policy:
                await self.escalation_manager.stop_escalation(event_id)
            
            logging.info(f"Alert acknowledged: {event_id} by {acknowledged_by}")
            return True
        
        return False
    
    async def resolve_alert(self, event_id: str, resolved_by: str) -> bool:
        """Resolve an active alert"""
        if event_id in self.active_alerts:
            alert = self.active_alerts[event_id]
            alert.resolved = True
            
            # Remove from active alerts
            del self.active_alerts[event_id]
            
            # Stop escalation
            config = self.alert_configs[alert.alert_id]
            if config.escalation_policy:
                await self.escalation_manager.stop_escalation(event_id)
            
            # Record resolution metrics
            self.metrics_collector.record_resolution(alert, resolved_by)
            
            logging.info(f"Alert resolved: {event_id} by {resolved_by}")
            return True
        
        return False
    
    def get_alert_summary(self) -> Dict[str, Any]:
        """Get summary of current alert status"""
        active_by_severity = {}
        for severity in AlertSeverity:
            active_by_severity[severity.value] = len([
                alert for alert in self.active_alerts.values()
                if alert.severity == severity
            ])
        
        return {
            "total_active_alerts": len(self.active_alerts),
            "active_by_severity": active_by_severity,
            "total_configurations": len(self.alert_configs),
            "enabled_configurations": len([
                config for config in self.alert_configs.values() if config.enabled
            ]),
            "recent_alerts_24h": len([
                alert for alert in self.alert_history
                if alert.timestamp > datetime.utcnow() - timedelta(hours=24)
            ])
        }

@dataclass
class AnalysisResult:
    """Result of alert analysis"""
    should_alert: bool
    title: str
    description: str
    affected_indicators: List[str]
    affected_countries: List[str]
    trigger_values: Dict[str, Any]
    context: Dict[str, Any]

class AlertAnalyzer(ABC):
    """Abstract base class for alert analyzers"""
    
    @abstractmethod
    async def analyze(self, config: AlertConfig, data: Dict[str, Any]) -> AnalysisResult:
        """Analyze data against alert configuration"""
        pass

class ThresholdAnalyzer(AlertAnalyzer):
    """Analyzer for threshold-based alerts"""
    
    async def analyze(self, config: AlertConfig, data: Dict[str, Any]) -> AnalysisResult:
        """Analyze threshold conditions"""
        conditions = config.conditions
        value = data.get('value')
        indicator_code = data.get('indicator_code')
        country_code = data.get('country_code')
        
        should_alert = False
        trigger_values = {}
        
        # Check upper threshold
        if 'upper_threshold' in conditions and value > conditions['upper_threshold']:
            should_alert = True
            trigger_values['threshold_type'] = 'upper'
            trigger_values['threshold_value'] = conditions['upper_threshold']
            trigger_values['actual_value'] = value
        
        # Check lower threshold
        elif 'lower_threshold' in conditions and value < conditions['lower_threshold']:
            should_alert = True
            trigger_values['threshold_type'] = 'lower'
            trigger_values['threshold_value'] = conditions['lower_threshold']
            trigger_values['actual_value'] = value
        
        if should_alert:
            title = f"Threshold Alert: {indicator_code} exceeded {trigger_values['threshold_type']} threshold"
            description = (
                f"{indicator_code} for {country_code} reached {value}, "
                f"which exceeds the {trigger_values['threshold_type']} threshold of "
                f"{trigger_values['threshold_value']}"
            )
        else:
            title = ""
            description = ""
        
        return AnalysisResult(
            should_alert=should_alert,
            title=title,
            description=description,
            affected_indicators=[indicator_code],
            affected_countries=[country_code],
            trigger_values=trigger_values,
            context={'analysis_type': 'threshold'}
        )

class AnomalyAnalyzer(AlertAnalyzer):
    """Analyzer for statistical anomaly detection"""
    
    def __init__(self):
        self.historical_data_cache = {}
        self.model_cache = {}
    
    async def analyze(self, config: AlertConfig, data: Dict[str, Any]) -> AnalysisResult:
        """Analyze for statistical anomalies"""
        indicator_code = data.get('indicator_code')
        country_code = data.get('country_code')
        value = data.get('value')
        timestamp = data.get('timestamp')
        
        # Get historical data for context
        historical_data = await self._get_historical_data(indicator_code, country_code)
        
        if len(historical_data) < 30:  # Need minimum data for anomaly detection
            return AnalysisResult(
                should_alert=False,
                title="",
                description="",
                affected_indicators=[indicator_code],
                affected_countries=[country_code],
                trigger_values={},
                context={'analysis_type': 'anomaly', 'insufficient_data': True}
            )
        
        # Calculate statistical measures
        mean = historical_data['value'].mean()
        std = historical_data['value'].std()
        z_score = abs(value - mean) / std if std > 0 else 0
        
        # Determine if anomalous
        anomaly_threshold = config.conditions.get('z_score_threshold', 3.0)
        should_alert = z_score > anomaly_threshold
        
        trigger_values = {
            'z_score': z_score,
            'historical_mean': mean,
            'historical_std': std,
            'anomaly_threshold': anomaly_threshold,
            'actual_value': value
        }
        
        if should_alert:
            title = f"Anomaly Alert: {indicator_code} shows unusual deviation"
            description = (
                f"{indicator_code} for {country_code} reached {value}, "
                f"which is {z_score:.2f} standard deviations from the historical mean "
                f"of {mean:.2f} (threshold: {anomaly_threshold})"
            )
        else:
            title = ""
            description = ""
        
        return AnalysisResult(
            should_alert=should_alert,
            title=title,
            description=description,
            affected_indicators=[indicator_code],
            affected_countries=[country_code],
            trigger_values=trigger_values,
            context={'analysis_type': 'anomaly'}
        )
    
    async def _get_historical_data(self, indicator_code: str, country_code: str) -> pd.DataFrame:
        """Get historical data for anomaly analysis"""
        # This would typically query the database
        # For demo purposes, return sample data
        cache_key = f"{indicator_code}_{country_code}"
        
        if cache_key not in self.historical_data_cache:
            # In production, this would query your database
            dates = pd.date_range(
                start=datetime.now() - timedelta(days=365),
                end=datetime.now(),
                freq='D'
            )
            values = np.random.normal(100, 10, len(dates))
            
            self.historical_data_cache[cache_key] = pd.DataFrame({
                'timestamp': dates,
                'value': values
            })
        
        return self.historical_data_cache[cache_key]

class TrendAnalyzer(AlertAnalyzer):
    """Analyzer for trend-based alerts"""
    
    async def analyze(self, config: AlertConfig, data: Dict[str, Any]) -> AnalysisResult:
        """Analyze trend patterns"""
        indicator_code = data.get('indicator_code')
        country_code = data.get('country_code')
        
        # Get recent historical data for trend analysis
        historical_data = await self._get_recent_data(indicator_code, country_code)
        
        if len(historical_data) < 10:  # Need minimum data for trend analysis
            return AnalysisResult(
                should_alert=False,
                title="",
                description="",
                affected_indicators=[indicator_code],
                affected_countries=[country_code],
                trigger_values={},
                context={'analysis_type': 'trend', 'insufficient_data': True}
            )
        
        # Calculate trend statistics
        trend_stats = self._calculate_trend_statistics(historical_data)
        
        # Check trend conditions
        conditions = config.conditions
        should_alert = False
        trigger_values = trend_stats.copy()
        
        if 'consecutive_increases' in conditions:
            if trend_stats['consecutive_increases'] >= conditions['consecutive_increases']:
                should_alert = True
                trigger_values['trigger_type'] = 'consecutive_increases'
        
        elif 'consecutive_decreases' in conditions:
            if trend_stats['consecutive_decreases'] >= conditions['consecutive_decreases']:
                should_alert = True
                trigger_values['trigger_type'] = 'consecutive_decreases'
        
        elif 'trend_strength' in conditions:
            if abs(trend_stats['trend_slope']) >= conditions['trend_strength']:
                should_alert = True
                trigger_values['trigger_type'] = 'trend_strength'
        
        if should_alert:
            title = f"Trend Alert: {indicator_code} shows significant trend"
            description = (
                f"{indicator_code} for {country_code} shows "
                f"{trigger_values['trigger_type']}: {trigger_values.get('value', 'N/A')}"
            )
        else:
            title = ""
            description = ""
        
        return AnalysisResult(
            should_alert=should_alert,
            title=title,
            description=description,
            affected_indicators=[indicator_code],
            affected_countries=[country_code],
            trigger_values=trigger_values,
            context={'analysis_type': 'trend'}
        )
    
    async def _get_recent_data(self, indicator_code: str, country_code: str) -> pd.DataFrame:
        """Get recent data for trend analysis"""
        # Mock implementation - in production would query database
        dates = pd.date_range(
            start=datetime.now() - timedelta(days=30),
            end=datetime.now(),
            freq='D'
        )
        # Create trending data for demo
        trend = np.linspace(100, 110, len(dates))
        noise = np.random.normal(0, 1, len(dates))
        values = trend + noise
        
        return pd.DataFrame({
            'timestamp': dates,
            'value': values
        })
    
    def _calculate_trend_statistics(self, data: pd.DataFrame) -> Dict[str, Any]:
        """Calculate trend statistics"""
        values = data['value'].values
        
        # Calculate consecutive increases/decreases
        differences = np.diff(values)
        
        consecutive_increases = 0
        consecutive_decreases = 0
        current_inc_streak = 0
        current_dec_streak = 0
        
        for diff in differences:
            if diff > 0:
                current_inc_streak += 1
                current_dec_streak = 0
                consecutive_increases = max(consecutive_increases, current_inc_streak)
            elif diff < 0:
                current_dec_streak += 1
                current_inc_streak = 0
                consecutive_decreases = max(consecutive_decreases, current_dec_streak)
            else:
                current_inc_streak = 0
                current_dec_streak = 0
        
        # Calculate trend slope using linear regression
        x = np.arange(len(values))
        trend_slope = np.polyfit(x, values, 1)[0]
        
        return {
            'consecutive_increases': consecutive_increases,
            'consecutive_decreases': consecutive_decreases,
            'trend_slope': trend_slope,
            'total_change': values[-1] - values[0],
            'percent_change': ((values[-1] - values[0]) / values[0]) * 100 if values[0] != 0 else 0
        }

class CorrelationAnalyzer(AlertAnalyzer):
    """Analyzer for correlation-based alerts"""
    
    async def analyze(self, config: AlertConfig, data: Dict[str, Any]) -> AnalysisResult:
        """Analyze correlation breakdowns"""
        # This would implement correlation analysis between indicators
        # For brevity, showing simplified structure
        
        return AnalysisResult(
            should_alert=False,
            title="",
            description="",
            affected_indicators=config.indicator_codes,
            affected_countries=config.countries or [],
            trigger_values={},
            context={'analysis_type': 'correlation'}
        )

class CompositeAnalyzer(AlertAnalyzer):
    """Analyzer for composite conditions"""
    
    async def analyze(self, config: AlertConfig, data: Dict[str, Any]) -> AnalysisResult:
        """Analyze composite conditions"""
        # This would implement complex multi-condition analysis
        # For brevity, showing simplified structure
        
        return AnalysisResult(
            should_alert=False,
            title="",
            description="",
            affected_indicators=config.indicator_codes,
            affected_countries=config.countries or [],
            trigger_values={},
            context={'analysis_type': 'composite'}
        )

class DataQualityAnalyzer(AlertAnalyzer):
    """Analyzer for data quality issues"""
    
    async def analyze(self, config: AlertConfig, data: Dict[str, Any]) -> AnalysisResult:
        """Analyze data quality issues"""
        indicator_code = data.get('indicator_code')
        country_code = data.get('country_code')
        value = data.get('value')
        
        quality_issues = []
        
        # Check for missing values
        if value is None:
            quality_issues.append('missing_value')
        
        # Check for unrealistic values
        if value is not None:
            conditions = config.conditions
            if 'realistic_range' in conditions:
                min_val, max_val = conditions['realistic_range']
                if value < min_val or value > max_val:
                    quality_issues.append('unrealistic_value')
        
        should_alert = len(quality_issues) > 0
        
        if should_alert:
            title = f"Data Quality Alert: {indicator_code} has quality issues"
            description = f"{indicator_code} for {country_code} has issues: {', '.join(quality_issues)}"
        else:
            title = ""
            description = ""
        
        return AnalysisResult(
            should_alert=should_alert,
            title=title,
            description=description,
            affected_indicators=[indicator_code],
            affected_countries=[country_code],
            trigger_values={'quality_issues': quality_issues, 'value': value},
            context={'analysis_type': 'data_quality'}
        )

Notification Management

Effective notification management ensures that economic alerts reach the right people at the right time through appropriate channels. The notification system must handle multiple delivery channels, respect user preferences, and adapt to different urgency levels and organizational structures.

The challenge lies in balancing immediate notification for critical economic events with avoiding alert fatigue from less important updates. The system should intelligently route notifications based on recipient roles, current availability, and the business impact of different types of economic events. During non-business hours, only the most critical alerts might trigger immediate notifications, while less urgent alerts could be batched for delivery during business hours.

Integration with existing communication systems becomes essential for organizational adoption. The notification system should support email, SMS, instant messaging platforms, and integration with incident management tools. Each channel has different characteristics in terms of delivery speed, message formatting, and user interaction capabilities that must be considered in the notification strategy.

from abc import ABC, abstractmethod
import smtplib
import ssl
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
import aiohttp
import asyncio
import json
from typing import Dict, List, Any, Optional
from dataclasses import dataclass
from datetime import datetime, timedelta
import logging

@dataclass
class NotificationChannel:
    """Configuration for notification channels"""
    channel_id: str
    channel_type: str  # 'email', 'sms', 'slack', 'webhook', 'teams'
    configuration: Dict[str, Any]
    enabled: bool = True
    rate_limit: Optional[int] = None  # Max notifications per hour

@dataclass
class RecipientProfile:
    """Recipient notification preferences"""
    user_id: str
    name: str
    email: Optional[str] = None
    phone: Optional[str] = None
    slack_user_id: Optional[str] = None
    teams_user_id: Optional[str] = None
    notification_preferences: Dict[str, Any] = None
    timezone: str = "UTC"
    quiet_hours: Optional[Dict[str, str]] = None  # {"start": "22:00", "end": "06:00"}

class NotificationManager:
    """Manages multi-channel notification delivery"""
    
    def __init__(self):
        self.channels = {}
        self.recipients = {}
        self.notification_history = []
        self.rate_limiters = {}
        
    def register_channel(self, channel: NotificationChannel):
        """Register a notification channel"""
        self.channels[channel.channel_id] = channel
        if channel.rate_limit:
            self.rate_limiters[channel.channel_id] = RateLimiter(channel.rate_limit)
        logging.info(f"Registered notification channel: {channel.channel_id}")
    
    def register_recipient(self, recipient: RecipientProfile):
        """Register a notification recipient"""
        self.recipients[recipient.user_id] = recipient
        logging.info(f"Registered recipient: {recipient.name}")
    
    async def send_notifications(self, alert_event: AlertEvent, config: AlertConfig):
        """Send notifications for an alert event"""
        notification_tasks = []
        
        for channel_id in config.notification_channels:
            if channel_id in self.channels:
                channel = self.channels[channel_id]
                
                # Check rate limits
                if channel_id in self.rate_limiters:
                    if not self.rate_limiters[channel_id].can_send():
                        logging.warning(f"Rate limit exceeded for channel {channel_id}")
                        continue
                
                # Create notification task
                task = asyncio.create_task(
                    self._send_channel_notification(alert_event, channel)
                )
                notification_tasks.append(task)
        
        # Send all notifications concurrently
        if notification_tasks:
            results = await asyncio.gather(*notification_tasks, return_exceptions=True)
            
            # Log results
            for i, result in enumerate(results):
                channel_id = config.notification_channels[i]
                if isinstance(result, Exception):
                    logging.error(f"Notification failed for {channel_id}: {result}")
                else:
                    logging.info(f"Notification sent successfully to {channel_id}")
    
    async def _send_channel_notification(self, alert_event: AlertEvent, channel: NotificationChannel):
        """Send notification through a specific channel"""
        
        if channel.channel_type == 'email':
            await self._send_email_notification(alert_event, channel)
        elif channel.channel_type == 'slack':
            await self._send_slack_notification(alert_event, channel)
        elif channel.channel_type == 'webhook':
            await self._send_webhook_notification(alert_event, channel)
        elif channel.channel_type == 'sms':
            await self._send_sms_notification(alert_event, channel)
        else:
            logging.warning(f"Unsupported channel type: {channel.channel_type}")
    
    async def _send_email_notification(self, alert_event: AlertEvent, channel: NotificationChannel):
        """Send email notification"""
        config = channel.configuration
        
        # Create message
        message = MIMEMultipart("alternative")
        message["Subject"] = f"[{alert_event.severity.value.upper()}] {alert_event.title}"
        message["From"] = config['from_email']
        message["To"] = config['to_email']
        
        # Create HTML content
        html_content = self._create_email_html(alert_event)
        html_part = MIMEText(html_content, "html")
        message.attach(html_part)
        
        # Send email
        try:
            context = ssl.create_default_context()
            with smtplib.SMTP_SSL(config['smtp_server'], config['smtp_port'], context=context) as server:
                server.login(config['smtp_username'], config['smtp_password'])
                server.sendmail(config['from_email'], config['to_email'], message.as_string())
        except Exception as e:
            logging.error(f"Email sending failed: {e}")
            raise
    
    def _create_email_html(self, alert_event: AlertEvent) -> str:
        """Create HTML content for email notification"""
        severity_colors = {
            AlertSeverity.LOW: "#28a745",
            AlertSeverity.MEDIUM: "#ffc107", 
            AlertSeverity.HIGH: "#fd7e14",
            AlertSeverity.CRITICAL: "#dc3545"
        }
        
        color = severity_colors.get(alert_event.severity, "#6c757d")
        
        html = f"""
        <html>
        <body style="font-family: Arial, sans-serif; margin: 0; padding: 20px;">
            <div style="border-left: 4px solid {color}; padding: 20px; background-color: #f8f9fa;">
                <h2 style="color: {color}; margin-top: 0;">
                    {alert_event.severity.value.upper()} ALERT: {alert_event.title}
                </h2>
                
                <p><strong>Event ID:</strong> {alert_event.event_id}</p>
                <p><strong>Timestamp:</strong> {alert_event.timestamp.strftime('%Y-%m-%d %H:%M:%S UTC')}</p>
                <p><strong>Alert Type:</strong> {alert_event.alert_type.value}</p>
                
                <h3>Description</h3>
                <p>{alert_event.description}</p>
                
                <h3>Affected Indicators</h3>
                <ul>
                    {''.join(f'<li>{indicator}</li>' for indicator in alert_event.affected_indicators)}
                </ul>
                
                <h3>Affected Countries</h3>
                <ul>
                    {''.join(f'<li>{country}</li>' for country in alert_event.affected_countries)}
                </ul>
                
                <h3>Trigger Values</h3>
                <table style="border-collapse: collapse; width: 100%;">
                    {''.join(f'''
                    <tr>
                        <td style="border: 1px solid #ddd; padding: 8px; font-weight: bold;">{key}</td>
                        <td style="border: 1px solid #ddd; padding: 8px;">{value}</td>
                    </tr>
                    ''' for key, value in alert_event.trigger_values.items())}
                </table>
                
                <p style="margin-top: 20px; font-size: 12px; color: #6c757d;">
                    This is an automated alert from the Economic Monitoring System.
                </p>
            </div>
        </body>
        </html>
        """
        
        return html
    
    async def _send_slack_notification(self, alert_event: AlertEvent, channel: NotificationChannel):
        """Send Slack notification"""
        config = channel.configuration
        webhook_url = config['webhook_url']
        
        # Create Slack message
        severity_colors = {
            AlertSeverity.LOW: "good",
            AlertSeverity.MEDIUM: "warning",
            AlertSeverity.HIGH: "#fd7e14",
            AlertSeverity.CRITICAL: "danger"
        }
        
        color = severity_colors.get(alert_event.severity, "#6c757d")
        
        slack_message = {
            "text": f"Economic Alert: {alert_event.title}",
            "attachments": [
                {
                    "color": color,
                    "title": f"{alert_event.severity.value.upper()} ALERT",
                    "text": alert_event.description,
                    "fields": [
                        {
                            "title": "Event ID",
                            "value": alert_event.event_id,
                            "short": True
                        },
                        {
                            "title": "Alert Type",
                            "value": alert_event.alert_type.value,
                            "short": True
                        },
                        {
                            "title": "Affected Indicators",
                            "value": ", ".join(alert_event.affected_indicators),
                            "short": False
                        },
                        {
                            "title": "Affected Countries",
                            "value": ", ".join(alert_event.affected_countries),
                            "short": False
                        }
                    ],
                    "footer": "Economic Monitoring System",
                    "ts": int(alert_event.timestamp.timestamp())
                }
            ]
        }
        
        # Send to Slack
        async with aiohttp.ClientSession() as session:
            async with session.post(webhook_url, json=slack_message) as response:
                if response.status != 200:
                    raise Exception(f"Slack notification failed with status {response.status}")
    
    async def _send_webhook_notification(self, alert_event: AlertEvent, channel: NotificationChannel):
        """Send webhook notification"""
        config = channel.configuration
        
        # Create webhook payload
        payload = {
            "alert_id": alert_event.alert_id,
            "event_id": alert_event.event_id,
            "timestamp": alert_event.timestamp.isoformat(),
            "severity": alert_event.severity.value,
            "alert_type": alert_event.alert_type.value,
            "title": alert_event.title,
            "description": alert_event.description,
            "affected_indicators": alert_event.affected_indicators,
            "affected_countries": alert_event.affected_countries,
            "trigger_values": alert_event.trigger_values,
            "context": alert_event.context
        }
        
        # Send webhook
        headers = config.get('headers', {})
        headers['Content-Type'] = 'application/json'
        
        async with aiohttp.ClientSession() as session:
            async with session.post(
                config['url'], 
                json=payload, 
                headers=headers,
                timeout=aiohttp.ClientTimeout(total=30)
            ) as response:
                if response.status not in [200, 201, 202]:
                    raise Exception(f"Webhook notification failed with status {response.status}")
    
    async def _send_sms_notification(self, alert_event: AlertEvent, channel: NotificationChannel):
        """Send SMS notification"""
        config = channel.configuration
        
        # Create SMS message (keep it short)
        message = (
            f"ALERT [{alert_event.severity.value.upper()}]: "
            f"{alert_event.title[:80]}... "
            f"Event ID: {alert_event.event_id}"
        )
        
        # This would integrate with SMS providers like Twilio, AWS SNS, etc.
        # For demo purposes, just log the message
        logging.info(f"SMS would be sent to {config.get('phone_number')}: {message}")

class RateLimiter:
    """Simple rate limiter for notifications"""
    
    def __init__(self, max_per_hour: int):
        self.max_per_hour = max_per_hour
        self.notifications = []
    
    def can_send(self) -> bool:
        """Check if we can send another notification"""
        now = datetime.utcnow()
        hour_ago = now - timedelta(hours=1)
        
        # Remove old notifications
        self.notifications = [ts for ts in self.notifications if ts > hour_ago]
        
        # Check if under limit
        if len(self.notifications) < self.max_per_hour:
            self.notifications.append(now)
            return True
        
        return False

class EscalationManager:
    """Manages alert escalation procedures"""
    
    def __init__(self):
        self.active_escalations = {}
        self.escalation_policies = {}
    
    def register_escalation_policy(self, policy_id: str, policy: Dict[str, Any]):
        """Register an escalation policy"""
        self.escalation_policies[policy_id] = policy
    
    async def start_escalation(self, alert_event: AlertEvent, config: AlertConfig):
        """Start escalation procedure for an alert"""
        if not config.escalation_policy:
            return
        
        policy = self.escalation_policies.get(config.escalation_policy)
        if not policy:
            logging.warning(f"Escalation policy not found: {config.escalation_policy}")
            return
        
        escalation_task = asyncio.create_task(
            self._execute_escalation(alert_event, policy)
        )
        
        self.active_escalations[alert_event.event_id] = escalation_task
    
    async def stop_escalation(self, event_id: str):
        """Stop escalation for an alert"""
        if event_id in self.active_escalations:
            escalation_task = self.active_escalations[event_id]
            escalation_task.cancel()
            del self.active_escalations[event_id]
    
    async def _execute_escalation(self, alert_event: AlertEvent, policy: Dict[str, Any]):
        """Execute escalation steps"""
        for step in policy.get('steps', []):
            try:
                # Wait for the specified delay
                await asyncio.sleep(step['delay_minutes'] * 60)
                
                # Check if alert is still active (not acknowledged/resolved)
                if alert_event.acknowledged or alert_event.resolved:
                    break
                
                # Execute escalation step
                logging.info(f"Escalating alert {alert_event.event_id} to level {step['level']}")
                
                # This would typically send notifications to higher-level recipients
                # or integrate with incident management systems
                
            except asyncio.CancelledError:
                logging.info(f"Escalation cancelled for {alert_event.event_id}")
                break
            except Exception as e:
                logging.error(f"Escalation step failed: {e}")

class AlertSuppression:
    """Manages alert suppression rules"""
    
    def __init__(self):
        self.suppression_rules = {}
        self.suppressed_alerts = {}
    
    def add_suppression_rule(self, alert_id: str, suppression_duration: timedelta):
        """Add suppression rule for an alert"""
        self.suppression_rules[alert_id] = suppression_duration
    
    def suppress_alert(self, alert_id: str):
        """Suppress an alert"""
        if alert_id in self.suppression_rules:
            duration = self.suppression_rules[alert_id]
            self.suppressed_alerts[alert_id] = datetime.utcnow() + duration
    
    def is_suppressed(self, alert_id: str) -> bool:
        """Check if an alert is currently suppressed"""
        if alert_id in self.suppressed_alerts:
            if datetime.utcnow() < self.suppressed_alerts[alert_id]:
                return True
            else:
                # Suppression expired
                del self.suppressed_alerts[alert_id]
        
        return False

class AlertMetricsCollector:
    """Collects metrics about alert system performance"""
    
    def __init__(self):
        self.metrics = {
            'alerts_triggered': 0,
            'alerts_resolved': 0,
            'alerts_acknowledged': 0,
            'false_positives': 0,
            'mean_time_to_acknowledge': timedelta(),
            'mean_time_to_resolve': timedelta()
        }
        self.alert_timings = []
    
    def record_alert(self, alert_event: AlertEvent):
        """Record alert trigger"""
        self.metrics['alerts_triggered'] += 1
        self.alert_timings.append({
            'event_id': alert_event.event_id,
            'triggered_at': alert_event.timestamp,
            'acknowledged_at': None,
            'resolved_at': None
        })
    
    def record_acknowledgment(self, alert_event: AlertEvent):
        """Record alert acknowledgment"""
        self.metrics['alerts_acknowledged'] += 1
        
        # Find timing record and update
        for timing in self.alert_timings:
            if timing['event_id'] == alert_event.event_id:
                timing['acknowledged_at'] = datetime.utcnow()
                break
    
    def record_resolution(self, alert_event: AlertEvent, resolved_by: str):
        """Record alert resolution"""
        self.metrics['alerts_resolved'] += 1
        
        # Find timing record and update
        for timing in self.alert_timings:
            if timing['event_id'] == alert_event.event_id:
                timing['resolved_at'] = datetime.utcnow()
                break
        
        # Update mean times
        self._update_mean_times()
    
    def _update_mean_times(self):
        """Update mean time metrics"""
        ack_times = []
        resolve_times = []
        
        for timing in self.alert_timings:
            if timing['acknowledged_at']:
                ack_time = timing['acknowledged_at'] - timing['triggered_at']
                ack_times.append(ack_time.total_seconds())
            
            if timing['resolved_at']:
                resolve_time = timing['resolved_at'] - timing['triggered_at']
                resolve_times.append(resolve_time.total_seconds())
        
        if ack_times:
            self.metrics['mean_time_to_acknowledge'] = timedelta(
                seconds=sum(ack_times) / len(ack_times)
            )
        
        if resolve_times:
            self.metrics['mean_time_to_resolve'] = timedelta(
                seconds=sum(resolve_times) / len(resolve_times)
            )
    
    def get_metrics_summary(self) -> Dict[str, Any]:
        """Get summary of alert metrics"""
        return {
            'total_alerts_triggered': self.metrics['alerts_triggered'],
            'total_alerts_resolved': self.metrics['alerts_resolved'],
            'total_alerts_acknowledged': self.metrics['alerts_acknowledged'],
            'resolution_rate': (
                self.metrics['alerts_resolved'] / self.metrics['alerts_triggered']
                if self.metrics['alerts_triggered'] > 0 else 0
            ),
            'acknowledgment_rate': (
                self.metrics['alerts_acknowledged'] / self.metrics['alerts_triggered']
                if self.metrics['alerts_triggered'] > 0 else 0
            ),
            'mean_time_to_acknowledge_minutes': (
                self.metrics['mean_time_to_acknowledge'].total_seconds() / 60
            ),
            'mean_time_to_resolve_minutes': (
                self.metrics['mean_time_to_resolve'].total_seconds() / 60
            )
        }

Advanced Alert Strategies

Modern economic alerting systems benefit from advanced strategies that go beyond simple threshold monitoring to provide intelligent, context-aware notifications. These strategies incorporate machine learning techniques, economic domain knowledge, and adaptive algorithms that learn from historical patterns and user feedback.

Seasonal adjustment and economic calendar integration represent critical advances in economic alerting. The system should understand that certain types of economic volatility are expected around scheduled data releases, earnings seasons, or policy announcement periods. Alerts during these periods should be adjusted to account for elevated normal volatility while still detecting truly abnormal events.

Multi-indicator correlation analysis provides another advanced alerting capability. Economic indicators rarely move in isolation, and understanding the expected relationships between different indicators enables more sophisticated anomaly detection. When unemployment rises, certain types of consumer spending typically decline. If consumer spending indicators don’t show expected correlations with employment changes, this might signal important structural changes worth alerting about.

The integration of external data sources further enhances alerting capabilities. News sentiment analysis, social media monitoring, and alternative data sources can provide early warning signals that complement traditional economic indicators. A spike in negative economic sentiment on social media might precede official economic data releases by several days or weeks.

The alerting strategies presented in this guide integrate with the broader economic data architecture described in Cloud Deployment Scaling Economic Data Systems and provide the monitoring foundation for the visualization systems covered in Economic Data Visualization Dashboard Development. These systems work together to provide comprehensive economic monitoring capabilities that support both operational decision-making and strategic planning.

For comprehensive economic alerting system implementation, explore these complementary resources:

Recent Articles