2026年4月30日
技術情報
データ連携時のPythonでの定義方法と渡し方
pydanticを中心とした型定義、dictとモデルの使い分け、async環境での渡し方など、データ連携 Pythonコードの実装指針を整理。境界で型確定、内部はdictで柔軟に。

要約
複数のデータソース(DB、API、ファイル、SaaS)をAIに繋ぐ際、Pythonでデータをどう定義し、どう渡すかは設計の根幹です。型がブレると、変換・検証・エラーハンドリングが各所に散らばり、保守不能になります。
本記事では、pydantic を中心とした型定義、dict とモデルの使い分け、async環境での渡し方、ジェネリック型でのオペレーター設計など、実装現場で必要になる判断を整理します。入出力境界(API、DB、外部SaaS)にはpydanticモデルを置き、内部処理はdictで流すのが最も柔軟です。
なぜ型定義が必要か
データ連携系のコードでは、ほぼ確実に以下が起こります。
- 上流のSaaSがフィールドをサイレントに変更する(
status→state) - DBのカラムが追加されるが、Python側は古い前提のまま
- 文字列で来るはずの数値が、たまにNullになる
- 日付フォーマットが拠点ごとに違う
型定義の役割は、境界で必ず形を確定させ、内部では信頼してよい状態を作ることです。
型定義の3つの選択肢
| 選択肢 | バリデーション | パフォーマンス | 主な用途 |
|---|---|---|---|
dict | なし | 最速 | 内部の中間データ、動的キー |
dataclass | なし(静的型のみ) | 速い | 内部のドメインモデル |
pydantic.BaseModel | あり(実行時) | やや重い | API/DB/外部I/Oの境界 |
pydanticモデルの基本形
from datetime import datetime from pydantic import BaseModel, Field, field_validator class CustomerRecord(BaseModel): id: str name
- 必須/任意を型で表現:
str | None = Noneで任意フィールドを明示 - Field でバリデーション:
ge=0、max_length=100などを宣言的に書く - field_validator で正規化:trim・lowercase などの整形は型側で済ませる
dictでの取り回し
パイプライン内部でレコードを流す際は、dictが最も柔軟です。
- オペレーターを汎用化できる:
filter・sort・aggregateは中身を知らずに動く - シリアライズコストが少ない:JSON/DBとの行き来がそのまま
- 動的フィールド追加が容易:上流で増えたカラムをそのまま下流に流せる
# 入口でpydantic検証 → 内部はdictで流す records: list[dict] = [ CustomerRecord.model_validate(raw).model_dump() for raw in raw_records
async環境での渡し方
Generator vs List
大量データを扱うときは AsyncGenerator で流します。リストに溜めるとメモリが破綻します。
from typing import AsyncGenerator async def fetch_records(api_url: str) -> AsyncGenerator[dict, None]:
バッチ処理
LLM呼び出しなどN件まとめた方が効率的なケースは、itertools.batched(Python 3.12+)でまとめます。
ジェネリック型でのオペレーター設計
各オペレーターを「入力型 → 出力型」のジェネリックなインターフェースで揃えると再利用性が高まります。
from typing import Generic, TypeVar from pydantic import BaseModel I = TypeVar("I") O = TypeVar("O") class
利点:カタログ可能・接続検証可能・テスト容易。
認証情報の扱い
外部APIの認証情報は、絶対にコードや設定JSONにベタ書きしません。専用のVaultテーブルを用意し、オペレーター側はキーだけを参照する設計にします。
class HttpSourceConfig(BaseModel): url: str credential_id: str # vault のID参照のみ async def run_http_source(cfg: HttpSourceConfig)
日付・タイムゾーンの正規化
原則:
- 境界では必ずタイムゾーン付きDatetimeに変換してUTCへ
- 内部はUTCで保持、表示直前にローカルタイムゾーンへ
- DBもタイムゾーン付き型:PostgreSQLなら
TIMESTAMPTZ
from datetime import datetime, UTC def normalize_datetime(s: str) -> datetime: s = s.replace("/", "-"
まとめ
- 境界で型確定、内部はdict:pydanticは入口/出口、内部はdictで柔軟に
- async + ジェネレータが基本:大量データはストリーミングで
- 認証・日付は型側で吸収:オペレーター本体に汚れを持ち込まない
各オペレーターを薄く保ち、上流の変化を境界で吸収する構造にしておくと、SaaSのAPI仕様変更や新フィールド追加に強い基盤が作れます。