【第4回】Python asyncio 入門 — 非同期処理のよくある落とし穴とデバッグTips

プログラミング

前回は aiohttp を使った非同期HTTPリクエストの実践を紹介しました。複数URLへの同時リクエストがいかに速くなるか、体感できた方も多いんじゃないかと思います。

今回は、少し毛色を変えて「asyncio を使っていてハマりやすいポイント」と「デバッグをラクにするTips」をまとめていきます。実際に自分が詰まった経験も交えつつ書いていくので、同じところで躓いている方の参考になれば。

  • asyncio でよくある落とし穴(await忘れ・ブロッキング処理・例外の取りこぼし)
  • 例外ハンドリング・エラーハンドリングの正しい書き方(gather / TaskGroup / except*)
  • asyncio デバッグモードの使い方
  • タスクのキャンセルとクリーンアップ
  • 全体のまとめ・チェックリスト

落とし穴① await を書き忘れる

asyncio 入門者が最初にやらかすのが、これです。コルーチン関数を呼び出しているのに await を付け忘れるパターン。

import asyncio

async def fetch():
    await asyncio.sleep(1)
    return "データ取得完了"

async def main():
    # NG: awaitを忘れるとコルーチンオブジェクトが返るだけ
    result = fetch()
    print(result)  # <coroutine object fetch at 0x...> と出る

    # OK: ちゃんとawaitする
    result = await fetch()
    print(result)  # "データ取得完了"

asyncio.run(main())

await なしで呼ぶと、関数が実行されず「コルーチンオブジェクト」が変数に入っているだけになります。しかも黙ってそのまま進んでしまうので、エラーにならずに気づきにくい。

後述するデバッグモードを有効にすると、awaitされていないコルーチンを検出して警告を出してくれます。「なんかおかしいな」と思ったらまずこれを有効にするのがおすすめです。

落とし穴② イベントループをブロッキング処理で止めてしまう

asyncio の非同期処理は「ある処理が待機している間に別の処理を進める」という仕組みです。なのでイベントループ上で重いブロッキング処理を直接実行してしまうと、その間ループが完全に止まります。

import asyncio
import httpx  # 同期HTTPクライアント(例として)

async def bad_fetch(url):
    # NG: 同期的なHTTPリクエストをasync関数内で直接呼ぶ
    response = httpx.get(url)  # イベントループがここで止まる!
    return response.text

async def good_fetch(url):
    # OK: run_in_executor でスレッドプールに逃がす
    loop = asyncio.get_running_loop()
    response = await loop.run_in_executor(
        None, httpx.get, url
    )
    return response.text

特にファイルI/OやDBアクセスが絡むときに起きがちです。対策としては、非同期対応ライブラリ(aiohttpaiofilesasyncpg など)を使うか、それが無理な場合は loop.run_in_executor() でスレッドプールに処理を逃がすのが基本になります。

余談ですが、自分は最初 requests を asyncio コードの中で普通に使っていて、「なんで並列にならないんだろう…」と小一時間悩んだことがあります。

落とし穴③ タスクの例外を取りこぼす

これが個人的に一番ハマったやつです。asyncio.create_task() でタスクを作って、そのタスクに例外が発生しても、誰も await していなければ例外が見えにくくなります。

import asyncio

async def buggy():
    await asyncio.sleep(0.1)
    raise ValueError("エラー発生!")

async def main():
    # NG: タスクを作ったまま放置
    asyncio.create_task(buggy())
    await asyncio.sleep(1)

asyncio.run(main())

「完全に握りつぶされて何も起きない」というより、タスクの例外が await(や task.result()/task.exception())で回収されない場合、基本的には Task exception was never retrieved 的なログが出ます(出力先やタイミングは環境やログ設定次第で見落としがち)。

本番環境だと「アプリは動いてそうに見えるけど、裏でエラー出てた」になりやすいので怖い挙動です。

対策としては、タスクを必ず await するか、add_done_callback でエラーを検知する方法があります。

import asyncio
import logging

async def buggy():
    await asyncio.sleep(0.1)
    raise ValueError("エラー発生!")

def handle_task_result(task: asyncio.Task):
    """タスク完了時にエラーがあればログに出す"""
    try:
        task.result()  # 例外があればここで再送出される
    except asyncio.CancelledError:
        # キャンセルは想定内なら無視/情報ログでもOK
        return
    except Exception:
        logging.exception("タスクで例外が発生しました")

async def main():
    task = asyncio.create_task(buggy())
    task.add_done_callback(handle_task_result)
    await asyncio.sleep(1)

asyncio.run(main())

タスクを「投げっぱなし」にすると、エラーの所在が一気に追いにくくなるんですよね。自分もこれで何度かやらかしました…。

Python asyncio のエラーハンドリング実践:gather vs TaskGroup

asyncio.gather() + return_exceptions

複数タスクをまとめて実行するとき、asyncio.gather() がよく使われます。return_exceptions=True を使うと、全タスクを最後まで走らせた上で結果一覧に例外オブジェクトを混ぜて返してくれます。

import asyncio

async def fetch(name, should_fail=False):
    await asyncio.sleep(0.5)
    if should_fail:
        raise RuntimeError(f"{name} でエラー")
    return f"{name} 成功"

async def main():
    results = await asyncio.gather(
        fetch("タスクA"),
        fetch("タスクB", should_fail=True),
        fetch("タスクC"),
        return_exceptions=True  # 例外もリストに含めて返す
    )
    for r in results:
        if isinstance(r, Exception):
            print(f"エラー: {r}")
        else:
            print(f"結果: {r}")

asyncio.run(main())
# 結果: タスクA 成功
# エラー: タスクB でエラー
# 結果: タスクC 成功

「全タスクを実行し終えてから個別にエラー処理したい」ときに便利です。ただし例外を受け取った後の処理はすべて自分で書く必要があるので、少し冗長になりがちです。

なお gather() の挙動については、「1つ落ちたら残りが即キャンセルされる」と断定するより、「ケースや実装によってはキャンセルが間に合わないこともある」くらいの理解が安全かなと。構造化並行性としての”強い保証”は後述の TaskGroup 側にあります。

Python 3.11+ の TaskGroup と except*

Python 3.11 で追加された asyncio.TaskGroup は、構造化並行性を提供する仕組みです。グループ内のどれかのタスクが例外を投げると、残りのタスクが自動でキャンセルされ、(複数あれば)まとめた ExceptionGroup として送出されます。この予測しやすい挙動が gather() との大きな違いです。

import asyncio

async def fetch(name, should_fail=False):
    await asyncio.sleep(0.5)
    if should_fail:
        raise ValueError(f"{name} でエラー")
    return f"{name} 成功"

async def main():
    try:
        async with asyncio.TaskGroup() as tg:
            t1 = tg.create_task(fetch("タスクA"))
            tg.create_task(fetch("タスクB", should_fail=True))
            tg.create_task(fetch("タスクC"))
    except* ValueError as eg:
        # except* はExceptionGroupの個別例外を処理する構文(Python 3.11+)
        for exc in eg.exceptions:
            print(f"ValueErrorを検知: {exc}")

    # 成功したタスクの結果は個別に取得(他タスクが失敗するとキャンセルされ得る点は注意)
    if t1.done() and not t1.cancelled() and t1.exception() is None:
        print(t1.result())

asyncio.run(main())

except* は Python 3.11 で同時に追加された構文で、ExceptionGroup(複数例外をまとめたオブジェクト)を型別に捌けます。複数タスクが異なる例外を投げたとき、種類別にエラーハンドリングを分けられるのが便利です。

個人的には、「タスクのどれかが失敗したら全体を止めたい」という用途では TaskGroup のほうがシンプルに書けると感じています。一方で「全部実行しきってから結果をまとめたい」なら gather(return_exceptions=True) の出番かなと。

asyncio デバッグモードの使い方

asyncio にはデバッグモードがあって、有効にすると普段は見えない問題を検出してくれます。開発中はオンにしておくと助かる場面が多いです。

デバッグモードを有効にする方法

デバッグモードを有効にする方法はいくつかあります。環境変数 PYTHONASYNCIODEBUG1 に設定する、Python 開発モード(-X dev)で実行する、asyncio.run()debug=True を渡す、あるいは loop.set_debug() を呼ぶ、といった方法があります。

# 方法1: asyncio.run() に debug=True を渡す
asyncio.run(main(), debug=True)

# 方法2: 環境変数で指定
# export PYTHONASYNCIODEBUG=1

# 方法3: Pythonの開発モードで実行
# python -X dev your_script.py

デバッグモードが有効なとき、asyncio は await されていないコルーチンなどを検出してログに記録します。「await 忘れ」の落とし穴にはまる可能性を軽減してくれますし、イベントループが一定時間以上ブロックされた場合の警告なども出やすくなります。

ロギングと組み合わせる

import asyncio
import logging

# asyncio ロガーを DEBUG レベルにしておくと詳細が出る
logging.basicConfig(level=logging.DEBUG)

async def main():
    await asyncio.sleep(0.1)

asyncio.run(main(), debug=True)

最初は出力が多くて面食らいますが、慣れると「あ、ここが問題か」というのが見つけやすくなります。本番環境では外しておくのが無難ですが、開発中はかなり頼りになります。

タスクのキャンセルとクリーンアップ

CancelledError は握りつぶさない

タスクをキャンセルすると asyncio.CancelledError が発生します。これを雑に except Exception でまとめて握りつぶしてしまうと、キャンセルが正常に伝播されなくなるので注意が必要です。

import asyncio

async def my_task():
    try:
        await asyncio.sleep(10)
    except asyncio.CancelledError:
        # キャンセル時のクリーンアップ処理(ファイルを閉じる等)
        print("クリーンアップ実行")
        raise  # 必ずraiseして伝播させる!

    # NG: Exception でまとめて握りつぶすとキャンセルが効かなくなる
    # except Exception:
    #     pass

CancelledError をキャッチしてクリーンアップを実行するのは問題ありませんが、その後は必ず raise して呼び出し元に伝播させる必要があります。これを忘れると、タスクがキャンセルできない状態になります。

asyncio.timeout() で時間制限をつける(Python 3.11+)

Python 3.11 から asyncio.timeout() が追加されて、タイムアウト処理がかなり書きやすくなりました。

import asyncio

async def slow_task():
    await asyncio.sleep(10)
    return "完了"

async def main():
    try:
        async with asyncio.timeout(3):  # 3秒でタイムアウト
            result = await slow_task()
    except TimeoutError:
        print("タイムアウトしました")

asyncio.run(main())

構造化ブロック内の複数の await 処理をまとめてタイムアウト対象にできる点が便利です。タイムアウトが起きた場合は、そのブロック内で発生した CancelledErrortimeout() が吸収して TimeoutError に変換してくれるので、TimeoutError はブロックの外側でキャッチします(ここ、地味に大事ポイントです)。

以前は asyncio.wait_for() を使うことが多かったですが、コンテキストマネージャ形式で書けるこちらのほうがすっきりします。

まとめ:Python asyncio エラーハンドリング 実践チェックリスト

全4回にわたって asyncio の基礎から実践まで書いてきました。今回は落とし穴とデバッグ・エラーハンドリングに絞った内容でしたが、改めてポイントを整理すると:

  • await 忘れ:デバッグモードで検出できる。開発中はオンにしておくと安心
  • ブロッキング処理:非同期ライブラリを使うか run_in_executor() で逃がす
  • 例外の取りこぼし:放置タスクは例外が見落とされやすい。await / add_done_callback / TaskGroup で回収する
  • gather vs TaskGroup:「全部実行→まとめて処理」なら gather、「1つ失敗で全停止」なら TaskGroup が書きやすい
  • CancelledError:キャッチしてもいいが必ず raise して伝播させる
  • タイムアウト:Python 3.11+ なら asyncio.timeout() が使いやすい

非同期処理は概念を掴むまでが大変ですが、落とし穴を知っておくだけでデバッグにかかる時間がかなり短くなります。自分もまだ「あれ、なんで動かないんだろう」となることはありますが、このあたりを確認するクセをつけてから随分マシになりました。

シリーズを通して読んでくれた方、ありがとうございました。少しでも参考になれば嬉しいです!

非同期HTTPリクエスト(aiohttp実践編)

タイトルとURLをコピーしました