前回は SQS のキュー作成から Lambda との Event Source Mapping の基本的な繋ぎ方を紹介しました(第2回はこちら)。今回はその続きで、「メッセージの処理に失敗したときどうするか」という話です。
正直、最初に SQS を使い始めたとき、エラー時の挙動をちゃんと設計しないままとりあえず動かして、メッセージが無限ループしてキューが詰まった…という苦い経験があります。そのあたりを整理しながら書いていきます。
この記事でわかること
- デッドレターキュー(DLQ)の役割と設定方法
maxReceiveCountと Visibility Timeout の関係- Lambda での Partial Batch Response(ReportBatchItemFailures)の実装
- DLQ に溜まったメッセージの監視とリドライブの方法
デッドレターキュー(DLQ)とは何か
デッドレターキュー(DLQ)は、ソースキューから正常に処理されなかったメッセージを受け取るためのキューです。つまり「何度試してもうまくいかないメッセージの隔離場所」みたいなものです。
DLQ は処理に失敗したメッセージを分離できるため、アプリケーションのデバッグに役立ちます。メインのキューに問題のあるメッセージが居座り続けると、正常なメッセージの処理も遅延するので、さっさと追い出す場所を用意しておくわけです。
ちなみに Amazon SQS はデッドレターキューを自動的に作成しません。DLQ として使用する前に、最初にキューを自分で作成する必要があります。「勝手に作ってくれるのかな」と思ったら違ったので注意してください。
もう一点、DLQ を設定する場合、キュータイプはソースキュータイプと一致する必要があります。FIFO キューは FIFO DLQ のみを使用し、標準キューは標準 DLQ のみを使用できます。FIFOとスタンダードを混ぜようとしてエラーになるやつ、地味にハマりポイントです。
maxReceiveCount と Visibility Timeout の関係
DLQ の設定で一番重要なパラメータが maxReceiveCount です。
リドライブポリシーでは、ソースキューとデッドレターキューを指定します。また、ソースキューのコンシューマーが指定回数メッセージの処理を失敗した場合に、Amazon SQS によってソースキューからデッドレターキューにメッセージが移動される条件も指定します。maxReceiveCount はメッセージを受信する試行回数の上限です。
ここで Visibility Timeout との関係を理解しておかないと思わぬ動きをします。Lambda が失敗した場合、メッセージは Visibility Timeout が切れた後にキューに戻り、受信カウントがインクリメントされます。つまり、Lambda がエラーを返すたびにカウントが加算されていき、maxReceiveCount に達したら DLQ 行きになるという仕組みです。
Visibility Timeout は、Lambda 関数のタイムアウト設定との兼ね合いで「タイムアウトの数倍にしておくのが良い」とされることが多いです。これはリトライや処理時間を考慮したものです。Lambda のタイムアウトが 30 秒なら、Visibility Timeout は最低でも 180 秒程度という計算になります。
boto3 で DLQ を作成・設定する
コンソールでポチポチするのも最初の確認には良いのですが、実際の運用ではコードで管理したいところ。boto3 でサクッと作れます。
import boto3
import json
sqs = boto3.client('sqs', region_name='ap-northeast-1')
# まず DLQ を作成
dlq = sqs.create_queue(
QueueName='my-function-dlq',
Attributes={
'MessageRetentionPeriod': '1209600', # 14日間(最大)
}
)
dlq_url = dlq['QueueUrl']
# DLQ の ARN を取得
dlq_attrs = sqs.get_queue_attributes(
QueueUrl=dlq_url,
AttributeNames=['QueueArn']
)
dlq_arn = dlq_attrs['Attributes']['QueueArn']
# ソースキューに Redrive ポリシーを設定
redrive_policy = {
'deadLetterTargetArn': dlq_arn,
'maxReceiveCount': '5' # 5回失敗したら DLQ へ
}
sqs.set_queue_attributes(
QueueUrl='https://sqs.ap-northeast-1.amazonaws.com/123456789012/my-source-queue',
Attributes={
'RedrivePolicy': json.dumps(redrive_policy)
}
)
MessageRetentionPeriod は DLQ では長めに設定しておくのがおすすめです。デフォルトの 4 日だと、週末に気づいたら消えていた…というケースがあります(経験談)。
Lambda 側のエラーハンドリング:Partial Batch Response
バッチサイズを大きく設定している場合、1件だけ壊れたメッセージがあったせいでバッチ全体が失敗扱いになるのは避けたいですよね。そこで使えるのが Partial Batch Response(ReportBatchItemFailures) という機能です。
Partial Batch Response を有効にするには、Event Source Mapping を設定するときに FunctionResponseTypes に ReportBatchItemFailures を指定します。これにより、Lambda 関数が部分的な成功を返せるようになり、不要なリトライの回数を減らせます。
Lambda 関数の実装はこんな感じです。
import json
import logging
logger = logging.getLogger()
logger.setLevel(logging.INFO)
def lambda_handler(event, context):
failed_items = []
for record in event['Records']:
message_id = record['messageId']
try:
body = json.loads(record['body'])
process_message(body)
logger.info(f"処理成功: {message_id}")
except Exception as e:
logger.error(f"処理失敗: {message_id} / {e}")
# ここ注意: messageId を返すと該当メッセージだけリトライ対象になる
failed_items.append({'itemIdentifier': message_id})
return {'batchItemFailures': failed_items}
def process_message(body):
# 実際のビジネスロジックをここに書く
if body.get('type') == 'bad':
raise ValueError("処理できないメッセージです")
# ...
重要な点として、もし Lambda 関数が例外をスローした場合(つまり return ではなく未捕捉の例外が発生した場合)、バッチ全体が失敗扱いになります。なので try-except でしっかり個別に捕まえて、失敗した messageId だけ返すのがポイントです。
余談ですが、FIFO キューで Partial Batch Response を使う場合は少し気をつける必要があって、FIFO キューの場合は条件によっては(順序維持の都合などで)未処理分も含めた扱いを検討する必要があるようです。スタンダードキューとは少し扱いが違います。
Event Source Mapping に ReportBatchItemFailures を設定する
Partial Batch Response は Lambda 側のコードだけ直しても有効にはなりません。Event Source Mapping の設定も変更が必要です。boto3 でやる場合はこうなります。
import boto3
lambda_client = boto3.client('lambda', region_name='ap-northeast-1')
# 既存の Event Source Mapping を更新する場合
lambda_client.update_event_source_mapping(
UUID='xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx', # ESM の UUID
FunctionResponseTypes=['ReportBatchItemFailures']
)
新規作成の場合は create_event_source_mapping に同じ引数を渡せばOKです。コンソールから設定するなら、Event Source Mapping の編集画面で「レポートバッチアイテムの失敗」というトグルを有効にするだけです。
DLQ にたまったメッセージをどう扱うか
DLQ にメッセージが流れ込んできたとき、ただ放置するのはよくないです。最低限 CloudWatch アラームを張っておきましょう。
import boto3
cw = boto3.client('cloudwatch', region_name='ap-northeast-1')
cw.put_metric_alarm(
AlarmName='my-function-dlq-not-empty',
MetricName='ApproximateNumberOfMessagesVisible',
Namespace='AWS/SQS',
Dimensions=[{'Name': 'QueueName', 'Value': 'my-function-dlq'}],
Statistic='Sum',
Period=300,
Threshold=1,
ComparisonOperator='GreaterThanOrEqualToThreshold',
EvaluationPeriods=1,
AlarmActions=['arn:aws:sns:ap-northeast-1:123456789012:ops-alert'],
)
DLQ にメッセージが1件でも入ったら SNS 経由で通知が飛ぶようにしておくのが基本構成です。
問題を修正してメッセージを再処理したい場合は、DLQ Redrive(リドライブ)という機能を使います。コンシューマーアプリケーションをデバッグして修正したら、デッドレターキューリドライブ機能を使ってメッセージをソースキューに戻すことができます。コンソールから「リドライブの開始」をポチッとするだけで戻せます。boto3 では start_message_move_task API が使えます。
sqs.start_message_move_task(
SourceArn='arn:aws:sqs:ap-northeast-1:123456789012:my-function-dlq',
DestinationArn='arn:aws:sqs:ap-northeast-1:123456789012:my-source-queue',
MaxNumberOfMessagesPerSecond=10
)
ただし、SendMessage API を使って DLQ に直接送信されたメッセージや、SNS トピックや Lambda 関数から DLQ に流れてきたメッセージについては、リドライブ機能が使えずエラーになることがあるようです。あくまで SQS のリドライブポリシー経由で移動したメッセージが主な対象です。これ、ドキュメントを読み込むまで知らなかったので書いておきます。
全体像をざっくりまとめると
今回触ったのはざっくりこんな構成です。
- SQS ソースキュー → Lambda(バッチ処理)
- Lambda 内で個別エラーをキャッチ →
batchItemFailuresで返す maxReceiveCountを超えたメッセージ → DLQ へ自動移動- DLQ に入ったら CloudWatch アラーム → SNS 通知
- 修正後は Redrive でソースキューに戻す
正直、Partial Batch Response の挙動(例外スロー vs batchItemFailures 返却の違い)はちょっとわかりにくいので、ローカルや開発環境で一度意図的にエラーを起こして動作確認してみるのが確実だと思います。自分もコード書きながら「あれ、バッチ全体が失敗になってるな?」と気づいてデバッグした経験があります。
※この記事にはプロモーションが含まれます
ちなみに、お名前.com レンタルサーバー(WordPressに特化した高速レンタルサーバー。月額990円〜、独自ドメイン実質0円)も気になっています。お名前.com レンタルサーバー![]()
📚 シリーズ「AWS SQS × Lambda Python 自動化入門」(第3回 / 全4回)
← 前回の記事: 前回の記事はこちら
→ 次回の記事: 【第4回】AWS SQS × Lambda Python 自動化入門 — 本番運用の落とし穴・スケーリング設計・監視のベストプラクティス

