【第3回】AWS SQS × Lambda Python 自動化入門 — エラーハンドリングとデッドレターキュー(DLQ)の活用法

AWS

このセクションでわかること

  • SQS と Lambda における自動再試行の仕組み
  • VisibilityTimeout を正しく設定する理由
  • デッドレターキュー(DLQ)の構成と設定方法
  • 部分バッチレスポンスで失敗メッセージだけを再試行する実装
  • DLQ の監視とアラート設定

前回のおさらいと今回のテーマ

前回は SQS と Lambda を実際に繋いで、Python でメッセージを受け取って処理する基本的な流れを紹介しました。 「受け取れた、動いた、やったー!」で終わったわけですが、実運用を考えると「もし処理が失敗したらどうなるの?」という問いが当然出てきます。

今回はそこを掘り下げます。再試行の仕組みとデッドレターキュー(DLQ)の設定、そして Lambda 側でのエラーハンドリングコードまで、順番に整理していきます。

SQS × Lambda の再試行はどう動いているのか

まずここを理解していないと、DLQ の話が頭に入ってこないので先に整理しておきます。

Lambda が SQS のメッセージを処理するとき、内部では「イベントソースマッピング」という仕組みが動いています。Lambda がキューをポーリングしてメッセージをまとめて取得し、関数を呼び出す、という流れです。

このとき、Lambda の処理が何らかの理由で失敗すると、バッチ内のすべてのメッセージがキューに戻ります。正確には「可視性タイムアウト(VisibilityTimeout)が切れると再び見えるようになる」という挙動です。デフォルトだと失敗したメッセージは再試行されます。

ここで重要なのが maxReceiveCount という設定値です。これを超えた回数失敗したメッセージは、DLQ に移送されます。「何回失敗したら諦めて別のキューに隔離するか」を決める数字ですね。

VisibilityTimeout の設定に注意

地味にハマりやすいのがここです。SQS の VisibilityTimeout は、Lambda のタイムアウト時間の 6 倍以上に設定するのが推奨されています。

たとえば Lambda のタイムアウトが 60 秒なら、VisibilityTimeout は 360 秒以上。Lambda が処理中にタイムアウトしてしまったとき、VisibilityTimeout が短いとメッセージが再び可視化されて二重処理になる可能性があります。最初にこの設定を間違えると、「なんかメッセージが複数回処理されてる?」という謎の不具合に悩まされます(経験談)。

DLQ(デッドレターキュー)を設定する

DLQ は、処理に何度失敗してもキューに居続けるメッセージを隔離するための仕組みです。SQS の通常キューとは別に DLQ 用のキューを作成して、元のキューの「リドライブポリシー」で紐付けます。

マネジメントコンソールでポチポチ設定してもいいですが、せっかくなので boto3 で作ってみた例を載せておきます。

import boto3
import json

sqs = boto3.client("sqs", region_name="ap-northeast-1")

# まず DLQ を作る
dlq = sqs.create_queue(
    QueueName="my-task-queue-dlq",
    Attributes={
        "MessageRetentionPeriod": "1209600",  # 14日間保持
    }
)
dlq_url = dlq["QueueUrl"]
dlq_attrs = sqs.get_queue_attributes(
    QueueUrl=dlq_url,
    AttributeNames=["QueueArn"]
)
dlq_arn = dlq_attrs["Attributes"]["QueueArn"]

# メインキューを DLQ と紐付けて作成
main_queue = sqs.create_queue(
    QueueName="my-task-queue",
    Attributes={
        "VisibilityTimeout": "360",
        "RedrivePolicy": json.dumps({
            "deadLetterTargetArn": dlq_arn,
            "maxReceiveCount": "3",  # 3回失敗したら DLQ へ
        }),
    }
)
print("Main queue URL:", main_queue["QueueUrl"])

maxReceiveCount を 3 にしておくと、3 回連続で処理失敗したメッセージが DLQ に移ります。ここの数字は用途によって変えてください。冪等性が確保されているなら多めにしてもいいですし、どうしても一発勝負で処理したいなら 1 にする、といった使い方もできます。

Lambda 側のエラーハンドリング:batchItemFailures を使う

ここが今回一番書きたかった部分です。

デフォルトだと、Lambda がバッチ内の 1 件でも例外をスローすると バッチ全件 がキューに戻ります。つまり 10 件のうち 1 件だけ失敗したのに、成功した 9 件まで再処理されてしまうということです。冪等でない処理だと地獄になります。

これを解消するのが 部分バッチレスポンス(Partial Batch Response) という機能で、失敗したメッセージだけをキューに戻すことができます。実装方法はシンプルで、Lambda の戻り値に batchItemFailures を含めるだけです。

def lambda_handler(event, context):
    batch_item_failures = []

    for record in event["Records"]:
        message_id = record["messageId"]
        body = record["body"]

        try:
            process_message(body)
        except Exception as e:
            print(f"Failed to process {message_id}: {e}")
            # ここ注意: messageId を返すことで失敗した件だけ再試行される
            batch_item_failures.append({"itemIdentifier": message_id})

    return {"batchItemFailures": batch_item_failures}


def process_message(body):
    import json
    data = json.loads(body)
    # 実際の処理をここに書く
    if not data.get("user_id"):
        raise ValueError("user_id が見つかりません")
    print(f"処理完了: user_id={data['user_id']}")

注意点として、この機能を使うにはイベントソースマッピング側で ReportBatchItemFailures を有効にする必要があります。Lambda コンソールの「トリガー」設定か、CLI で以下のように設定します。

aws lambda update-event-source-mapping \
  --uuid "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx" \
  --function-response-types "ReportBatchItemFailures"

UUID は list-event-source-mappings コマンドで確認できます。コードだけ直して「あれ、全件再試行されてる…?」となるのはこの設定忘れが原因だったりします。

FIFO キューの場合は挙動が違う

余談ですが、FIFO キューで部分バッチレスポンスを使う場合は少し注意が必要です。FIFO はメッセージの順序を保証するキューなので、途中で失敗が出たら失敗以降の未処理メッセージも含めてすべて batchItemFailures に返す必要があります。順序を壊さないための仕様なので、スタンダードキューとは動きが違う点を頭に入れておくといいと思います。今回は基本的にスタンダードキューの話をしています。

DLQ に溜まったメッセージをどうするか

DLQ にメッセージが流れてきた=何かが壊れているサインです。放置しておいても意味がないので、運用上は次のどちらかを考えることになります。

  • 手動で確認して再処理(リドライブ)する:マネジメントコンソールから、DLQ のメッセージを元のキューに戻せる機能が用意されています。バグを直した後に再送したいときに使います
  • Lambda で自動処理する:DLQ にトリガーを貼った別の Lambda を作り、アラートを飛ばすとかログに書き出すとかをやらせる方法です

個人的には、まず CloudWatch アラームで「DLQ にメッセージが来たら通知」する仕組みを入れておくのが最低限かなと思っています。メトリクスは ApproximateNumberOfMessagesVisible を使えばOKです。コンソールでポチポチ設定するだけなので、作っておくと精神的に楽です。

import boto3

cloudwatch = boto3.client("cloudwatch", region_name="ap-northeast-1")

cloudwatch.put_metric_alarm(
    AlarmName="dlq-has-messages",
    Namespace="AWS/SQS",
    MetricName="ApproximateNumberOfMessagesVisible",
    Dimensions=[{"Name": "QueueName", "Value": "my-task-queue-dlq"}],
    Statistic="Sum",
    Period=60,
    EvaluationPeriods=1,
    Threshold=1,
    ComparisonOperator="GreaterThanOrEqualToThreshold",
    AlarmActions=["arn:aws:sns:ap-northeast-1:xxxxxxxxxxxx:my-alert-topic"],
    TreatMissingData="notBreaching",
)

SNS トピックと連携すれば、DLQ に 1 件でも溜まったタイミングでメールや Slack に通知を飛ばせます。正直このアラームを設定していないと、DLQ がいつの間にか大量のメッセージで埋まっていた、という事態になりかねないので要注意です。

※この記事にはプロモーションが含まれます

ちなみに、お名前.com レンタルサーバー(WordPressに特化した高速レンタルサーバー。月額990円〜、独自ドメイン実質0円)も気になっています。お名前.com レンタルサーバー

まとめ

SQS × Lambda のエラーハンドリングで押さえておくべきポイントをまとめると、①VisibilityTimeout は Lambda タイムアウトの 6 倍以上にする、②maxReceiveCount で何回失敗したら DLQ に移すかを決める、③部分バッチレスポンス(batchItemFailures)を使えば失敗分だけ再試行できる、④DLQ は CloudWatch アラームと組み合わせて死活監視する——という感じです。

正直、最初は「失敗したらリトライされてればいいか」くらいに思っていたんですが、実運用で DLQ 通知がない状態で放置してしまったことがあって、気づいたら数百件のメッセージが詰まっていた、みたいな経験をしてからちゃんと設定するようになりました。

📚 シリーズ「AWS SQS × Lambda Python 自動化入門」(第3回 / 全4回)

← 前回の記事: 前回の記事はこちら

→ 次回の記事: 公開後にリンクが追加されます

参考になったらクリックしてもらえると嬉しいです!

Blogmura CloudAWS Ranking
タイトルとURLをコピーしました