digital base
プロダクトドキュメント最新情報コンテンツ会社概要

お問い合わせ

ご質問やご相談など、お気軽にお問い合わせください。

デジタルベース株式会社

〒106-0047
東京都港区南麻布3-20-1 5階

サイトメニュー

  • トップページ
  • プロダクト
  • ドキュメント
  • 最新ニュース
  • 記事一覧
  • 会社情報

お問い合わせ

  • info@digital-base.co.jp

NVIDIA Inception Program / Intel Partner ISV /
NTTPC Innovation LAB / IT導入補助金 対象

© デジタルベース株式会社. All rights reserved.
一覧に戻る

2026年4月30日

·

技術情報

·
411 文字

データ連携基盤におけるPythonの型設計|pydanticとdictの使い分けと渡し方

複数のデータソースをAIに連携する基盤では、型設計が保守性を左右します。本稿では境界にpydanticモデルを置き内部はdictで流す設計を軸に、async環境での渡し方、ジェネリックなオペレーター設計、認証情報・日付の正規化まで実装指針を整理します。

データ連携基盤におけるPythonの型設計|pydanticとdictの使い分けと渡し方

概要

複数のデータソース(DB、API、ファイル、SaaS)をLLMやRAG基盤に連携する際、Pythonでデータをどう定義し、どう渡すかは設計の根幹を成します。型がブレると、変換・検証・エラーハンドリングの責務が各所に散らばり、上流の仕様変更に追従できない保守困難なコードに陥ります。

本稿では、DigitalBaseが社内AI基盤・データ連携の実装で標準としている設計指針を整理します。具体的には、pydantic を中心とした型定義、dict とモデルの使い分け、async環境での渡し方、ジェネリック型によるオペレーター設計です。結論として、入出力境界(API、DB、外部SaaS)にはpydanticモデルを置き、内部処理はdictで流す構成を推奨します。境界で型を確定させ、内部は柔軟に保つことで、データソースの増減や仕様変更に強い基盤を構築できます。


なぜ型定義が必要か

データ連携系のコードでは、運用を続けるうちにほぼ確実に以下の事象が発生します。

  • 上流のSaaSがフィールドをサイレントに変更する(status → state)
  • DBのカラムが追加されるが、Python側は古い前提のまま動作する
  • 文字列で来るはずの数値が、特定条件下でNullになる
  • 日付フォーマットが拠点ごとに異なる

型定義の役割は、境界で必ず形を確定させ、内部では信頼してよい状態を作ることにあります。境界でバリデーションを一度通せば、下流の処理は値の妥当性を都度チェックする必要がなくなり、ロジックを本質に集中させられます。


型定義の3つの選択肢

データの取り回しには、用途に応じて3つの選択肢を使い分けます。

選択肢バリデーションパフォーマンス主な用途
dictなし最速内部の中間データ、動的キー
dataclassなし(静的型のみ)速い内部のドメインモデル
pydantic.BaseModelあり(実行時)やや重いAPI/DB/外部I/Oの境界

実行時バリデーションのコストは外部I/Oの境界でのみ支払い、内部の高頻度処理には持ち込まない、という切り分けが基本方針です。

pydanticモデルの基本形

from datetime import datetime from pydantic import BaseModel, Field, field_validator class CustomerRecord(BaseModel): id: str name: str email: str company: str | None = None created_at: datetime revenue: float = Field(default=0.0, ge=0) @field_validator("email") @classmethod def normalize_email(cls, v: str) -> str: return v.strip().lower()
  • 必須/任意を型で表現:str | None = None で任意フィールドを明示する
  • Field でバリデーション:ge=0、max_length=100 などを宣言的に記述する
  • field_validator で正規化:trim・lowercase などの整形は型側で済ませる

正規化やバリデーションを型に寄せることで、レコードを受け取った下流のコードは整形済みであることを前提にできます。


dictでの取り回し

パイプライン内部でレコードを流す際は、dictが最も柔軟です。理由は次の3点です。

  1. オペレーターを汎用化できる:filter・sort・aggregate は中身のスキーマを知らずに動作する
  2. シリアライズコストが少ない:JSON/DBとの行き来がそのまま行える
  3. 動的フィールド追加が容易:上流で増えたカラムをそのまま下流に流せる
# 入口でpydantic検証 → 内部はdictで流す records: list[dict] = [ CustomerRecord.model_validate(raw).model_dump() for raw in raw_records ] # transform_filter filtered = [r for r in records if r["revenue"] >= 1_000_000] # transform_set for r in filtered: r["tier"] = "enterprise" if r["revenue"] >= 10_000_000 else "standard"

境界で一度 model_validate を通したあとは model_dump() でdict化し、以降のパイプラインは型に縛られず汎用的に処理します。


async環境での渡し方

Generator vs List

大量データを扱う場合は AsyncGenerator でストリーミングします。すべてをリストに溜め込む実装はメモリを圧迫し、件数が増えると破綻します。

from typing import AsyncGenerator import httpx async def fetch_records(api_url: str) -> AsyncGenerator[dict, None]: """ページング対応で1件ずつyieldする。""" async with httpx.AsyncClient() as client: cursor = None while True: params = {"cursor": cursor} if cursor else {} resp = await client.get(api_url, params=params) data = resp.json() for record in data["items"]: yield record cursor = data.get("next_cursor") if not cursor: break

バッチ処理

LLM呼び出しのように、N件をまとめて処理したほうが効率的なケースでは、itertools.batched(Python 3.12以降)でチャンク化します。ストリーミングで受けつつ、必要な単位だけバッファリングするのが要点です。

from itertools import batched async def embed_in_batches(records: AsyncGenerator[dict, None], size: int = 32): buffer: list[dict] = [] async for record in records: buffer.append(record) if len(buffer) >= size: await embed_batch(buffer) buffer.clear() if buffer: await embed_batch(buffer)

ジェネリック型によるオペレーター設計

各オペレーターを「入力型 → 出力型」のジェネリックなインターフェースで揃えると、再利用性とテスト容易性が高まります。

from typing import Any, Generic, TypeVar from pydantic import BaseModel I = TypeVar("I") O = TypeVar("O") class Operator(BaseModel, Generic[I, O]): name: str async def run(self, items: list[I]) -> list[O]: raise NotImplementedError class FilterOp(Operator[dict, dict]): field: str op: str # "eq" | "ne" | "gt" | ... value: Any async def run(self, items: list[dict]) -> list[dict]: return [r for r in items if self._match(r)]

このインターフェースに揃える利点は、カタログ化が可能・接続検証が可能・単体テストが容易という3点に集約されます。オペレーターの入出力型が明示されるため、パイプラインを組む段階で型の不整合を検出でき、実行前に接続の妥当性を確認できます。


認証情報の扱い

外部APIの認証情報は、コードや設定JSONにベタ書きしてはなりません。専用のVaultテーブル(またはシークレットマネージャー)を用意し、オペレーター側はキーのIDだけを参照する設計とします。

class HttpSourceConfig(BaseModel): url: str credential_id: str # vault のID参照のみ async def run_http_source(cfg: HttpSourceConfig) -> AsyncGenerator[dict, None]: cred = await vault.get(cfg.credential_id) # 復号して取得 headers = {"Authorization": f"Bearer {cred.token}"} ...

設定をそのままログやエラーメッセージに出力しても秘匿情報が漏れない構造になり、設定ファイルのバージョン管理やレビューも安全に行えます。


日付・タイムゾーンの正規化

拠点や外部システムごとに異なる日付表現は、境界で吸収します。原則は次の3点です。

  • 境界では必ずタイムゾーン付きdatetimeへ変換し、UTCに揃える
  • 内部はUTCで保持し、表示の直前にローカルタイムゾーンへ変換する
  • DBもタイムゾーン付き型を用いる:PostgreSQLなら TIMESTAMPTZ
from datetime import datetime, UTC def normalize_datetime(s: str) -> datetime: s = s.replace("/", "-") dt = datetime.fromisoformat(s) if dt.tzinfo is None: dt = dt.replace(tzinfo=UTC) return dt.astimezone(UTC)

タイムゾーンの扱いを境界に閉じ込めることで、内部ロジックは常にUTC前提で記述でき、拠点をまたぐ集計や比較のバグを防げます。


まとめ

データ連携基盤におけるPythonの型設計について、本稿の要点は次の3つに集約されます。

  1. 境界で型確定、内部はdict:pydanticは入口・出口に置き、内部はdictで柔軟に流す
  2. async + ジェネレータが基本:大量データはストリーミングで処理し、必要な単位だけバッファリングする
  3. 認証・日付は型側で吸収:オペレーター本体に環境差や秘匿情報を持ち込まない

各オペレーターを薄く保ち、上流の変化を境界で吸収する構造にしておくことで、SaaSのAPI仕様変更や新フィールド追加に強い基盤を構築できます。

DigitalBase データ連携フロー
DigitalBase

社内データを、ネットワーク不要で
“使えるAI”に。

エンタープライズに必要なAI機能を1つに集約した、ライセンス型のオンプレミスLLM基盤。 機密データを外部に出さず、完全オフライン環境で運用できます。

  • ✓ 専用AIチャット / ドキュメントAgent(RAG)
  • ✓ 文字起こし・ベンチマーク測定
  • ✓ 管理者・共有・権限管理機能
無料で試す製品の詳細を見る

資料請求・導入のご相談は お問い合わせ から。

ニュースリリース

最新のお知らせやプレスリリースをご覧いただけます

お知らせ
「AI NATIVE EXPO 2026」(6月10日〜12日 @ 幕張メッセ) に出展いたします
Interop Tokyo 併設の総合展「AI NATIVE EXPO 2026」に出展いたします。社内データを自動連携・加工し、BI・AIエージェントへ繋ぐ一連のフローを展示します。
2026年6月8日
プレスリリースPR TIMES
台湾AIインフラ企業Spingence Technologyと社内データ連携AIプラットフォームを共同開発
4月15日〜17日開催「NexTech Week 2026【春】第10回 AI・人工知能 EXPO」に出展 ~社内データをAIに接続し、業務フローに組み込む企業向けAI基盤~
2026年4月6日
お知らせ
「AI Frontier 2026」にスポンサー出展
AI技術の最前線を発信するカンファレンス「AI Frontier 2026」にスポンサーとして出展いたします。
2026年3月4日
一覧に戻る