ストーリー
要件の整理
interface IoTPlatformRequirements {
functional: {
deviceRegistration: "デバイスの登録とプロビジョニング";
telemetry: "テレメトリデータの受信と保存";
command: "デバイスへのコマンド送信";
alerting: "リアルタイム異常検知とアラート";
dashboard: "デバイス状態のリアルタイムダッシュボード";
ota: "ファームウェアの無線アップデート(OTA)";
};
nonFunctional: {
deviceCount: "100万台の同時接続デバイス";
ingestionRate: "100万メッセージ/秒";
latency: "テレメトリ受信から表示まで < 5秒";
storage: "時系列データを1年間保持";
reliability: "メッセージロスト率 < 0.01%";
security: "デバイス認証、通信暗号化";
};
}
通信プロトコル
const IOT_PROTOCOLS = {
mqtt: {
name: "MQTT",
description: "軽量パブサブプロトコル。IoTの標準",
qosLevels: {
0: "At most once(最大1回、ロスト可能性あり)",
1: "At least once(最低1回、重複可能性あり)",
2: "Exactly once(正確に1回、オーバーヘッド大)",
},
useCase: "低帯域デバイス、センサーデータ",
port: 8883, // TLS
},
http: {
name: "HTTPS",
description: "標準的なHTTPリクエスト",
useCase: "バッチ送信、リソースに余裕のあるデバイス",
overhead: "MQTTより高い",
},
coap: {
name: "CoAP",
description: "UDP上の軽量RESTプロトコル",
useCase: "超低消費電力デバイス",
overhead: "最小",
},
};
ハイレベル設計
┌──────────────────────────────────────────────────────────┐
│ │
│ [IoTデバイス群] │
│ 100万台 │
│ │ MQTT/HTTPS │
│ ▼ │
│ [プロトコルゲートウェイ] │
│ (MQTT Broker群 / API Gateway) │
│ │ │
│ ▼ │
│ [メッセージブローカー: Kafka] │
│ │ │
│ ┌──┴──────────────┬─────────────────┐ │
│ ▼ ▼ ▼ │
│ [ストリーム処理] [バッチ処理] [コマンド] │
│ (Flink) (Spark) サービス │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ [異常検知] [時系列DB] [デバイスシャドウ] │
│ [アラート] (TimescaleDB) (DynamoDB) │
│ [コールドストレージ] │
│ (S3/Glacier) │
│ │
│ [デバイスレジストリ] [認証サービス] [ダッシュボード] │
│ │
└──────────────────────────────────────────────────────────┘
詳細設計
デバイスシャドウ(Digital Twin)
// デバイスシャドウ: デバイスの「あるべき状態」と「現在の状態」を管理
interface DeviceShadow {
deviceId: string;
// 報告済み状態(デバイスが最後に報告した状態)
reported: {
temperature: number;
firmwareVersion: string;
status: 'online' | 'offline';
lastUpdated: Date;
};
// 期待状態(クラウドがデバイスに求める状態)
desired: {
firmwareVersion: string;
configVersion: string;
samplingInterval: number; // 秒
};
// 差分: reported と desired の差がある場合、デバイスに同期させる
delta: Record<string, unknown>;
}
class DeviceShadowService {
// デバイスからの状態報告
async updateReported(deviceId: string, state: Partial<ReportedState>): Promise<void> {
const shadow = await this.db.get(deviceId);
shadow.reported = { ...shadow.reported, ...state, lastUpdated: new Date() };
// delta の再計算
shadow.delta = this.calculateDelta(shadow.desired, shadow.reported);
await this.db.save(shadow);
// delta があればデバイスに同期コマンドを送信
if (Object.keys(shadow.delta).length > 0) {
await this.commandService.send(deviceId, { type: 'SYNC', payload: shadow.delta });
}
}
}
テレメトリデータの処理
// 時系列データの保存戦略
class TelemetryStorage {
// 階層化ストレージ
tiers = {
hot: {
store: "TimescaleDB / InfluxDB",
retention: "7日間",
resolution: "生データ(1秒ごと)",
purpose: "リアルタイムダッシュボード、異常検知",
},
warm: {
store: "TimescaleDB(ダウンサンプリング済み)",
retention: "90日間",
resolution: "1分間隔の集約データ",
purpose: "トレンド分析、週次レポート",
},
cold: {
store: "S3 (Parquet形式)",
retention: "1年以上",
resolution: "1時間間隔の集約データ",
purpose: "コンプライアンス、長期分析",
},
};
// ダウンサンプリング: 高解像度データを低解像度に変換
async downsample(data: TelemetryPoint[], interval: string): Promise<AggregatedPoint[]> {
// 1分間のデータを avg, min, max, count に集約
return this.aggregate(data, interval, ['avg', 'min', 'max', 'count']);
}
}
リアルタイム異常検知
// ストリーム処理による異常検知
class AnomalyDetector {
rules = [
{
name: "温度閾値超過",
condition: (data: Telemetry) => data.temperature > 80,
severity: "CRITICAL",
action: "即座にアラート + デバイスシャットダウンコマンド",
},
{
name: "デバイスオフライン",
condition: (deviceId: string) => this.getLastHeartbeat(deviceId) > 300_000, // 5分
severity: "WARNING",
action: "運用チームに通知",
},
{
name: "異常パターン検知",
condition: (data: Telemetry[]) => {
// 直近10分間の標準偏差が通常の3倍を超える
const stddev = this.calculateStdDev(data.map(d => d.vibration));
return stddev > this.normalStdDev * 3;
},
severity: "WARNING",
action: "予知保全アラート",
},
];
}
まとめ
| ポイント | 内容 |
|---|---|
| MQTT | IoT標準の軽量パブサブプロトコル(QoSレベル選択可能) |
| デバイスシャドウ | 期待状態と報告状態を管理し差分を自動同期 |
| 階層化ストレージ | Hot/Warm/Coldでコストと速度を最適化 |
| 異常検知 | ルールベース + 統計的異常検知のストリーム処理 |
チェックリスト
- MQTT、HTTP、CoAPの使い分けを理解した
- デバイスシャドウパターンの設計を把握した
- テレメトリデータの階層化ストレージを設計できた
- リアルタイム異常検知の仕組みを理解した
次のステップへ
次は「マルチリージョン戦略」を学びます。グローバルに展開するシステムのデータレプリケーションとレイテンシ最適化を設計しましょう。
推定読了時間: 40分