2026年4月30日
技術情報
データ連携時のPythonでの定義方法と渡し方
pydanticを中心とした型定義、dictとモデルの使い分け、async環境での渡し方など、データ連携 Pythonコードの実装指針を整理。境界で型確定、内部はdictで柔軟に。

要約
複数のデータソース(DB、API、ファイル、SaaS)をAIに繋ぐ際、Pythonでデータをどう定義し、どう渡すかは設計の根幹です。型がブレると、変換・検証・エラーハンドリングが各所に散らばり、保守不能になります。
本記事では、pydantic を中心とした型定義、dict とモデルの使い分け、async環境での渡し方、ジェネリック型でのオペレーター設計など、実装現場で必要になる判断を整理します。入出力境界(API、DB、外部SaaS)にはpydanticモデルを置き、内部処理はdictで流すのが最も柔軟です。
なぜ型定義が必要か
データ連携系のコードでは、ほぼ確実に以下が起こります。
- 上流のSaaSがフィールドをサイレントに変更する(
status→state) - DBのカラムが追加されるが、Python側は古い前提のまま
- 文字列で来るはずの数値が、たまにNullになる
- 日付フォーマットが拠点ごとに違う
型定義の役割は、境界で必ず形を確定させ、内部では信頼してよい状態を作ることです。
型定義の3つの選択肢
| 選択肢 | バリデーション | パフォーマンス | 主な用途 |
|---|---|---|---|
dict | なし | 最速 | 内部の中間データ、動的キー |
dataclass | なし(静的型のみ) | 速い | 内部のドメインモデル |
pydantic.BaseModel | あり(実行時) | やや重い | API/DB/外部I/Oの境界 |
pydanticモデルの基本形
from datetime import datetime from pydantic import BaseModel, Field, field_validator class CustomerRecord(BaseModel): id: str name: str email: str company: str | None = None created_at: datetime revenue: float = Field(default=0.0, ge=0) @field_validator("email") @classmethod def normalize_email(cls, v: str) -> str: return v.strip().lower()
- 必須/任意を型で表現:
str | None = Noneで任意フィールドを明示 - Field でバリデーション:
ge=0、max_length=100などを宣言的に書く - field_validator で正規化:trim・lowercase などの整形は型側で済ませる
dictでの取り回し
パイプライン内部でレコードを流す際は、dictが最も柔軟です。
- オペレーターを汎用化できる:
filter・sort・aggregateは中身を知らずに動く - シリアライズコストが少ない:JSON/DBとの行き来がそのまま
- 動的フィールド追加が容易:上流で増えたカラムをそのまま下流に流せる
# 入口でpydantic検証 → 内部はdictで流す records: list[dict] = [ CustomerRecord.model_validate(raw).model_dump() for raw in raw_records ] # transform_filter filtered = [r for r in records if r["revenue"] >= 1_000_000] # transform_set for r in filtered: r["tier"] = "enterprise" if r["revenue"] >= 10_000_000 else "standard"
async環境での渡し方
Generator vs List
大量データを扱うときは AsyncGenerator で流します。リストに溜めるとメモリが破綻します。
from typing import AsyncGenerator async def fetch_records(api_url: str) -> AsyncGenerator[dict, None]: """ページング対応で1件ずつyield。""" async with httpx.AsyncClient() as client: cursor = None while True: params = {"cursor": cursor} if cursor else {} resp = await client.get(api_url, params=params) data = resp.json() for record in data["items"]: yield record cursor = data.get("next_cursor") if not cursor: break
バッチ処理
LLM呼び出しなどN件まとめた方が効率的なケースは、itertools.batched(Python 3.12+)でまとめます。
ジェネリック型でのオペレーター設計
各オペレーターを「入力型 → 出力型」のジェネリックなインターフェースで揃えると再利用性が高まります。
from typing import Generic, TypeVar from pydantic import BaseModel I = TypeVar("I") O = TypeVar("O") class Operator(BaseModel, Generic[I, O]): name: str async def run(self, items: list[I]) -> list[O]: raise NotImplementedError class FilterOp(Operator[dict, dict]): field: str op: str # "eq" | "ne" | "gt" | ... value: Any async def run(self, items: list[dict]) -> list[dict]: return [r for r in items if self._match(r)]
利点:カタログ可能・接続検証可能・テスト容易。
認証情報の扱い
外部APIの認証情報は、絶対にコードや設定JSONにベタ書きしません。専用のVaultテーブルを用意し、オペレーター側はキーだけを参照する設計にします。
class HttpSourceConfig(BaseModel): url: str credential_id: str # vault のID参照のみ async def run_http_source(cfg: HttpSourceConfig) -> AsyncGenerator[dict, None]: cred = await vault.get(cfg.credential_id) # 復号して取得 headers = {"Authorization": f"Bearer {cred.token}"}
日付・タイムゾーンの正規化
原則:
- 境界では必ずタイムゾーン付きDatetimeに変換してUTCへ
- 内部はUTCで保持、表示直前にローカルタイムゾーンへ
- DBもタイムゾーン付き型:PostgreSQLなら
TIMESTAMPTZ
from datetime import datetime, UTC def normalize_datetime(s: str) -> datetime: s = s.replace("/", "-") dt = datetime.fromisoformat(s) if dt.tzinfo is None: dt = dt.replace(tzinfo=UTC) return dt.astimezone(UTC)
まとめ
- 境界で型確定、内部はdict:pydanticは入口/出口、内部はdictで柔軟に
- async + ジェネレータが基本:大量データはストリーミングで
- 認証・日付は型側で吸収:オペレーター本体に汚れを持ち込まない
各オペレーターを薄く保ち、上流の変化を境界で吸収する構造にしておくと、SaaSのAPI仕様変更や新フィールド追加に強い基盤が作れます。
