【第2回】Python asyncio 入門 — タスクの並行実行(create_task / gather / TaskGroup)

プログラミング

前回は asyncio の基本的な仕組み――イベントループ・コルーチン・async/await の書き方を紹介しました(第1回はこちら)。「なんとなく動かせる」くらいにはなったものの、「じゃあ複数の処理を同時に走らせるにはどうするの?」というのが次の疑問でした。今回はそこを掘り下げます。

実際に自分が AWS Lambda で複数の API を叩く処理を書いたとき、順番に await するだけだと全然速くならなくて「あれ?」ってなったんですよね。そのあたりの気づきも交えながら書いていきます。

この記事でわかること

  • asyncio.create_task() でタスクを作る方法
  • asyncio.gather() で複数タスクをまとめて実行する方法
  • Python 3.11 から使える asyncio.TaskGroup の書き方と gather との違い
  • asyncio.Semaphore で同時実行数を制限する方法
  • asyncio.wait_for() でタイムアウトを設定する方法

そもそも「並行処理」になってない落とし穴

まずここ、自分もはまったやつです。asyncio を使っていても、書き方によってはぜんぜん並行になりません。

import asyncio

async def fetch(name, delay):
    print(f"{name} 開始")
    await asyncio.sleep(delay)
    print(f"{name} 完了")

async def main():
    # ❌ これは順番に実行されるだけ(並行ではない)
    await fetch("API-A", 2)
    await fetch("API-B", 2)

asyncio.run(main())
# → 合計4秒かかる

await を直列に並べると、前のコルーチンが終わるまで次が始まりません。これは普通の同期処理と実質同じです。Python asyncio で並行処理を実現するには「タスク」として登録する必要があります。

asyncio.create_task() ── タスクを登録する基本

asyncio.create_task() はコルーチンをイベントループに「タスク」として登録し、バックグラウンドで実行を開始させます。返ってくるのは Task オブジェクトで、後から結果を取り出したりキャンセルしたりできます。

import asyncio

async def fetch(name, delay):
    print(f"{name} 開始")
    await asyncio.sleep(delay)
    print(f"{name} 完了")
    return f"{name} の結果"

async def main():
    # ✅ タスクとして登録 → 即座にバックグラウンドで開始される
    task_a = asyncio.create_task(fetch("API-A", 2))
    task_b = asyncio.create_task(fetch("API-B", 2))

    result_a = await task_a  # タスクの完了を待つ
    result_b = await task_b

    print(result_a)
    print(result_b)

asyncio.run(main())
# → 合計約2秒で完了(並行実行!)

create_task() を呼んだ時点でタスクが「スケジュール済み」になるのがポイントです。await task_a は「完了を待つ」だけで、そこで初めて動き出すわけじゃない。このあたりの感覚が最初はつかみにくかったです。

Task オブジェクトの便利メソッド

Task オブジェクトにはいくつかメソッドがあります。よく使うのはこのあたり。

task = asyncio.create_task(fetch("API-A", 2))

task.done()    # 完了済みかどうか(bool)
task.result()  # 結果を取得(完了していない場合は例外)
task.cancel()  # タスクをキャンセルする

余談ですが、task.cancel() を呼ぶとタスク内で CancelledError が発生します。キャンセルされた後に task.result() を呼ぶと CancelledError が上がってくるので注意が必要です。

asyncio.gather() ── 複数タスクをまとめて待つ

asyncio.gather() は複数のコルーチン(またはタスク)を並行実行し、すべての完了を待ってから結果をリストで返してくれます。create_task() を何度も書かなくていいので、まとめて並行実行したいときによく使います。

import asyncio

async def fetch(name, delay):
    await asyncio.sleep(delay)
    return f"{name} の結果"

async def main():
    # コルーチンをそのまま渡せる(内部で自動的にタスク化される)
    results = await asyncio.gather(
        fetch("API-A", 2),
        fetch("API-B", 1),
        fetch("API-C", 3),
    )
    print(results)
    # → ['API-A の結果', 'API-B の結果', 'API-C の結果']
    # ※ 結果の順序は引数の順序と一致する(完了順ではない)

asyncio.run(main())

結果リストの順序は引数に渡した順番と一致します。API-B が一番早く終わっても、結果は [A, B, C] の順番で入ってくる。これ最初わかってなくて変なバグを出した記憶があります。

エラーハンドリング:return_exceptions の挙動

gather() はデフォルト(return_exceptions=False)だと、どれか1つのタスクが例外を投げると即座にその例外が gather の呼び出し元に伝わります。ここ、勘違いしやすいんですが、このとき残りのタスクは基本的にキャンセルされず、そのまま動き続けます(「例外が出たら全停止」ではない)。

async def risky_fetch(name, fail=False):
    await asyncio.sleep(1)
    if fail:
        raise ValueError(f"{name} が失敗しました")
    return f"{name} OK"

async def main():
    try:
        results = await asyncio.gather(
            risky_fetch("A"),
            risky_fetch("B", fail=True),  # これが例外を投げる
            risky_fetch("C"),
        )
    except ValueError as e:
        print(f"エラーキャッチ: {e}")

    # return_exceptions=True にすると例外も結果リストに含まれる
    results = await asyncio.gather(
        risky_fetch("A"),
        risky_fetch("B", fail=True),
        risky_fetch("C"),
        return_exceptions=True,
    )
    for r in results:
        if isinstance(r, Exception):
            print(f"例外: {r}")
        else:
            print(f"成功: {r}")

return_exceptions=True にすると、例外も「結果の一つ」として扱われます。後でループしてまとめて処理したいときに便利です。

asyncio.TaskGroup ── Python 3.11 から使える新しい書き方

Python 3.11 で asyncio.TaskGroup というクラスが追加されました。gather() の代替として公式ドキュメントでも案内されている書き方で、いわゆる「構造化並行(structured concurrency)」っぽく、安全にまとめやすくなっています。

import asyncio

async def fetch(name, delay):
    await asyncio.sleep(delay)
    print(f"{name} 完了")
    return f"{name} の結果"

async def main():
    async with asyncio.TaskGroup() as tg:
        task_a = tg.create_task(fetch("API-A", 2))
        task_b = tg.create_task(fetch("API-B", 1))
        task_c = tg.create_task(fetch("API-C", 3))
    # ← async with を抜けた時点で全タスクが完了している

    print(task_a.result())
    print(task_b.result())
    print(task_c.result())

asyncio.run(main())

async with ブロックを抜けるとき、内部の全タスクが完了するまで自動的に待機します。結果は各 Task オブジェクトから .result() で取り出します。

gather との違い:エラー時の挙動

TaskGroupgather() の最大の違いは、例外が発生したときの挙動です。

  • gather():1つが例外でも、他の awaitable は基本キャンセルされずに動き続ける
  • TaskGroup:1つのタスクが例外を投げると、グループ内の残りのタスクが自動的にキャンセルされる

TaskGroup のほうが「タスクの一部が失敗したら全体を止める」方向に寄っていて、事故りにくい設計になってます。また、複数のタスクが同時に例外を投げた場合は ExceptionGroup にまとめられます(これも Python 3.11 で追加された機能)。

async def main():
    try:
        async with asyncio.TaskGroup() as tg:
            tg.create_task(fetch("A", 1))
            tg.create_task(bad_task())   # 例外を投げるタスク
            tg.create_task(fetch("C", 3))
    except* ValueError as eg:
        # except* は ExceptionGroup を処理する構文(Python 3.11+)
        for e in eg.exceptions:
            print(f"キャッチ: {e}")

except* という構文も 3.11 からで、ExceptionGroup 内の個別の例外を型でフィルタして処理できます。個人的にはこの組み合わせがかなりスッキリ書けて好きです。

asyncio.Semaphore ── 同時実行数を制限する

100件の API リクエストを全部同時に飛ばすと、サーバーに怒られたりレート制限に引っかかったりします。そういうときに使うのが asyncio.Semaphore です。

セマフォは「同時にここを通れるタスクの数」を制限するための仕組みです。asyncio.Semaphore(3) なら、最大3つのタスクだけが同時に実行できます。

import asyncio

async def fetch_with_limit(sem, name):
    async with sem:  # セマフォを取得(空きがなければここで待機)
        print(f"{name} 実行中")
        await asyncio.sleep(1)
        print(f"{name} 完了")
        return f"{name} の結果"

async def main():
    sem = asyncio.Semaphore(3)  # 同時実行数を3に制限

    tasks = [fetch_with_limit(sem, f"タスク{i}") for i in range(10)]
    results = await asyncio.gather(*tasks)
    print(results)

asyncio.run(main())
# → 3つずつ実行される。10タスクが約4秒で完了(3+3+3+1)

async with sem: と書くだけで、セマフォの取得と解放を自動でやってくれます。コンテキストマネージャとして使うのが推奨されています。

そういえば最近、自分が作った LLM のバッチ処理でも Semaphore を使いました。Claude API を呼び出す処理を一気に100件並行で走らせたらレート制限エラーが連発して、Semaphore で5並行に抑えたら安定しました。実用性高いです。

asyncio.wait_for() ── タイムアウトを設定する

外部 API を叩くとき、応答が返ってこないままずっと待ち続けるのは困ります。そういうときは asyncio.wait_for() でタイムアウトを設定できます。

import asyncio

async def slow_api():
    print("API 呼び出し開始")
    await asyncio.sleep(10)  # 遅い処理
    return "レスポンス"

async def main():
    try:
        result = await asyncio.wait_for(slow_api(), timeout=3.0)
        print(result)
    except TimeoutError:
        print("タイムアウト!3秒以内に応答がありませんでした")

asyncio.run(main())

タイムアウトが発生すると、実行中のタスクは自動的にキャンセルされ、TimeoutError が発生します。try/except でキャッチして適切に処理しましょう。

Python 3.11 以降は asyncio.timeout() も使える

Python 3.11 からは asyncio.timeout() というコンテキストマネージャも追加されました。

async def main():
    try:
        async with asyncio.timeout(3.0):
            result = await slow_api()
            print(result)
    except TimeoutError:
        print("タイムアウト!")

wait_for() との違いは書き方だけでなく、asyncio.timeout() は作った後に reschedule() で締切を変更できたり、タイムアウトしたかどうかを確認できたりします。既存の処理を大きく崩さずに「この範囲だけ制限時間つけたい」みたいなときに便利そうです。シンプルに1発だけタイムアウトしたいなら wait_for() が直感的かも。

まとめ:Python asyncio タスクと並行処理の実践ポイント

今回扱ったタスクと並行処理の要点を整理します。

  • await を直列に並べるだけでは並行にならない。タスクとして登録することが必要
  • asyncio.create_task():コルーチンをタスクとして登録し、バックグラウンドで実行を開始する
  • asyncio.gather():複数タスクをまとめて並行実行し、全完了を待つ。結果は引数の順序で返る
  • asyncio.TaskGroup(Python 3.11+):より強い安全性。タスクが失敗すると残りを自動キャンセルし、例外は ExceptionGroup になり得る
  • asyncio.Semaphore:同時実行数を制限する。API レート制限対策などに有効
  • asyncio.wait_for():タイムアウトを設定し、超えたらタスクをキャンセルして TimeoutError

Python 3.11+ を使っているなら TaskGroup を積極的に使っていくのがよさそうです。gather() も引き続き使えますが、「1個失敗したら他も止めたい」みたいな本番寄りのケースでは TaskGroup のほうがハマりやすいと思います。

(おまけ)この記事を書きながら、gather() の「例外が出ても他が走り続ける」仕様をまた忘れかけてました。油断すると普通に事故るので、ここはメモとして太字にしておきたい。

タイトルとURLをコピーしました