Claude APIを使っていて、「ChatGPTみたいに文字がリアルタイムで流れてくるやつ、どうやって実装するんだろう」と思ったことがあって、調べ始めたのがこのシリーズのきっかけです。Claude API × Pythonのストリーミング実装って聞くと難しそうなんですが、公式SDKを使えば思ったよりずっとシンプルでした。
この「Claude API Python 実践入門」シリーズでは、全4回にわたってClaude APIをPythonで使いこなすための基礎から実践まで順番に解説していきます。第1回の今回はストリーミングAPIに絞って、仕組みの理解から基本実装、そして少し踏み込んだ使い方まで一通り見ていきます。
- ストリーミングAPIとは何か(非ストリーミングとの違い)
- 公式Python SDKのセットアップ方法
client.messages.stream()を使ったシンプルな実装- SSEイベントの種類と内部の仕組み
- 非同期(async)ストリーミングの書き方
- よくあるエラーと対処のポイント
ストリーミングAPIって何がうれしいの?
まず前提として、Claude APIには大きく2つのレスポンス取得方法があります。
- 非ストリーミング(デフォルト):モデルが全部生成し終わってから、一括でレスポンスが返ってくる
- ストリーミング:生成されたテキストを少しずつリアルタイムで受け取れる
非ストリーミングだと、長い回答をリクエストした場合、ユーザーはレスポンスが完全に生成されるまで何も表示されない状態で待つことになります。体感的にめちゃくちゃ遅く感じるんですよね。ストリーミングなら最初のトークンが届いた瞬間から表示が始まるので、UX的には圧倒的に違います。
余談ですが、ChatGPTを使い始めてあの「文字が流れてくる感じ」に慣れると、一括返却のAPIをそのままUIに使ったとき「なんか古い感じするな」ってなってしまうんですよね。個人開発でも地味に気になる部分です。
技術的には、ストリーミングはSSE(Server-Sent Events)という仕組みで実現されています。HTTPコネクションを張りっぱなしにして、サーバー側から随時イベントを送り続ける方式です。WebSocketと違って単方向(サーバー→クライアント)なのが特徴で、テキスト生成ユースケースには十分すぎるくらい向いています。
環境セットアップ
必要なもの
- Python 3.9 以上
- Anthropic APIキー(console.anthropic.com で取得)
- anthropic Python SDK
SDKのインストール
インストール自体はシンプルです。通常用途ならこれだけでOK。
# 基本インストール
pip install anthropic
# 非同期パフォーマンスを上げたい場合(aiohttp使用)
pip install anthropic[aiohttp]
AWS BedrockやGoogle Vertex AI経由でClaude APIを使う場合は別途オプションが必要ですが、今回はAnthropicの直接API前提で進めます。
APIキーの設定
APIキーはコードに直書きせず、環境変数で管理するのが定石です。
# .envファイル(リポジトリに含めないこと!)
ANTHROPIC_API_KEY=sk-ant-api03-xxxxxxxxxxxxxxxxxxxx
# Pythonコード側
import os
from anthropic import Anthropic
# 環境変数から自動で読み込まれる(デフォルト動作)
client = Anthropic()
# 明示的に指定したい場合
# client = Anthropic(api_key=os.environ.get("ANTHROPIC_API_KEY"))
Anthropic() はデフォルトで環境変数 ANTHROPIC_API_KEY を参照してくれるので、設定しておけばコード内でごちゃごちゃ書く必要はないです。
基本実装:Claude API PythonでストリーミングをSDKで書く
いちばんシンプルな書き方
公式SDKには client.messages.stream() というストリーミング用の仕組みが用意されています。まずはこれを使うのが一番手軽です。
import anthropic
client = anthropic.Anthropic()
# with文でストリームを開いてテキストを逐次受け取る
with client.messages.stream(
model="claude-3-5-sonnet-latest",
max_tokens=1024,
messages=[
{"role": "user", "content": "Pythonでフィボナッチ数列を求める関数を書いてください。"}
],
) as stream:
for text in stream.text_stream:
# end="" で改行なし、flush=True で即時出力
print(text, end="", flush=True)
print() # 最後に改行
stream.text_stream はイテレータになっていて、テキストの差分(デルタ)が届くたびに値を返してくれます。print(text, end="", flush=True) としているのは、バッファリングされずに即座にターミナルに表示させるためです。これを忘れると、なぜか最後にドバっとまとめて出てくることがあるので要注意。
モデル名について補足しておくと、Anthropic公式には「固定のスナップショットID」と「latest系エイリアス」があります。個人開発でまず動かすだけなら claude-3-5-sonnet-latest みたいなエイリアスが楽で、挙動固定したい場合は claude-3-5-sonnet-20241022 のように日付入りを使う、という住み分けがよさそうです。
最終的なレスポンス全体を取得したい場合
ストリーミングしつつも、処理が終わったあとで「トータルのトークン数は?」「stop_reasonは?」みたいな情報が欲しいことがあります。そういうときは get_final_message() が使えます。
with client.messages.stream(
model="claude-3-5-sonnet-latest",
max_tokens=1024,
messages=[
{"role": "user", "content": "Pythonで素数判定の関数を書いてください。"}
],
) as stream:
# リアルタイムで表示
for text in stream.text_stream:
print(text, end="", flush=True)
# ストリーム終了後に完全なMessageオブジェクトを取得
final_message = stream.get_final_message()
print()
print(f"\n--- 使用トークン ---")
print(f"input: {final_message.usage.input_tokens}")
print(f"output: {final_message.usage.output_tokens}")
print(f"stop_reason: {final_message.stop_reason}")
内部的にはストリーミング受信中にすべてのイベントを積算しておいて、終了後にまとめたMessageオブジェクトを返してくれる仕組みです。「表示はリアルタイムにしたいけど、ログとしてトークン数も記録したい」みたいなケースで便利です。
SSEイベントの仕組みを理解する
SDKが内部でやってくれているとはいえ、仕組みを知っておくと問題が起きたときのデバッグが格段に楽になります。
イベントの流れ
Claude APIのストリーミングレスポンスは、以下のようなSSEイベントの流れになります(代表例)。
event: message_start
data: {"type": "message_start", "message": {"id": "msg_xxx", "type": "message", "role": "assistant", "content": [], "model": "claude-3-5-sonnet-20241022", "stop_reason": null, "usage": {"input_tokens": 25, "output_tokens": 0}}}
event: content_block_start
data: {"type": "content_block_start", "index": 0, "content_block": {"type": "text", "text": ""}}
event: content_block_delta
data: {"type": "content_block_delta", "index": 0, "delta": {"type": "text_delta", "text": "def "}}
...(以降 content_block_delta が繰り返される)
event: content_block_stop
data: {"type": "content_block_stop", "index": 0}
event: message_delta
data: {"type": "message_delta", "delta": {"stop_reason": "end_turn"}, "usage": {"output_tokens": 124}}
event: message_stop
data: {"type": "message_stop"}
各イベントの役割をまとめるとこうなります。
message_start:ストリーム開始。空のcontentを持つMessageオブジェクトcontent_block_start:コンテンツブロック(テキストブロックなど)の開始content_block_delta:テキストの差分。これが繰り返し届くcontent_block_stop:コンテンツブロックの終了message_delta:stop_reasonやusageの更新情報message_stop:ストリーム全体の終了ping:接続維持のためのpingイベント(随時発生することがある)error:ストリーミング中にエラーが起きた場合のイベント
生のイベントをハンドリングしたい場合
特定のイベントだけ拾いたいときは、stream.text_stream の代わりに stream を直接イテレートすることもできます。
with client.messages.stream(
model="claude-3-5-sonnet-latest",
max_tokens=512,
messages=[{"role": "user", "content": "短い詩を書いてください。"}],
) as stream:
for event in stream:
# イベントタイプで分岐
if event.type == "content_block_delta":
# deltaのtypeがtext_deltaのときだけテキストを処理
if event.delta.type == "text_delta":
print(event.delta.text, end="", flush=True)
elif event.type == "message_stop":
print("\n[ストリーム終了]")
通常は text_stream で十分なんですが、ツール使用(Function Calling)のストリーミングや、エラーイベントを細かく拾いたいときはこちらが必要になってきます。
非同期(async)ストリーミングの実装
Python SDKは同期・非同期の両方のストリーミングに対応しています。FastAPIやWebSocketサーバーなど、非同期フレームワークと組み合わせるときはasync版を使います。書き方はほぼ同じで、AsyncAnthropic クライアントに差し替えるだけです。
import asyncio
import anthropic
async def stream_response(prompt: str) -> None:
# AsyncAnthropic クライアントを使う
client = anthropic.AsyncAnthropic()
async with client.messages.stream(
model="claude-3-5-sonnet-latest",
max_tokens=1024,
messages=[{"role": "user", "content": prompt}],
) as stream:
async for text in stream.text_stream:
print(text, end="", flush=True)
print()
# 実行
asyncio.run(stream_response("非同期プログラミングのメリットを教えてください。"))
with → async with、for → async for に変わるだけです。Pythonの非同期に慣れていれば違和感なく書けると思います。
非同期パフォーマンスをより高めたい場合は pip install anthropic[aiohttp] でaiohttp対応版を入れておくと良いです。ただし、入れただけで自動的にaiohttpへ切り替わるわけではなく、クライアント作成時にHTTPバックエンドを指定する必要があります。
余談ですが、FastAPIでSSEをレスポンスとして返す場合は StreamingResponse と組み合わせることになります。自分もここでたまにハマるので、別途まとめたいとは思っています。
よくあるエラーと対処ポイント
APIキーエラー(AuthenticationError)
# エラー例
anthropic.AuthenticationError: {"type":"error","error":{"type":"authentication_error","message":"invalid x-api-key"}}
APIキーが間違っている、または環境変数が読み込まれていないケースです。echo $ANTHROPIC_API_KEY で確認、あるいは python-dotenv を使っている場合は load_dotenv() が呼ばれているか確認してください。
レート制限エラー(RateLimitError)
import time
import anthropic
client = anthropic.Anthropic()
def stream_with_retry(prompt: str, max_retries: int = 3) -> None:
for attempt in range(max_retries):
try:
with client.messages.stream(
model="claude-3-5-sonnet-latest",
max_tokens=1024,
messages=[{"role": "user", "content": prompt}],
) as stream:
for text in stream.text_stream:
print(text, end="", flush=True)
print()
return # 成功したら抜ける
except anthropic.RateLimitError:
wait_time = 2 ** attempt # 指数バックオフ
print(f"\nレート制限。{wait_time}秒後にリトライ...")
time.sleep(wait_time)
print("リトライ上限に達しました")
個人開発の範囲では頻繁に当たるエラーではないですが、処理を大量に回すバッチ系のコードを書くときに遭遇しやすいです。シンプルなリトライロジックを早めに入れておくと後で助かります。
ストリームをwith文の外で使おうとするエラー
これはやりがちなミスで、with ブロックを抜けるとストリームが閉じられるので、外で text_stream を使おうとするとエラーになります。ブロック内でデータを収集しておくか、get_final_message() を活用するのがベターです。
※この記事にはプロモーションが含まれます
ちなみに、PLAUD NOTE(AIボイスレコーダー。録音から文字起こし・要約まで自動で行える)も気になっています。PLAUD NOTE![]()
まとめ
第1回のポイントを整理するとこんな感じです。
- Claude API × Pythonのストリーミングはほぼコピペレベルのコードから始められる
- ストリーミングはSSEベースで、生成されたテキストをリアルタイムで受け取れる仕組み
- SDKの
client.messages.stream()+stream.text_streamで最小限のコードで実装できる - 処理後にメタ情報が欲しい場合は
get_final_message()を使う - SSEイベントは
message_start → content_block_delta(繰り返し)→ message_stopを含む流れで届く - 非同期は
AsyncAnthropicに差し替えるだけでほぼ同じように書ける - エラーハンドリングとリトライは早めに入れておくと後悔しない
自分用メモのつもりで書いてますが、参考になれば嬉しいです!
📚 シリーズ「Claude API Python 実践入門」(第1回 / 全4回)
→ 次回の記事: 【第2回】Claude API Python 実践入門 — プロンプト設計とメッセージ履歴で会話を制御する

