LESSON 30分

ストーリー

田中VPoE
Document AIの個別技術を学んできた。最後にこれらを統合した「インテリジェント文書処理(IDP)」パイプラインを設計しよう
あなた
IDP…文書を賢く処理する仕組みですね
田中VPoE
IDPは文書の受付から分類、データ抽出、検証、業務システムへの登録までを自動化する包括的なパイプラインだ。NetShop社では経理・法務・人事など複数部門の文書処理を一元化できる
あなた
部門横断で使える汎用的なパイプラインを作るんですね
田中VPoE
そうだ。新しい文書タイプが増えても、設定の追加だけで対応できる拡張性のある設計を目指そう

IDPパイプラインの全体像

アーキテクチャ

┌─────────────────────────────────────────────────────────────┐
│              インテリジェント文書処理パイプライン                  │
│                                                             │
│  [取り込み]    [分類]      [抽出]      [検証]      [登録]     │
│                                                             │
│  スキャン  → 文書分類  → フィールド → バリデ   → 業務      │
│  メール     →           → 抽出      → ーション → システム  │
│  アップロード             → 表抽出             → 通知      │
│                                                             │
│  ──────────── Human-in-the-Loop ─────────────               │
│  信頼度が低い場合のみ人間が介入                                │
└─────────────────────────────────────────────────────────────┘

パイプラインの実装

from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from enum import Enum
from typing import Any

class ProcessingStatus(Enum):
    PENDING = "待機中"
    PROCESSING = "処理中"
    NEEDS_REVIEW = "確認待ち"
    APPROVED = "承認済み"
    REJECTED = "却下"
    COMPLETED = "完了"

@dataclass
class DocumentProcessingResult:
    document_id: str
    document_type: str
    status: ProcessingStatus
    extracted_data: dict
    validation_result: dict
    confidence_score: float
    processing_time_ms: int
    needs_human_review: bool
    review_reasons: list[str] = field(default_factory=list)


class DocumentProcessor(ABC):
    """文書処理の抽象基底クラス"""

    @abstractmethod
    def can_process(self, document_type: str) -> bool:
        pass

    @abstractmethod
    def extract(self, image_path: str) -> dict:
        pass

    @abstractmethod
    def validate(self, data: dict) -> dict:
        pass


class InvoiceProcessor(DocumentProcessor):
    """請求書プロセッサ"""

    def can_process(self, document_type: str) -> bool:
        return document_type == "invoice"

    def extract(self, image_path: str) -> dict:
        return process_invoice_with_vlm(image_path)

    def validate(self, data: dict) -> dict:
        return validate_invoice(data)


class ContractProcessor(DocumentProcessor):
    """契約書プロセッサ"""

    def can_process(self, document_type: str) -> bool:
        return document_type == "contract"

    def extract(self, image_path: str) -> dict:
        return analyze_contract(image_path)

    def validate(self, data: dict) -> dict:
        return validate_contract(data)

パイプラインオーケストレーター

import time
import uuid

class IDPPipeline:
    """インテリジェント文書処理パイプライン"""

    def __init__(self):
        self.processors: list[DocumentProcessor] = [
            InvoiceProcessor(),
            ContractProcessor(),
        ]
        self.confidence_threshold = 0.9
        self.high_value_threshold = 100000  # 10万円以上は必ずレビュー

    def process(self, image_path: str) -> DocumentProcessingResult:
        """文書を処理"""
        start_time = time.time()
        doc_id = str(uuid.uuid4())

        # Step 1: 文書分類
        classification = classify_document(image_path)
        doc_type = classification["type"]

        # Step 2: 適切なプロセッサを選択
        processor = self._get_processor(doc_type)
        if not processor:
            return self._create_result(
                doc_id, doc_type, ProcessingStatus.NEEDS_REVIEW,
                {}, {}, 0.0, start_time,
                needs_review=True,
                reasons=["対応するプロセッサが見つかりません"]
            )

        # Step 3: データ抽出
        extracted_data = processor.extract(image_path)

        # Step 4: バリデーション
        validation = processor.validate(extracted_data)

        # Step 5: レビュー必要性の判定
        confidence = classification.get("confidence", 0.0)
        needs_review, reasons = self._check_review_needed(
            extracted_data, validation, confidence
        )

        status = ProcessingStatus.NEEDS_REVIEW if needs_review else ProcessingStatus.APPROVED

        return self._create_result(
            doc_id, doc_type, status,
            extracted_data, validation, confidence,
            start_time, needs_review, reasons
        )

    def _get_processor(self, doc_type: str) -> DocumentProcessor | None:
        for processor in self.processors:
            if processor.can_process(doc_type):
                return processor
        return None

    def _check_review_needed(
        self, data: dict, validation: dict, confidence: float
    ) -> tuple[bool, list[str]]:
        """人的レビューが必要か判定"""
        reasons = []

        if confidence < self.confidence_threshold:
            reasons.append(f"分類信頼度が低い: {confidence:.2f}")

        if not validation.get("valid", True):
            reasons.append(f"バリデーションエラー: {validation.get('errors', [])}")

        # 高額取引チェック
        total = data.get("total", {})
        if isinstance(total, dict):
            amount = total.get("value", 0) or 0
        else:
            amount = total or 0
        if amount > self.high_value_threshold:
            reasons.append(f"高額取引: {amount:,}円")

        return len(reasons) > 0, reasons

    def _create_result(
        self, doc_id, doc_type, status, data, validation,
        confidence, start_time, needs_review=False, reasons=None
    ) -> DocumentProcessingResult:
        return DocumentProcessingResult(
            document_id=doc_id,
            document_type=doc_type,
            status=status,
            extracted_data=data,
            validation_result=validation,
            confidence_score=confidence,
            processing_time_ms=int((time.time() - start_time) * 1000),
            needs_human_review=needs_review,
            review_reasons=reasons or []
        )

Human-in-the-Loopの設計

レビューキューの管理

@dataclass
class ReviewTask:
    document_id: str
    document_type: str
    original_image_path: str
    ai_extracted_data: dict
    review_reasons: list[str]
    priority: int  # 1(高) 〜 3(低)
    assigned_to: str | None = None
    status: str = "pending"

class ReviewQueue:
    """人的レビューのキュー管理"""

    def __init__(self):
        self.tasks: list[ReviewTask] = []

    def add_task(self, result: DocumentProcessingResult, image_path: str):
        """レビュータスクを追加"""
        priority = self._calculate_priority(result)

        task = ReviewTask(
            document_id=result.document_id,
            document_type=result.document_type,
            original_image_path=image_path,
            ai_extracted_data=result.extracted_data,
            review_reasons=result.review_reasons,
            priority=priority
        )
        self.tasks.append(task)
        self.tasks.sort(key=lambda t: t.priority)

    def _calculate_priority(self, result: DocumentProcessingResult) -> int:
        """優先度を計算"""
        if any("バリデーションエラー" in r for r in result.review_reasons):
            return 1  # 高: データ不整合
        if any("高額" in r for r in result.review_reasons):
            return 1  # 高: 高額取引
        if result.confidence_score < 0.7:
            return 2  # 中: 低信頼度
        return 3  # 低: その他

    def get_next_task(self, reviewer: str) -> ReviewTask | None:
        """次のレビュータスクを取得"""
        for task in self.tasks:
            if task.status == "pending":
                task.status = "in_review"
                task.assigned_to = reviewer
                return task
        return None

運用モニタリング

KPIダッシュボード

KPI目標測定方法
自動処理率70%以上自動完了件数 / 全件数
処理精度95%以上人的レビュー時の修正率
平均処理時間30秒以内APIレスポンス時間
レビューキュー待ち時間2時間以内タスク作成〜完了時間
月間コスト予算内API費用 + 人件費

メトリクス収集

from datetime import datetime

class IDPMetrics:
    """IDPパイプラインのメトリクス"""

    def __init__(self):
        self.records = []

    def record(self, result: DocumentProcessingResult):
        self.records.append({
            "timestamp": datetime.now(),
            "document_type": result.document_type,
            "status": result.status.value,
            "confidence": result.confidence_score,
            "processing_time_ms": result.processing_time_ms,
            "needs_review": result.needs_human_review
        })

    def get_daily_stats(self) -> dict:
        today = [r for r in self.records
                 if r["timestamp"].date() == datetime.now().date()]

        total = len(today)
        if total == 0:
            return {"total": 0}

        auto_completed = sum(1 for r in today if not r["needs_review"])

        return {
            "total": total,
            "auto_completed": auto_completed,
            "auto_rate": auto_completed / total,
            "avg_processing_time_ms": sum(r["processing_time_ms"] for r in today) / total,
            "avg_confidence": sum(r["confidence"] for r in today) / total,
            "by_type": self._group_by_type(today)
        }

    def _group_by_type(self, records: list) -> dict:
        types = {}
        for r in records:
            t = r["document_type"]
            if t not in types:
                types[t] = {"count": 0, "auto": 0}
            types[t]["count"] += 1
            if not r["needs_review"]:
                types[t]["auto"] += 1
        return types

まとめ

項目内容
IDPパイプライン取り込み → 分類 → 抽出 → 検証 → 登録
拡張性プロセッサを追加するだけで新文書タイプに対応
Human-in-the-Loop信頼度・バリデーション・金額に基づく自動判定
モニタリング自動処理率、精度、処理時間をKPIとして追跡

チェックリスト

  • IDPパイプラインの5つのステージ(取り込み→分類→抽出→検証→登録)を理解した
  • プロセッサパターンによる拡張可能な設計を把握した
  • Human-in-the-Loopのレビュー判定ロジックを設計できる
  • 運用モニタリングのKPIとメトリクス収集を説明できる

推定所要時間: 30分