6.3: Intelligent Notifications
Overview
Intelligent notifications use AI to filter, route, and escalate alerts based on severity, context, and recipient preferences. For safety-critical embedded systems development, effective notification management is essential—teams must respond quickly to critical issues while avoiding the fatigue that comes from alert floods.
This chapter covers AI-powered notification systems that learn from team behavior, reduce noise, and ensure the right information reaches the right people at the right time.
Key Terms
| Term | Definition |
|---|---|
| Notification Fatigue | Desensitization to alerts due to excessive volume, leading to missed critical issues |
| Smart Routing | AI-driven determination of optimal recipients based on expertise and availability |
| Escalation Path | Defined sequence of notifications when initial recipients don't respond |
| Digest | Batched summary of low-priority notifications delivered at scheduled intervals |
| MTTR | Mean Time To Resolve—key metric for notification effectiveness |
The Notification Problem
Modern development teams face a barrage of notifications:
| Source | Typical Daily Volume | Signal-to-Noise |
|---|---|---|
| CI/CD Pipeline | 50-200 | Medium |
| Static Analysis | 10-50 | Low |
| Code Review | 20-100 | High |
| Security Scans | 5-30 | Medium |
| Monitoring/Alerts | 20-500 | Variable |
| Issue Tracking | 30-100 | Medium |
Result: Engineers ignore most notifications, including critical ones.
Intelligent Notification Architecture
The following diagram shows how event sources (CI/CD, code review, requirements changes) feed into the AI-powered notification engine, which classifies severity and routes alerts to the appropriate stakeholders.
AI Severity Classification
Use machine learning to classify notification severity based on content analysis.
Training Data Structure
# notification_classifier.py
from sklearn.ensemble import RandomForestClassifier
from sklearn.feature_extraction.text import TfidfVectorizer
import pickle
# Sample training data (expand with real historical data)
training_data = [
# (notification_text, context_features, severity_label)
("Build failed: segmentation fault in main.c",
{"source": "ci", "branch": "main", "time_to_release": 2},
"critical"),
("MISRA violation: Rule 11.3 in gpio.c:45",
{"source": "static_analysis", "rule_type": "required", "file_changed": True},
"high"),
("New comment on PR #123",
{"source": "code_review", "is_blocking": False, "reviewer_count": 2},
"low"),
("Security vulnerability: CVE-2024-1234 in libssl",
{"source": "security", "cvss_score": 9.8, "exploitable": True},
"critical"),
("Test coverage dropped below 80%",
{"source": "ci", "branch": "feature/new-driver", "delta": -2.5},
"medium"),
]
class NotificationClassifier:
def __init__(self):
self.vectorizer = TfidfVectorizer(max_features=1000)
self.classifier = RandomForestClassifier(n_estimators=100)
self.severity_levels = ['low', 'medium', 'high', 'critical']
def train(self, data):
"""Train classifier on historical notification data."""
texts = [d[0] for d in data]
labels = [self.severity_levels.index(d[2]) for d in data]
X = self.vectorizer.fit_transform(texts)
self.classifier.fit(X, labels)
def predict(self, notification_text: str, context: dict) -> dict:
"""Predict severity for new notification."""
X = self.vectorizer.transform([notification_text])
proba = self.classifier.predict_proba(X)[0]
predicted_idx = proba.argmax()
# Apply context-based adjustments
severity = self.severity_levels[predicted_idx]
severity = self._apply_context_rules(severity, context)
return {
'severity': severity,
'confidence': float(proba[predicted_idx]),
'probabilities': dict(zip(self.severity_levels, proba.tolist()))
}
def _apply_context_rules(self, severity: str, context: dict) -> str:
"""Apply business rules to adjust AI prediction."""
# Main branch failures are always critical
if context.get('branch') == 'main' and context.get('source') == 'ci':
return 'critical'
# High CVSS scores override AI prediction
if context.get('cvss_score', 0) >= 9.0:
return 'critical'
# Close to release = escalate
if context.get('time_to_release', 999) <= 3:
idx = self.severity_levels.index(severity)
return self.severity_levels[min(idx + 1, 3)]
return severity
def save(self, path: str):
with open(path, 'wb') as f:
pickle.dump((self.vectorizer, self.classifier), f)
def load(self, path: str):
with open(path, 'rb') as f:
self.vectorizer, self.classifier = pickle.load(f)
LLM-Based Classification
For complex notifications, use an LLM for nuanced understanding:
from anthropic import Anthropic
def classify_with_llm(notification: dict) -> dict:
"""Use Claude to classify notification severity."""
client = Anthropic()
prompt = f"""Analyze this development notification and classify its severity.
**Source**: {notification['source']}
**Message**: {notification['message']}
**Context**:
- Branch: {notification.get('branch', 'unknown')}
- Time to release: {notification.get('days_to_release', 'unknown')} days
- Affected files: {notification.get('affected_files', [])}
Classify as one of: critical, high, medium, low
Respond with JSON:
{{
"severity": "<level>",
"reasoning": "<brief explanation>",
"suggested_recipients": ["<role or name>"],
"urgency_hours": <number>,
"can_batch": <true/false>
}}"""
response = client.messages.create(
model="claude-sonnet-4-6",
max_tokens=256,
messages=[{"role": "user", "content": prompt}]
)
import json
return json.loads(response.content[0].text)
Smart Routing Engine
Route notifications to the most appropriate recipients based on expertise, availability, and workload.
Expertise Mapping
# expertise_map.yaml
team_members:
- name: Alice Chen
email: alice@company.com
slack: U123ABC
expertise:
- pattern: "*.c|*.h"
domains: [drivers, hal, safety]
weight: 0.9
- pattern: "can_*.c"
domains: [can, networking]
weight: 1.0
availability:
timezone: America/Los_Angeles
working_hours: "09:00-18:00"
on_call_schedule: week_1
- name: Bob Smith
email: bob@company.com
slack: U456DEF
expertise:
- pattern: "test_*.c|*_test.py"
domains: [testing, verification]
weight: 0.95
- pattern: ".github/*|Jenkinsfile"
domains: [ci_cd, devops]
weight: 0.85
availability:
timezone: Europe/Berlin
working_hours: "08:00-17:00"
on_call_schedule: week_2
escalation_paths:
critical:
- level: 1
timeout_minutes: 15
recipients: [on_call_primary]
- level: 2
timeout_minutes: 30
recipients: [on_call_secondary, tech_lead]
- level: 3
timeout_minutes: 60
recipients: [engineering_manager, on_call_primary, on_call_secondary]
high:
- level: 1
timeout_minutes: 60
recipients: [file_owner, domain_expert]
- level: 2
timeout_minutes: 240
recipients: [tech_lead]
Routing Implementation
import yaml
from datetime import datetime, timedelta
import pytz
class SmartRouter:
def __init__(self, config_path: str):
with open(config_path) as f:
self.config = yaml.safe_load(f)
self.workload_tracker = {}
def route(self, notification: dict) -> list:
"""Determine optimal recipients for notification."""
severity = notification['severity']
affected_files = notification.get('affected_files', [])
domain = notification.get('domain', 'general')
candidates = []
# Find experts for affected files
for member in self.config['team_members']:
score = self._calculate_expertise_score(member, affected_files, domain)
if score > 0:
availability = self._check_availability(member)
workload = self._get_workload(member['email'])
candidates.append({
'member': member,
'expertise_score': score,
'available': availability['available'],
'availability_score': availability['score'],
'workload_score': 1.0 - (workload / 10.0), # Invert workload
'combined_score': score * availability['score'] * (1.0 - workload/10.0)
})
# Sort by combined score
candidates.sort(key=lambda x: x['combined_score'], reverse=True)
# Select top candidates based on severity
if severity == 'critical':
# Notify top 3 + on-call
recipients = [c['member'] for c in candidates[:3]]
recipients.append(self._get_on_call())
elif severity == 'high':
recipients = [c['member'] for c in candidates[:2]]
else:
recipients = [c['member'] for c in candidates[:1]] if candidates else []
return recipients
def _calculate_expertise_score(self, member: dict, files: list, domain: str) -> float:
"""Calculate expertise match score."""
import fnmatch
max_score = 0
for expertise in member.get('expertise', []):
pattern = expertise['pattern']
domains = expertise.get('domains', [])
weight = expertise.get('weight', 0.5)
# Check file pattern match
for file in files:
if fnmatch.fnmatch(file, pattern):
max_score = max(max_score, weight)
# Check domain match
if domain in domains:
max_score = max(max_score, weight * 0.8)
return max_score
def _check_availability(self, member: dict) -> dict:
"""Check if member is currently available."""
avail = member.get('availability', {})
tz = pytz.timezone(avail.get('timezone', 'UTC'))
now = datetime.now(tz)
# Parse working hours
start_str, end_str = avail.get('working_hours', '00:00-23:59').split('-')
start_hour = int(start_str.split(':')[0])
end_hour = int(end_str.split(':')[0])
is_working_hours = start_hour <= now.hour < end_hour
is_weekday = now.weekday() < 5
if is_working_hours and is_weekday:
return {'available': True, 'score': 1.0}
elif is_weekday:
return {'available': False, 'score': 0.5} # Same day, outside hours
else:
return {'available': False, 'score': 0.2} # Weekend
def _get_workload(self, email: str) -> int:
"""Get current notification workload for team member."""
return self.workload_tracker.get(email, 0)
def _get_on_call(self) -> dict:
"""Get current on-call engineer."""
# Simplified - real implementation would check schedule
for member in self.config['team_members']:
if member.get('availability', {}).get('on_call_schedule') == 'week_1':
return member
return self.config['team_members'][0]
Channel Selection
Choose the appropriate notification channel based on severity and urgency.
class ChannelSelector:
def __init__(self):
self.channel_priority = {
'critical': ['pagerduty', 'sms', 'slack_dm', 'email'],
'high': ['slack_dm', 'slack_channel', 'email'],
'medium': ['slack_channel', 'email'],
'low': ['digest', 'slack_channel']
}
def select_channels(self, notification: dict, recipient: dict) -> list:
"""Select notification channels based on context."""
severity = notification['severity']
channels = []
# Get candidate channels for severity
candidates = self.channel_priority.get(severity, ['email'])
# Check recipient preferences
prefs = recipient.get('notification_preferences', {})
for channel in candidates:
if self._channel_available(channel, recipient, notification):
channels.append(channel)
# For critical, use multiple channels
if severity != 'critical':
break
return channels if channels else ['email']
def _channel_available(self, channel: str, recipient: dict, notification: dict) -> bool:
"""Check if channel is available and appropriate."""
# Check quiet hours for non-critical
if notification['severity'] not in ['critical', 'high']:
if self._is_quiet_hours(recipient):
if channel in ['sms', 'pagerduty', 'slack_dm']:
return False
# Check if recipient has channel configured
channel_map = {
'slack_dm': 'slack',
'slack_channel': 'slack',
'email': 'email',
'sms': 'phone',
'pagerduty': 'pagerduty_id'
}
required_field = channel_map.get(channel)
return required_field is None or recipient.get(required_field) is not None
def _is_quiet_hours(self, recipient: dict) -> bool:
"""Check if it's quiet hours for recipient."""
import pytz
from datetime import datetime
tz_name = recipient.get('availability', {}).get('timezone', 'UTC')
tz = pytz.timezone(tz_name)
now = datetime.now(tz)
quiet_start = 22 # 10 PM
quiet_end = 7 # 7 AM
return now.hour >= quiet_start or now.hour < quiet_end
Escalation Engine
Automatically escalate unacknowledged notifications.
import asyncio
from datetime import datetime, timedelta
from typing import Optional
class EscalationEngine:
def __init__(self, router: SmartRouter, notifier):
self.router = router
self.notifier = notifier
self.active_incidents = {}
async def track_notification(self, notification_id: str, notification: dict):
"""Track notification and escalate if not acknowledged."""
severity = notification['severity']
escalation_path = self.router.config['escalation_paths'].get(severity, [])
self.active_incidents[notification_id] = {
'notification': notification,
'created_at': datetime.utcnow(),
'acknowledged': False,
'current_level': 0,
'escalation_path': escalation_path
}
# Start escalation timer
asyncio.create_task(self._escalation_loop(notification_id))
async def acknowledge(self, notification_id: str, user: str):
"""Mark notification as acknowledged."""
if notification_id in self.active_incidents:
self.active_incidents[notification_id]['acknowledged'] = True
self.active_incidents[notification_id]['acknowledged_by'] = user
self.active_incidents[notification_id]['acknowledged_at'] = datetime.utcnow()
async def _escalation_loop(self, notification_id: str):
"""Background task to handle escalation."""
while notification_id in self.active_incidents:
incident = self.active_incidents[notification_id]
if incident['acknowledged']:
# Clean up after acknowledgment
del self.active_incidents[notification_id]
break
path = incident['escalation_path']
level = incident['current_level']
if level >= len(path):
# Max escalation reached
await self._notify_max_escalation(notification_id, incident)
break
escalation = path[level]
timeout = escalation['timeout_minutes']
# Wait for timeout
await asyncio.sleep(timeout * 60)
# Check again if acknowledged
if self.active_incidents.get(notification_id, {}).get('acknowledged'):
break
# Escalate
await self._escalate(notification_id, incident, escalation)
self.active_incidents[notification_id]['current_level'] += 1
async def _escalate(self, notification_id: str, incident: dict, escalation: dict):
"""Send escalated notification."""
recipients = self._resolve_recipients(escalation['recipients'])
for recipient in recipients:
await self.notifier.send(
recipient=recipient,
notification={
**incident['notification'],
'escalation_level': escalation,
'original_notification_id': notification_id,
'time_unacknowledged': (datetime.utcnow() - incident['created_at']).seconds
},
channel='slack_dm' # Escalations always go to DM
)
def _resolve_recipients(self, recipient_refs: list) -> list:
"""Resolve recipient references to actual users."""
resolved = []
for ref in recipient_refs:
if ref == 'on_call_primary':
resolved.append(self.router._get_on_call())
elif ref == 'tech_lead':
# Look up tech lead
for member in self.router.config['team_members']:
if 'tech_lead' in member.get('roles', []):
resolved.append(member)
else:
# Direct name reference
for member in self.router.config['team_members']:
if member['name'] == ref:
resolved.append(member)
return resolved
Digest System
Batch low-priority notifications into periodic digests.
from collections import defaultdict
from datetime import datetime, time
import asyncio
class DigestManager:
def __init__(self, notifier):
self.notifier = notifier
self.queues = defaultdict(list) # recipient -> [notifications]
self.digest_schedules = {
'daily': time(9, 0), # 9 AM
'weekly': (0, time(9, 0)) # Monday 9 AM
}
def queue(self, recipient: str, notification: dict):
"""Add notification to digest queue."""
self.queues[recipient].append({
'notification': notification,
'queued_at': datetime.utcnow()
})
async def send_digest(self, recipient: str, digest_type: str = 'daily'):
"""Send digest to recipient."""
if recipient not in self.queues or not self.queues[recipient]:
return
notifications = self.queues[recipient]
# Group by source
by_source = defaultdict(list)
for item in notifications:
source = item['notification'].get('source', 'other')
by_source[source].append(item['notification'])
# Format digest
digest = self._format_digest(by_source, digest_type)
# Send
await self.notifier.send_email(
to=recipient,
subject=f"Development Digest - {datetime.now().strftime('%Y-%m-%d')}",
body=digest
)
# Clear queue
self.queues[recipient] = []
def _format_digest(self, by_source: dict, digest_type: str) -> str:
"""Format digest content."""
lines = [f"# {digest_type.title()} Development Digest\n"]
total = sum(len(v) for v in by_source.values())
lines.append(f"**{total} notifications** from the past "
f"{'24 hours' if digest_type == 'daily' else 'week'}\n")
for source, notifications in sorted(by_source.items()):
lines.append(f"\n## {source.replace('_', ' ').title()} ({len(notifications)})\n")
# Show top 5 per source
for n in notifications[:5]:
severity = n.get('severity', 'info')
emoji = {'critical': '🔴', 'high': '🟠', 'medium': '🟡', 'low': '🟢'}.get(severity, '⚪')
lines.append(f"- {emoji} {n.get('message', 'No message')[:80]}")
if len(notifications) > 5:
lines.append(f"- ... and {len(notifications) - 5} more")
lines.append("\n---\n*Manage preferences: [Settings](https://notifications.internal/settings)*")
return "\n".join(lines)
Slack Integration
Complete Slack notification implementation with interactive buttons.
from slack_sdk import WebClient
from slack_sdk.errors import SlackApiError
class SlackNotifier:
def __init__(self, token: str):
self.client = WebClient(token=token)
self.channel_map = {
'ci_cd': '#ci-alerts',
'security': '#security-alerts',
'general': '#dev-notifications'
}
def send_notification(self, notification: dict, recipient: dict, channel_type: str):
"""Send notification to Slack."""
severity = notification['severity']
# Build message blocks
blocks = self._build_blocks(notification)
# Determine target
if channel_type == 'slack_dm':
target = recipient['slack']
self._send_dm(target, blocks, severity)
else:
channel = self.channel_map.get(notification.get('domain', 'general'), '#dev-notifications')
self._send_channel(channel, blocks, severity)
def _build_blocks(self, notification: dict) -> list:
"""Build Slack block kit message."""
severity = notification['severity']
emoji = {'critical': ':red_circle:', 'high': ':large_orange_circle:',
'medium': ':large_yellow_circle:', 'low': ':white_circle:'}.get(severity, ':white_circle:')
blocks = [
{
"type": "header",
"text": {
"type": "plain_text",
"text": f"{emoji} {severity.upper()}: {notification['source']}"
}
},
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": notification['message']
}
}
]
# Add context
if 'affected_files' in notification:
files = notification['affected_files'][:3]
blocks.append({
"type": "context",
"elements": [
{"type": "mrkdwn", "text": f"📁 Files: `{', '.join(files)}`"}
]
})
# Add action buttons for critical/high
if severity in ['critical', 'high']:
blocks.append({
"type": "actions",
"block_id": f"actions_{notification['id']}",
"elements": [
{
"type": "button",
"text": {"type": "plain_text", "text": "✅ Acknowledge"},
"style": "primary",
"action_id": "acknowledge",
"value": notification['id']
},
{
"type": "button",
"text": {"type": "plain_text", "text": "🔇 Snooze 1h"},
"action_id": "snooze",
"value": f"{notification['id']}:60"
},
{
"type": "button",
"text": {"type": "plain_text", "text": "🔗 View Details"},
"action_id": "view_details",
"url": notification.get('url', '#')
}
]
})
return blocks
def _send_dm(self, user_id: str, blocks: list, severity: str):
"""Send direct message."""
try:
# Open DM channel
response = self.client.conversations_open(users=[user_id])
channel_id = response['channel']['id']
# Send message
self.client.chat_postMessage(
channel=channel_id,
blocks=blocks,
text=f"{severity.upper()} notification" # Fallback
)
except SlackApiError as e:
print(f"Slack error: {e.response['error']}")
def _send_channel(self, channel: str, blocks: list, severity: str):
"""Send to channel."""
try:
self.client.chat_postMessage(
channel=channel,
blocks=blocks,
text=f"{severity.upper()} notification"
)
except SlackApiError as e:
print(f"Slack error: {e.response['error']}")
Metrics and Feedback Loop
Track notification effectiveness to continuously improve.
Key Metrics
| Metric | Target | Description |
|---|---|---|
| MTTA (Acknowledge) | < 15 min (critical) | Time from notification to acknowledgment |
| MTTR (Resolve) | < 2 hours (critical) | Time from notification to resolution |
| False Positive Rate | < 10% | Notifications marked "not relevant" |
| Notification Volume | Decreasing | Total notifications per engineer per day |
| Escalation Rate | < 5% | Percentage requiring escalation |
Feedback Collection
class NotificationFeedback:
def __init__(self, db):
self.db = db
async def record_action(self, notification_id: str, action: str, user: str,
metadata: dict = None):
"""Record user action on notification."""
await self.db.execute("""
INSERT INTO notification_actions
(notification_id, action, user_id, timestamp, metadata)
VALUES (?, ?, ?, ?, ?)
""", (notification_id, action, user, datetime.utcnow(), json.dumps(metadata or {})))
async def mark_false_positive(self, notification_id: str, user: str, reason: str):
"""Mark notification as false positive for model training."""
await self.record_action(notification_id, 'false_positive', user, {'reason': reason})
# Trigger model retraining if threshold reached
fp_count = await self._get_recent_fp_count()
if fp_count >= 100:
await self._trigger_model_retrain()
async def get_metrics(self, days: int = 7) -> dict:
"""Get notification metrics for dashboard."""
return {
'total_sent': await self._count_notifications(days),
'acknowledged': await self._count_by_action('acknowledge', days),
'escalated': await self._count_by_action('escalate', days),
'false_positives': await self._count_by_action('false_positive', days),
'avg_mtta_minutes': await self._avg_time_to_action('acknowledge', days),
'by_severity': await self._group_by_severity(days),
'by_source': await self._group_by_source(days)
}
n8n Workflow Integration
Create notification workflows in n8n for low-code configuration.
{
"name": "Intelligent CI Notification",
"nodes": [
{
"name": "Webhook Trigger",
"type": "n8n-nodes-base.webhook",
"parameters": {
"path": "ci-notification",
"method": "POST"
}
},
{
"name": "Classify Severity",
"type": "n8n-nodes-base.httpRequest",
"parameters": {
"url": "http://classifier:8080/classify",
"method": "POST",
"body": "={{ JSON.stringify($json) }}"
}
},
{
"name": "Route Decision",
"type": "n8n-nodes-base.switch",
"parameters": {
"rules": [
{
"value": "critical",
"output": 0
},
{
"value": "high",
"output": 1
},
{
"default": 2
}
],
"dataPropertyName": "severity"
}
},
{
"name": "PagerDuty Alert",
"type": "n8n-nodes-base.pagerDuty",
"parameters": {
"operation": "create",
"title": "={{ $json.message }}",
"severity": "critical"
}
},
{
"name": "Slack DM",
"type": "n8n-nodes-base.slack",
"parameters": {
"operation": "sendMessage",
"channel": "={{ $json.recipient.slack }}",
"text": "={{ $json.message }}"
}
},
{
"name": "Queue for Digest",
"type": "n8n-nodes-base.redis",
"parameters": {
"operation": "push",
"list": "digest:{{ $json.recipient.email }}"
}
}
]
}
Best Practices Summary
- Classify Before Routing: Use AI to determine severity and urgency before selecting recipients
- Respect Quiet Hours: Non-critical notifications should wait for working hours
- Enable Self-Service: Let engineers customize their notification preferences
- Provide Actionable Buttons: Acknowledge, snooze, and view details should be one click
- Batch When Possible: Aggregate low-priority notifications into digests
- Track and Tune: Monitor MTTA/MTTR and false positive rates, retrain models regularly
- Escalate Automatically: Don't rely on humans to remember to escalate
- Preserve Context: Include enough information to act without clicking through
Summary
Intelligent notifications transform alert chaos into actionable information:
| Feature | Benefit |
|---|---|
| AI Severity Classification | Prioritize what matters |
| Smart Routing | Right person, right time |
| Channel Selection | Appropriate urgency |
| Escalation Engine | Nothing falls through cracks |
| Digest System | Reduce notification fatigue |
| Feedback Loop | Continuous improvement |
Effective notification systems enable fast response to critical issues while preserving engineer focus and wellbeing.