はじめに ― SQSとLambdaを組み合わせると「失敗」はどこへいくのか
SQS × Lambda の構成で非同期処理を作っていると、必ずぶつかる疑問があります。「Lambda が例外を投げたら、そのメッセージってどうなるの?」というやつ。
これがデッドレターキュー(DLQ)を理解する出発点です。DLQ を知らないまま本番に出すとメッセージが消えたり、逆に無限にリトライし続けたりして、じわじわと事故に近づきます。自分も最初はその怖さをあまりわかっていなかったので、今回はちゃんと整理してみました。
このシリーズ「AWS SQS × Lambda Python 自動化入門」は全4回構成です。今回の第1回は、SQSとLambdaの基本的な連携とDLQの仕組みを理解することがゴール。
この記事でわかること
- デッドレターキュー(DLQ)の役割と基本的な仕組み
- メッセージが DLQ に移動するまでの流れ
- Python で Lambda ハンドラを実装するときの注意点
- batchItemFailures を使った部分的バッチレスポンスの実装方法
- DLQ リドライブの基本パターン
- 本番運用で押さえておきたいベストプラクティス
そもそも DLQ とは何か
Amazon SQS はデッドレターキュー(DLQ)をサポートしています。これは、正常に処理されなかったメッセージをソースキューから送信する先のターゲットキューです。一言でいうと「捨てるかわりに別の場所に隔離しておくキュー」です。
DLQ があることで何ができるかというと、未消費のメッセージを分離して処理が成功しなかった理由を判断できるため、アプリケーションのデバッグに役立ちます。後から「あのとき失敗したメッセージ、何が入ってたんだろう」と調べたいときに DLQ があれば助かります。なければそのまま消えます。
ちなみに DLQ は自動で作成されません。デッドレターキューとして使用する前に、最初にキューを作成する必要があります。また、DLQ を設定する場合、キュータイプはソースキュータイプと一致する必要があります。FIFO キューは FIFO DLQ のみを使用し、標準キューは標準 DLQ のみを使用できます。この「型が一致しないといけない」は地味にハマるので覚えておくといいです。
メッセージが DLQ に移動するまでの流れ
SQS × Lambda の構成での動きを整理すると、こんな流れになります。
メッセージ送信
↓
SQS キュー(メインキュー)
↓
Lambda がポーリング(自動)
↓
処理成功 → メッセージ削除(自動)
処理失敗 → 可視性タイムアウト後に再試行
↓(N回失敗)
SQS デッドレターキュー(DLQ)に移動
ここで重要なのが maxReceiveCount という設定値です。maxReceiveCount は、メッセージがデッドレターキューに移動される前に、コンシューマーがソースキューからメッセージを受信できる回数です。たとえば maxReceiveCount を 1 のような小さな値に設定すると、1回の受信失敗でメッセージが DLQ に移動します。
同じメッセージが3回処理を試みて全て失敗した場合、DLQ に移動します。「1回目の失敗 → 可視性タイムアウト後に再試行 → 2回目の失敗 → 再試行 → 3回目の失敗 → DLQ へ」という流れです。
正常な場合は Lambda が処理を完了した時点で SQS がメッセージを削除します。Lambda はキューからメッセージを正常に呼び出さない限り、キューからメッセージを削除しません。つまり「Lambda が例外を投げた = メッセージはキューに残ったまま」ということです。
実際に構築してみる(コンソール手順)
コンソールで作る場合、依存する順番通りに作成する必要があります。DLQ を先に作成しないとメインキューに DLQ の ARN を設定できないからです。具体的にはこの順番でやります。
- DLQ を先に作成(ARN をメモっておく)
- メインキューを作成 → DLQ の ARN を設定
- Lambda 関数を作成
- Lambda の実行ロールに SQS 権限を追加
- SQS トリガーを Lambda に追加
余談ですが、SAM や CDK でコード化すると !GetAtt DLQ.Arn みたいな参照で順番を意識せずに書けるので、実運用ではやっぱりコード化したほうが楽です。でも最初はコンソールで手を動かして全体像を把握するのがおすすめかなと思っています。
Python で Lambda ハンドラを書く
まず基本的な Lambda ハンドラから。SQS からトリガーされると event["Records"] にメッセージが配列で入ってきます。
import json
import logging
logger = logging.getLogger()
logger.setLevel(logging.INFO)
def lambda_handler(event, context):
for record in event["Records"]:
message_id = record["messageId"]
body = json.loads(record["body"])
logger.info(f"Processing: {message_id}")
# ここでビジネスロジック
process_message(body)
return {"statusCode": 200}
def process_message(body):
if body.get("type") == "unknown":
raise ValueError(f"Unknown message type: {body}")
logger.info(f"Done: {body}")
このコードで ValueError が発生すると、Lambda は例外を返します。するとメッセージは SQS に戻り、maxReceiveCount に達するまでリトライされ、最終的に DLQ に入ります。シンプルですが、これだと バッチ内の1件の失敗で全件が再処理される という問題があります。
バッチ処理の「部分的な失敗」問題と batchItemFailures
SQS のイベントソースマッピングでは、複数のメッセージをまとめてバッチで Lambda に渡します。デフォルトだと「1件でも失敗するとバッチ全体が再試行対象になる」という挙動で、これが地味に厄介です。
DLQ のポリューション問題が発生します。十分なリトライの後、バッチ全体(正常に処理されたメッセージを含む)がデッドレターキューに入ってしまう可能性があるのです。つまり「1件の悪いメッセージのせいで、他の正常なメッセージまで DLQ に飛ばされる」という事態が起きます。
これを解決するのが Partial Batch Response(部分的バッチレスポンス) という機能です。イベントソースマッピングを設定して失敗したメッセージだけを可視状態にするには、イベントソースマッピング設定時に FunctionResponseTypes に ReportBatchItemFailures を追加します。
Lambda ハンドラ側でも対応が必要で、失敗したメッセージの ID を batchItemFailures として返すように実装します。
import json
import logging
logger = logging.getLogger()
logger.setLevel(logging.INFO)
def lambda_handler(event, context):
batch_item_failures = []
for record in event["Records"]:
try:
body = json.loads(record["body"])
process_message(body)
except Exception as e:
logger.error(f"Failed: {record['messageId']} - {e}")
batch_item_failures.append(
{"itemIdentifier": record["messageId"]}
)
return {"batchItemFailures": batch_item_failures}
def process_message(body):
if body.get("type") == "unknown":
raise ValueError(f"Unknown type: {body}")
logger.info(f"OK: {body}")
これで失敗した1件だけがキューに戻り、正常に処理できた他のメッセージは削除されます。maxReceiveCount に達した失敗メッセージだけが DLQ へ。
FIFO キューを使っている場合は、最初の失敗が発生したらそれ以降のメッセージの処理を止め、失敗した及び未処理のメッセージをすべて batchItemFailures に返す必要があります。これでキュー内のメッセージ順序を保持できます。標準キューと FIFO キューで挙動が変わる点は要注意です。
DLQ に入ったメッセージのリドライブ
DLQ に入ったメッセージはそのまま放置されると、SQS の保持期間の範囲内で消えます。調査して原因がわかったら、メインキューに戻して再処理させたいケースがあります。これが DLQ リドライブ です。
SQS リドライブ機能を使うと、DLQ で「DLQ redrive」を選択してメッセージをポーリングし、消費されていないメッセージを選んで調べることができます。コンソールから GUI で操作できるようになっており、最近はかなり使いやすくなった印象です。
Python(boto3)でリドライブを自動化する場合はこんな感じです。EventBridge などと組み合わせて定期実行するパターンもあります。
import boto3
sqs = boto3.client("sqs")
DLQ_URL = "https://sqs.ap-northeast-1.amazonaws.com/123456789/my-dlq"
MAIN_QUEUE_URL = "https://sqs.ap-northeast-1.amazonaws.com/123456789/my-queue"
def redrive_dlq():
while True:
response = sqs.receive_message(
QueueUrl=DLQ_URL,
MaxNumberOfMessages=10,
WaitTimeSeconds=5,
AttributeNames=["All"],
)
messages = response.get("Messages", [])
if not messages:
print("DLQ is empty.")
break
for msg in messages:
sqs.send_message(
QueueUrl=MAIN_QUEUE_URL,
MessageBody=msg["Body"],
)
sqs.delete_message(
QueueUrl=DLQ_URL,
ReceiptHandle=msg["ReceiptHandle"],
)
print(f"Redriven: {msg['MessageId']}")
正直、リドライブのロジックは「原因を修正してからやる」が大前提なので、焦って実行しないほうがいいです。同じ失敗メッセージをまた DLQ に送り込むだけになりかねないので。
ベストプラクティスとして押さえておきたいこと
パフォーマンスを最適化するには、ソースキューと DLQ を同じ AWS アカウントおよびリージョン内に維持することがベストプラクティスです。クロスアカウントや別リージョンに DLQ を置くのは避けたほうが無難そうです。
maxReceiveCount の値については、実際の環境では、要件に応じて、また実際のアプリケーションでの失敗が何を意味するのかに応じて、適切な数値を設定する必要があります。小さすぎると一時的な障害でも DLQ に入ってしまい、大きすぎると本当に壊れたメッセージがずっとキューを占領します。自分の経験上では 3〜5 あたりが無難な出発点かなと感じています。
また、SQS キューで DLQ を有効にすることはベストプラクティスです。Lambda はキューからメッセージを正常に呼び出さない限り、キューからメッセージを削除しません。DLQ は「あったほうがいい」ではなく「絶対つけるもの」と思っておいたほうが本番運用では安全です。
CloudWatch で DLQ のメッセージ数に対してアラームを設定しておくと、異常をすぐ検知できます。DLQ にメッセージが積まれはじめたら何か壊れているサインなので、0件を維持するのが理想的な状態です。
第1回のまとめ
今回は SQS × Lambda × DLQ の基本的な仕組みと、Python での実装パターンを整理しました。特に batchItemFailures を使った部分的バッチレスポンスは、実装コストが低い割に効果が大きいので早めに取り入れることをおすすめします。
第2回では、実際のエラーハンドリングパターンと、より複雑な非同期処理フローをカバーします。
※この記事にはプロモーションが含まれます
ちなみに、お名前.com レンタルサーバー(WordPressに特化した高速レンタルサーバー。月額990円〜、独自ドメイン実質0円)も気になっています。お名前.com レンタルサーバー![]()
