【第1回】AWS SQS × Lambda Python 自動化入門 — SQSとLambdaの基本構成とトリガー設定を理解しよう

AWS

「非同期処理をサーバーレスでやりたい」と思って調べ始めたら、どこを見ても SQS + Lambda の組み合わせが出てきた。なんとなくイメージはあるけど、実際に手を動かすとつまずく部分が多かったので、自分用に整理しながらシリーズ記事にしてみました。

このシリーズは全4回で、AWS SQS と Lambda を Python で組み合わせた自動化の仕組みを一から作っていきます。

  • 第1回(今回):SQS と Lambda の基本構成、トリガー設定の仕組みを理解する
  • 第2回:Lambda 関数の実装と、イベント構造の読み方
  • 第3回:エラーハンドリング、DLQ(デッドレターキュー)の設定
  • 第4回:実用的なパターン(S3連携、EventBridge連携など)

第1回は「そもそも SQS って何?」「Lambda トリガーってどういう仕組みで動くの?」というところから、実際にトリガーを設定するところまでを扱います。

この記事でわかること

  • Amazon SQS(Simple Queue Service)とは何か、Lambda との組み合わせがなぜ有効か
  • SQS のキュータイプ(Standard と FIFO)の違いと選び方
  • Event Source Mapping でポーリングが動く仕組み
  • マネジメントコンソールからの実際のトリガー設定手順
  • 初期設定で気をつけるべきポイント(可視性タイムアウト、IAM 権限など)

SQS と Lambda をざっくり理解する

まず前提として、Amazon SQS(Simple Queue Service)はメッセージキューのサービスです。何かのアプリがメッセージを「送信」して、別の何かがそれを「受信・処理する」という非同期の仕組みを作るために使います。

最初に SQS を知ったときは「キュー」という言葉に馴染みがなくて、なんとなく難しそうに見えました。要するに「待合室」みたいなものだと思うと少し楽になります。

Lambda はご存じの通り、コードをサーバーレスで実行するサービス。この二つを組み合わせることで、「キューにメッセージが来たら自動で Lambda が処理する」 という構成が作れます。

たとえばこういった使い方が考えられます:

  • 画像アップロードのリクエストをキューに入れて、Lambda で非同期にリサイズ処理する
  • 外部 API からのデータをキューに積んで、Lambda でバッチ的に DB 保存する
  • EC2 や他のサービスから処理依頼をキューに投げ、Lambda 側で受け取る

「直接 Lambda を呼べばいいんじゃ?」と思うかもしれないですが、SQS を挟むことで バースト時のトラフィック吸収サービス間の疎結合化リトライ管理 ができるようになります。この辺の設計上のメリットはシリーズを通して実感できると思います。

SQS のキュータイプ:Standard か FIFO か

SQS には2種類のキューがあって、最初に選択が必要です。

Standard キュー

高スループットで使いやすいのが特徴です。ただしメッセージの順序は保証されないし、まれに同じメッセージが複数回デリバリーされることがあります(At-least-once delivery)。Lambda との組み合わせでよく使われるのはこちら。バッチサイズも最大 10,000 件まで設定できます。

FIFO キュー

順序が保証されて、重複排除(Exactly-once processing と説明されることが多い)に対応します。ただしスループットに制限があり、Lambda と組み合わせるときのバッチサイズ上限は 10 件です。「順番通りに処理したい」「重複は極力避けたい」という要件があるときに使います。とはいえ、アプリ側の冪等性も考えておくのが無難です。

入門段階では Standard キューで問題ないです。自分も最初は Standard から始めました。

Event Source Mapping の仕組み

SQS と Lambda を繋ぐ仕組みが Event Source Mapping(イベントソースマッピング) です。ここが少しわかりにくかったので丁寧に説明します。

Lambda は SQS をポーリング(定期的にメッセージを確認)して、メッセージがあれば自動的に関数を呼び出します。この「自動ポーリング → 呼び出し」の紐付けがイベントソースマッピングです。

重要なのは、Lambda 側が SQS をポーリングする形になっている という点。「SQS が Lambda を呼ぶ」というよりも「Lambda が SQS を監視していて、メッセージを取りに行く」イメージです。内部ではデフォルトで 2〜200 のポーラーが並行して動いていて、負荷に応じて自動スケールします。

もう一つ大事な設定が バッチサイズ。1回の Lambda 呼び出しで何件のメッセージをまとめて処理するかを決めます。デフォルトは 10 です。Standard キューは最大 10,000 まで増やせますが、10 を超える場合は MaximumBatchingWindowInSeconds(バッチ待機時間)を 1 秒以上に設定する必要があります。

最初は「バッチサイズ 1 にすれば1件ずつ確実に処理できそう」と思っていたんですが、ケースによってはコストやスループット面で不利になりがちとされています。後で詳しく触れます。

実際にトリガーを設定してみる

ここからは実際の手順です。マネジメントコンソールでの操作を前提にします。

1. SQS キューを作成する

AWS コンソールで SQS を開いて、「キューの作成」から Standard キューを作ります。設定で気をつけるのは 可視性タイムアウト です。

可視性タイムアウトは「メッセージが Lambda に渡された後、他のコンシューマーから見えなくなる時間」。Lambda がこの時間内に処理を完了できないと、メッセージがキューに戻って再処理されます。

AWS のドキュメントでは、Lambda 関数のタイムアウトの6倍以上(+設定している場合はバッチ待機時間も考慮) に設定することが推奨されています。Lambda のデフォルトタイムアウトが 3 秒なら、可視性タイムアウトは最低 18 秒以上、余裕を見て 60〜300 秒あたりに設定しておくのが無難です。

2. Lambda 関数を作成する

Lambda コンソールから新しい関数を作ります。ランタイムは Python 3.12 を選択。実行ロールは「基本的な Lambda アクセス権限で新しいロールを作成」でひとまず OK です。

3. トリガーを追加する

Lambda 関数の設定画面で「トリガーを追加」→「SQS」を選択し、先ほど作ったキューを指定します。バッチサイズはデフォルト(10)のままで。

このとき、Lambda 実行ロールに SQS へのアクセス権が必要です。マネジメントコンソールからトリガーを追加すると、必要な権限(sqs:ReceiveMessagesqs:DeleteMessagesqs:GetQueueAttributes)が自動でロールに付与されることが多いですが、念のため IAM ロールを確認しておくといいです。

また、暗号化キュー(KMS)を使っている場合は、Lambda の実行ロールに kms:Decrypt の権限も追加しないとポーリングが動かないので注意。ここでハマった記憶があります。

4. 最初の Lambda 関数コードを書く

トリガー設定が終わったら、まず受け取ったイベントの構造を確認するコードを書きます。

import json

def lambda_handler(event, context):
    for record in event['Records']:
        body = record['body']
        print(f"受信メッセージ: {body}")
    
    return {'statusCode': 200}

SQS から Lambda に渡されるイベントは Records というリストを持っていて、各レコードに body(メッセージ本文)や messageIdattributes などが入っています。実際のイベント構造はこんな感じです:

{
  "Records": [
    {
      "messageId": "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx",
      "receiptHandle": "...",
      "body": "Hello from SQS",
      "attributes": {
        "ApproximateReceiveCount": "1",
        "SentTimestamp": "1234567890000",
        ...
      },
      "messageAttributes": {},
      "md5OfBody": "...",
      "eventSource": "aws:sqs",
      "eventSourceARN": "arn:aws:sqs:ap-northeast-1:123456789012:my-queue",
      "awsRegion": "ap-northeast-1"
    }
  ]
}

JSON 形式のメッセージを送る場合は bodyjson.loads() でパースします。

import json

def lambda_handler(event, context):
    for record in event['Records']:
        body = json.loads(record['body'])  # ここ注意:文字列なのでパースが必要
        print(body)

5. テストメッセージを送ってみる

SQS コンソールからキューを選んで「メッセージを送信」で動作確認できます。Lambda コンソールの「モニタリング」タブや CloudWatch Logs で実行ログを確認しましょう。

ちゃんとトリガーが動いていれば、送ったメッセージの内容がログに出るはずです。

設定のポイントをまとめておく

ここまでの内容で、特に意識しておきたいポイントをざっくり整理します。

  • キューと Lambda は同一リージョンに置く(異なる AWS アカウント間については、権限設計など条件が絡むため要件に応じて確認が必要です)
  • 可視性タイムアウト = Lambda タイムアウトの6倍以上 を守る
  • バッチサイズは 1 にしない。デフォルト 10 か、要件に合わせて増やす(バッチ 1 はケースによって不利になりがち)
  • IAM ロールの権限不足でサイレントに動かないことがあるので、ロールは都度確認する
  • 処理失敗時の挙動(バッチ全体がキューに戻る)を理解しておく

処理失敗時の挙動については、デフォルトだとバッチ内のどれか1件でもエラーになると バッチ全体がキューに戻ります。これは「部分的バッチレスポンス(ReportBatchItemFailures)」で改善できますが、まずはこの挙動を頭に入れておくだけで OK です。

※この記事にはプロモーションが含まれます

ちなみに、お名前.com レンタルサーバー(WordPressに特化した高速レンタルサーバー。月額990円〜、独自ドメイン実質0円)も気になっています。お名前.com レンタルサーバー

まとめ

第1回では SQS と Lambda の基本、キュータイプの選び方、イベントソースマッピングの仕組みから、実際のトリガー設定までを説明しました。

ポイントは以下の3つです:

  • SQS は「メッセージの待合室」で、Lambda がポーリング型で監視している
  • Standard キューが一般的、FIFO キューは順序と重複排除が必要なときに使う
  • 可視性タイムアウトと IAM 権限の設定がサイレントエラーを防ぐカギになる

第2回では、このトリガーが実際に動いたときのイベント構造をもっと詳しく見たり、実用的な Lambda 関数の書き方を扱う予定です。まずは今回の内容で、SQS と Lambda をポーリング型で繋ぐ仕組みをイメージできていれば大丈夫です。

📚 シリーズ「AWS SQS × Lambda Python 自動化入門」(第1回 / 全4回)

→ 次回の記事: 公開後にリンクが追加されます

参考になったらクリックしてもらえると嬉しいです!

Blogmura CloudAWS Ranking
タイトルとURLをコピーしました