LESSON 40分

ストーリー

佐藤CTO
データパイプラインの設計は、システムの血管を設計するようなものだ
佐藤CTO
バッチ処理は古典的だが、今でも大量データの処理には不可欠だ。MapReduceの原理を理解し、Sparkやスケジューリングの実践的な設計力を身につけよう

バッチ処理の基本アーキテクチャ

Lambda Architecture

graph TD
    DS["Data Source<br/>(Master Data)"] --> BL & SL
    BL["Batch Layer<br/>(全データ再計算)<br/>正確だが遅い"] --> SV
    SL["Speed Layer<br/>(差分リアルタイム)<br/>高速だが近似"] --> SV
    SV["Serving Layer<br/>(Batch View + Real-time View)"]

    classDef source fill:#f5f3ff,stroke:#7c3aed
    classDef batch fill:#dbeafe,stroke:#3b82f6
    classDef speed fill:#fee2e2,stroke:#ef4444
    classDef serving fill:#f0fdf4,stroke:#22c55e,font-weight:bold
    class DS source
    class BL batch
    class SL speed
    class SV serving
レイヤー処理方式精度レイテンシ
Batch Layer全データ再計算高い数時間
Speed Layer差分処理近似的秒〜分
Serving Layer両方の結果をマージ最終的に正確即座

MapReduceの原理

# MapReduce の概念実装

from collections import defaultdict
from typing import Callable, Iterable, TypeVar

K = TypeVar('K')
V = TypeVar('V')

def map_reduce(
    data: list[dict],
    mapper: Callable[[dict], Iterable[tuple[str, int]]],
    reducer: Callable[[str, list[int]], tuple[str, int]],
) -> dict[str, int]:
    """
    MapReduceの3フェーズ:
    1. Map: 入力データをkey-valueペアに変換
    2. Shuffle: 同じキーのデータを集約
    3. Reduce: 集約されたデータを処理
    """
    # Phase 1: Map
    mapped: list[tuple[str, int]] = []
    for record in data:
        mapped.extend(mapper(record))

    # Phase 2: Shuffle
    shuffled: dict[str, list[int]] = defaultdict(list)
    for key, value in mapped:
        shuffled[key].append(value)

    # Phase 3: Reduce
    results = {}
    for key, values in shuffled.items():
        result_key, result_value = reducer(key, values)
        results[result_key] = result_value

    return results

# 例: 月別売上集計
def sales_mapper(record: dict) -> list[tuple[str, int]]:
    month = record['order_date'][:7]  # "2024-06" 形式
    return [(month, record['amount'])]

def sales_reducer(key: str, values: list[int]) -> tuple[str, int]:
    return (key, sum(values))

orders = [
    {'order_id': 1, 'order_date': '2024-06-01', 'amount': 5000},
    {'order_id': 2, 'order_date': '2024-06-15', 'amount': 3000},
    {'order_id': 3, 'order_date': '2024-07-01', 'amount': 8000},
]

result = map_reduce(orders, sales_mapper, sales_reducer)
# {'2024-06': 8000, '2024-07': 8000}

Apache Spark によるバッチ処理

PySpark の基本

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window

# SparkSession の作成
spark = SparkSession.builder \
    .appName("SalesAnalytics") \
    .config("spark.sql.adaptive.enabled", "true") \
    .config("spark.sql.shuffle.partitions", "200") \
    .getOrCreate()

# データ読み込み(Parquet形式、パーティション分割済み)
orders_df = spark.read.parquet("s3://data-lake/orders/")
products_df = spark.read.parquet("s3://data-lake/products/")

# 変換処理
monthly_sales = (
    orders_df
    .filter(F.col("status") == "completed")
    .join(products_df, "product_id")
    .withColumn("order_month", F.date_trunc("month", "order_date"))
    .groupBy("order_month", "category")
    .agg(
        F.count("order_id").alias("order_count"),
        F.sum("amount").alias("total_revenue"),
        F.avg("amount").alias("avg_order_value"),
        F.countDistinct("customer_id").alias("unique_customers"),
    )
    .orderBy("order_month", "category")
)

# ウィンドウ関数: 前月比成長率
window_spec = Window.partitionBy("category").orderBy("order_month")

monthly_sales_with_growth = monthly_sales.withColumn(
    "revenue_growth_rate",
    (F.col("total_revenue") - F.lag("total_revenue").over(window_spec))
    / F.lag("total_revenue").over(window_spec)
    * 100
)

# 結果を書き出し(Parquet + パーティション分割)
monthly_sales_with_growth.write \
    .mode("overwrite") \
    .partitionBy("order_month") \
    .parquet("s3://data-warehouse/monthly_sales/")

Spark パフォーマンスチューニング

問題対策設定例
データスキューSalting / Adaptive Queryspark.sql.adaptive.skewJoin.enabled=true
シャッフル過多Broadcast Joinspark.sql.autoBroadcastJoinThreshold=100MB
メモリ不足パーティション調整spark.sql.shuffle.partitions=auto
小ファイル問題Coalesce / Compaction.coalesce(100).write.parquet(...)
Sparkのデータスキュー対策の詳細
# データスキュー: 特定のキーにデータが偏ると一部のタスクが遅くなる

# 対策1: Salting(キーにランダム値を付与して分散)
from pyspark.sql import functions as F

skewed_key = "popular_product_id"
salt_range = 10

# Salt追加
salted_df = orders_df.withColumn(
    "salted_key",
    F.concat(F.col("product_id"), F.lit("_"), (F.rand() * salt_range).cast("int"))
)

# Saltedキーで集約
partial_result = salted_df.groupBy("salted_key").agg(
    F.sum("amount").alias("partial_sum")
)

# Salt除去して最終集約
final_result = partial_result.withColumn(
    "product_id",
    F.split("salted_key", "_")[0]
).groupBy("product_id").agg(
    F.sum("partial_sum").alias("total_amount")
)

# 対策2: Spark 3.x の Adaptive Query Execution (AQE)
# 設定するだけで自動的にスキューを検知・最適化
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionFactor", "5")

スケジューリングと依存管理

Apache Airflow による DAG 設計

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
from airflow.sensors.s3_key_sensor import S3KeySensor
from datetime import datetime, timedelta

default_args = {
    'owner': 'data-team',
    'retries': 3,
    'retry_delay': timedelta(minutes=5),
    'execution_timeout': timedelta(hours=2),
    'on_failure_callback': notify_slack,
}

with DAG(
    dag_id='daily_sales_pipeline',
    default_args=default_args,
    schedule='0 3 * * *',  # 毎日AM3時
    start_date=datetime(2024, 1, 1),
    catchup=False,
    tags=['sales', 'daily', 'batch'],
    max_active_runs=1,  # 同時実行防止
) as dag:

    # Step 1: データ到着を待機
    wait_for_data = S3KeySensor(
        task_id='wait_for_data',
        bucket_name='raw-data',
        bucket_key='orders/dt={{ ds }}/_SUCCESS',
        timeout=3600,  # 最大1時間待機
        poke_interval=300,
    )

    # Step 2: データ品質チェック
    quality_check = PythonOperator(
        task_id='quality_check',
        python_callable=run_data_quality_checks,
        op_kwargs={'date': '{{ ds }}'},
    )

    # Step 3: Spark変換ジョブ
    transform = SparkSubmitOperator(
        task_id='transform_sales',
        application='s3://scripts/transform_sales.py',
        conf={
            'spark.executor.memory': '4g',
            'spark.executor.cores': '2',
            'spark.dynamicAllocation.enabled': 'true',
        },
        application_args=['--date', '{{ ds }}'],
    )

    # Step 4: DWHにロード
    load_dwh = PythonOperator(
        task_id='load_to_dwh',
        python_callable=load_to_warehouse,
        op_kwargs={'date': '{{ ds }}'},
    )

    # Step 5: 完了通知
    notify = PythonOperator(
        task_id='notify_completion',
        python_callable=send_completion_notification,
    )

    # DAG依存関係
    wait_for_data >> quality_check >> transform >> load_dwh >> notify

バッチ処理の設計パターン

パターン説明使い所
フルリフレッシュ全データを毎回再計算データ量が少ない、正確性が重要
インクリメンタル差分のみ処理データ量が大きい、日次更新
パーティション上書き特定パーティションのみ再計算日付パーティションのデータ修正
Idempotent(冪等性)同じ入力で何度実行しても同じ結果リトライ安全性を保証

まとめ

ポイント内容
MapReduceMap→Shuffle→Reduce の3フェーズで並列処理
SparkDataFrameAPIで宣言的にバッチ変換を記述
スケジューリングAirflow DAGで依存関係とリトライを管理
パフォーマンスデータスキュー、シャッフル、パーティション設計が鍵

チェックリスト

  • MapReduceの原理を説明できる
  • Sparkの基本的なDataFrame操作を理解した
  • Airflow DAGで依存管理する方法を理解した
  • バッチ処理のパフォーマンスチューニングポイントを把握した

次のステップへ

次はストリーム処理アーキテクチャを学び、リアルタイムデータ処理の設計手法を理解します。


推定読了時間: 40分