【第3回】Python asyncio 入門 — 非同期HTTPリクエスト(aiohttp実践編)

プログラミング

前回は asyncio.gather()asyncio.wait() を使ったタスクの並列実行パターンを紹介しました(前回記事はこちら)。「並列で動かせる雰囲気はわかった、でも実際に何に使うの?」という感じだったと思うので、今回はいよいよ実務でいちばん頻出な非同期HTTPリクエストをテーマにします。

きっかけは仕事でもないただの個人プロジェクトで、複数のWeb APIを叩いてデータを集約する処理を書いたとき。同期で書いたら遅くて「これは違う…」となり、aiohttp を本腰入れて調べることになりました。せっかくなのでまとめておきます。

余談ですが、aiohttpって名前、最初は「あいおーえいちてぃーてぃーぴー」って読んでました。普通に「エーアイオーエイチティーティーピー」ですね。

📌 この記事でわかること

  • aiohttp の基本的な使い方(インストール〜GETリクエスト)
  • ClientSession を正しく使うポイント
  • 複数URLへの並列リクエストと asyncio.gather() の組み合わせ
  • タイムアウト・エラーハンドリングの実装方法
  • セマフォで同時接続数を制限するテクニック
  • 同期の requests との違いと使い分け

aiohttp とは? — requests と何が違うの

Pythonで HTTP リクエストといえば requests ライブラリが鉄板ですが、requests は同期処理です。1件リクエストして、レスポンスが返ってくるまで次の処理に進めない。直感的でわかりやすい反面、大量のリクエストが必要な場面では遅くなりがちです。

一方、aiohttpasyncio ベースの非同期 HTTP クライアント(+サーバー)ライブラリです。リクエストを投げている間にイベントループが別のタスクを処理できるので、I/O 待ちが多い処理では大幅に速くなります。

2026年2月時点の最新バージョンは 3.13.3(2026年1月3日リリース)です。新しめの Python / aiohttp を使うほどパフォーマンス改善の恩恵を受けやすいことが多いので、バージョンはなるべく新しいものを使うのがおすすめです。

インストール

pip install aiohttp

特別な依存関係はないのでサクッと入ります。仮想環境の中で実行してください。

まず1件だけ GET リクエストしてみる

最小構成のコードから確認します。

import aiohttp
import asyncio

async def main():
    async with aiohttp.ClientSession() as session:
        async with session.get("https://httpbin.org/get") as response:
            print("Status:", response.status)
            data = await response.json()
            print("Origin IP:", data["origin"])

asyncio.run(main())

ポイントを整理すると:

  • aiohttp.ClientSession()async with で開いて、処理が終わったら自動でクローズ
  • session.get(URL)async with で受け取る。レスポンスオブジェクトはここで手に入る
  • ボディの取得(response.text()response.json())は await が必要。ここを忘れると中身が取れないので注意です

なお、response.status はプロパティなので await 不要。地味に最初混乱するところ。

POST リクエストも同じノリで

async def post_example():
    payload = {"name": "nyanchu", "message": "hello"}

    async with aiohttp.ClientSession() as session:
        async with session.post(
            "https://httpbin.org/post",
            json=payload  # json=で自動的にContent-Typeもapplication/jsonになる
        ) as response:
            result = await response.json()
            print(result["json"])  # 送ったデータが返ってくる

asyncio.run(post_example())

json= キーワード引数に辞書を渡せばシリアライズと Content-Type 設定を自動でやってくれます。requests の使い勝手に近いので移行しやすいと思います。

複数 URL を並列リクエストする — asyncio.gather() との組み合わせ

ここが非同期 HTTP リクエストのいちばんの旨みです。前回学んだ asyncio.gather() をフル活用します。

基本パターン

import aiohttp
import asyncio
import time

async def fetch(session: aiohttp.ClientSession, url: str) -> dict:
    """1つのURLにGETリクエストを投げてJSONを返す"""
    async with session.get(url) as response:
        return await response.json()

async def main():
    urls = [
        "https://httpbin.org/get?id=1",
        "https://httpbin.org/get?id=2",
        "https://httpbin.org/get?id=3",
        "https://httpbin.org/get?id=4",
        "https://httpbin.org/get?id=5",
    ]

    start = time.time()

    # ClientSession は1つ作り回す。リクエストごとに作るのはNG
    async with aiohttp.ClientSession() as session:
        tasks = [fetch(session, url) for url in urls]
        results = await asyncio.gather(*tasks)

    elapsed = time.time() - start
    print(f"{len(results)}件取得完了 / {elapsed:.2f}秒")

asyncio.run(main())

重要なのが「ClientSession はループの外で1つだけ作る」という点です。リクエストのたびに ClientSession() を生成するコードをたまに見かけますが、接続プールが使い回されず、パフォーマンスが落ちるしリソースの無駄にもなります。1つのセッションを使い回して、最後にきちんと閉じる、これだけ守ればOKです。

return_exceptions=True で1件失敗しても止まらないようにする

results = await asyncio.gather(*tasks, return_exceptions=True)

for url, result in zip(urls, results):
    if isinstance(result, Exception):
        print(f"[ERROR] {url}: {result}")
    else:
        print(f"[OK] {url}: 取得成功")

return_exceptions=True を指定しない場合、gather() は最初に発生した例外を呼び出し元にすぐ投げ返します(他のタスク自体が即キャンセルされるわけではなく、裏では動き続ける挙動になります)。「最後まで回収して、失敗したやつだけ例外として受け取りたい」なら return_exceptions=True が便利です。

複数件処理するときは基本的にこのオプションを付けておくのが無難です(前回も触れましたが、実際に使うと大事さが身に染みます)。

タイムアウトとエラーハンドリング

実際のAPIを叩くなら、タイムアウトとエラー処理は必須です。ここを雑にするとフリーズしたり、謎のクラッシュが起きたりします。

ClientTimeout でタイムアウトを設定する

import aiohttp
import asyncio

async def fetch_with_timeout(url: str) -> str | None:
    # 全体30秒、接続確立10秒、読み込み10秒
    timeout = aiohttp.ClientTimeout(
        total=30,
        connect=10,
        sock_read=10,
    )

    try:
        async with aiohttp.ClientSession(timeout=timeout) as session:
            async with session.get(url) as response:
                response.raise_for_status()  # 4xx/5xxで例外を発生させる
                return await response.text()

    except aiohttp.ClientResponseError as e:
        print(f"HTTPエラー: {e.status} {e.message}")
    except aiohttp.ClientConnectorError:
        print("接続エラー: サーバーに繋がりませんでした")
    except asyncio.TimeoutError:
        print("タイムアウト: レスポンスが遅すぎます")
    except aiohttp.ClientError as e:
        print(f"その他のクライアントエラー: {e}")

    return None

ClientTimeout の主なパラメータは total(リクエスト全体の上限時間)、connect(接続タイムアウト)、sock_read(読み込みタイムアウト)の3つです。

response.raise_for_status() はステータスコードが 4xx/5xx のときに ClientResponseError を投げてくれます。これを入れておくとエラー時に黙って None を返すようなバグを防げます。個人的にはほぼ毎回入れるようにしています。

セマフォで同時接続数を制限する

100件・1000件のリクエストを全部一気に投げると、相手サーバーへの負荷や自分のリソース枯渇が問題になります。そこで asyncio.Semaphore を使って同時実行数を絞るのがよくあるパターンです。

import aiohttp
import asyncio

async def fetch_with_semaphore(
    semaphore: asyncio.Semaphore,
    session: aiohttp.ClientSession,
    url: str,
) -> dict | None:
    async with semaphore:  # ここで同時実行数を制御
        try:
            async with session.get(url) as response:
                response.raise_for_status()
                return await response.json()
        except Exception as e:
            print(f"[ERROR] {url}: {e}")
            return None

async def main():
    urls = [f"https://httpbin.org/get?id={i}" for i in range(20)]

    # 同時に最大5件まで
    semaphore = asyncio.Semaphore(5)

    timeout = aiohttp.ClientTimeout(total=30, connect=10, sock_read=10)
    connector = aiohttp.TCPConnector(limit=50)  # コネクションプール上限

    async with aiohttp.ClientSession(timeout=timeout, connector=connector) as session:
        tasks = [fetch_with_semaphore(semaphore, session, url) for url in urls]
        results = await asyncio.gather(*tasks, return_exceptions=True)

    success = [r for r in results if r is not None and not isinstance(r, Exception)]
    print(f"成功: {len(success)} / {len(urls)}")

asyncio.run(main())

asyncio.Semaphore(5) は「同時に最大5つのコルーチンだけ async with semaphore: ブロックの中に入れる」という制御で、6つ目以降は前の処理が終わるまで待機します。

TCPConnector では limit(同時接続数の上限)、limit_per_host(ホスト毎の同時接続数上限)、ttl_dns_cache(DNSキャッシュの有効期限)などを設定できます。大量リクエスト時には意識しておくといいです。

セマフォの値の決め方

正直「これが正解」という値はなく、相手のAPI制限や自分のネットワーク環境次第です。Rate Limit がある外部 API なら仕様を確認して設定するのが安全。自前のサーバーなら負荷テストしながら調整するのが現実的かと思います。10〜20あたりから試してみると感触が掴みやすいです。

実用的なまとめコード — コピペで使えるテンプレート

ここまでのエッセンスをひとつにまとめたテンプレートです。そのまま使い回せるように書きました。

"""
aiohttp 並列HTTPリクエスト テンプレート
- タイムアウト設定
- エラーハンドリング
- セマフォによる同時接続数制限
"""
import aiohttp
import asyncio
from dataclasses import dataclass

@dataclass
class FetchResult:
    url: str
    status: int | None
    data: dict | None
    error: str | None

async def fetch_one(
    semaphore: asyncio.Semaphore,
    session: aiohttp.ClientSession,
    url: str,
) -> FetchResult:
    async with semaphore:
        try:
            async with session.get(url) as response:
                response.raise_for_status()
                data = await response.json()
                return FetchResult(url=url, status=response.status, data=data, error=None)
        except aiohttp.ClientResponseError as e:
            return FetchResult(url=url, status=e.status, data=None, error=str(e.message))
        except asyncio.TimeoutError:
            return FetchResult(url=url, status=None, data=None, error="timeout")
        except aiohttp.ClientError as e:
            return FetchResult(url=url, status=None, data=None, error=str(e))

async def fetch_all(urls: list[str], concurrency: int = 10) -> list[FetchResult]:
    semaphore = asyncio.Semaphore(concurrency)
    timeout = aiohttp.ClientTimeout(total=30, connect=10, sock_read=10)
    connector = aiohttp.TCPConnector(limit=100, ttl_dns_cache=300)

    async with aiohttp.ClientSession(timeout=timeout, connector=connector) as session:
        tasks = [fetch_one(semaphore, session, url) for url in urls]
        return await asyncio.gather(*tasks)

if __name__ == "__main__":
    import time

    urls = [f"https://httpbin.org/get?id={i}" for i in range(15)]

    start = time.time()
    results = asyncio.run(fetch_all(urls, concurrency=5))
    elapsed = time.time() - start

    ok = [r for r in results if r.error is None]
    ng = [r for r in results if r.error is not None]
    print(f"完了: {len(ok)}件成功 / {len(ng)}件失敗 / {elapsed:.2f}秒")

FetchResult データクラスで結果の形を統一しているのがポイントです。成功・失敗どちらでも同じ型で返ってくるので、呼び出し側での処理がシンプルになります。このパターン、一度覚えると応用が効くのでおすすめです。

まとめ

  • aiohttp は asyncio 対応の非同期 HTTP クライアント。requests の代わりに I/O バウンドな処理で使う
  • ClientSession は1つ作り回す。リクエストごとに生成するのは NG
  • asyncio.gather() + tasks リストで複数 URL への並列リクエストが実現できる
  • ClientTimeout でタイムアウトを設定し、raise_for_status() で HTTP エラーも捕捉する
  • asyncio.Semaphore で同時接続数を制御。外部 API を叩くときは必須級
  • まとめコードをテンプレートにしておくと毎回ゼロから書かずに済む

Python asyncio と aiohttp の組み合わせ、最初はとっつきにくいですが慣れると手放せなくなります。このシリーズが少しでも参考になれば嬉しいです!

タイトルとURLをコピーしました