【第2回】AWS SQS × Lambda Python 自動化入門 — PythonでLambdaを実装してSQSメッセージを受信・処理する

AWS

前回は SQS キューの作成と基本的な仕組みを紹介しました。今回はいよいよ Lambda 側の実装に入ります。「SQS にメッセージが来たら Lambda が動く」という構成を実際に手を動かしながら作っていきます。

余談ですが、最初にこの構成を知ったときは「Lambda が自分でポーリングしてくれるのか…便利すぎる」と感動した記憶があります。昔は自前でポーリング処理を書く必要があったらしいので、今の時代に学んでいてよかったなと。

この記事でわかること

  • Lambda がメッセージを受け取るのに必要な IAM 権限の設定方法
  • SQS トリガーで動作する Python ハンドラーのコード実装
  • イベントソースマッピングの設定項目と意味
  • 可視性タイムアウトとバッチ処理の重要なポイント
  • エラーが起きた時に失敗メッセージだけを再処理する方法
  • 実装後の動作確認とトラブルシューティング

全体の流れをおさらい

SQS → Lambda のトリガー構成、やることはシンプルで以下の 3 ステップです。

  • Lambda の実行ロールに SQS へのアクセス権限を付与する
  • Lambda 関数(Python)を作成する
  • SQS をトリガーとして設定する(イベントソースマッピング)

コンソールから設定する方法をメインに書きつつ、コードも一緒に説明していきます。

IAM ロールに権限を追加する

Lambda が SQS からメッセージを受け取るには、実行ロールに適切な権限が必要です。AWS マネージドポリシーの AWSLambdaSQSQueueExecutionRole をアタッチするのがいちばん手っ取り早いです。

このポリシーには以下のような権限(SQS の受信・削除・属性参照や、CloudWatch Logs への書き込みなど)が含まれています。

  • sqs:ReceiveMessage
  • sqs:DeleteMessage
  • sqs:GetQueueAttributes
  • logs:CreateLogGroup / CreateLogStream / PutLogEvents(CloudWatch ログ用)

IAM コンソール → ロール → 対象ロールを選択 → 「ポリシーをアタッチ」から AWSLambdaSQSQueueExecutionRole を検索してアタッチするだけです。Lambda 作成時に「基本的な Lambda アクセス権限で新しいロールを作成」を選んでいれば、そのロールに追加すれば OK です。

最小権限の原則でいくなら自前でポリシーを書くのがベターですが、まず動かしてみる段階はマネージドポリシーで十分かなと思っています。

Lambda 関数を作成する(Python)

コンソールで関数を作る

Lambda コンソール → 「関数の作成」→ 「一から作成」を選択。

  • 関数名:任意(例: sqs-processor
  • ランタイム:Python(サポート対象のバージョンを選択。例: Python 3.12)
  • 実行ロール:先ほど権限を追加したロール

ハンドラーのコードを書く

SQS トリガーのとき、Lambda には event["Records"] というリストでメッセージが渡されます。1 回の呼び出しで複数のメッセージがまとめて来ることもあるので(バッチ処理)、ループで処理するのが基本パターンです。

import json
import logging

logger = logging.getLogger()
logger.setLevel(logging.INFO)

def lambda_handler(event, context):
    for record in event["Records"]:
        process_message(record)

def process_message(record):
    body = record["body"]

    try:
        # JSON 形式のメッセージを想定
        data = json.loads(body)
        logger.info(f"受信メッセージ: {data}")

        # ここに実際の処理を書く
        do_something(data)

    except json.JSONDecodeError:
        # JSON じゃないメッセージも一応考慮
        logger.info(f"テキストメッセージ: {body}")

def do_something(data):
    # 実処理のプレースホルダー
    logger.info(f"処理完了: {data}")

このコード例では、event["Records"] から各メッセージを取り出し、JSON をパースして処理しています。処理が正常に完了した場合は(設定や状況にもよりますが)Lambda 側で SQS メッセージが削除されます。一方で、処理が失敗した場合は削除されず、可視性タイムアウト後にキューに戻ってきて再処理されることがあります。この「戻ってくる」挙動はハマりポイントなので後述します。

イベントの構造を知っておく

実際に Lambda に渡される event の形はこんな感じです。

{
  "Records": [
    {
      "messageId": "19dd0b57-b21e-4ac1-bd88-01bbb068cb78",
      "receiptHandle": "MessageReceiptHandle",
      "body": "{\"order_id\": \"123\", \"item\": \"coffee\"}",
      "attributes": {
        "ApproximateReceiveCount": "1",
        "SentTimestamp": "1523232000000",
        "SenderId": "123456789012",
        "ApproximateFirstReceiveTimestamp": "1523232000001"
      },
      "messageAttributes": {},
      "eventSource": "aws:sqs",
      "eventSourceARN": "arn:aws:sqs:ap-northeast-1:123456789012:my-queue",
      "awsRegion": "ap-northeast-1"
    }
  ]
}

body が実際のメッセージ内容です。JSON 文字列として入ってくるので、json.loads() でパースする必要があります。receiptHandle は手動削除のときに使うフィールドですが、トリガー経由の場合は Lambda 側が処理することが多いので基本は気にしなくて大丈夫です。

SQS トリガーを設定する(イベントソースマッピング)

Lambda 関数の画面から「トリガーを追加」→ ソースとして「SQS」を選択します。主な設定項目は以下です。

  • SQS キュー:前回作成したキューの ARN を指定
  • バッチサイズ:1 回の呼び出しで渡すメッセージ数(デフォルト 10、標準キューでは最大 10,000)
  • バッチウィンドウ:バッチを満たすまで待つ最大時間(デフォルト 0 秒)
  • 最大同時実行数:このトリガーに対する Lambda の並列数の上限(任意)

バッチサイズはとりあえず 1 から始めると挙動が掴みやすいです。本番でスループットが必要になったら増やせばいい。

「有効化」チェックボックスがありますが、最初から ON で問題ないです。設定を保存したらしばらくで Lambda がポーリングを始めます。

可視性タイムアウトの設定は超重要

ここ、最初に知らないとかなりハマります。

SQS の可視性タイムアウトは「Lambda が処理中にメッセージを他のコンシューマーから隠す時間」です。この時間内に Lambda が処理を終えないと、メッセージがキューに戻ってきて再処理されます。

可視性タイムアウトは、少なくとも Lambda がメッセージを処理し終えるまで十分長くなるように設定するのが基本です(バッチサイズやバッチウィンドウ、処理時間のブレも考慮)。

設定場所は SQS コンソールのキュー設定から。Lambda のタイムアウトを後で変更したときに可視性タイムアウトの更新を忘れがちなので注意です(自分も一回やらかしました)。

エラーハンドリングと部分的バッチ失敗(ReportBatchItemFailures)

デフォルトの挙動だと、バッチ内のメッセージが 1 件でもエラーになると バッチ全体 がキューに戻ることがあります。10 件まとめて処理してて 1 件だけ失敗した場合、残り 9 件も再処理されてしまうわけです。

これを防ぐのが 部分的バッチ応答(Partial Batch Response) という機能です。イベントソースマッピングで ReportBatchItemFailures を有効にすると、失敗したメッセージだけをキューに戻すことができます。これによって不必要なリトライの回数を減らせます。

コードの実装はこんな感じになります。

import json
import logging

logger = logging.getLogger()
logger.setLevel(logging.INFO)

def lambda_handler(event, context):
    batch_item_failures = []

    for record in event["Records"]:
        try:
            body = json.loads(record["body"])
            do_something(body)
        except Exception as e:
            logger.error(f"処理失敗 messageId={record['messageId']}: {e}")
            # ここ注意: messageId を返すことで失敗メッセージだけ再試行対象になる
            batch_item_failures.append({"itemIdentifier": record["messageId"]})

    return {"batchItemFailures": batch_item_failures}

def do_something(data):
    logger.info(f"処理: {data}")

このコードでは、例外が発生したメッセージの messageIdbatch_item_failures に追加して返すことで、失敗したメッセージだけが再処理対象になります。有効にする方法は、Lambda のトリガー設定画面 → 「追加設定」→「バッチ失敗のレポート」をオンにするだけです。地味に便利な機能なので、バッチサイズを 1 以上にする場合は最初から設定しておくのをおすすめします。

正直、この機能は最初知らなくて「なんで同じメッセージが何度も来るんだ…」って悩んでいた時期がありました。DLQ(デッドレターキュー)の設定と合わせて理解しておくと、トラブルシューティングがかなり楽になります。

動作確認

設定が終わったら、SQS コンソールからテストメッセージを送ってみます。

SQS コンソール → 対象キュー → 「メッセージを送受信」→「メッセージを送信」

メッセージ本文に JSON を入れてみましょう。

{"order_id": "001", "item": "coffee", "qty": 2}

送信後、Lambda の「モニタリング」タブ → CloudWatch Logs で実行ログが確認できます。logger.info() で出力した内容がそこに出ていれば成功です。

ログが出ない場合のチェックリスト:

  • イベントソースマッピングが「有効」になっているか
  • 実行ロールに AWSLambdaSQSQueueExecutionRole がアタッチされているか
  • SQS と Lambda がどちらも同じリージョンにあるか
  • Lambda 関数のタイムアウトが短すぎないか(デフォルト 3 秒)

Lambda の同時実行数に引っかかってメッセージが詰まることもあります。その場合はトリガー設定の「最大同時実行数」を確認してみてください。

※この記事にはプロモーションが含まれます

ちなみに、お名前.com レンタルサーバー(WordPressに特化した高速レンタルサーバー。月額990円〜、独自ドメイン実質0円)も気になっています。お名前.com レンタルサーバー

まとめ

SQS トリガーの設定自体はそれほど難しくなくて、IAM ロール・イベントソースマッピング・可視性タイムアウトの 3 点を押さえれば大体動きます。ただ、バッチ処理と ReportBatchItemFailures あたりは最初から理解しておくとあとが楽なので、面倒でも設定しておくのがいいかなと個人的には思っています。

▶ 前回記事:AWS SQS × Lambda Python 自動化入門 — SQS の仕組みとキュー作成

📚 シリーズ「AWS SQS × Lambda Python 自動化入門」(第2回 / 全4回)

← 前回の記事: 前回の記事はこちら

→ 次回の記事: 【第3回】AWS SQS × Lambda Python 自動化入門 — デッドレターキュー(DLQ)の設定と失敗メッセージのハンドリング

参考になったらクリックしてもらえると嬉しいです!

Blogmura CloudAWS Ranking
タイトルとURLをコピーしました