前回は asyncio の基本的な仕組み――イベントループ・コルーチン・async/await の書き方を紹介しました(第1回はこちら)。「なんとなく動かせる」くらいにはなったものの、「じゃあ複数の処理を同時に走らせるにはどうするの?」というのが次の疑問でした。今回はそこを掘り下げます。
実際に自分が AWS Lambda で複数の API を叩く処理を書いたとき、順番に await するだけだと全然速くならなくて「あれ?」ってなったんですよね。そのあたりの気づきも交えながら書いていきます。
この記事でわかること
asyncio.create_task()でタスクを作る方法asyncio.gather()で複数タスクをまとめて実行する方法- Python 3.11 から使える
asyncio.TaskGroupの書き方と gather との違い asyncio.Semaphoreで同時実行数を制限する方法asyncio.wait_for()でタイムアウトを設定する方法
そもそも「並行処理」になってない落とし穴
まずここ、自分もはまったやつです。asyncio を使っていても、書き方によってはぜんぜん並行になりません。
import asyncio
async def fetch(name, delay):
print(f"{name} 開始")
await asyncio.sleep(delay)
print(f"{name} 完了")
async def main():
# ❌ これは順番に実行されるだけ(並行ではない)
await fetch("API-A", 2)
await fetch("API-B", 2)
asyncio.run(main())
# → 合計4秒かかる
await を直列に並べると、前のコルーチンが終わるまで次が始まりません。これは普通の同期処理と実質同じです。Python asyncio で並行処理を実現するには「タスク」として登録する必要があります。
asyncio.create_task() ── タスクを登録する基本
asyncio.create_task() はコルーチンをイベントループに「タスク」として登録し、バックグラウンドで実行を開始させます。返ってくるのは Task オブジェクトで、後から結果を取り出したりキャンセルしたりできます。
import asyncio
async def fetch(name, delay):
print(f"{name} 開始")
await asyncio.sleep(delay)
print(f"{name} 完了")
return f"{name} の結果"
async def main():
# ✅ タスクとして登録 → 即座にバックグラウンドで開始される
task_a = asyncio.create_task(fetch("API-A", 2))
task_b = asyncio.create_task(fetch("API-B", 2))
result_a = await task_a # タスクの完了を待つ
result_b = await task_b
print(result_a)
print(result_b)
asyncio.run(main())
# → 合計約2秒で完了(並行実行!)
create_task() を呼んだ時点でタスクが「スケジュール済み」になるのがポイントです。await task_a は「完了を待つ」だけで、そこで初めて動き出すわけじゃない。このあたりの感覚が最初はつかみにくかったです。
Task オブジェクトの便利メソッド
Task オブジェクトにはいくつかメソッドがあります。よく使うのはこのあたり。
task = asyncio.create_task(fetch("API-A", 2))
task.done() # 完了済みかどうか(bool)
task.result() # 結果を取得(完了していない場合は例外)
task.cancel() # タスクをキャンセルする
余談ですが、task.cancel() を呼ぶとタスク内で CancelledError が発生します。キャンセルされた後に task.result() を呼ぶと CancelledError が上がってくるので注意が必要です。
asyncio.gather() ── 複数タスクをまとめて待つ
asyncio.gather() は複数のコルーチン(またはタスク)を並行実行し、すべての完了を待ってから結果をリストで返してくれます。create_task() を何度も書かなくていいので、まとめて並行実行したいときによく使います。
import asyncio
async def fetch(name, delay):
await asyncio.sleep(delay)
return f"{name} の結果"
async def main():
# コルーチンをそのまま渡せる(内部で自動的にタスク化される)
results = await asyncio.gather(
fetch("API-A", 2),
fetch("API-B", 1),
fetch("API-C", 3),
)
print(results)
# → ['API-A の結果', 'API-B の結果', 'API-C の結果']
# ※ 結果の順序は引数の順序と一致する(完了順ではない)
asyncio.run(main())
結果リストの順序は引数に渡した順番と一致します。API-B が一番早く終わっても、結果は [A, B, C] の順番で入ってくる。これ最初わかってなくて変なバグを出した記憶があります。
エラーハンドリング:return_exceptions の挙動
gather() はデフォルト(return_exceptions=False)だと、どれか1つのタスクが例外を投げると即座にその例外が gather の呼び出し元に伝わります。ここ、勘違いしやすいんですが、このとき残りのタスクは基本的にキャンセルされず、そのまま動き続けます(「例外が出たら全停止」ではない)。
async def risky_fetch(name, fail=False):
await asyncio.sleep(1)
if fail:
raise ValueError(f"{name} が失敗しました")
return f"{name} OK"
async def main():
try:
results = await asyncio.gather(
risky_fetch("A"),
risky_fetch("B", fail=True), # これが例外を投げる
risky_fetch("C"),
)
except ValueError as e:
print(f"エラーキャッチ: {e}")
# return_exceptions=True にすると例外も結果リストに含まれる
results = await asyncio.gather(
risky_fetch("A"),
risky_fetch("B", fail=True),
risky_fetch("C"),
return_exceptions=True,
)
for r in results:
if isinstance(r, Exception):
print(f"例外: {r}")
else:
print(f"成功: {r}")
return_exceptions=True にすると、例外も「結果の一つ」として扱われます。後でループしてまとめて処理したいときに便利です。
asyncio.TaskGroup ── Python 3.11 から使える新しい書き方
Python 3.11 で asyncio.TaskGroup というクラスが追加されました。gather() の代替として公式ドキュメントでも案内されている書き方で、いわゆる「構造化並行(structured concurrency)」っぽく、安全にまとめやすくなっています。
import asyncio
async def fetch(name, delay):
await asyncio.sleep(delay)
print(f"{name} 完了")
return f"{name} の結果"
async def main():
async with asyncio.TaskGroup() as tg:
task_a = tg.create_task(fetch("API-A", 2))
task_b = tg.create_task(fetch("API-B", 1))
task_c = tg.create_task(fetch("API-C", 3))
# ← async with を抜けた時点で全タスクが完了している
print(task_a.result())
print(task_b.result())
print(task_c.result())
asyncio.run(main())
async with ブロックを抜けるとき、内部の全タスクが完了するまで自動的に待機します。結果は各 Task オブジェクトから .result() で取り出します。
gather との違い:エラー時の挙動
TaskGroup と gather() の最大の違いは、例外が発生したときの挙動です。
- gather():1つが例外でも、他の awaitable は基本キャンセルされずに動き続ける
- TaskGroup:1つのタスクが例外を投げると、グループ内の残りのタスクが自動的にキャンセルされる
TaskGroup のほうが「タスクの一部が失敗したら全体を止める」方向に寄っていて、事故りにくい設計になってます。また、複数のタスクが同時に例外を投げた場合は ExceptionGroup にまとめられます(これも Python 3.11 で追加された機能)。
async def main():
try:
async with asyncio.TaskGroup() as tg:
tg.create_task(fetch("A", 1))
tg.create_task(bad_task()) # 例外を投げるタスク
tg.create_task(fetch("C", 3))
except* ValueError as eg:
# except* は ExceptionGroup を処理する構文(Python 3.11+)
for e in eg.exceptions:
print(f"キャッチ: {e}")
except* という構文も 3.11 からで、ExceptionGroup 内の個別の例外を型でフィルタして処理できます。個人的にはこの組み合わせがかなりスッキリ書けて好きです。
asyncio.Semaphore ── 同時実行数を制限する
100件の API リクエストを全部同時に飛ばすと、サーバーに怒られたりレート制限に引っかかったりします。そういうときに使うのが asyncio.Semaphore です。
セマフォは「同時にここを通れるタスクの数」を制限するための仕組みです。asyncio.Semaphore(3) なら、最大3つのタスクだけが同時に実行できます。
import asyncio
async def fetch_with_limit(sem, name):
async with sem: # セマフォを取得(空きがなければここで待機)
print(f"{name} 実行中")
await asyncio.sleep(1)
print(f"{name} 完了")
return f"{name} の結果"
async def main():
sem = asyncio.Semaphore(3) # 同時実行数を3に制限
tasks = [fetch_with_limit(sem, f"タスク{i}") for i in range(10)]
results = await asyncio.gather(*tasks)
print(results)
asyncio.run(main())
# → 3つずつ実行される。10タスクが約4秒で完了(3+3+3+1)
async with sem: と書くだけで、セマフォの取得と解放を自動でやってくれます。コンテキストマネージャとして使うのが推奨されています。
そういえば最近、自分が作った LLM のバッチ処理でも Semaphore を使いました。Claude API を呼び出す処理を一気に100件並行で走らせたらレート制限エラーが連発して、Semaphore で5並行に抑えたら安定しました。実用性高いです。
asyncio.wait_for() ── タイムアウトを設定する
外部 API を叩くとき、応答が返ってこないままずっと待ち続けるのは困ります。そういうときは asyncio.wait_for() でタイムアウトを設定できます。
import asyncio
async def slow_api():
print("API 呼び出し開始")
await asyncio.sleep(10) # 遅い処理
return "レスポンス"
async def main():
try:
result = await asyncio.wait_for(slow_api(), timeout=3.0)
print(result)
except TimeoutError:
print("タイムアウト!3秒以内に応答がありませんでした")
asyncio.run(main())
タイムアウトが発生すると、実行中のタスクは自動的にキャンセルされ、TimeoutError が発生します。try/except でキャッチして適切に処理しましょう。
Python 3.11 以降は asyncio.timeout() も使える
Python 3.11 からは asyncio.timeout() というコンテキストマネージャも追加されました。
async def main():
try:
async with asyncio.timeout(3.0):
result = await slow_api()
print(result)
except TimeoutError:
print("タイムアウト!")
wait_for() との違いは書き方だけでなく、asyncio.timeout() は作った後に reschedule() で締切を変更できたり、タイムアウトしたかどうかを確認できたりします。既存の処理を大きく崩さずに「この範囲だけ制限時間つけたい」みたいなときに便利そうです。シンプルに1発だけタイムアウトしたいなら wait_for() が直感的かも。
まとめ:Python asyncio タスクと並行処理の実践ポイント
今回扱ったタスクと並行処理の要点を整理します。
- await を直列に並べるだけでは並行にならない。タスクとして登録することが必要
asyncio.create_task():コルーチンをタスクとして登録し、バックグラウンドで実行を開始するasyncio.gather():複数タスクをまとめて並行実行し、全完了を待つ。結果は引数の順序で返るasyncio.TaskGroup(Python 3.11+):より強い安全性。タスクが失敗すると残りを自動キャンセルし、例外はExceptionGroupになり得るasyncio.Semaphore:同時実行数を制限する。API レート制限対策などに有効asyncio.wait_for():タイムアウトを設定し、超えたらタスクをキャンセルしてTimeoutError
Python 3.11+ を使っているなら TaskGroup を積極的に使っていくのがよさそうです。gather() も引き続き使えますが、「1個失敗したら他も止めたい」みたいな本番寄りのケースでは TaskGroup のほうがハマりやすいと思います。
(おまけ)この記事を書きながら、gather() の「例外が出ても他が走り続ける」仕様をまた忘れかけてました。油断すると普通に事故るので、ここはメモとして太字にしておきたい。
📚 シリーズ「Python asyncio 非同期処理入門」(第2回 / 全4回)
← 前回の記事: 前回の記事はこちら
→ 次回の記事: 【第3回】Python asyncio 入門 — 非同期HTTPリクエスト(aiohttp実践編)

