LESSON

アラートワークフローの設計

「リスク評価ができても、それが適切な人に適切なタイミングで届かなければ意味がない。」

田中VPoEが運用チームのSlackチャンネルを見せる。

「今のアラートは毎日200件以上飛んでいる。運用チームは疲弊して、本当に重要なアラートを見逃している。これを改善するのがアラートワークフローの設計だ。」

アラート疲れの問題

現状の課題

NetShop社の現行アラート状況:
├── 日間アラート数: 200件以上
├── うち本当の不正: 15件(7.5%)
├── アラート対応時間: 平均25分/件
├── 運用チーム: 3名
├── 1人あたり日間対応: 67件(物理的に不可能)
└── 結果: 重要アラートの見逃し、対応遅延

目指す姿

改善後のアラート設計:
├── 日間アラート数: 30件以下(85%削減)
├── うち本当の不正: 12件(40%以上の適合率)
├── 自動処理: LOW/MEDIUMは自動対応
├── 人間対応: HIGH/CRITICALのみ(約30件)
├── 優先度付き: 緊急度でソート
└── 結果: 重要案件に集中、対応品質向上

アラートの階層設計

3層アラートシステム

from enum import Enum
from dataclasses import dataclass, field
from datetime import datetime
from typing import Optional

class AlertSeverity(Enum):
    INFO = "info"           # 情報のみ(人間対応不要)
    WARNING = "warning"     # 注意(キューに入れるが急がない)
    CRITICAL = "critical"   # 緊急(即時対応が必要)

class AlertChannel(Enum):
    LOG_ONLY = "log_only"            # ログ記録のみ
    DASHBOARD = "dashboard"          # ダッシュボード表示
    QUEUE = "review_queue"           # レビューキュー投入
    SLACK = "slack"                  # Slack通知
    PAGER = "pager"                  # PagerDuty(即時対応)
    SMS = "sms"                      # SMS通知

@dataclass
class Alert:
    id: str
    transaction_id: str
    severity: AlertSeverity
    risk_score: float
    risk_level: str
    channels: list[AlertChannel]
    title: str
    description: str
    recommended_action: str
    created_at: datetime = field(default_factory=datetime.now)
    assigned_to: Optional[str] = None
    status: str = "open"             # open, investigating, resolved
    resolution: Optional[str] = None
    resolved_at: Optional[datetime] = None

アラートルーティング

class AlertManager:
    """アラートの生成とルーティング"""

    def create_alert(self, context, action) -> Alert:
        """リスク評価結果からアラートを生成しルーティングする"""
        severity = self._determine_severity(context)
        channels = self._determine_channels(severity, context)

        alert = Alert(
            id=self._generate_id(),
            transaction_id=context.transaction_id,
            severity=severity,
            risk_score=context.risk_score,
            risk_level=context.risk_level,
            channels=channels,
            title=self._generate_title(context),
            description=context.explanation,
            recommended_action=action.reason,
        )

        # 各チャネルにディスパッチ
        for channel in channels:
            self._dispatch(alert, channel)

        return alert

    def _determine_severity(self, context) -> AlertSeverity:
        if context.risk_level == 'critical':
            return AlertSeverity.CRITICAL
        elif context.risk_level == 'high':
            return AlertSeverity.WARNING
        else:
            return AlertSeverity.INFO

    def _determine_channels(self, severity: AlertSeverity,
                             context) -> list[AlertChannel]:
        """重要度に応じた通知チャネルを決定"""
        channel_map = {
            AlertSeverity.INFO: [
                AlertChannel.LOG_ONLY,
                AlertChannel.DASHBOARD,
            ],
            AlertSeverity.WARNING: [
                AlertChannel.DASHBOARD,
                AlertChannel.QUEUE,
                AlertChannel.SLACK,
            ],
            AlertSeverity.CRITICAL: [
                AlertChannel.DASHBOARD,
                AlertChannel.QUEUE,
                AlertChannel.SLACK,
                AlertChannel.PAGER,
            ],
        }

        channels = channel_map[severity]

        # 高額取引は常にSMS通知を追加
        if context.risk_score > 0.9 and context.amount > 100_000:
            channels.append(AlertChannel.SMS)

        return channels

    def _generate_title(self, context) -> str:
        level_labels = {
            'low': 'LOW',
            'medium': 'MEDIUM',
            'high': 'HIGH',
            'critical': 'CRITICAL',
        }
        label = level_labels.get(context.risk_level, 'UNKNOWN')
        return (
            f"[{label}] 不正疑い取引検知 "
            f"(スコア: {context.risk_score:.2f})"
        )

レビューキューの設計

優先度付きキュー

import heapq
from dataclasses import dataclass

@dataclass
class ReviewItem:
    """レビューキューのアイテム"""
    priority: float          # 低いほど優先(0が最高)
    alert: Alert
    estimated_review_time: int  # 推定レビュー時間(分)
    auto_escalation_at: datetime  # 自動エスカレーション時刻

    def __lt__(self, other):
        return self.priority < other.priority


class ReviewQueue:
    """優先度付きレビューキュー"""

    def __init__(self):
        self.queue = []
        self.stats = {
            'total_added': 0,
            'total_resolved': 0,
            'avg_resolution_time': 0,
        }

    def add(self, alert: Alert):
        """アラートをキューに追加"""
        priority = self._calculate_priority(alert)

        # エスカレーション時間の設定
        escalation_minutes = {
            AlertSeverity.CRITICAL: 15,    # 15分以内に対応
            AlertSeverity.WARNING: 60,     # 1時間以内に対応
            AlertSeverity.INFO: 480,       # 8時間以内
        }

        from datetime import timedelta
        escalation_at = datetime.now() + timedelta(
            minutes=escalation_minutes[alert.severity]
        )

        item = ReviewItem(
            priority=priority,
            alert=alert,
            estimated_review_time=self._estimate_review_time(alert),
            auto_escalation_at=escalation_at,
        )

        heapq.heappush(self.queue, item)
        self.stats['total_added'] += 1

    def get_next(self) -> ReviewItem | None:
        """次にレビューすべきアイテムを取得"""
        if self.queue:
            return heapq.heappop(self.queue)
        return None

    def _calculate_priority(self, alert: Alert) -> float:
        """優先度を計算(低いほど優先)"""
        # 基本優先度(重要度ベース)
        severity_priority = {
            AlertSeverity.CRITICAL: 0.0,
            AlertSeverity.WARNING: 0.5,
            AlertSeverity.INFO: 1.0,
        }
        base = severity_priority[alert.severity]

        # リスクスコアで微調整(高スコアほど優先)
        score_adjustment = (1.0 - alert.risk_score) * 0.1

        return base + score_adjustment

    def _estimate_review_time(self, alert: Alert) -> int:
        """レビュー所要時間を推定(分)"""
        if alert.severity == AlertSeverity.CRITICAL:
            return 10   # 迅速判断
        elif alert.severity == AlertSeverity.WARNING:
            return 20   # 詳細確認
        return 5         # 簡易確認

自動エスカレーション

class EscalationManager:
    """対応遅延時の自動エスカレーション"""

    def __init__(self, alert_manager: AlertManager):
        self.alert_manager = alert_manager
        self.escalation_rules = [
            {
                'condition': lambda a: (
                    a.severity == AlertSeverity.CRITICAL
                    and a.status == 'open'
                ),
                'timeout_minutes': 15,
                'action': self._escalate_to_manager,
            },
            {
                'condition': lambda a: (
                    a.severity == AlertSeverity.CRITICAL
                    and a.status == 'investigating'
                ),
                'timeout_minutes': 30,
                'action': self._escalate_to_director,
            },
            {
                'condition': lambda a: (
                    a.severity == AlertSeverity.WARNING
                    and a.status == 'open'
                ),
                'timeout_minutes': 60,
                'action': self._auto_assign,
            },
        ]

    def check_escalations(self):
        """定期実行: 未対応アラートのエスカレーションチェック"""
        open_alerts = self._get_open_alerts()

        for alert in open_alerts:
            for rule in self.escalation_rules:
                if rule['condition'](alert):
                    elapsed = (
                        datetime.now() - alert.created_at
                    ).total_seconds() / 60
                    if elapsed > rule['timeout_minutes']:
                        rule['action'](alert)

    def _escalate_to_manager(self, alert: Alert):
        """マネージャーへエスカレーション"""
        self.alert_manager._dispatch(
            alert, AlertChannel.PAGER
        )
        alert.assigned_to = "manager@netshop.co.jp"
        print(f"ESCALATION: Alert {alert.id} → マネージャー "
              f"({alert.severity.value}, 15分未対応)")

    def _escalate_to_director(self, alert: Alert):
        """ディレクターへエスカレーション"""
        self.alert_manager._dispatch(
            alert, AlertChannel.SMS
        )
        alert.assigned_to = "director@netshop.co.jp"

    def _auto_assign(self, alert: Alert):
        """自動アサイン"""
        available = self._get_available_reviewer()
        if available:
            alert.assigned_to = available
            alert.status = "investigating"

アラートダッシュボードの設計

class AlertDashboard:
    """運用チーム向けダッシュボード"""

    def get_summary(self) -> dict:
        """ダッシュボードのサマリー情報"""
        return {
            'realtime': {
                'open_critical': self._count_by_status(
                    'critical', 'open'
                ),
                'open_warning': self._count_by_status(
                    'warning', 'open'
                ),
                'investigating': self._count_by_status(
                    None, 'investigating'
                ),
                'avg_response_time_min': self._avg_response_time(),
            },
            'today': {
                'total_alerts': self._count_today(),
                'resolved': self._count_resolved_today(),
                'true_positives': self._count_tp_today(),
                'false_positives': self._count_fp_today(),
                'precision': self._precision_today(),
            },
            'trends': {
                'alerts_7d': self._alerts_trend(days=7),
                'precision_7d': self._precision_trend(days=7),
                'avg_resolution_time_7d': self._resolution_trend(days=7),
            },
        }

    def get_review_queue(self, limit: int = 20) -> list:
        """レビューキューの上位N件"""
        items = []
        for item in self._get_queue_items(limit):
            items.append({
                'alert_id': item.alert.id,
                'severity': item.alert.severity.value,
                'risk_score': item.alert.risk_score,
                'title': item.alert.title,
                'recommended_action': item.alert.recommended_action,
                'estimated_review_time': item.estimated_review_time,
                'time_remaining': self._time_until_escalation(item),
                'assigned_to': item.alert.assigned_to,
            })
        return items

Slack通知の設計

class SlackNotifier:
    """Slack連携通知"""

    def send_alert(self, alert: Alert):
        """構造化されたSlack通知を送信"""
        color_map = {
            AlertSeverity.CRITICAL: '#FF0000',   # 赤
            AlertSeverity.WARNING: '#FFA500',     # オレンジ
            AlertSeverity.INFO: '#36A2EB',        # 青
        }

        message = {
            'channel': self._get_channel(alert.severity),
            'attachments': [{
                'color': color_map[alert.severity],
                'title': alert.title,
                'fields': [
                    {
                        'title': 'リスクスコア',
                        'value': f'{alert.risk_score:.3f}',
                        'short': True,
                    },
                    {
                        'title': 'リスクレベル',
                        'value': alert.risk_level.upper(),
                        'short': True,
                    },
                    {
                        'title': '推奨アクション',
                        'value': alert.recommended_action,
                        'short': False,
                    },
                    {
                        'title': '詳細',
                        'value': alert.description,
                        'short': False,
                    },
                ],
                'actions': [
                    {
                        'type': 'button',
                        'text': '詳細を確認',
                        'url': f'https://dashboard.netshop.co.jp/alerts/{alert.id}',
                    },
                    {
                        'type': 'button',
                        'text': '対応開始',
                        'value': f'investigate:{alert.id}',
                    },
                ],
            }],
        }

        self._post_message(message)

    def _get_channel(self, severity: AlertSeverity) -> str:
        return {
            AlertSeverity.CRITICAL: '#fraud-critical',
            AlertSeverity.WARNING: '#fraud-alerts',
            AlertSeverity.INFO: '#fraud-log',
        }[severity]

まとめ

項目ポイント
アラート削減200件/日 → 30件/日に削減(85%減)
3層設計INFO(ログ)/WARNING(キュー)/CRITICAL(即時)
優先度キューリスクスコアと重要度で自動ソート
エスカレーション時間ベースの自動エスカレーション
ダッシュボードリアルタイム状況把握と傾向分析

チェックリスト

  • アラート疲れの問題と原因を説明できる
  • 3層アラートシステムの設計を説明できる
  • 優先度付きレビューキューの仕組みを理解した
  • 自動エスカレーションの必要性を説明できる

次のステップへ

アラートワークフローの設計を理解したところで、次は演習で実際にエージェントを構築しよう。

推定読了時間: 30分