Introduction
Economic indicator monitoring requires sophisticated alerting systems that can distinguish between normal market fluctuations and significant economic events requiring immediate attention. Unlike typical application monitoring that focuses on system health metrics, economic alerting must understand the contextual significance of data changes, account for seasonal patterns, and adapt to changing economic conditions.
The challenge lies in creating alert systems that are sensitive enough to catch important economic developments early, yet robust enough to avoid alert fatigue from normal market volatility. Economic indicators exhibit complex temporal patterns, correlation structures, and regime changes that simple threshold-based alerting cannot effectively handle. Modern economic monitoring systems must incorporate statistical models, machine learning techniques, and domain expertise to provide actionable intelligence.
Regulatory and compliance requirements add another dimension to economic alerting systems. Financial institutions must monitor specific indicators for regulatory reporting, while central banks need real-time awareness of economic conditions that might influence policy decisions. The alerting system must provide audit trails, configurable escalation procedures, and integration with existing operational workflows.
This guide builds upon the data processing capabilities from Real-Time Data Processing Economic Indicators and leverages the storage patterns from Database Integration for Economic Data Storage. The monitoring patterns presented here complement the data quality frameworks from Data Quality Practices for Economic Datasets.
Alert Framework Architecture
A comprehensive economic alerting system requires a layered architecture that separates data ingestion, analysis, alert generation, and notification delivery. This separation enables different components to evolve independently and allows for sophisticated alert logic without impacting data collection or delivery performance.
The analysis layer represents the heart of the alerting system, where raw economic data is transformed into actionable insights. This layer must handle multiple types of analysis simultaneously - statistical anomaly detection, threshold monitoring, trend analysis, and cross-indicator correlation analysis. Each analysis type contributes to the overall assessment of economic conditions and potential alert generation.
The alert orchestration layer manages the complex decision-making process that determines when and how to escalate economic events. This layer must consider alert priorities, recipient preferences, escalation schedules, and the broader context of current economic conditions. During periods of high market volatility, the system might adjust alert thresholds or consolidate related alerts to prevent overwhelming recipients.
from abc import ABC, abstractmethod
from typing import Dict, List, Any, Optional, Callable, Union
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from enum import Enum
import asyncio
import logging
import pandas as pd
import numpy as np
from concurrent.futures import ThreadPoolExecutor
import json
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
import aiohttp
import websockets
class AlertSeverity(Enum):
LOW = "low"
MEDIUM = "medium"
HIGH = "high"
CRITICAL = "critical"
class AlertType(Enum):
THRESHOLD = "threshold"
ANOMALY = "anomaly"
TREND = "trend"
CORRELATION = "correlation"
COMPOSITE = "composite"
DATA_QUALITY = "data_quality"
@dataclass
class AlertConfig:
"""Configuration for economic indicator alerts"""
alert_id: str
name: str
description: str
alert_type: AlertType
severity: AlertSeverity
indicator_codes: List[str]
countries: Optional[List[str]] = None
conditions: Dict[str, Any] = field(default_factory=dict)
enabled: bool = True
notification_channels: List[str] = field(default_factory=list)
escalation_policy: Optional[str] = None
metadata: Dict[str, Any] = field(default_factory=dict)
@dataclass
class AlertEvent:
"""Economic alert event"""
alert_id: str
event_id: str
timestamp: datetime
severity: AlertSeverity
alert_type: AlertType
title: str
description: str
affected_indicators: List[str]
affected_countries: List[str]
trigger_values: Dict[str, Any]
context: Dict[str, Any]
resolved: bool = False
acknowledged: bool = False
acknowledged_by: Optional[str] = None
class EconomicAlertFramework:
"""Comprehensive framework for economic indicator alerting"""
def __init__(self):
self.alert_configs = {}
self.active_alerts = {}
self.alert_history = []
self.analyzers = self._initialize_analyzers()
self.notification_manager = NotificationManager()
self.escalation_manager = EscalationManager()
self.alert_suppression = AlertSuppression()
self.metrics_collector = AlertMetricsCollector()
def _initialize_analyzers(self) -> Dict[AlertType, 'AlertAnalyzer']:
"""Initialize alert analyzers for different types"""
return {
AlertType.THRESHOLD: ThresholdAnalyzer(),
AlertType.ANOMALY: AnomalyAnalyzer(),
AlertType.TREND: TrendAnalyzer(),
AlertType.CORRELATION: CorrelationAnalyzer(),
AlertType.COMPOSITE: CompositeAnalyzer(),
AlertType.DATA_QUALITY: DataQualityAnalyzer()
}
def register_alert(self, config: AlertConfig):
"""Register a new alert configuration"""
self.alert_configs[config.alert_id] = config
logging.info(f"Registered alert: {config.name} ({config.alert_id})")
async def process_data_update(self, data: Dict[str, Any]):
"""Process incoming data and check for alerts"""
alert_tasks = []
for alert_id, config in self.alert_configs.items():
if not config.enabled:
continue
# Check if this data update is relevant to the alert
if self._is_relevant_data(data, config):
analyzer = self.analyzers[config.alert_type]
task = asyncio.create_task(
self._evaluate_alert(alert_id, config, analyzer, data)
)
alert_tasks.append(task)
# Process all relevant alerts concurrently
if alert_tasks:
alert_results = await asyncio.gather(*alert_tasks, return_exceptions=True)
# Handle alert results
for i, result in enumerate(alert_results):
if isinstance(result, Exception):
logging.error(f"Alert evaluation failed: {result}")
elif result: # Alert was triggered
await self._handle_alert_trigger(result)
def _is_relevant_data(self, data: Dict[str, Any], config: AlertConfig) -> bool:
"""Check if data update is relevant to alert configuration"""
data_indicator = data.get('indicator_code')
data_country = data.get('country_code')
# Check indicator relevance
if data_indicator not in config.indicator_codes:
return False
# Check country relevance
if config.countries and data_country not in config.countries:
return False
return True
async def _evaluate_alert(self, alert_id: str, config: AlertConfig,
analyzer: 'AlertAnalyzer', data: Dict[str, Any]) -> Optional[AlertEvent]:
"""Evaluate a specific alert against new data"""
try:
# Check if alert is currently suppressed
if self.alert_suppression.is_suppressed(alert_id):
return None
# Perform analysis
analysis_result = await analyzer.analyze(config, data)
if analysis_result.should_alert:
# Create alert event
alert_event = AlertEvent(
alert_id=alert_id,
event_id=f"{alert_id}_{datetime.utcnow().isoformat()}",
timestamp=datetime.utcnow(),
severity=config.severity,
alert_type=config.alert_type,
title=analysis_result.title,
description=analysis_result.description,
affected_indicators=analysis_result.affected_indicators,
affected_countries=analysis_result.affected_countries,
trigger_values=analysis_result.trigger_values,
context=analysis_result.context
)
return alert_event
return None
except Exception as e:
logging.error(f"Alert evaluation failed for {alert_id}: {e}")
return None
async def _handle_alert_trigger(self, alert_event: AlertEvent):
"""Handle triggered alert event"""
# Store alert
self.active_alerts[alert_event.event_id] = alert_event
self.alert_history.append(alert_event)
# Record metrics
self.metrics_collector.record_alert(alert_event)
# Send notifications
config = self.alert_configs[alert_event.alert_id]
await self.notification_manager.send_notifications(alert_event, config)
# Handle escalation if configured
if config.escalation_policy:
await self.escalation_manager.start_escalation(alert_event, config)
logging.info(f"Alert triggered: {alert_event.title} ({alert_event.event_id})")
async def acknowledge_alert(self, event_id: str, acknowledged_by: str) -> bool:
"""Acknowledge an active alert"""
if event_id in self.active_alerts:
alert = self.active_alerts[event_id]
alert.acknowledged = True
alert.acknowledged_by = acknowledged_by
# Stop escalation
config = self.alert_configs[alert.alert_id]
if config.escalation_policy:
await self.escalation_manager.stop_escalation(event_id)
logging.info(f"Alert acknowledged: {event_id} by {acknowledged_by}")
return True
return False
async def resolve_alert(self, event_id: str, resolved_by: str) -> bool:
"""Resolve an active alert"""
if event_id in self.active_alerts:
alert = self.active_alerts[event_id]
alert.resolved = True
# Remove from active alerts
del self.active_alerts[event_id]
# Stop escalation
config = self.alert_configs[alert.alert_id]
if config.escalation_policy:
await self.escalation_manager.stop_escalation(event_id)
# Record resolution metrics
self.metrics_collector.record_resolution(alert, resolved_by)
logging.info(f"Alert resolved: {event_id} by {resolved_by}")
return True
return False
def get_alert_summary(self) -> Dict[str, Any]:
"""Get summary of current alert status"""
active_by_severity = {}
for severity in AlertSeverity:
active_by_severity[severity.value] = len([
alert for alert in self.active_alerts.values()
if alert.severity == severity
])
return {
"total_active_alerts": len(self.active_alerts),
"active_by_severity": active_by_severity,
"total_configurations": len(self.alert_configs),
"enabled_configurations": len([
config for config in self.alert_configs.values() if config.enabled
]),
"recent_alerts_24h": len([
alert for alert in self.alert_history
if alert.timestamp > datetime.utcnow() - timedelta(hours=24)
])
}
@dataclass
class AnalysisResult:
"""Result of alert analysis"""
should_alert: bool
title: str
description: str
affected_indicators: List[str]
affected_countries: List[str]
trigger_values: Dict[str, Any]
context: Dict[str, Any]
class AlertAnalyzer(ABC):
"""Abstract base class for alert analyzers"""
@abstractmethod
async def analyze(self, config: AlertConfig, data: Dict[str, Any]) -> AnalysisResult:
"""Analyze data against alert configuration"""
pass
class ThresholdAnalyzer(AlertAnalyzer):
"""Analyzer for threshold-based alerts"""
async def analyze(self, config: AlertConfig, data: Dict[str, Any]) -> AnalysisResult:
"""Analyze threshold conditions"""
conditions = config.conditions
value = data.get('value')
indicator_code = data.get('indicator_code')
country_code = data.get('country_code')
should_alert = False
trigger_values = {}
# Check upper threshold
if 'upper_threshold' in conditions and value > conditions['upper_threshold']:
should_alert = True
trigger_values['threshold_type'] = 'upper'
trigger_values['threshold_value'] = conditions['upper_threshold']
trigger_values['actual_value'] = value
# Check lower threshold
elif 'lower_threshold' in conditions and value < conditions['lower_threshold']:
should_alert = True
trigger_values['threshold_type'] = 'lower'
trigger_values['threshold_value'] = conditions['lower_threshold']
trigger_values['actual_value'] = value
if should_alert:
title = f"Threshold Alert: {indicator_code} exceeded {trigger_values['threshold_type']} threshold"
description = (
f"{indicator_code} for {country_code} reached {value}, "
f"which exceeds the {trigger_values['threshold_type']} threshold of "
f"{trigger_values['threshold_value']}"
)
else:
title = ""
description = ""
return AnalysisResult(
should_alert=should_alert,
title=title,
description=description,
affected_indicators=[indicator_code],
affected_countries=[country_code],
trigger_values=trigger_values,
context={'analysis_type': 'threshold'}
)
class AnomalyAnalyzer(AlertAnalyzer):
"""Analyzer for statistical anomaly detection"""
def __init__(self):
self.historical_data_cache = {}
self.model_cache = {}
async def analyze(self, config: AlertConfig, data: Dict[str, Any]) -> AnalysisResult:
"""Analyze for statistical anomalies"""
indicator_code = data.get('indicator_code')
country_code = data.get('country_code')
value = data.get('value')
timestamp = data.get('timestamp')
# Get historical data for context
historical_data = await self._get_historical_data(indicator_code, country_code)
if len(historical_data) < 30: # Need minimum data for anomaly detection
return AnalysisResult(
should_alert=False,
title="",
description="",
affected_indicators=[indicator_code],
affected_countries=[country_code],
trigger_values={},
context={'analysis_type': 'anomaly', 'insufficient_data': True}
)
# Calculate statistical measures
mean = historical_data['value'].mean()
std = historical_data['value'].std()
z_score = abs(value - mean) / std if std > 0 else 0
# Determine if anomalous
anomaly_threshold = config.conditions.get('z_score_threshold', 3.0)
should_alert = z_score > anomaly_threshold
trigger_values = {
'z_score': z_score,
'historical_mean': mean,
'historical_std': std,
'anomaly_threshold': anomaly_threshold,
'actual_value': value
}
if should_alert:
title = f"Anomaly Alert: {indicator_code} shows unusual deviation"
description = (
f"{indicator_code} for {country_code} reached {value}, "
f"which is {z_score:.2f} standard deviations from the historical mean "
f"of {mean:.2f} (threshold: {anomaly_threshold})"
)
else:
title = ""
description = ""
return AnalysisResult(
should_alert=should_alert,
title=title,
description=description,
affected_indicators=[indicator_code],
affected_countries=[country_code],
trigger_values=trigger_values,
context={'analysis_type': 'anomaly'}
)
async def _get_historical_data(self, indicator_code: str, country_code: str) -> pd.DataFrame:
"""Get historical data for anomaly analysis"""
# This would typically query the database
# For demo purposes, return sample data
cache_key = f"{indicator_code}_{country_code}"
if cache_key not in self.historical_data_cache:
# In production, this would query your database
dates = pd.date_range(
start=datetime.now() - timedelta(days=365),
end=datetime.now(),
freq='D'
)
values = np.random.normal(100, 10, len(dates))
self.historical_data_cache[cache_key] = pd.DataFrame({
'timestamp': dates,
'value': values
})
return self.historical_data_cache[cache_key]
class TrendAnalyzer(AlertAnalyzer):
"""Analyzer for trend-based alerts"""
async def analyze(self, config: AlertConfig, data: Dict[str, Any]) -> AnalysisResult:
"""Analyze trend patterns"""
indicator_code = data.get('indicator_code')
country_code = data.get('country_code')
# Get recent historical data for trend analysis
historical_data = await self._get_recent_data(indicator_code, country_code)
if len(historical_data) < 10: # Need minimum data for trend analysis
return AnalysisResult(
should_alert=False,
title="",
description="",
affected_indicators=[indicator_code],
affected_countries=[country_code],
trigger_values={},
context={'analysis_type': 'trend', 'insufficient_data': True}
)
# Calculate trend statistics
trend_stats = self._calculate_trend_statistics(historical_data)
# Check trend conditions
conditions = config.conditions
should_alert = False
trigger_values = trend_stats.copy()
if 'consecutive_increases' in conditions:
if trend_stats['consecutive_increases'] >= conditions['consecutive_increases']:
should_alert = True
trigger_values['trigger_type'] = 'consecutive_increases'
elif 'consecutive_decreases' in conditions:
if trend_stats['consecutive_decreases'] >= conditions['consecutive_decreases']:
should_alert = True
trigger_values['trigger_type'] = 'consecutive_decreases'
elif 'trend_strength' in conditions:
if abs(trend_stats['trend_slope']) >= conditions['trend_strength']:
should_alert = True
trigger_values['trigger_type'] = 'trend_strength'
if should_alert:
title = f"Trend Alert: {indicator_code} shows significant trend"
description = (
f"{indicator_code} for {country_code} shows "
f"{trigger_values['trigger_type']}: {trigger_values.get('value', 'N/A')}"
)
else:
title = ""
description = ""
return AnalysisResult(
should_alert=should_alert,
title=title,
description=description,
affected_indicators=[indicator_code],
affected_countries=[country_code],
trigger_values=trigger_values,
context={'analysis_type': 'trend'}
)
async def _get_recent_data(self, indicator_code: str, country_code: str) -> pd.DataFrame:
"""Get recent data for trend analysis"""
# Mock implementation - in production would query database
dates = pd.date_range(
start=datetime.now() - timedelta(days=30),
end=datetime.now(),
freq='D'
)
# Create trending data for demo
trend = np.linspace(100, 110, len(dates))
noise = np.random.normal(0, 1, len(dates))
values = trend + noise
return pd.DataFrame({
'timestamp': dates,
'value': values
})
def _calculate_trend_statistics(self, data: pd.DataFrame) -> Dict[str, Any]:
"""Calculate trend statistics"""
values = data['value'].values
# Calculate consecutive increases/decreases
differences = np.diff(values)
consecutive_increases = 0
consecutive_decreases = 0
current_inc_streak = 0
current_dec_streak = 0
for diff in differences:
if diff > 0:
current_inc_streak += 1
current_dec_streak = 0
consecutive_increases = max(consecutive_increases, current_inc_streak)
elif diff < 0:
current_dec_streak += 1
current_inc_streak = 0
consecutive_decreases = max(consecutive_decreases, current_dec_streak)
else:
current_inc_streak = 0
current_dec_streak = 0
# Calculate trend slope using linear regression
x = np.arange(len(values))
trend_slope = np.polyfit(x, values, 1)[0]
return {
'consecutive_increases': consecutive_increases,
'consecutive_decreases': consecutive_decreases,
'trend_slope': trend_slope,
'total_change': values[-1] - values[0],
'percent_change': ((values[-1] - values[0]) / values[0]) * 100 if values[0] != 0 else 0
}
class CorrelationAnalyzer(AlertAnalyzer):
"""Analyzer for correlation-based alerts"""
async def analyze(self, config: AlertConfig, data: Dict[str, Any]) -> AnalysisResult:
"""Analyze correlation breakdowns"""
# This would implement correlation analysis between indicators
# For brevity, showing simplified structure
return AnalysisResult(
should_alert=False,
title="",
description="",
affected_indicators=config.indicator_codes,
affected_countries=config.countries or [],
trigger_values={},
context={'analysis_type': 'correlation'}
)
class CompositeAnalyzer(AlertAnalyzer):
"""Analyzer for composite conditions"""
async def analyze(self, config: AlertConfig, data: Dict[str, Any]) -> AnalysisResult:
"""Analyze composite conditions"""
# This would implement complex multi-condition analysis
# For brevity, showing simplified structure
return AnalysisResult(
should_alert=False,
title="",
description="",
affected_indicators=config.indicator_codes,
affected_countries=config.countries or [],
trigger_values={},
context={'analysis_type': 'composite'}
)
class DataQualityAnalyzer(AlertAnalyzer):
"""Analyzer for data quality issues"""
async def analyze(self, config: AlertConfig, data: Dict[str, Any]) -> AnalysisResult:
"""Analyze data quality issues"""
indicator_code = data.get('indicator_code')
country_code = data.get('country_code')
value = data.get('value')
quality_issues = []
# Check for missing values
if value is None:
quality_issues.append('missing_value')
# Check for unrealistic values
if value is not None:
conditions = config.conditions
if 'realistic_range' in conditions:
min_val, max_val = conditions['realistic_range']
if value < min_val or value > max_val:
quality_issues.append('unrealistic_value')
should_alert = len(quality_issues) > 0
if should_alert:
title = f"Data Quality Alert: {indicator_code} has quality issues"
description = f"{indicator_code} for {country_code} has issues: {', '.join(quality_issues)}"
else:
title = ""
description = ""
return AnalysisResult(
should_alert=should_alert,
title=title,
description=description,
affected_indicators=[indicator_code],
affected_countries=[country_code],
trigger_values={'quality_issues': quality_issues, 'value': value},
context={'analysis_type': 'data_quality'}
)
Notification Management
Effective notification management ensures that economic alerts reach the right people at the right time through appropriate channels. The notification system must handle multiple delivery channels, respect user preferences, and adapt to different urgency levels and organizational structures.
The challenge lies in balancing immediate notification for critical economic events with avoiding alert fatigue from less important updates. The system should intelligently route notifications based on recipient roles, current availability, and the business impact of different types of economic events. During non-business hours, only the most critical alerts might trigger immediate notifications, while less urgent alerts could be batched for delivery during business hours.
Integration with existing communication systems becomes essential for organizational adoption. The notification system should support email, SMS, instant messaging platforms, and integration with incident management tools. Each channel has different characteristics in terms of delivery speed, message formatting, and user interaction capabilities that must be considered in the notification strategy.
from abc import ABC, abstractmethod
import smtplib
import ssl
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
import aiohttp
import asyncio
import json
from typing import Dict, List, Any, Optional
from dataclasses import dataclass
from datetime import datetime, timedelta
import logging
@dataclass
class NotificationChannel:
"""Configuration for notification channels"""
channel_id: str
channel_type: str # 'email', 'sms', 'slack', 'webhook', 'teams'
configuration: Dict[str, Any]
enabled: bool = True
rate_limit: Optional[int] = None # Max notifications per hour
@dataclass
class RecipientProfile:
"""Recipient notification preferences"""
user_id: str
name: str
email: Optional[str] = None
phone: Optional[str] = None
slack_user_id: Optional[str] = None
teams_user_id: Optional[str] = None
notification_preferences: Dict[str, Any] = None
timezone: str = "UTC"
quiet_hours: Optional[Dict[str, str]] = None # {"start": "22:00", "end": "06:00"}
class NotificationManager:
"""Manages multi-channel notification delivery"""
def __init__(self):
self.channels = {}
self.recipients = {}
self.notification_history = []
self.rate_limiters = {}
def register_channel(self, channel: NotificationChannel):
"""Register a notification channel"""
self.channels[channel.channel_id] = channel
if channel.rate_limit:
self.rate_limiters[channel.channel_id] = RateLimiter(channel.rate_limit)
logging.info(f"Registered notification channel: {channel.channel_id}")
def register_recipient(self, recipient: RecipientProfile):
"""Register a notification recipient"""
self.recipients[recipient.user_id] = recipient
logging.info(f"Registered recipient: {recipient.name}")
async def send_notifications(self, alert_event: AlertEvent, config: AlertConfig):
"""Send notifications for an alert event"""
notification_tasks = []
for channel_id in config.notification_channels:
if channel_id in self.channels:
channel = self.channels[channel_id]
# Check rate limits
if channel_id in self.rate_limiters:
if not self.rate_limiters[channel_id].can_send():
logging.warning(f"Rate limit exceeded for channel {channel_id}")
continue
# Create notification task
task = asyncio.create_task(
self._send_channel_notification(alert_event, channel)
)
notification_tasks.append(task)
# Send all notifications concurrently
if notification_tasks:
results = await asyncio.gather(*notification_tasks, return_exceptions=True)
# Log results
for i, result in enumerate(results):
channel_id = config.notification_channels[i]
if isinstance(result, Exception):
logging.error(f"Notification failed for {channel_id}: {result}")
else:
logging.info(f"Notification sent successfully to {channel_id}")
async def _send_channel_notification(self, alert_event: AlertEvent, channel: NotificationChannel):
"""Send notification through a specific channel"""
if channel.channel_type == 'email':
await self._send_email_notification(alert_event, channel)
elif channel.channel_type == 'slack':
await self._send_slack_notification(alert_event, channel)
elif channel.channel_type == 'webhook':
await self._send_webhook_notification(alert_event, channel)
elif channel.channel_type == 'sms':
await self._send_sms_notification(alert_event, channel)
else:
logging.warning(f"Unsupported channel type: {channel.channel_type}")
async def _send_email_notification(self, alert_event: AlertEvent, channel: NotificationChannel):
"""Send email notification"""
config = channel.configuration
# Create message
message = MIMEMultipart("alternative")
message["Subject"] = f"[{alert_event.severity.value.upper()}] {alert_event.title}"
message["From"] = config['from_email']
message["To"] = config['to_email']
# Create HTML content
html_content = self._create_email_html(alert_event)
html_part = MIMEText(html_content, "html")
message.attach(html_part)
# Send email
try:
context = ssl.create_default_context()
with smtplib.SMTP_SSL(config['smtp_server'], config['smtp_port'], context=context) as server:
server.login(config['smtp_username'], config['smtp_password'])
server.sendmail(config['from_email'], config['to_email'], message.as_string())
except Exception as e:
logging.error(f"Email sending failed: {e}")
raise
def _create_email_html(self, alert_event: AlertEvent) -> str:
"""Create HTML content for email notification"""
severity_colors = {
AlertSeverity.LOW: "#28a745",
AlertSeverity.MEDIUM: "#ffc107",
AlertSeverity.HIGH: "#fd7e14",
AlertSeverity.CRITICAL: "#dc3545"
}
color = severity_colors.get(alert_event.severity, "#6c757d")
html = f"""
<html>
<body style="font-family: Arial, sans-serif; margin: 0; padding: 20px;">
<div style="border-left: 4px solid {color}; padding: 20px; background-color: #f8f9fa;">
<h2 style="color: {color}; margin-top: 0;">
{alert_event.severity.value.upper()} ALERT: {alert_event.title}
</h2>
<p><strong>Event ID:</strong> {alert_event.event_id}</p>
<p><strong>Timestamp:</strong> {alert_event.timestamp.strftime('%Y-%m-%d %H:%M:%S UTC')}</p>
<p><strong>Alert Type:</strong> {alert_event.alert_type.value}</p>
<h3>Description</h3>
<p>{alert_event.description}</p>
<h3>Affected Indicators</h3>
<ul>
{''.join(f'<li>{indicator}</li>' for indicator in alert_event.affected_indicators)}
</ul>
<h3>Affected Countries</h3>
<ul>
{''.join(f'<li>{country}</li>' for country in alert_event.affected_countries)}
</ul>
<h3>Trigger Values</h3>
<table style="border-collapse: collapse; width: 100%;">
{''.join(f'''
<tr>
<td style="border: 1px solid #ddd; padding: 8px; font-weight: bold;">{key}</td>
<td style="border: 1px solid #ddd; padding: 8px;">{value}</td>
</tr>
''' for key, value in alert_event.trigger_values.items())}
</table>
<p style="margin-top: 20px; font-size: 12px; color: #6c757d;">
This is an automated alert from the Economic Monitoring System.
</p>
</div>
</body>
</html>
"""
return html
async def _send_slack_notification(self, alert_event: AlertEvent, channel: NotificationChannel):
"""Send Slack notification"""
config = channel.configuration
webhook_url = config['webhook_url']
# Create Slack message
severity_colors = {
AlertSeverity.LOW: "good",
AlertSeverity.MEDIUM: "warning",
AlertSeverity.HIGH: "#fd7e14",
AlertSeverity.CRITICAL: "danger"
}
color = severity_colors.get(alert_event.severity, "#6c757d")
slack_message = {
"text": f"Economic Alert: {alert_event.title}",
"attachments": [
{
"color": color,
"title": f"{alert_event.severity.value.upper()} ALERT",
"text": alert_event.description,
"fields": [
{
"title": "Event ID",
"value": alert_event.event_id,
"short": True
},
{
"title": "Alert Type",
"value": alert_event.alert_type.value,
"short": True
},
{
"title": "Affected Indicators",
"value": ", ".join(alert_event.affected_indicators),
"short": False
},
{
"title": "Affected Countries",
"value": ", ".join(alert_event.affected_countries),
"short": False
}
],
"footer": "Economic Monitoring System",
"ts": int(alert_event.timestamp.timestamp())
}
]
}
# Send to Slack
async with aiohttp.ClientSession() as session:
async with session.post(webhook_url, json=slack_message) as response:
if response.status != 200:
raise Exception(f"Slack notification failed with status {response.status}")
async def _send_webhook_notification(self, alert_event: AlertEvent, channel: NotificationChannel):
"""Send webhook notification"""
config = channel.configuration
# Create webhook payload
payload = {
"alert_id": alert_event.alert_id,
"event_id": alert_event.event_id,
"timestamp": alert_event.timestamp.isoformat(),
"severity": alert_event.severity.value,
"alert_type": alert_event.alert_type.value,
"title": alert_event.title,
"description": alert_event.description,
"affected_indicators": alert_event.affected_indicators,
"affected_countries": alert_event.affected_countries,
"trigger_values": alert_event.trigger_values,
"context": alert_event.context
}
# Send webhook
headers = config.get('headers', {})
headers['Content-Type'] = 'application/json'
async with aiohttp.ClientSession() as session:
async with session.post(
config['url'],
json=payload,
headers=headers,
timeout=aiohttp.ClientTimeout(total=30)
) as response:
if response.status not in [200, 201, 202]:
raise Exception(f"Webhook notification failed with status {response.status}")
async def _send_sms_notification(self, alert_event: AlertEvent, channel: NotificationChannel):
"""Send SMS notification"""
config = channel.configuration
# Create SMS message (keep it short)
message = (
f"ALERT [{alert_event.severity.value.upper()}]: "
f"{alert_event.title[:80]}... "
f"Event ID: {alert_event.event_id}"
)
# This would integrate with SMS providers like Twilio, AWS SNS, etc.
# For demo purposes, just log the message
logging.info(f"SMS would be sent to {config.get('phone_number')}: {message}")
class RateLimiter:
"""Simple rate limiter for notifications"""
def __init__(self, max_per_hour: int):
self.max_per_hour = max_per_hour
self.notifications = []
def can_send(self) -> bool:
"""Check if we can send another notification"""
now = datetime.utcnow()
hour_ago = now - timedelta(hours=1)
# Remove old notifications
self.notifications = [ts for ts in self.notifications if ts > hour_ago]
# Check if under limit
if len(self.notifications) < self.max_per_hour:
self.notifications.append(now)
return True
return False
class EscalationManager:
"""Manages alert escalation procedures"""
def __init__(self):
self.active_escalations = {}
self.escalation_policies = {}
def register_escalation_policy(self, policy_id: str, policy: Dict[str, Any]):
"""Register an escalation policy"""
self.escalation_policies[policy_id] = policy
async def start_escalation(self, alert_event: AlertEvent, config: AlertConfig):
"""Start escalation procedure for an alert"""
if not config.escalation_policy:
return
policy = self.escalation_policies.get(config.escalation_policy)
if not policy:
logging.warning(f"Escalation policy not found: {config.escalation_policy}")
return
escalation_task = asyncio.create_task(
self._execute_escalation(alert_event, policy)
)
self.active_escalations[alert_event.event_id] = escalation_task
async def stop_escalation(self, event_id: str):
"""Stop escalation for an alert"""
if event_id in self.active_escalations:
escalation_task = self.active_escalations[event_id]
escalation_task.cancel()
del self.active_escalations[event_id]
async def _execute_escalation(self, alert_event: AlertEvent, policy: Dict[str, Any]):
"""Execute escalation steps"""
for step in policy.get('steps', []):
try:
# Wait for the specified delay
await asyncio.sleep(step['delay_minutes'] * 60)
# Check if alert is still active (not acknowledged/resolved)
if alert_event.acknowledged or alert_event.resolved:
break
# Execute escalation step
logging.info(f"Escalating alert {alert_event.event_id} to level {step['level']}")
# This would typically send notifications to higher-level recipients
# or integrate with incident management systems
except asyncio.CancelledError:
logging.info(f"Escalation cancelled for {alert_event.event_id}")
break
except Exception as e:
logging.error(f"Escalation step failed: {e}")
class AlertSuppression:
"""Manages alert suppression rules"""
def __init__(self):
self.suppression_rules = {}
self.suppressed_alerts = {}
def add_suppression_rule(self, alert_id: str, suppression_duration: timedelta):
"""Add suppression rule for an alert"""
self.suppression_rules[alert_id] = suppression_duration
def suppress_alert(self, alert_id: str):
"""Suppress an alert"""
if alert_id in self.suppression_rules:
duration = self.suppression_rules[alert_id]
self.suppressed_alerts[alert_id] = datetime.utcnow() + duration
def is_suppressed(self, alert_id: str) -> bool:
"""Check if an alert is currently suppressed"""
if alert_id in self.suppressed_alerts:
if datetime.utcnow() < self.suppressed_alerts[alert_id]:
return True
else:
# Suppression expired
del self.suppressed_alerts[alert_id]
return False
class AlertMetricsCollector:
"""Collects metrics about alert system performance"""
def __init__(self):
self.metrics = {
'alerts_triggered': 0,
'alerts_resolved': 0,
'alerts_acknowledged': 0,
'false_positives': 0,
'mean_time_to_acknowledge': timedelta(),
'mean_time_to_resolve': timedelta()
}
self.alert_timings = []
def record_alert(self, alert_event: AlertEvent):
"""Record alert trigger"""
self.metrics['alerts_triggered'] += 1
self.alert_timings.append({
'event_id': alert_event.event_id,
'triggered_at': alert_event.timestamp,
'acknowledged_at': None,
'resolved_at': None
})
def record_acknowledgment(self, alert_event: AlertEvent):
"""Record alert acknowledgment"""
self.metrics['alerts_acknowledged'] += 1
# Find timing record and update
for timing in self.alert_timings:
if timing['event_id'] == alert_event.event_id:
timing['acknowledged_at'] = datetime.utcnow()
break
def record_resolution(self, alert_event: AlertEvent, resolved_by: str):
"""Record alert resolution"""
self.metrics['alerts_resolved'] += 1
# Find timing record and update
for timing in self.alert_timings:
if timing['event_id'] == alert_event.event_id:
timing['resolved_at'] = datetime.utcnow()
break
# Update mean times
self._update_mean_times()
def _update_mean_times(self):
"""Update mean time metrics"""
ack_times = []
resolve_times = []
for timing in self.alert_timings:
if timing['acknowledged_at']:
ack_time = timing['acknowledged_at'] - timing['triggered_at']
ack_times.append(ack_time.total_seconds())
if timing['resolved_at']:
resolve_time = timing['resolved_at'] - timing['triggered_at']
resolve_times.append(resolve_time.total_seconds())
if ack_times:
self.metrics['mean_time_to_acknowledge'] = timedelta(
seconds=sum(ack_times) / len(ack_times)
)
if resolve_times:
self.metrics['mean_time_to_resolve'] = timedelta(
seconds=sum(resolve_times) / len(resolve_times)
)
def get_metrics_summary(self) -> Dict[str, Any]:
"""Get summary of alert metrics"""
return {
'total_alerts_triggered': self.metrics['alerts_triggered'],
'total_alerts_resolved': self.metrics['alerts_resolved'],
'total_alerts_acknowledged': self.metrics['alerts_acknowledged'],
'resolution_rate': (
self.metrics['alerts_resolved'] / self.metrics['alerts_triggered']
if self.metrics['alerts_triggered'] > 0 else 0
),
'acknowledgment_rate': (
self.metrics['alerts_acknowledged'] / self.metrics['alerts_triggered']
if self.metrics['alerts_triggered'] > 0 else 0
),
'mean_time_to_acknowledge_minutes': (
self.metrics['mean_time_to_acknowledge'].total_seconds() / 60
),
'mean_time_to_resolve_minutes': (
self.metrics['mean_time_to_resolve'].total_seconds() / 60
)
}
Advanced Alert Strategies
Modern economic alerting systems benefit from advanced strategies that go beyond simple threshold monitoring to provide intelligent, context-aware notifications. These strategies incorporate machine learning techniques, economic domain knowledge, and adaptive algorithms that learn from historical patterns and user feedback.
Seasonal adjustment and economic calendar integration represent critical advances in economic alerting. The system should understand that certain types of economic volatility are expected around scheduled data releases, earnings seasons, or policy announcement periods. Alerts during these periods should be adjusted to account for elevated normal volatility while still detecting truly abnormal events.
Multi-indicator correlation analysis provides another advanced alerting capability. Economic indicators rarely move in isolation, and understanding the expected relationships between different indicators enables more sophisticated anomaly detection. When unemployment rises, certain types of consumer spending typically decline. If consumer spending indicators don’t show expected correlations with employment changes, this might signal important structural changes worth alerting about.
The integration of external data sources further enhances alerting capabilities. News sentiment analysis, social media monitoring, and alternative data sources can provide early warning signals that complement traditional economic indicators. A spike in negative economic sentiment on social media might precede official economic data releases by several days or weeks.
The alerting strategies presented in this guide integrate with the broader economic data architecture described in Cloud Deployment Scaling Economic Data Systems and provide the monitoring foundation for the visualization systems covered in Economic Data Visualization Dashboard Development. These systems work together to provide comprehensive economic monitoring capabilities that support both operational decision-making and strategic planning.
Related Guides
For comprehensive economic alerting system implementation, explore these complementary resources:
- Real-Time Data Processing Economic Indicators - Build streaming systems that feed alerting frameworks
- Database Integration for Economic Data Storage - Store historical data needed for alert analysis
- Data Quality Practices for Economic Datasets - Implement quality alerts and monitoring
- Machine Learning Applications Economic Data Analysis - Use ML techniques for advanced anomaly detection
- Economic Data Visualization Dashboard Development - Visualize alert data and system metrics
- Cloud Deployment Scaling Economic Data Systems - Deploy alerting systems at scale
- API Integration for Economic Data Sources - Integrate data sources with alerting systems