【第2回】AWS SQS × Lambda Python 自動化入門 — Pythonでメッセージを受信・処理する実装パターン

AWS

この記事でわかること

  • Lambda に SQS メッセージが どのような構造で渡ってくるか
  • シンプルなメッセージ処理ハンドラーの実装方法
  • 部分失敗レスポンス(ReportBatchItemFailures)を使ったバッチ処理の実装
  • 可視性タイムアウトと Lambda タイムアウトの関係
  • DLQ を使ったエラーハンドリングの設計

前回のおさらいと今回やること

前回は AWS SQS の基本的な仕組みと、キューの作成・メッセージ送信までを Python(boto3)で実装しました。標準キューと FIFO キューの違いや、可視性タイムアウトの考え方なども触れたかと思います。

今回はその続きで、「Lambda 側でどうメッセージを受け取り、処理するか」を中心に書いていきます。具体的には以下の流れで進めます。

  • Lambda の基本的なイベント構造を確認する
  • シンプルなメッセージ処理ハンドラーを実装する
  • バッチ処理と部分失敗レスポンス(ReportBatchItemFailures)を扱う
  • DLQ(デッドレターキュー)との組み合わせについて整理する

SQS × Lambda の構成、最初は「難しそう」と思って後回しにしてたんですが、実際に触ってみたら思ったよりシンプルでした。ただ、エラーハンドリング周りだけは少し考えることがあって、そこをちゃんとまとめておきたかったというのが正直な動機です。

Lambda に届くイベントの構造を理解する

まず前提として、SQS → Lambda の連携は「イベントソースマッピング」という仕組みで動いています。Lambda が SQS をポーリングして、標準キューと FIFO キューの両方をサポートしています。自分でポーリングするコードを書く必要はありません。

Lambda に渡ってくるイベントはこんな構造になっています。

{
  "Records": [
    {
      "messageId": "059f36b4-87a3-44ab-83d2-661975830a7d",
      "receiptHandle": "AQEBwJnKyrHigUMZj6reyasLm...",
      "body": "{\"order_id\": \"ORD-001\", \"amount\": 1500}",
      "attributes": {
        "ApproximateReceiveCount": "1",
        "SentTimestamp": "1716000000000",
        "SenderId": "123456789012",
        "ApproximateFirstReceiveTimestamp": "1716000001000"
      },
      "messageAttributes": {},
      "md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b5",
      "eventSource": "aws:sqs",
      "eventSourceARN": "arn:aws:sqs:ap-northeast-1:123456789012:my-queue",
      "awsRegion": "ap-northeast-1"
    }
  ]
}

ポイントは body がただの文字列として入ってくることです。JSON を送っていても文字列なので、受け取り側で json.loads() が必要になります。最初これに気づかずに「なんで辞書にならないんだ」ってなった記憶があります。

また、Records は配列になっていて、バッチサイズの設定次第で複数メッセージがまとめて来ます。デフォルトのバッチサイズは10です。

基本的なハンドラーを書く

まずは素直な実装から。注文データが来たと想定して、それを処理するシンプルなハンドラーです。

import json
import logging

logger = logging.getLogger()
logger.setLevel(logging.INFO)

def lambda_handler(event, context):
    for record in event["Records"]:
        message_id = record["messageId"]

        try:
            body = json.loads(record["body"])
            process_order(body, message_id)
        except json.JSONDecodeError as e:
            logger.error(f"JSON parse error: {message_id} / {e}")
            raise
        except Exception as e:
            logger.error(f"Processing failed: {message_id} / {e}")
            raise

def process_order(order: dict, message_id: str):
    order_id = order.get("order_id")
    amount = order.get("amount")

    logger.info(f"Processing order: {order_id}, amount: {amount}")
    # ここに実際のビジネスロジックを書く(DB保存、API呼び出しなど)

この実装には1つ大きな落とし穴があります。バッチ内のどれか1件が失敗した場合、バッチ全体が失敗扱いになって、全メッセージがキューに戻ってきます。10件まとめて処理していて9件成功・1件失敗だった場合も、9件が再処理されてしまいます。

小さなシステムや冪等性が完璧に担保されている場合は気にならないかもしれませんが、現実的には「同じ注文を2回処理してしまう」みたいな事故につながりかねないので、次のセクションで対処法を紹介します。

部分失敗レスポンス(ReportBatchItemFailures)で賢くリトライする

ここが今回一番書きたかった部分です。

この問題を避けるには、イベントソースマッピングで失敗したメッセージだけをキューに再出現させる「部分的なバッチレスポンス」を設定します。有効にするには FunctionResponseTypesReportBatchItemFailures を追加します。

① イベントソースマッピングで ReportBatchItemFailures を有効化

AWS コンソールから設定する場合は、Lambda のトリガー設定で「レポートバッチアイテム失敗」をオンにします。CLI だとこんな感じです。

aws lambda create-event-source-mapping \
  --function-name my-order-processor \
  --event-source-arn arn:aws:sqs:ap-northeast-1:123456789012:my-queue \
  --batch-size 10 \
  --function-response-types ReportBatchItemFailures

既存のマッピングに追加する場合は update-event-source-mapping で UUID を指定して同じように設定します。

② Lambda 関数側で失敗 ID を返す

import json
import logging

logger = logging.getLogger()
logger.setLevel(logging.INFO)

def lambda_handler(event, context):
    batch_item_failures = []

    for record in event["Records"]:
        message_id = record["messageId"]
        try:
            body = json.loads(record["body"])
            process_order(body, message_id)
            logger.info(f"Success: {message_id}")
        except Exception as e:
            logger.error(f"Failed: {message_id} / {e}")
            # ここ注意:失敗したIDをリストに追加するだけ。raiseしない
            batch_item_failures.append({"itemIdentifier": message_id})

    return {"batchItemFailures": batch_item_failures}

def process_order(order: dict, message_id: str):
    order_id = order.get("order_id")
    if not order_id:
        raise ValueError("order_id is missing")
    logger.info(f"Order processed: {order_id}")

大事なのは、失敗しても raise せずに batch_item_failures にメッセージ ID を積んでいくこと。最後に {"batchItemFailures": [...] } を返すと、Lambda がその ID だけをリトライ対象にしてくれます。全件成功なら batchItemFailures は空リストになります。

最初この仕組みを理解するまでちょっとかかりました。「例外を握りつぶしてるの?」と思ってしまうんですが、設計としては「Lambda に明示的に失敗を教える」という感じで理解するとしっくりきます。

規模が大きくなってきたら、Powertools for AWS Lambda を使うと、この部分的なバッチレスポンスのロジックをかなり楽にできるようです。Python・Java・TypeScript・.NET など複数言語対応とされています。ただ、まずはスクラッチで理解してから使うのがおすすめです。

可視性タイムアウトと Lambda タイムアウトの関係

設定値の話で少し脱線します。SQS の可視性タイムアウトは、Lambda のタイムアウトの6倍以上に設定することが推奨されています。

たとえば Lambda のタイムアウトを 30 秒に設定しているなら、SQS の可視性タイムアウトは最低でも 180 秒(3分)にしておく、という感じです。

これは、関数がスロットリングされてリトライが走ったときに、メッセージが途中でキューに再出現しないようにするためです。この設定をサボるとデバッグが面倒になるので、最初から合わせておくのがおすすめです。

# キューの可視性タイムアウトを更新する例
import boto3

sqs = boto3.client("sqs", region_name="ap-northeast-1")

sqs.set_queue_attributes(
    QueueUrl="https://sqs.ap-northeast-1.amazonaws.com/123456789012/my-queue",
    Attributes={
        "VisibilityTimeout": "180"  # Lambda タイムアウト(30s) × 6
    }
)

DLQ(デッドレターキュー)との組み合わせ

最後に DLQ の話を少し。部分失敗レスポンスと合わせることで、こんな流れが作れます。

  • 通常:メッセージが来る → Lambda が処理 → 成功したら自動削除
  • 失敗時:失敗した ID だけキューに戻る → 再試行(maxReceiveCount 回まで)
  • 上限を超えたら:DLQ に移動して保持 → 後で手動確認・再処理

DLQ 自体は SQS キューを別途作っておいて、元のキューのリドライブポリシーに指定するだけなので設定は難しくないです。maxReceiveCount(最大受信回数)は 3〜5 程度にしておくのが多い印象です。

ReportBatchItemFailures と DLQ を組み合わせる場合の注意点として、リドライブカウントはバッチ単位ではなくメッセージ単位でカウントされるようです。ここはちょっとハマりポイントなので一応メモしておきます。

※この記事にはプロモーションが含まれます

ちなみに、お名前.com レンタルサーバー(WordPressに特化した高速レンタルサーバー。月額990円〜、独自ドメイン実質0円)も気になっています。お名前.com レンタルサーバー

まとめ

今回は Lambda で SQS メッセージを受け取る基本実装から、部分失敗レスポンスを使ったバッチ処理まで書きました。ReportBatchItemFailures は最初とっつきにくいですが、一度理解すると「これなしで SQS × Lambda は使いたくないな」と感じるくらい便利です。

シリーズ第1回はこちら:AWS SQS × Lambda Python 自動化入門

📚 シリーズ「AWS SQS × Lambda Python 自動化入門」(第2回 / 全4回)

← 前回の記事: 前回の記事はこちら

→ 次回の記事: 【第3回】AWS SQS × Lambda Python 自動化入門 — エラーハンドリングとデッドレターキュー(DLQ)の活用法

参考になったらクリックしてもらえると嬉しいです!

Blogmura CloudAWS Ranking
タイトルとURLをコピーしました