アラートワークフローの設計
「リスク評価ができても、それが適切な人に適切なタイミングで届かなければ意味がない。」
田中VPoEが運用チームのSlackチャンネルを見せる。
「今のアラートは毎日200件以上飛んでいる。運用チームは疲弊して、本当に重要なアラートを見逃している。これを改善するのがアラートワークフローの設計だ。」
アラート疲れの問題
現状の課題
NetShop社の現行アラート状況:
├── 日間アラート数: 200件以上
├── うち本当の不正: 15件(7.5%)
├── アラート対応時間: 平均25分/件
├── 運用チーム: 3名
├── 1人あたり日間対応: 67件(物理的に不可能)
└── 結果: 重要アラートの見逃し、対応遅延
目指す姿
改善後のアラート設計:
├── 日間アラート数: 30件以下(85%削減)
├── うち本当の不正: 12件(40%以上の適合率)
├── 自動処理: LOW/MEDIUMは自動対応
├── 人間対応: HIGH/CRITICALのみ(約30件)
├── 優先度付き: 緊急度でソート
└── 結果: 重要案件に集中、対応品質向上
アラートの階層設計
3層アラートシステム
from enum import Enum
from dataclasses import dataclass, field
from datetime import datetime
from typing import Optional
class AlertSeverity(Enum):
INFO = "info" # 情報のみ(人間対応不要)
WARNING = "warning" # 注意(キューに入れるが急がない)
CRITICAL = "critical" # 緊急(即時対応が必要)
class AlertChannel(Enum):
LOG_ONLY = "log_only" # ログ記録のみ
DASHBOARD = "dashboard" # ダッシュボード表示
QUEUE = "review_queue" # レビューキュー投入
SLACK = "slack" # Slack通知
PAGER = "pager" # PagerDuty(即時対応)
SMS = "sms" # SMS通知
@dataclass
class Alert:
id: str
transaction_id: str
severity: AlertSeverity
risk_score: float
risk_level: str
channels: list[AlertChannel]
title: str
description: str
recommended_action: str
created_at: datetime = field(default_factory=datetime.now)
assigned_to: Optional[str] = None
status: str = "open" # open, investigating, resolved
resolution: Optional[str] = None
resolved_at: Optional[datetime] = None
アラートルーティング
class AlertManager:
"""アラートの生成とルーティング"""
def create_alert(self, context, action) -> Alert:
"""リスク評価結果からアラートを生成しルーティングする"""
severity = self._determine_severity(context)
channels = self._determine_channels(severity, context)
alert = Alert(
id=self._generate_id(),
transaction_id=context.transaction_id,
severity=severity,
risk_score=context.risk_score,
risk_level=context.risk_level,
channels=channels,
title=self._generate_title(context),
description=context.explanation,
recommended_action=action.reason,
)
# 各チャネルにディスパッチ
for channel in channels:
self._dispatch(alert, channel)
return alert
def _determine_severity(self, context) -> AlertSeverity:
if context.risk_level == 'critical':
return AlertSeverity.CRITICAL
elif context.risk_level == 'high':
return AlertSeverity.WARNING
else:
return AlertSeverity.INFO
def _determine_channels(self, severity: AlertSeverity,
context) -> list[AlertChannel]:
"""重要度に応じた通知チャネルを決定"""
channel_map = {
AlertSeverity.INFO: [
AlertChannel.LOG_ONLY,
AlertChannel.DASHBOARD,
],
AlertSeverity.WARNING: [
AlertChannel.DASHBOARD,
AlertChannel.QUEUE,
AlertChannel.SLACK,
],
AlertSeverity.CRITICAL: [
AlertChannel.DASHBOARD,
AlertChannel.QUEUE,
AlertChannel.SLACK,
AlertChannel.PAGER,
],
}
channels = channel_map[severity]
# 高額取引は常にSMS通知を追加
if context.risk_score > 0.9 and context.amount > 100_000:
channels.append(AlertChannel.SMS)
return channels
def _generate_title(self, context) -> str:
level_labels = {
'low': 'LOW',
'medium': 'MEDIUM',
'high': 'HIGH',
'critical': 'CRITICAL',
}
label = level_labels.get(context.risk_level, 'UNKNOWN')
return (
f"[{label}] 不正疑い取引検知 "
f"(スコア: {context.risk_score:.2f})"
)
レビューキューの設計
優先度付きキュー
import heapq
from dataclasses import dataclass
@dataclass
class ReviewItem:
"""レビューキューのアイテム"""
priority: float # 低いほど優先(0が最高)
alert: Alert
estimated_review_time: int # 推定レビュー時間(分)
auto_escalation_at: datetime # 自動エスカレーション時刻
def __lt__(self, other):
return self.priority < other.priority
class ReviewQueue:
"""優先度付きレビューキュー"""
def __init__(self):
self.queue = []
self.stats = {
'total_added': 0,
'total_resolved': 0,
'avg_resolution_time': 0,
}
def add(self, alert: Alert):
"""アラートをキューに追加"""
priority = self._calculate_priority(alert)
# エスカレーション時間の設定
escalation_minutes = {
AlertSeverity.CRITICAL: 15, # 15分以内に対応
AlertSeverity.WARNING: 60, # 1時間以内に対応
AlertSeverity.INFO: 480, # 8時間以内
}
from datetime import timedelta
escalation_at = datetime.now() + timedelta(
minutes=escalation_minutes[alert.severity]
)
item = ReviewItem(
priority=priority,
alert=alert,
estimated_review_time=self._estimate_review_time(alert),
auto_escalation_at=escalation_at,
)
heapq.heappush(self.queue, item)
self.stats['total_added'] += 1
def get_next(self) -> ReviewItem | None:
"""次にレビューすべきアイテムを取得"""
if self.queue:
return heapq.heappop(self.queue)
return None
def _calculate_priority(self, alert: Alert) -> float:
"""優先度を計算(低いほど優先)"""
# 基本優先度(重要度ベース)
severity_priority = {
AlertSeverity.CRITICAL: 0.0,
AlertSeverity.WARNING: 0.5,
AlertSeverity.INFO: 1.0,
}
base = severity_priority[alert.severity]
# リスクスコアで微調整(高スコアほど優先)
score_adjustment = (1.0 - alert.risk_score) * 0.1
return base + score_adjustment
def _estimate_review_time(self, alert: Alert) -> int:
"""レビュー所要時間を推定(分)"""
if alert.severity == AlertSeverity.CRITICAL:
return 10 # 迅速判断
elif alert.severity == AlertSeverity.WARNING:
return 20 # 詳細確認
return 5 # 簡易確認
自動エスカレーション
class EscalationManager:
"""対応遅延時の自動エスカレーション"""
def __init__(self, alert_manager: AlertManager):
self.alert_manager = alert_manager
self.escalation_rules = [
{
'condition': lambda a: (
a.severity == AlertSeverity.CRITICAL
and a.status == 'open'
),
'timeout_minutes': 15,
'action': self._escalate_to_manager,
},
{
'condition': lambda a: (
a.severity == AlertSeverity.CRITICAL
and a.status == 'investigating'
),
'timeout_minutes': 30,
'action': self._escalate_to_director,
},
{
'condition': lambda a: (
a.severity == AlertSeverity.WARNING
and a.status == 'open'
),
'timeout_minutes': 60,
'action': self._auto_assign,
},
]
def check_escalations(self):
"""定期実行: 未対応アラートのエスカレーションチェック"""
open_alerts = self._get_open_alerts()
for alert in open_alerts:
for rule in self.escalation_rules:
if rule['condition'](alert):
elapsed = (
datetime.now() - alert.created_at
).total_seconds() / 60
if elapsed > rule['timeout_minutes']:
rule['action'](alert)
def _escalate_to_manager(self, alert: Alert):
"""マネージャーへエスカレーション"""
self.alert_manager._dispatch(
alert, AlertChannel.PAGER
)
alert.assigned_to = "manager@netshop.co.jp"
print(f"ESCALATION: Alert {alert.id} → マネージャー "
f"({alert.severity.value}, 15分未対応)")
def _escalate_to_director(self, alert: Alert):
"""ディレクターへエスカレーション"""
self.alert_manager._dispatch(
alert, AlertChannel.SMS
)
alert.assigned_to = "director@netshop.co.jp"
def _auto_assign(self, alert: Alert):
"""自動アサイン"""
available = self._get_available_reviewer()
if available:
alert.assigned_to = available
alert.status = "investigating"
アラートダッシュボードの設計
class AlertDashboard:
"""運用チーム向けダッシュボード"""
def get_summary(self) -> dict:
"""ダッシュボードのサマリー情報"""
return {
'realtime': {
'open_critical': self._count_by_status(
'critical', 'open'
),
'open_warning': self._count_by_status(
'warning', 'open'
),
'investigating': self._count_by_status(
None, 'investigating'
),
'avg_response_time_min': self._avg_response_time(),
},
'today': {
'total_alerts': self._count_today(),
'resolved': self._count_resolved_today(),
'true_positives': self._count_tp_today(),
'false_positives': self._count_fp_today(),
'precision': self._precision_today(),
},
'trends': {
'alerts_7d': self._alerts_trend(days=7),
'precision_7d': self._precision_trend(days=7),
'avg_resolution_time_7d': self._resolution_trend(days=7),
},
}
def get_review_queue(self, limit: int = 20) -> list:
"""レビューキューの上位N件"""
items = []
for item in self._get_queue_items(limit):
items.append({
'alert_id': item.alert.id,
'severity': item.alert.severity.value,
'risk_score': item.alert.risk_score,
'title': item.alert.title,
'recommended_action': item.alert.recommended_action,
'estimated_review_time': item.estimated_review_time,
'time_remaining': self._time_until_escalation(item),
'assigned_to': item.alert.assigned_to,
})
return items
Slack通知の設計
class SlackNotifier:
"""Slack連携通知"""
def send_alert(self, alert: Alert):
"""構造化されたSlack通知を送信"""
color_map = {
AlertSeverity.CRITICAL: '#FF0000', # 赤
AlertSeverity.WARNING: '#FFA500', # オレンジ
AlertSeverity.INFO: '#36A2EB', # 青
}
message = {
'channel': self._get_channel(alert.severity),
'attachments': [{
'color': color_map[alert.severity],
'title': alert.title,
'fields': [
{
'title': 'リスクスコア',
'value': f'{alert.risk_score:.3f}',
'short': True,
},
{
'title': 'リスクレベル',
'value': alert.risk_level.upper(),
'short': True,
},
{
'title': '推奨アクション',
'value': alert.recommended_action,
'short': False,
},
{
'title': '詳細',
'value': alert.description,
'short': False,
},
],
'actions': [
{
'type': 'button',
'text': '詳細を確認',
'url': f'https://dashboard.netshop.co.jp/alerts/{alert.id}',
},
{
'type': 'button',
'text': '対応開始',
'value': f'investigate:{alert.id}',
},
],
}],
}
self._post_message(message)
def _get_channel(self, severity: AlertSeverity) -> str:
return {
AlertSeverity.CRITICAL: '#fraud-critical',
AlertSeverity.WARNING: '#fraud-alerts',
AlertSeverity.INFO: '#fraud-log',
}[severity]
まとめ
| 項目 | ポイント |
|---|---|
| アラート削減 | 200件/日 → 30件/日に削減(85%減) |
| 3層設計 | INFO(ログ)/WARNING(キュー)/CRITICAL(即時) |
| 優先度キュー | リスクスコアと重要度で自動ソート |
| エスカレーション | 時間ベースの自動エスカレーション |
| ダッシュボード | リアルタイム状況把握と傾向分析 |
チェックリスト
- アラート疲れの問題と原因を説明できる
- 3層アラートシステムの設計を説明できる
- 優先度付きレビューキューの仕組みを理解した
- 自動エスカレーションの必要性を説明できる
次のステップへ
アラートワークフローの設計を理解したところで、次は演習で実際にエージェントを構築しよう。
推定読了時間: 30分