ストーリー
佐
佐藤CTO
従来のバッチDWHでは、昨日のデータを今日分析する。だがビジネスは”今起きていること”をリアルタイムに分析したがっている
佐
佐藤CTO
リアルタイムOLAPエンジン — ClickHouse、Apache Druid、Apache Pinot。これらは数十億行のデータに対して秒以下のクエリ応答を実現する。マテリアライズドビューの活用と合わせて学ぼう
リアルタイムOLAPの必要性
| ユースケース | 要件 | 従来のDWH | リアルタイムOLAP |
|---|
| リアルタイムダッシュボード | 秒単位の更新 | 困難 | 最適 |
| 異常検知 | ミリ秒応答 | 不可 | 最適 |
| ユーザー行動分析 | 大量イベントの即時集計 | 分〜時間 | 秒以下 |
| A/Bテスト結果 | 実験中のリアルタイム確認 | 日次 | 秒 |
ClickHouse
基本構造
-- ClickHouse: カラム指向、超高速分析エンジン
-- テーブル作成: MergeTree エンジン
CREATE TABLE events (
event_id UUID DEFAULT generateUUIDv4(),
user_id String,
event_type LowCardinality(String), -- 低カーディナリティ最適化
page_url String,
session_id String,
device_type LowCardinality(String),
country LowCardinality(String),
amount Decimal64(2),
event_time DateTime64(3),
event_date Date MATERIALIZED toDate(event_time)
)
ENGINE = MergeTree()
PARTITION BY toYYYYMM(event_date) -- 月次パーティション
ORDER BY (event_type, user_id, event_time) -- ソートキー(クエリの主要フィルタ)
TTL event_date + INTERVAL 1 YEAR -- 1年後に自動削除
SETTINGS index_granularity = 8192;
-- Kafka からのリアルタイム取り込み
CREATE TABLE events_kafka (
event_id String,
user_id String,
event_type String,
page_url String,
amount Float64,
event_time String
)
ENGINE = Kafka()
SETTINGS
kafka_broker_list = 'kafka:9092',
kafka_topic_list = 'user-events',
kafka_group_name = 'clickhouse-consumer',
kafka_format = 'JSONEachRow',
kafka_num_consumers = 4;
-- Kafka → MergeTree のマテリアライズドビュー(自動パイプライン)
CREATE MATERIALIZED VIEW events_consumer TO events AS
SELECT
toUUID(event_id) AS event_id,
user_id,
event_type,
page_url,
'' AS session_id,
'' AS device_type,
'' AS country,
toDecimal64(amount, 2) AS amount,
parseDateTimeBestEffort(event_time) AS event_time
FROM events_kafka;
ClickHouse のクエリ性能
-- 10億行のイベントに対するリアルタイムクエリ(応答: <1秒)
-- 直近1時間のイベント種別集計
SELECT
event_type,
count() AS event_count,
uniqExact(user_id) AS unique_users,
avg(amount) AS avg_amount
FROM events
WHERE event_time >= now() - INTERVAL 1 HOUR
GROUP BY event_type
ORDER BY event_count DESC;
-- 分単位のファネル分析
SELECT
toStartOfMinute(event_time) AS minute,
countIf(event_type = 'page_view') AS views,
countIf(event_type = 'cart_add') AS cart_adds,
countIf(event_type = 'purchase') AS purchases,
round(countIf(event_type = 'purchase') /
countIf(event_type = 'cart_add') * 100, 2) AS conversion_rate
FROM events
WHERE event_date = today()
GROUP BY minute
ORDER BY minute DESC
LIMIT 60;
-- ウィンドウ関数: ユーザーごとのセッション分析
SELECT
user_id,
session_id,
min(event_time) AS session_start,
max(event_time) AS session_end,
dateDiff('second', min(event_time), max(event_time)) AS session_seconds,
count() AS event_count,
groupArray(event_type) AS event_sequence
FROM events
WHERE event_date = today()
GROUP BY user_id, session_id
HAVING session_seconds > 0
ORDER BY session_seconds DESC
LIMIT 100;
Apache Druid / Apache Pinot
Druid: リアルタイムインジェスト + OLAP
// Druid: データスキーマ定義(Ingestion Spec)
{
"type": "kafka",
"dataSchema": {
"dataSource": "user_events",
"timestampSpec": {
"column": "event_time",
"format": "iso"
},
"dimensionsSpec": {
"dimensions": [
"user_id",
"event_type",
"page_url",
"device_type",
"country"
]
},
"metricsSpec": [
{ "type": "count", "name": "event_count" },
{ "type": "doubleSum", "name": "total_amount", "fieldName": "amount" },
{
"type": "HLLSketchBuild",
"name": "unique_users",
"fieldName": "user_id"
}
],
"granularitySpec": {
"type": "uniform",
"segmentGranularity": "HOUR",
"queryGranularity": "MINUTE",
"rollup": true
}
},
"ioConfig": {
"type": "kafka",
"consumerProperties": {
"bootstrap.servers": "kafka:9092"
},
"topic": "user-events",
"useEarliestOffset": true,
"taskDuration": "PT1H"
}
}
リアルタイムOLAPエンジン比較
| 機能 | ClickHouse | Druid | Pinot |
|---|
| クエリ言語 | SQL(完全対応) | SQL(限定的) | SQL(限定的) |
| JOINサポート | Yes(制限あり) | 限定的 | 限定的 |
| リアルタイム取込 | Kafka Engine | ネイティブ | ネイティブ |
| 事前集約 | マテリアライズドビュー | Rollup | Star-tree Index |
| 学習コスト | 低い(SQL互換) | 中程度 | 中程度 |
| 運用難易度 | 中程度 | 高い | 高い |
| 適切な規模 | 小〜大規模 | 大規模 | 大規模 |
マテリアライズドビューの活用
-- PostgreSQL のマテリアライズドビュー
CREATE MATERIALIZED VIEW mv_hourly_revenue
AS
SELECT
date_trunc('hour', order_date) AS hour,
category,
region,
COUNT(*) AS order_count,
SUM(total_amount) AS total_revenue,
AVG(total_amount) AS avg_order_value,
COUNT(DISTINCT customer_id) AS unique_customers
FROM orders
WHERE status = 'completed'
GROUP BY 1, 2, 3
WITH DATA;
-- ユニークインデックスで高速リフレッシュ
CREATE UNIQUE INDEX ON mv_hourly_revenue (hour, category, region);
-- 定期リフレッシュ(他のクエリをブロックしない)
REFRESH MATERIALIZED VIEW CONCURRENTLY mv_hourly_revenue;
// マテリアライズドビューの自動リフレッシュ戦略
interface MaterializedViewConfig {
name: string;
refreshStrategy: 'periodic' | 'event-driven' | 'lazy';
refreshInterval?: string; // periodic の場合
triggerEvents?: string[]; // event-driven の場合
staleTolerance?: string; // lazy の場合(読み取り時にチェック)
}
const mvConfigs: MaterializedViewConfig[] = [
{
// リアルタイムダッシュボード用: 5分ごと
name: 'mv_realtime_metrics',
refreshStrategy: 'periodic',
refreshInterval: '5 minutes',
},
{
// 注文完了時にのみ更新: イベント駆動
name: 'mv_order_summary',
refreshStrategy: 'event-driven',
triggerEvents: ['order.completed', 'order.cancelled'],
},
{
// アドホック分析用: 読み取り時に30分以上古ければ更新
name: 'mv_customer_segments',
refreshStrategy: 'lazy',
staleTolerance: '30 minutes',
},
];
リアルタイム分析のアーキテクチャパターン
パターン1: Kappa Architecture(ストリーム統一)
Kafka → Flink → ClickHouse → Dashboard
全てをストリームとして処理。バッチは不要。
パターン2: Speed + Batch Layer
Kafka → Flink → Redis (リアルタイム近似)
S3 → Spark → DWH (正確な日次集計)
Dashboard: Redis(直近)+ DWH(過去)をマージ
パターン3: Streaming ETL + OLAP Engine
Kafka → Kafka Connect → ClickHouse
ClickHouse内でマテリアライズドビューを活用して事前集計
推奨: パターン3 が運用コストと性能のバランスが最も良い。
まとめ
| ポイント | 内容 |
|---|
| リアルタイムOLAP | 秒以下の応答で大規模データを分析 |
| ClickHouse | SQL互換、Kafkaからの直接取り込み、学習コスト低 |
| Druid/Pinot | Rollupによる事前集約、大規模リアルタイム分析 |
| マテリアライズドビュー | 定期/イベント駆動/遅延評価の3戦略 |
チェックリスト
次のステップへ
次は演習で分析基盤の設計を実践します。
推定読了時間: 40分