LESSON 40分

ストーリー

高橋アーキテクト
100万台のセンサーが毎秒データを送信してくる。温度、湿度、振動、位置情報…1日で860億データポイントだ
高橋アーキテクト
このデータをリアルタイムで処理し、異常を検知し、ダッシュボードに表示する。さらにデバイスへのコマンド送信も必要だ。IoTは双方向のシステムなんだ

要件の整理

interface IoTPlatformRequirements {
  functional: {
    deviceRegistration: "デバイスの登録とプロビジョニング";
    telemetry: "テレメトリデータの受信と保存";
    command: "デバイスへのコマンド送信";
    alerting: "リアルタイム異常検知とアラート";
    dashboard: "デバイス状態のリアルタイムダッシュボード";
    ota: "ファームウェアの無線アップデート(OTA)";
  };
  nonFunctional: {
    deviceCount: "100万台の同時接続デバイス";
    ingestionRate: "100万メッセージ/秒";
    latency: "テレメトリ受信から表示まで < 5秒";
    storage: "時系列データを1年間保持";
    reliability: "メッセージロスト率 < 0.01%";
    security: "デバイス認証、通信暗号化";
  };
}

通信プロトコル

const IOT_PROTOCOLS = {
  mqtt: {
    name: "MQTT",
    description: "軽量パブサブプロトコル。IoTの標準",
    qosLevels: {
      0: "At most once(最大1回、ロスト可能性あり)",
      1: "At least once(最低1回、重複可能性あり)",
      2: "Exactly once(正確に1回、オーバーヘッド大)",
    },
    useCase: "低帯域デバイス、センサーデータ",
    port: 8883, // TLS
  },
  http: {
    name: "HTTPS",
    description: "標準的なHTTPリクエスト",
    useCase: "バッチ送信、リソースに余裕のあるデバイス",
    overhead: "MQTTより高い",
  },
  coap: {
    name: "CoAP",
    description: "UDP上の軽量RESTプロトコル",
    useCase: "超低消費電力デバイス",
    overhead: "最小",
  },
};

ハイレベル設計

┌──────────────────────────────────────────────────────────┐
│                                                          │
│  [IoTデバイス群]                                           │
│  100万台                                                  │
│     │ MQTT/HTTPS                                         │
│     ▼                                                    │
│  [プロトコルゲートウェイ]                                    │
│  (MQTT Broker群 / API Gateway)                           │
│     │                                                    │
│     ▼                                                    │
│  [メッセージブローカー: Kafka]                               │
│     │                                                    │
│  ┌──┴──────────────┬─────────────────┐                   │
│  ▼                 ▼                 ▼                   │
│ [ストリーム処理]  [バッチ処理]      [コマンド]              │
│ (Flink)          (Spark)           サービス              │
│  │                │                  │                   │
│  ▼                ▼                  ▼                   │
│ [異常検知]     [時系列DB]        [デバイスシャドウ]          │
│ [アラート]     (TimescaleDB)     (DynamoDB)              │
│               [コールドストレージ]                          │
│               (S3/Glacier)                               │
│                                                          │
│  [デバイスレジストリ]  [認証サービス]  [ダッシュボード]       │
│                                                          │
└──────────────────────────────────────────────────────────┘

詳細設計

デバイスシャドウ(Digital Twin)

// デバイスシャドウ: デバイスの「あるべき状態」と「現在の状態」を管理
interface DeviceShadow {
  deviceId: string;

  // 報告済み状態(デバイスが最後に報告した状態)
  reported: {
    temperature: number;
    firmwareVersion: string;
    status: 'online' | 'offline';
    lastUpdated: Date;
  };

  // 期待状態(クラウドがデバイスに求める状態)
  desired: {
    firmwareVersion: string;
    configVersion: string;
    samplingInterval: number; // 秒
  };

  // 差分: reported と desired の差がある場合、デバイスに同期させる
  delta: Record<string, unknown>;
}

class DeviceShadowService {
  // デバイスからの状態報告
  async updateReported(deviceId: string, state: Partial<ReportedState>): Promise<void> {
    const shadow = await this.db.get(deviceId);
    shadow.reported = { ...shadow.reported, ...state, lastUpdated: new Date() };

    // delta の再計算
    shadow.delta = this.calculateDelta(shadow.desired, shadow.reported);

    await this.db.save(shadow);

    // delta があればデバイスに同期コマンドを送信
    if (Object.keys(shadow.delta).length > 0) {
      await this.commandService.send(deviceId, { type: 'SYNC', payload: shadow.delta });
    }
  }
}

テレメトリデータの処理

// 時系列データの保存戦略
class TelemetryStorage {
  // 階層化ストレージ
  tiers = {
    hot: {
      store: "TimescaleDB / InfluxDB",
      retention: "7日間",
      resolution: "生データ(1秒ごと)",
      purpose: "リアルタイムダッシュボード、異常検知",
    },
    warm: {
      store: "TimescaleDB(ダウンサンプリング済み)",
      retention: "90日間",
      resolution: "1分間隔の集約データ",
      purpose: "トレンド分析、週次レポート",
    },
    cold: {
      store: "S3 (Parquet形式)",
      retention: "1年以上",
      resolution: "1時間間隔の集約データ",
      purpose: "コンプライアンス、長期分析",
    },
  };

  // ダウンサンプリング: 高解像度データを低解像度に変換
  async downsample(data: TelemetryPoint[], interval: string): Promise<AggregatedPoint[]> {
    // 1分間のデータを avg, min, max, count に集約
    return this.aggregate(data, interval, ['avg', 'min', 'max', 'count']);
  }
}

リアルタイム異常検知

// ストリーム処理による異常検知
class AnomalyDetector {
  rules = [
    {
      name: "温度閾値超過",
      condition: (data: Telemetry) => data.temperature > 80,
      severity: "CRITICAL",
      action: "即座にアラート + デバイスシャットダウンコマンド",
    },
    {
      name: "デバイスオフライン",
      condition: (deviceId: string) => this.getLastHeartbeat(deviceId) > 300_000, // 5分
      severity: "WARNING",
      action: "運用チームに通知",
    },
    {
      name: "異常パターン検知",
      condition: (data: Telemetry[]) => {
        // 直近10分間の標準偏差が通常の3倍を超える
        const stddev = this.calculateStdDev(data.map(d => d.vibration));
        return stddev > this.normalStdDev * 3;
      },
      severity: "WARNING",
      action: "予知保全アラート",
    },
  ];
}

まとめ

ポイント内容
MQTTIoT標準の軽量パブサブプロトコル(QoSレベル選択可能)
デバイスシャドウ期待状態と報告状態を管理し差分を自動同期
階層化ストレージHot/Warm/Coldでコストと速度を最適化
異常検知ルールベース + 統計的異常検知のストリーム処理

チェックリスト

  • MQTT、HTTP、CoAPの使い分けを理解した
  • デバイスシャドウパターンの設計を把握した
  • テレメトリデータの階層化ストレージを設計できた
  • リアルタイム異常検知の仕組みを理解した

次のステップへ

次は「マルチリージョン戦略」を学びます。グローバルに展開するシステムのデータレプリケーションとレイテンシ最適化を設計しましょう。


推定読了時間: 40分