ストーリー
佐
佐藤CTO
データパイプラインの設計は、システムの血管を設計するようなものだ
佐
佐藤CTO
バッチ処理は古典的だが、今でも大量データの処理には不可欠だ。MapReduceの原理を理解し、Sparkやスケジューリングの実践的な設計力を身につけよう
バッチ処理の基本アーキテクチャ
Lambda Architecture
graph TD
DS["Data Source<br/>(Master Data)"] --> BL & SL
BL["Batch Layer<br/>(全データ再計算)<br/>正確だが遅い"] --> SV
SL["Speed Layer<br/>(差分リアルタイム)<br/>高速だが近似"] --> SV
SV["Serving Layer<br/>(Batch View + Real-time View)"]
classDef source fill:#f5f3ff,stroke:#7c3aed
classDef batch fill:#dbeafe,stroke:#3b82f6
classDef speed fill:#fee2e2,stroke:#ef4444
classDef serving fill:#f0fdf4,stroke:#22c55e,font-weight:bold
class DS source
class BL batch
class SL speed
class SV serving
| レイヤー | 処理方式 | 精度 | レイテンシ |
|---|
| Batch Layer | 全データ再計算 | 高い | 数時間 |
| Speed Layer | 差分処理 | 近似的 | 秒〜分 |
| Serving Layer | 両方の結果をマージ | 最終的に正確 | 即座 |
MapReduceの原理
# MapReduce の概念実装
from collections import defaultdict
from typing import Callable, Iterable, TypeVar
K = TypeVar('K')
V = TypeVar('V')
def map_reduce(
data: list[dict],
mapper: Callable[[dict], Iterable[tuple[str, int]]],
reducer: Callable[[str, list[int]], tuple[str, int]],
) -> dict[str, int]:
"""
MapReduceの3フェーズ:
1. Map: 入力データをkey-valueペアに変換
2. Shuffle: 同じキーのデータを集約
3. Reduce: 集約されたデータを処理
"""
# Phase 1: Map
mapped: list[tuple[str, int]] = []
for record in data:
mapped.extend(mapper(record))
# Phase 2: Shuffle
shuffled: dict[str, list[int]] = defaultdict(list)
for key, value in mapped:
shuffled[key].append(value)
# Phase 3: Reduce
results = {}
for key, values in shuffled.items():
result_key, result_value = reducer(key, values)
results[result_key] = result_value
return results
# 例: 月別売上集計
def sales_mapper(record: dict) -> list[tuple[str, int]]:
month = record['order_date'][:7] # "2024-06" 形式
return [(month, record['amount'])]
def sales_reducer(key: str, values: list[int]) -> tuple[str, int]:
return (key, sum(values))
orders = [
{'order_id': 1, 'order_date': '2024-06-01', 'amount': 5000},
{'order_id': 2, 'order_date': '2024-06-15', 'amount': 3000},
{'order_id': 3, 'order_date': '2024-07-01', 'amount': 8000},
]
result = map_reduce(orders, sales_mapper, sales_reducer)
# {'2024-06': 8000, '2024-07': 8000}
Apache Spark によるバッチ処理
PySpark の基本
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window
# SparkSession の作成
spark = SparkSession.builder \
.appName("SalesAnalytics") \
.config("spark.sql.adaptive.enabled", "true") \
.config("spark.sql.shuffle.partitions", "200") \
.getOrCreate()
# データ読み込み(Parquet形式、パーティション分割済み)
orders_df = spark.read.parquet("s3://data-lake/orders/")
products_df = spark.read.parquet("s3://data-lake/products/")
# 変換処理
monthly_sales = (
orders_df
.filter(F.col("status") == "completed")
.join(products_df, "product_id")
.withColumn("order_month", F.date_trunc("month", "order_date"))
.groupBy("order_month", "category")
.agg(
F.count("order_id").alias("order_count"),
F.sum("amount").alias("total_revenue"),
F.avg("amount").alias("avg_order_value"),
F.countDistinct("customer_id").alias("unique_customers"),
)
.orderBy("order_month", "category")
)
# ウィンドウ関数: 前月比成長率
window_spec = Window.partitionBy("category").orderBy("order_month")
monthly_sales_with_growth = monthly_sales.withColumn(
"revenue_growth_rate",
(F.col("total_revenue") - F.lag("total_revenue").over(window_spec))
/ F.lag("total_revenue").over(window_spec)
* 100
)
# 結果を書き出し(Parquet + パーティション分割)
monthly_sales_with_growth.write \
.mode("overwrite") \
.partitionBy("order_month") \
.parquet("s3://data-warehouse/monthly_sales/")
Spark パフォーマンスチューニング
| 問題 | 対策 | 設定例 |
|---|
| データスキュー | Salting / Adaptive Query | spark.sql.adaptive.skewJoin.enabled=true |
| シャッフル過多 | Broadcast Join | spark.sql.autoBroadcastJoinThreshold=100MB |
| メモリ不足 | パーティション調整 | spark.sql.shuffle.partitions=auto |
| 小ファイル問題 | Coalesce / Compaction | .coalesce(100).write.parquet(...) |
Sparkのデータスキュー対策の詳細
# データスキュー: 特定のキーにデータが偏ると一部のタスクが遅くなる
# 対策1: Salting(キーにランダム値を付与して分散)
from pyspark.sql import functions as F
skewed_key = "popular_product_id"
salt_range = 10
# Salt追加
salted_df = orders_df.withColumn(
"salted_key",
F.concat(F.col("product_id"), F.lit("_"), (F.rand() * salt_range).cast("int"))
)
# Saltedキーで集約
partial_result = salted_df.groupBy("salted_key").agg(
F.sum("amount").alias("partial_sum")
)
# Salt除去して最終集約
final_result = partial_result.withColumn(
"product_id",
F.split("salted_key", "_")[0]
).groupBy("product_id").agg(
F.sum("partial_sum").alias("total_amount")
)
# 対策2: Spark 3.x の Adaptive Query Execution (AQE)
# 設定するだけで自動的にスキューを検知・最適化
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionFactor", "5")
スケジューリングと依存管理
Apache Airflow による DAG 設計
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
from airflow.sensors.s3_key_sensor import S3KeySensor
from datetime import datetime, timedelta
default_args = {
'owner': 'data-team',
'retries': 3,
'retry_delay': timedelta(minutes=5),
'execution_timeout': timedelta(hours=2),
'on_failure_callback': notify_slack,
}
with DAG(
dag_id='daily_sales_pipeline',
default_args=default_args,
schedule='0 3 * * *', # 毎日AM3時
start_date=datetime(2024, 1, 1),
catchup=False,
tags=['sales', 'daily', 'batch'],
max_active_runs=1, # 同時実行防止
) as dag:
# Step 1: データ到着を待機
wait_for_data = S3KeySensor(
task_id='wait_for_data',
bucket_name='raw-data',
bucket_key='orders/dt={{ ds }}/_SUCCESS',
timeout=3600, # 最大1時間待機
poke_interval=300,
)
# Step 2: データ品質チェック
quality_check = PythonOperator(
task_id='quality_check',
python_callable=run_data_quality_checks,
op_kwargs={'date': '{{ ds }}'},
)
# Step 3: Spark変換ジョブ
transform = SparkSubmitOperator(
task_id='transform_sales',
application='s3://scripts/transform_sales.py',
conf={
'spark.executor.memory': '4g',
'spark.executor.cores': '2',
'spark.dynamicAllocation.enabled': 'true',
},
application_args=['--date', '{{ ds }}'],
)
# Step 4: DWHにロード
load_dwh = PythonOperator(
task_id='load_to_dwh',
python_callable=load_to_warehouse,
op_kwargs={'date': '{{ ds }}'},
)
# Step 5: 完了通知
notify = PythonOperator(
task_id='notify_completion',
python_callable=send_completion_notification,
)
# DAG依存関係
wait_for_data >> quality_check >> transform >> load_dwh >> notify
バッチ処理の設計パターン
| パターン | 説明 | 使い所 |
|---|
| フルリフレッシュ | 全データを毎回再計算 | データ量が少ない、正確性が重要 |
| インクリメンタル | 差分のみ処理 | データ量が大きい、日次更新 |
| パーティション上書き | 特定パーティションのみ再計算 | 日付パーティションのデータ修正 |
| Idempotent(冪等性) | 同じ入力で何度実行しても同じ結果 | リトライ安全性を保証 |
まとめ
| ポイント | 内容 |
|---|
| MapReduce | Map→Shuffle→Reduce の3フェーズで並列処理 |
| Spark | DataFrameAPIで宣言的にバッチ変換を記述 |
| スケジューリング | Airflow DAGで依存関係とリトライを管理 |
| パフォーマンス | データスキュー、シャッフル、パーティション設計が鍵 |
チェックリスト
次のステップへ
次はストリーム処理アーキテクチャを学び、リアルタイムデータ処理の設計手法を理解します。
推定読了時間: 40分