【第2回】AWS EventBridge × Python イベント駆動設計入門 — イベントバスとルールの設計パターン、Pythonでのイベント送受信実装

AWS

前回(第1回)はAWS EventBridgeの基本概念とアーキテクチャ全体像を紹介しました。今回はそこからもう一歩踏み込んで、EventBridge Pipes のフィルタリング機能と、Pythonでの実装パターンをまとめていきます。

正直、Pipesという機能自体は知っていたんですが、「ルールと何が違うの?」というのがずっとモヤモヤしていて。調べたら思ったよりちゃんと役割分担があったので、整理がてらこの記事を書いています。

この記事でわかること

  • EventBridge Pipes とイベントバス + ルールの違い
  • Pipesの FilterCriteria によるフィルタリング構造
  • PythonでPipesを作成・管理するコード例
  • IAM設定で陥りやすいポイント
  • ルール と Pipes の使い分け方
  • Lambda がPipesイベントを受け取る形式

EventBridge Pipes とは何者か

EventBridge には大きく分けて、イベントバス + ルールの組み合わせと、Pipes(パイプ)という2つのルーティング手段があります。

ルールはイベントバス上を流れるイベントを「パターンマッチングで拾ってターゲットに送る」仕組みです。1つのイベントを複数のターゲットに同時に届けるファンアウト構成が得意。一方、Pipesはソース → (フィルタ)→ (エンリッチメント)→ ターゲットという一方向のポイント・ツー・ポイント統合を作れるサービスです。

Pipesが登場する前は、SQSキューをポーリングしてLambdaで処理してSNSに流す、みたいな構成をわざわざ自前のLambdaで書いていました。Pipesを使うとその「つなぎのコード」が不要になるケースがあります。

  • ソースとして使えるもの:SQS、Kinesis、DynamoDB Streams、Amazon MQ、Amazon MSK など
  • ターゲットとして使えるもの:Lambda、SQS、SNS、EventBridgeイベントバス、Step Functions、API Gatewayなど
  • オプションのエンリッチメント:LambdaやAPI Gatewayで途中加工できる

余談ですが、Pipesってネーミング、UNIXのパイプ(|)を意識してるんですかね。知らんけど、個人的にはすごくしっくりくる名前だと思っています。

フィルタリングの仕組み — FilterCriteria の構造

Pipesのフィルタリングは、EventBridgeのイベントパターン構文をそのまま使います。FilterCriteriaというオブジェクトに、Filtersリストを渡す形です。

SQSをソースにする場合、bodyフィールドにメッセージボディが入ってくるので、フィルタもbodyを起点に書きます。

# SQSメッセージのフィルタパターン例
# body.status が "APPROVED" のメッセージだけ通す

{
  "body": {
    "status": ["APPROVED"]
  }
}

DynamoDB Streamsの場合は構造が変わって、dynamodb配下の情報や、eventNameなどを使います。

# DynamoDB Streams のフィルタパターン例
# INSERT イベントだけ通す

{
  "eventName": ["INSERT"]
}

数値比較やプレフィックスマッチも使えます。例えば「金額が1000以上のSQSメッセージだけ処理したい」みたいなケースには数値範囲フィルタが便利です。

# 数値範囲フィルタ(amount >= 1000)
{
  "body": {
    "amount": [{ "numeric": [">=", 1000] }]
  }
}

複数のフィルタをFiltersリストに並べると OR 条件になります。AND にしたい場合は1つのパターン内に複数のフィールドを書きます。これ最初「なんで?」と思ったんですが、イベントパターンの仕様をそのまま引き継いでいるので、第1回の内容とつながる話でもあります。

Pythonでパイプを作る(boto3 実装)

では実際にPythonでPipesを構築してみます。ここでは「SQSキューに流れてきたメッセージのうち、status"APPROVED"のものだけLambdaに流す」という構成を作ります。

前提リソース

事前にAWSコンソールまたはCLIで以下を用意しておきます。

  • SQSキュー(ソース):my-source-queue
  • Lambda関数(ターゲット):my-processor
  • IAMロール(Pipesが使う):SQSのRead権限とLambdaのInvoke権限が必要

IAMロールは忘れがちですが、Pipesがソースをポーリングしたりターゲットを呼び出したりするためのロールを別途作る必要があります。ここがハマりポイントなので後述します。

パイプを作成するコード

import json
import boto3

pipes_client = boto3.client("pipes", region_name="ap-northeast-1")

SOURCE_ARN = "arn:aws:sqs:ap-northeast-1:123456789012:my-source-queue"
TARGET_ARN = "arn:aws:lambda:ap-northeast-1:123456789012:function:my-processor"
ROLE_ARN = "arn:aws:iam::123456789012:role/eventbridge-pipes-role"

filter_pattern = json.dumps({
    "body": {
        "status": ["APPROVED"]
    }
})

response = pipes_client.create_pipe(
    Name="approved-orders-pipe",
    RoleArn=ROLE_ARN,
    Source=SOURCE_ARN,
    SourceParameters={
        "SqsQueueParameters": {
            "BatchSize": 1,
            "MaximumBatchingWindowInSeconds": 0,
        },
        "FilterCriteria": {
            "Filters": [
                {"Pattern": filter_pattern}
            ]
        }
    },
    Target=TARGET_ARN,
    DesiredState="RUNNING",
)

print(response["Arn"])

FilterCriteriaPattern文字列型(JSONを文字列化したもの)を渡す必要があります。辞書をそのまま渡すとエラーになるのでここ注意。json.dumps()を挟むのを忘れないようにしましょう。

フィルタパターンを後から更新する

運用しているとフィルタ条件を変えたくなることがあります。update_pipeで変更できます。

new_filter = json.dumps({
    "body": {
        "status": ["APPROVED", "PENDING"]  # PENDINGも追加
    }
})

pipes_client.update_pipe(
    Name="approved-orders-pipe",
    SourceParameters={
        "FilterCriteria": {
            "Filters": [
                {"Pattern": new_filter}
            ]
        }
    }
)

ただし、update_pipe中の挙動(更新時にパイプがどういう状態遷移をするか)によっては、一時的に処理が進まず、その間のメッセージはSQSキュー側に滞留することがあるようです。SQSの可視性タイムアウト(Visibility Timeout)周りは事前に確認しておくと安心です。

IAMロールの設定(ここでハマった)

最初、Pipesのロールに「SQSのfull accessとLambdaのfull accessをつければ動くだろ」と思って試したら、SQSのポーリング自体はできるのにLambdaの呼び出しでエラーになった、というのをやりました。

Pipesが必要とする権限は意外とシンプルで、最低限これだけあれば動きます。

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "sqs:ReceiveMessage",
        "sqs:DeleteMessage",
        "sqs:GetQueueAttributes"
      ],
      "Resource": "arn:aws:sqs:ap-northeast-1:123456789012:my-source-queue"
    },
    {
      "Effect": "Allow",
      "Action": "lambda:InvokeFunction",
      "Resource": "arn:aws:lambda:ap-northeast-1:123456789012:function:my-processor"
    }
  ]
}

信頼ポリシー(Trust Policy)にpipes.amazonaws.comを追加するのも忘れずに。これを忘れると「ロールを引き受けられない」系のエラーが出ます。

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Principal": {
        "Service": "pipes.amazonaws.com"
      },
      "Action": "sts:AssumeRole"
    }
  ]
}

ルールとPipesの設計パターン使い分け

第1回でEventBridgeのルールを扱ったので、「Pipesとどっちを使えばいい?」という話もまとめておきます。

ざっくり整理するとこんな感じです。

  • イベントバス + ルールが向いているケース:
    • 1つのイベントを複数のサービスに同時に届けたい(ファンアウト)
    • カスタムイベントバスに外部イベントを集約している
    • ソースがEventBridgeに対応していてPull型ポーリングが不要な場合

  • Pipesが向いているケース:

    • SQSやDynamoDB Streamsなど、ポーリング型のソースを使う
    • 処理が1対1でシンプルなポイント・ツー・ポイント
    • 途中でデータを変換・エンリッチしたい
    • Lambda ESMより細かいフィルタリングをしたい

「SQS → Lambda」という構成ならLambdaのイベントソースマッピング(ESM)でもできますが、PipesだとLambdaを書かずにSQSからSNSやEventBridgeバスに直接つなげたりできます。ユースケースによっては「Lambda書かなくてよかった」という体験ができます。

Lambda 側でイベントを受け取る

PipesのターゲットになったLambda関数には、SQSのメッセージがどのような形で渡ってくるのかを確認しておきます。SQSソースのパイプが扱うイベント例はリスト形式になっているので、設定によってはリスト形式のイベントとしてLambdaに渡ってくることがあります。

import json

def lambda_handler(event, context):
    # event がリスト形式で渡ってくるケースを想定
    for record in event:
        body = record.get("body", "{}")
        # body は文字列なのでパースが必要
        data = json.loads(body)
        status = data.get("status")
        amount = data.get("amount")
        print(f"status={status}, amount={amount}")

bodyは文字列として渡ってくるのでjson.loads()が必要です。ここは普通のSQS Lambda連携と同じ扱いです。BatchSizeを1より大きくするとrecordが複数入ってくるので、ループで処理する形にしておくのが無難です。

※この記事にはプロモーションが含まれます

ちなみに、お名前.com レンタルサーバー(WordPressに特化した高速レンタルサーバー。月額990円〜、独自ドメイン実質0円)も気になっています。お名前.com レンタルサーバー

まとめ

EventBridge PipesのフィルタリングはLambda ESMと同じパターン構文を使うので、EventBridgeのルール設計と合わせて覚えると理解がスムーズです。ただ、SQSソースの場合にbodyキーをネストする書き方が必要だったり、Patternに文字列を渡す必要があったりと、細かいところでハマりポイントがいくつかあります。

第3回ではEventBridgeのイベントトランスフォーマーと入力変換、Step Functionsとの組み合わせパターンについても見ていく予定です。

📚 シリーズ「AWS EventBridge × Python イベント駆動設計入門」(第2回 / 全4回)

← 前回の記事: 前回の記事はこちら

→ 次回の記事: 公開後にリンクが追加されます

参考になったらクリックしてもらえると嬉しいです!

Blogmura CloudAWS Ranking
タイトルとURLをコピーしました