このセクションでわかること
- SQS と Lambda における自動再試行の仕組み
- VisibilityTimeout を正しく設定する理由
- デッドレターキュー(DLQ)の構成と設定方法
- 部分バッチレスポンスで失敗メッセージだけを再試行する実装
- DLQ の監視とアラート設定
前回のおさらいと今回のテーマ
前回は SQS と Lambda を実際に繋いで、Python でメッセージを受け取って処理する基本的な流れを紹介しました。 「受け取れた、動いた、やったー!」で終わったわけですが、実運用を考えると「もし処理が失敗したらどうなるの?」という問いが当然出てきます。
今回はそこを掘り下げます。再試行の仕組みとデッドレターキュー(DLQ)の設定、そして Lambda 側でのエラーハンドリングコードまで、順番に整理していきます。
SQS × Lambda の再試行はどう動いているのか
まずここを理解していないと、DLQ の話が頭に入ってこないので先に整理しておきます。
Lambda が SQS のメッセージを処理するとき、内部では「イベントソースマッピング」という仕組みが動いています。Lambda がキューをポーリングしてメッセージをまとめて取得し、関数を呼び出す、という流れです。
このとき、Lambda の処理が何らかの理由で失敗すると、バッチ内のすべてのメッセージがキューに戻ります。正確には「可視性タイムアウト(VisibilityTimeout)が切れると再び見えるようになる」という挙動です。デフォルトだと失敗したメッセージは再試行されます。
ここで重要なのが maxReceiveCount という設定値です。これを超えた回数失敗したメッセージは、DLQ に移送されます。「何回失敗したら諦めて別のキューに隔離するか」を決める数字ですね。
VisibilityTimeout の設定に注意
地味にハマりやすいのがここです。SQS の VisibilityTimeout は、Lambda のタイムアウト時間の 6 倍以上に設定するのが推奨されています。
たとえば Lambda のタイムアウトが 60 秒なら、VisibilityTimeout は 360 秒以上。Lambda が処理中にタイムアウトしてしまったとき、VisibilityTimeout が短いとメッセージが再び可視化されて二重処理になる可能性があります。最初にこの設定を間違えると、「なんかメッセージが複数回処理されてる?」という謎の不具合に悩まされます(経験談)。
DLQ(デッドレターキュー)を設定する
DLQ は、処理に何度失敗してもキューに居続けるメッセージを隔離するための仕組みです。SQS の通常キューとは別に DLQ 用のキューを作成して、元のキューの「リドライブポリシー」で紐付けます。
マネジメントコンソールでポチポチ設定してもいいですが、せっかくなので boto3 で作ってみた例を載せておきます。
import boto3
import json
sqs = boto3.client("sqs", region_name="ap-northeast-1")
# まず DLQ を作る
dlq = sqs.create_queue(
QueueName="my-task-queue-dlq",
Attributes={
"MessageRetentionPeriod": "1209600", # 14日間保持
}
)
dlq_url = dlq["QueueUrl"]
dlq_attrs = sqs.get_queue_attributes(
QueueUrl=dlq_url,
AttributeNames=["QueueArn"]
)
dlq_arn = dlq_attrs["Attributes"]["QueueArn"]
# メインキューを DLQ と紐付けて作成
main_queue = sqs.create_queue(
QueueName="my-task-queue",
Attributes={
"VisibilityTimeout": "360",
"RedrivePolicy": json.dumps({
"deadLetterTargetArn": dlq_arn,
"maxReceiveCount": "3", # 3回失敗したら DLQ へ
}),
}
)
print("Main queue URL:", main_queue["QueueUrl"])
maxReceiveCount を 3 にしておくと、3 回連続で処理失敗したメッセージが DLQ に移ります。ここの数字は用途によって変えてください。冪等性が確保されているなら多めにしてもいいですし、どうしても一発勝負で処理したいなら 1 にする、といった使い方もできます。
Lambda 側のエラーハンドリング:batchItemFailures を使う
ここが今回一番書きたかった部分です。
デフォルトだと、Lambda がバッチ内の 1 件でも例外をスローすると バッチ全件 がキューに戻ります。つまり 10 件のうち 1 件だけ失敗したのに、成功した 9 件まで再処理されてしまうということです。冪等でない処理だと地獄になります。
これを解消するのが 部分バッチレスポンス(Partial Batch Response) という機能で、失敗したメッセージだけをキューに戻すことができます。実装方法はシンプルで、Lambda の戻り値に batchItemFailures を含めるだけです。
def lambda_handler(event, context):
batch_item_failures = []
for record in event["Records"]:
message_id = record["messageId"]
body = record["body"]
try:
process_message(body)
except Exception as e:
print(f"Failed to process {message_id}: {e}")
# ここ注意: messageId を返すことで失敗した件だけ再試行される
batch_item_failures.append({"itemIdentifier": message_id})
return {"batchItemFailures": batch_item_failures}
def process_message(body):
import json
data = json.loads(body)
# 実際の処理をここに書く
if not data.get("user_id"):
raise ValueError("user_id が見つかりません")
print(f"処理完了: user_id={data['user_id']}")
注意点として、この機能を使うにはイベントソースマッピング側で ReportBatchItemFailures を有効にする必要があります。Lambda コンソールの「トリガー」設定か、CLI で以下のように設定します。
aws lambda update-event-source-mapping \
--uuid "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx" \
--function-response-types "ReportBatchItemFailures"
UUID は list-event-source-mappings コマンドで確認できます。コードだけ直して「あれ、全件再試行されてる…?」となるのはこの設定忘れが原因だったりします。
FIFO キューの場合は挙動が違う
余談ですが、FIFO キューで部分バッチレスポンスを使う場合は少し注意が必要です。FIFO はメッセージの順序を保証するキューなので、途中で失敗が出たら失敗以降の未処理メッセージも含めてすべて batchItemFailures に返す必要があります。順序を壊さないための仕様なので、スタンダードキューとは動きが違う点を頭に入れておくといいと思います。今回は基本的にスタンダードキューの話をしています。
DLQ に溜まったメッセージをどうするか
DLQ にメッセージが流れてきた=何かが壊れているサインです。放置しておいても意味がないので、運用上は次のどちらかを考えることになります。
- 手動で確認して再処理(リドライブ)する:マネジメントコンソールから、DLQ のメッセージを元のキューに戻せる機能が用意されています。バグを直した後に再送したいときに使います
- Lambda で自動処理する:DLQ にトリガーを貼った別の Lambda を作り、アラートを飛ばすとかログに書き出すとかをやらせる方法です
個人的には、まず CloudWatch アラームで「DLQ にメッセージが来たら通知」する仕組みを入れておくのが最低限かなと思っています。メトリクスは ApproximateNumberOfMessagesVisible を使えばOKです。コンソールでポチポチ設定するだけなので、作っておくと精神的に楽です。
import boto3
cloudwatch = boto3.client("cloudwatch", region_name="ap-northeast-1")
cloudwatch.put_metric_alarm(
AlarmName="dlq-has-messages",
Namespace="AWS/SQS",
MetricName="ApproximateNumberOfMessagesVisible",
Dimensions=[{"Name": "QueueName", "Value": "my-task-queue-dlq"}],
Statistic="Sum",
Period=60,
EvaluationPeriods=1,
Threshold=1,
ComparisonOperator="GreaterThanOrEqualToThreshold",
AlarmActions=["arn:aws:sns:ap-northeast-1:xxxxxxxxxxxx:my-alert-topic"],
TreatMissingData="notBreaching",
)
SNS トピックと連携すれば、DLQ に 1 件でも溜まったタイミングでメールや Slack に通知を飛ばせます。正直このアラームを設定していないと、DLQ がいつの間にか大量のメッセージで埋まっていた、という事態になりかねないので要注意です。
※この記事にはプロモーションが含まれます
ちなみに、お名前.com レンタルサーバー(WordPressに特化した高速レンタルサーバー。月額990円〜、独自ドメイン実質0円)も気になっています。お名前.com レンタルサーバー![]()
まとめ
SQS × Lambda のエラーハンドリングで押さえておくべきポイントをまとめると、①VisibilityTimeout は Lambda タイムアウトの 6 倍以上にする、②maxReceiveCount で何回失敗したら DLQ に移すかを決める、③部分バッチレスポンス(batchItemFailures)を使えば失敗分だけ再試行できる、④DLQ は CloudWatch アラームと組み合わせて死活監視する——という感じです。
正直、最初は「失敗したらリトライされてればいいか」くらいに思っていたんですが、実運用で DLQ 通知がない状態で放置してしまったことがあって、気づいたら数百件のメッセージが詰まっていた、みたいな経験をしてからちゃんと設定するようになりました。
