前回(第1回)はAWS EventBridgeの基本概念とアーキテクチャ全体像を紹介しました。今回はそこからもう一歩踏み込んで、EventBridge Pipes のフィルタリング機能と、Pythonでの実装パターンをまとめていきます。
正直、Pipesという機能自体は知っていたんですが、「ルールと何が違うの?」というのがずっとモヤモヤしていて。調べたら思ったよりちゃんと役割分担があったので、整理がてらこの記事を書いています。
この記事でわかること
- EventBridge Pipes とイベントバス + ルールの違い
- Pipesの FilterCriteria によるフィルタリング構造
- PythonでPipesを作成・管理するコード例
- IAM設定で陥りやすいポイント
- ルール と Pipes の使い分け方
- Lambda がPipesイベントを受け取る形式
EventBridge Pipes とは何者か
EventBridge には大きく分けて、イベントバス + ルールの組み合わせと、Pipes(パイプ)という2つのルーティング手段があります。
ルールはイベントバス上を流れるイベントを「パターンマッチングで拾ってターゲットに送る」仕組みです。1つのイベントを複数のターゲットに同時に届けるファンアウト構成が得意。一方、Pipesはソース → (フィルタ)→ (エンリッチメント)→ ターゲットという一方向のポイント・ツー・ポイント統合を作れるサービスです。
Pipesが登場する前は、SQSキューをポーリングしてLambdaで処理してSNSに流す、みたいな構成をわざわざ自前のLambdaで書いていました。Pipesを使うとその「つなぎのコード」が不要になるケースがあります。
- ソースとして使えるもの:SQS、Kinesis、DynamoDB Streams、Amazon MQ、Amazon MSK など
- ターゲットとして使えるもの:Lambda、SQS、SNS、EventBridgeイベントバス、Step Functions、API Gatewayなど
- オプションのエンリッチメント:LambdaやAPI Gatewayで途中加工できる
余談ですが、Pipesってネーミング、UNIXのパイプ(|)を意識してるんですかね。知らんけど、個人的にはすごくしっくりくる名前だと思っています。
フィルタリングの仕組み — FilterCriteria の構造
Pipesのフィルタリングは、EventBridgeのイベントパターン構文をそのまま使います。FilterCriteriaというオブジェクトに、Filtersリストを渡す形です。
SQSをソースにする場合、bodyフィールドにメッセージボディが入ってくるので、フィルタもbodyを起点に書きます。
# SQSメッセージのフィルタパターン例
# body.status が "APPROVED" のメッセージだけ通す
{
"body": {
"status": ["APPROVED"]
}
}
DynamoDB Streamsの場合は構造が変わって、dynamodb配下の情報や、eventNameなどを使います。
# DynamoDB Streams のフィルタパターン例
# INSERT イベントだけ通す
{
"eventName": ["INSERT"]
}
数値比較やプレフィックスマッチも使えます。例えば「金額が1000以上のSQSメッセージだけ処理したい」みたいなケースには数値範囲フィルタが便利です。
# 数値範囲フィルタ(amount >= 1000)
{
"body": {
"amount": [{ "numeric": [">=", 1000] }]
}
}
複数のフィルタをFiltersリストに並べると OR 条件になります。AND にしたい場合は1つのパターン内に複数のフィールドを書きます。これ最初「なんで?」と思ったんですが、イベントパターンの仕様をそのまま引き継いでいるので、第1回の内容とつながる話でもあります。
Pythonでパイプを作る(boto3 実装)
では実際にPythonでPipesを構築してみます。ここでは「SQSキューに流れてきたメッセージのうち、statusが"APPROVED"のものだけLambdaに流す」という構成を作ります。
前提リソース
事前にAWSコンソールまたはCLIで以下を用意しておきます。
- SQSキュー(ソース):
my-source-queue - Lambda関数(ターゲット):
my-processor - IAMロール(Pipesが使う):SQSのRead権限とLambdaのInvoke権限が必要
IAMロールは忘れがちですが、Pipesがソースをポーリングしたりターゲットを呼び出したりするためのロールを別途作る必要があります。ここがハマりポイントなので後述します。
パイプを作成するコード
import json
import boto3
pipes_client = boto3.client("pipes", region_name="ap-northeast-1")
SOURCE_ARN = "arn:aws:sqs:ap-northeast-1:123456789012:my-source-queue"
TARGET_ARN = "arn:aws:lambda:ap-northeast-1:123456789012:function:my-processor"
ROLE_ARN = "arn:aws:iam::123456789012:role/eventbridge-pipes-role"
filter_pattern = json.dumps({
"body": {
"status": ["APPROVED"]
}
})
response = pipes_client.create_pipe(
Name="approved-orders-pipe",
RoleArn=ROLE_ARN,
Source=SOURCE_ARN,
SourceParameters={
"SqsQueueParameters": {
"BatchSize": 1,
"MaximumBatchingWindowInSeconds": 0,
},
"FilterCriteria": {
"Filters": [
{"Pattern": filter_pattern}
]
}
},
Target=TARGET_ARN,
DesiredState="RUNNING",
)
print(response["Arn"])
FilterCriteriaのPatternは文字列型(JSONを文字列化したもの)を渡す必要があります。辞書をそのまま渡すとエラーになるのでここ注意。json.dumps()を挟むのを忘れないようにしましょう。
フィルタパターンを後から更新する
運用しているとフィルタ条件を変えたくなることがあります。update_pipeで変更できます。
new_filter = json.dumps({
"body": {
"status": ["APPROVED", "PENDING"] # PENDINGも追加
}
})
pipes_client.update_pipe(
Name="approved-orders-pipe",
SourceParameters={
"FilterCriteria": {
"Filters": [
{"Pattern": new_filter}
]
}
}
)
ただし、update_pipe中の挙動(更新時にパイプがどういう状態遷移をするか)によっては、一時的に処理が進まず、その間のメッセージはSQSキュー側に滞留することがあるようです。SQSの可視性タイムアウト(Visibility Timeout)周りは事前に確認しておくと安心です。
IAMロールの設定(ここでハマった)
最初、Pipesのロールに「SQSのfull accessとLambdaのfull accessをつければ動くだろ」と思って試したら、SQSのポーリング自体はできるのにLambdaの呼び出しでエラーになった、というのをやりました。
Pipesが必要とする権限は意外とシンプルで、最低限これだけあれば動きます。
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"sqs:ReceiveMessage",
"sqs:DeleteMessage",
"sqs:GetQueueAttributes"
],
"Resource": "arn:aws:sqs:ap-northeast-1:123456789012:my-source-queue"
},
{
"Effect": "Allow",
"Action": "lambda:InvokeFunction",
"Resource": "arn:aws:lambda:ap-northeast-1:123456789012:function:my-processor"
}
]
}
信頼ポリシー(Trust Policy)にpipes.amazonaws.comを追加するのも忘れずに。これを忘れると「ロールを引き受けられない」系のエラーが出ます。
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"Service": "pipes.amazonaws.com"
},
"Action": "sts:AssumeRole"
}
]
}
ルールとPipesの設計パターン使い分け
第1回でEventBridgeのルールを扱ったので、「Pipesとどっちを使えばいい?」という話もまとめておきます。
ざっくり整理するとこんな感じです。
- イベントバス + ルールが向いているケース:
- 1つのイベントを複数のサービスに同時に届けたい(ファンアウト)
- カスタムイベントバスに外部イベントを集約している
- ソースがEventBridgeに対応していてPull型ポーリングが不要な場合
- Pipesが向いているケース:
- SQSやDynamoDB Streamsなど、ポーリング型のソースを使う
- 処理が1対1でシンプルなポイント・ツー・ポイント
- 途中でデータを変換・エンリッチしたい
- Lambda ESMより細かいフィルタリングをしたい
「SQS → Lambda」という構成ならLambdaのイベントソースマッピング(ESM)でもできますが、PipesだとLambdaを書かずにSQSからSNSやEventBridgeバスに直接つなげたりできます。ユースケースによっては「Lambda書かなくてよかった」という体験ができます。
Lambda 側でイベントを受け取る
PipesのターゲットになったLambda関数には、SQSのメッセージがどのような形で渡ってくるのかを確認しておきます。SQSソースのパイプが扱うイベント例はリスト形式になっているので、設定によってはリスト形式のイベントとしてLambdaに渡ってくることがあります。
import json
def lambda_handler(event, context):
# event がリスト形式で渡ってくるケースを想定
for record in event:
body = record.get("body", "{}")
# body は文字列なのでパースが必要
data = json.loads(body)
status = data.get("status")
amount = data.get("amount")
print(f"status={status}, amount={amount}")
bodyは文字列として渡ってくるのでjson.loads()が必要です。ここは普通のSQS Lambda連携と同じ扱いです。BatchSizeを1より大きくするとrecordが複数入ってくるので、ループで処理する形にしておくのが無難です。
※この記事にはプロモーションが含まれます
ちなみに、お名前.com レンタルサーバー(WordPressに特化した高速レンタルサーバー。月額990円〜、独自ドメイン実質0円)も気になっています。お名前.com レンタルサーバー![]()
まとめ
EventBridge PipesのフィルタリングはLambda ESMと同じパターン構文を使うので、EventBridgeのルール設計と合わせて覚えると理解がスムーズです。ただ、SQSソースの場合にbodyキーをネストする書き方が必要だったり、Patternに文字列を渡す必要があったりと、細かいところでハマりポイントがいくつかあります。
第3回ではEventBridgeのイベントトランスフォーマーと入力変換、Step Functionsとの組み合わせパターンについても見ていく予定です。
