2026年4月30日
技術情報
データ連携基盤におけるPythonの型設計|pydanticとdictの使い分けと渡し方
複数のデータソースをAIに連携する基盤では、型設計が保守性を左右します。本稿では境界にpydanticモデルを置き内部はdictで流す設計を軸に、async環境での渡し方、ジェネリックなオペレーター設計、認証情報・日付の正規化まで実装指針を整理します。

概要
複数のデータソース(DB、API、ファイル、SaaS)をLLMやRAG基盤に連携する際、Pythonでデータをどう定義し、どう渡すかは設計の根幹を成します。型がブレると、変換・検証・エラーハンドリングの責務が各所に散らばり、上流の仕様変更に追従できない保守困難なコードに陥ります。
本稿では、DigitalBaseが社内AI基盤・データ連携の実装で標準としている設計指針を整理します。具体的には、pydantic を中心とした型定義、dict とモデルの使い分け、async環境での渡し方、ジェネリック型によるオペレーター設計です。結論として、入出力境界(API、DB、外部SaaS)にはpydanticモデルを置き、内部処理はdictで流す構成を推奨します。境界で型を確定させ、内部は柔軟に保つことで、データソースの増減や仕様変更に強い基盤を構築できます。
なぜ型定義が必要か
データ連携系のコードでは、運用を続けるうちにほぼ確実に以下の事象が発生します。
- 上流のSaaSがフィールドをサイレントに変更する(
status→state) - DBのカラムが追加されるが、Python側は古い前提のまま動作する
- 文字列で来るはずの数値が、特定条件下でNullになる
- 日付フォーマットが拠点ごとに異なる
型定義の役割は、境界で必ず形を確定させ、内部では信頼してよい状態を作ることにあります。境界でバリデーションを一度通せば、下流の処理は値の妥当性を都度チェックする必要がなくなり、ロジックを本質に集中させられます。
型定義の3つの選択肢
データの取り回しには、用途に応じて3つの選択肢を使い分けます。
| 選択肢 | バリデーション | パフォーマンス | 主な用途 |
|---|---|---|---|
dict | なし | 最速 | 内部の中間データ、動的キー |
dataclass | なし(静的型のみ) | 速い | 内部のドメインモデル |
pydantic.BaseModel | あり(実行時) | やや重い | API/DB/外部I/Oの境界 |
実行時バリデーションのコストは外部I/Oの境界でのみ支払い、内部の高頻度処理には持ち込まない、という切り分けが基本方針です。
pydanticモデルの基本形
from datetime import datetime from pydantic import BaseModel, Field, field_validator class CustomerRecord(BaseModel): id: str name: str email: str company: str | None = None created_at: datetime revenue: float = Field(default=0.0, ge=0) @field_validator("email") @classmethod def normalize_email(cls, v: str) -> str: return v.strip().lower()
- 必須/任意を型で表現:
str | None = Noneで任意フィールドを明示する - Field でバリデーション:
ge=0、max_length=100などを宣言的に記述する - field_validator で正規化:trim・lowercase などの整形は型側で済ませる
正規化やバリデーションを型に寄せることで、レコードを受け取った下流のコードは整形済みであることを前提にできます。
dictでの取り回し
パイプライン内部でレコードを流す際は、dictが最も柔軟です。理由は次の3点です。
- オペレーターを汎用化できる:
filter・sort・aggregateは中身のスキーマを知らずに動作する - シリアライズコストが少ない:JSON/DBとの行き来がそのまま行える
- 動的フィールド追加が容易:上流で増えたカラムをそのまま下流に流せる
# 入口でpydantic検証 → 内部はdictで流す records: list[dict] = [ CustomerRecord.model_validate(raw).model_dump() for raw in raw_records ] # transform_filter filtered = [r for r in records if r["revenue"] >= 1_000_000] # transform_set for r in filtered: r["tier"] = "enterprise" if r["revenue"] >= 10_000_000 else "standard"
境界で一度 model_validate を通したあとは model_dump() でdict化し、以降のパイプラインは型に縛られず汎用的に処理します。
async環境での渡し方
Generator vs List
大量データを扱う場合は AsyncGenerator でストリーミングします。すべてをリストに溜め込む実装はメモリを圧迫し、件数が増えると破綻します。
from typing import AsyncGenerator import httpx async def fetch_records(api_url: str) -> AsyncGenerator[dict, None]: """ページング対応で1件ずつyieldする。""" async with httpx.AsyncClient() as client: cursor = None while True: params = {"cursor": cursor} if cursor else {} resp = await client.get(api_url, params=params) data = resp.json() for record in data["items"]: yield record cursor = data.get("next_cursor") if not cursor: break
バッチ処理
LLM呼び出しのように、N件をまとめて処理したほうが効率的なケースでは、itertools.batched(Python 3.12以降)でチャンク化します。ストリーミングで受けつつ、必要な単位だけバッファリングするのが要点です。
from itertools import batched async def embed_in_batches(records: AsyncGenerator[dict, None], size: int = 32): buffer: list[dict] = [] async for record in records: buffer.append(record) if len(buffer) >= size: await embed_batch(buffer) buffer.clear() if buffer: await embed_batch(buffer)
ジェネリック型によるオペレーター設計
各オペレーターを「入力型 → 出力型」のジェネリックなインターフェースで揃えると、再利用性とテスト容易性が高まります。
from typing import Any, Generic, TypeVar from pydantic import BaseModel I = TypeVar("I") O = TypeVar("O") class Operator(BaseModel, Generic[I, O]): name: str async def run(self, items: list[I]) -> list[O]: raise NotImplementedError class FilterOp(Operator[dict, dict]): field: str op: str # "eq" | "ne" | "gt" | ... value: Any async def run(self, items: list[dict]) -> list[dict]: return [r for r in items if self._match(r)]
このインターフェースに揃える利点は、カタログ化が可能・接続検証が可能・単体テストが容易という3点に集約されます。オペレーターの入出力型が明示されるため、パイプラインを組む段階で型の不整合を検出でき、実行前に接続の妥当性を確認できます。
認証情報の扱い
外部APIの認証情報は、コードや設定JSONにベタ書きしてはなりません。専用のVaultテーブル(またはシークレットマネージャー)を用意し、オペレーター側はキーのIDだけを参照する設計とします。
class HttpSourceConfig(BaseModel): url: str credential_id: str # vault のID参照のみ async def run_http_source(cfg: HttpSourceConfig) -> AsyncGenerator[dict, None]: cred = await vault.get(cfg.credential_id) # 復号して取得 headers = {"Authorization": f"Bearer {cred.token}"} ...
設定をそのままログやエラーメッセージに出力しても秘匿情報が漏れない構造になり、設定ファイルのバージョン管理やレビューも安全に行えます。
日付・タイムゾーンの正規化
拠点や外部システムごとに異なる日付表現は、境界で吸収します。原則は次の3点です。
- 境界では必ずタイムゾーン付きdatetimeへ変換し、UTCに揃える
- 内部はUTCで保持し、表示の直前にローカルタイムゾーンへ変換する
- DBもタイムゾーン付き型を用いる:PostgreSQLなら
TIMESTAMPTZ
from datetime import datetime, UTC def normalize_datetime(s: str) -> datetime: s = s.replace("/", "-") dt = datetime.fromisoformat(s) if dt.tzinfo is None: dt = dt.replace(tzinfo=UTC) return dt.astimezone(UTC)
タイムゾーンの扱いを境界に閉じ込めることで、内部ロジックは常にUTC前提で記述でき、拠点をまたぐ集計や比較のバグを防げます。
まとめ
データ連携基盤におけるPythonの型設計について、本稿の要点は次の3つに集約されます。
- 境界で型確定、内部はdict:pydanticは入口・出口に置き、内部はdictで柔軟に流す
- async + ジェネレータが基本:大量データはストリーミングで処理し、必要な単位だけバッファリングする
- 認証・日付は型側で吸収:オペレーター本体に環境差や秘匿情報を持ち込まない
各オペレーターを薄く保ち、上流の変化を境界で吸収する構造にしておくことで、SaaSのAPI仕様変更や新フィールド追加に強い基盤を構築できます。
