ストーリー
田
田中VPoE
Document AIの個別技術を学んできた。最後にこれらを統合した「インテリジェント文書処理(IDP)」パイプラインを設計しよう
田
田中VPoE
IDPは文書の受付から分類、データ抽出、検証、業務システムへの登録までを自動化する包括的なパイプラインだ。NetShop社では経理・法務・人事など複数部門の文書処理を一元化できる
あなた
部門横断で使える汎用的なパイプラインを作るんですね
あ
田
田中VPoE
そうだ。新しい文書タイプが増えても、設定の追加だけで対応できる拡張性のある設計を目指そう
IDPパイプラインの全体像
アーキテクチャ
┌─────────────────────────────────────────────────────────────┐
│ インテリジェント文書処理パイプライン │
│ │
│ [取り込み] [分類] [抽出] [検証] [登録] │
│ │
│ スキャン → 文書分類 → フィールド → バリデ → 業務 │
│ メール → → 抽出 → ーション → システム │
│ アップロード → 表抽出 → 通知 │
│ │
│ ──────────── Human-in-the-Loop ───────────── │
│ 信頼度が低い場合のみ人間が介入 │
└─────────────────────────────────────────────────────────────┘
パイプラインの実装
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from enum import Enum
from typing import Any
class ProcessingStatus(Enum):
PENDING = "待機中"
PROCESSING = "処理中"
NEEDS_REVIEW = "確認待ち"
APPROVED = "承認済み"
REJECTED = "却下"
COMPLETED = "完了"
@dataclass
class DocumentProcessingResult:
document_id: str
document_type: str
status: ProcessingStatus
extracted_data: dict
validation_result: dict
confidence_score: float
processing_time_ms: int
needs_human_review: bool
review_reasons: list[str] = field(default_factory=list)
class DocumentProcessor(ABC):
"""文書処理の抽象基底クラス"""
@abstractmethod
def can_process(self, document_type: str) -> bool:
pass
@abstractmethod
def extract(self, image_path: str) -> dict:
pass
@abstractmethod
def validate(self, data: dict) -> dict:
pass
class InvoiceProcessor(DocumentProcessor):
"""請求書プロセッサ"""
def can_process(self, document_type: str) -> bool:
return document_type == "invoice"
def extract(self, image_path: str) -> dict:
return process_invoice_with_vlm(image_path)
def validate(self, data: dict) -> dict:
return validate_invoice(data)
class ContractProcessor(DocumentProcessor):
"""契約書プロセッサ"""
def can_process(self, document_type: str) -> bool:
return document_type == "contract"
def extract(self, image_path: str) -> dict:
return analyze_contract(image_path)
def validate(self, data: dict) -> dict:
return validate_contract(data)
パイプラインオーケストレーター
import time
import uuid
class IDPPipeline:
"""インテリジェント文書処理パイプライン"""
def __init__(self):
self.processors: list[DocumentProcessor] = [
InvoiceProcessor(),
ContractProcessor(),
]
self.confidence_threshold = 0.9
self.high_value_threshold = 100000 # 10万円以上は必ずレビュー
def process(self, image_path: str) -> DocumentProcessingResult:
"""文書を処理"""
start_time = time.time()
doc_id = str(uuid.uuid4())
# Step 1: 文書分類
classification = classify_document(image_path)
doc_type = classification["type"]
# Step 2: 適切なプロセッサを選択
processor = self._get_processor(doc_type)
if not processor:
return self._create_result(
doc_id, doc_type, ProcessingStatus.NEEDS_REVIEW,
{}, {}, 0.0, start_time,
needs_review=True,
reasons=["対応するプロセッサが見つかりません"]
)
# Step 3: データ抽出
extracted_data = processor.extract(image_path)
# Step 4: バリデーション
validation = processor.validate(extracted_data)
# Step 5: レビュー必要性の判定
confidence = classification.get("confidence", 0.0)
needs_review, reasons = self._check_review_needed(
extracted_data, validation, confidence
)
status = ProcessingStatus.NEEDS_REVIEW if needs_review else ProcessingStatus.APPROVED
return self._create_result(
doc_id, doc_type, status,
extracted_data, validation, confidence,
start_time, needs_review, reasons
)
def _get_processor(self, doc_type: str) -> DocumentProcessor | None:
for processor in self.processors:
if processor.can_process(doc_type):
return processor
return None
def _check_review_needed(
self, data: dict, validation: dict, confidence: float
) -> tuple[bool, list[str]]:
"""人的レビューが必要か判定"""
reasons = []
if confidence < self.confidence_threshold:
reasons.append(f"分類信頼度が低い: {confidence:.2f}")
if not validation.get("valid", True):
reasons.append(f"バリデーションエラー: {validation.get('errors', [])}")
# 高額取引チェック
total = data.get("total", {})
if isinstance(total, dict):
amount = total.get("value", 0) or 0
else:
amount = total or 0
if amount > self.high_value_threshold:
reasons.append(f"高額取引: {amount:,}円")
return len(reasons) > 0, reasons
def _create_result(
self, doc_id, doc_type, status, data, validation,
confidence, start_time, needs_review=False, reasons=None
) -> DocumentProcessingResult:
return DocumentProcessingResult(
document_id=doc_id,
document_type=doc_type,
status=status,
extracted_data=data,
validation_result=validation,
confidence_score=confidence,
processing_time_ms=int((time.time() - start_time) * 1000),
needs_human_review=needs_review,
review_reasons=reasons or []
)
Human-in-the-Loopの設計
レビューキューの管理
@dataclass
class ReviewTask:
document_id: str
document_type: str
original_image_path: str
ai_extracted_data: dict
review_reasons: list[str]
priority: int # 1(高) 〜 3(低)
assigned_to: str | None = None
status: str = "pending"
class ReviewQueue:
"""人的レビューのキュー管理"""
def __init__(self):
self.tasks: list[ReviewTask] = []
def add_task(self, result: DocumentProcessingResult, image_path: str):
"""レビュータスクを追加"""
priority = self._calculate_priority(result)
task = ReviewTask(
document_id=result.document_id,
document_type=result.document_type,
original_image_path=image_path,
ai_extracted_data=result.extracted_data,
review_reasons=result.review_reasons,
priority=priority
)
self.tasks.append(task)
self.tasks.sort(key=lambda t: t.priority)
def _calculate_priority(self, result: DocumentProcessingResult) -> int:
"""優先度を計算"""
if any("バリデーションエラー" in r for r in result.review_reasons):
return 1 # 高: データ不整合
if any("高額" in r for r in result.review_reasons):
return 1 # 高: 高額取引
if result.confidence_score < 0.7:
return 2 # 中: 低信頼度
return 3 # 低: その他
def get_next_task(self, reviewer: str) -> ReviewTask | None:
"""次のレビュータスクを取得"""
for task in self.tasks:
if task.status == "pending":
task.status = "in_review"
task.assigned_to = reviewer
return task
return None
運用モニタリング
KPIダッシュボード
| KPI | 目標 | 測定方法 |
|---|
| 自動処理率 | 70%以上 | 自動完了件数 / 全件数 |
| 処理精度 | 95%以上 | 人的レビュー時の修正率 |
| 平均処理時間 | 30秒以内 | APIレスポンス時間 |
| レビューキュー待ち時間 | 2時間以内 | タスク作成〜完了時間 |
| 月間コスト | 予算内 | API費用 + 人件費 |
メトリクス収集
from datetime import datetime
class IDPMetrics:
"""IDPパイプラインのメトリクス"""
def __init__(self):
self.records = []
def record(self, result: DocumentProcessingResult):
self.records.append({
"timestamp": datetime.now(),
"document_type": result.document_type,
"status": result.status.value,
"confidence": result.confidence_score,
"processing_time_ms": result.processing_time_ms,
"needs_review": result.needs_human_review
})
def get_daily_stats(self) -> dict:
today = [r for r in self.records
if r["timestamp"].date() == datetime.now().date()]
total = len(today)
if total == 0:
return {"total": 0}
auto_completed = sum(1 for r in today if not r["needs_review"])
return {
"total": total,
"auto_completed": auto_completed,
"auto_rate": auto_completed / total,
"avg_processing_time_ms": sum(r["processing_time_ms"] for r in today) / total,
"avg_confidence": sum(r["confidence"] for r in today) / total,
"by_type": self._group_by_type(today)
}
def _group_by_type(self, records: list) -> dict:
types = {}
for r in records:
t = r["document_type"]
if t not in types:
types[t] = {"count": 0, "auto": 0}
types[t]["count"] += 1
if not r["needs_review"]:
types[t]["auto"] += 1
return types
まとめ
| 項目 | 内容 |
|---|
| IDPパイプライン | 取り込み → 分類 → 抽出 → 検証 → 登録 |
| 拡張性 | プロセッサを追加するだけで新文書タイプに対応 |
| Human-in-the-Loop | 信頼度・バリデーション・金額に基づく自動判定 |
| モニタリング | 自動処理率、精度、処理時間をKPIとして追跡 |
チェックリスト
推定所要時間: 30分