前回は EventBridge のカスタムイベントバスとルールを使って「イベントを受け取って振り分ける」ところまで実装しました(第2回はこちら)。今回はそこからもう一歩踏み込んで、EventBridge Pipes を使った SQS・DynamoDB Streams との連携を試してみます。
Pipes、名前は聞いてたけどイベントバスとどう違うの?という疑問がずっとあって。実際に手を動かして「あ〜、これはグルーコードを書かなくていい仕組みか」とようやく腹落ちしたので、その過程をそのまま書いていきます。
この記事でわかること
- EventBridge Pipes とイベントバスの違い
- SQS → Pipes → Lambda でフィルタリング付きパイプを構築する方法
- DynamoDB Streams → Pipes → SQS でストリームイベントをルーティングする方法
- フィルタリング・エンリッチメント・IAM設定の実装パターン
- Pipes が活躍するシーンと使い分け
EventBridge Pipes とは:イベントバスとの違いを整理する
EventBridge Pipes はソースとターゲットをつなぐポイント・トゥ・ポイントの統合サービスです。高度なトランスフォームやエンリッチメントをサポートしており、イベント駆動アーキテクチャを開発する際に特化した知識や統合コードの必要性を減らしてくれます。
イベントバスとの違いを一言でまとめると:
- イベントバス:多数の発行者 → 多数の受信者(多対多)
- Pipes:単一の発行者 → 単一の受信者(一対一)
「一対一しか使えないなら弱くない?」と最初は思ったんですが、Pipes の強みはフロー内に フィルタリング → エンリッチメント → ターゲット の3ステップを宣言的に書けることにあります。フィルタリングによって対象のイベントのサブセットだけを選択・処理でき、エンリッチメントによってターゲットに送る前にデータを補完・強化することができます。
AWS BlackBelt の資料を見ていたら「サービス間連携の課題」として認証・エラーハンドリング・デプロイなど列挙されていて、Pipes はそうした課題を解消するための「glue(糊)コード」として機能する、という説明がされていました。確かに Lambda でつなぐたびに書いていたアレです。
今回構築するアーキテクチャ
次の2パターンを実装します。
- パターンA:SQS → Pipes(フィルタリング)→ Lambda:SQS に届いたメッセージのうち、特定条件を満たすものだけ Lambda に流す
- パターンB:DynamoDB Streams → Pipes(フィルタリング)→ SQS:テーブルへの INSERT イベントだけを SQS に送る
コードは boto3 で書きます。IaC としては CDK や SAM を使う方が実務的ですが、仕組みを理解するには API を直接叩くほうが構造が見えやすいので、今回はあえて boto3 縛りにしています。
パターンA:SQS → Pipes → Lambda(フィルタリング付き)
フィルタリングの仕組み
EventBridge Pipes では FilterCriteria オブジェクトを使ってイベントをフィルタリングします。FilterCriteria は Filters のリストで構成され、各フィルタは Pattern という文字列で表現された JSON フィルタルールを持ちます。
SQS をソースにするときの注意点がひとつ。SQS のメッセージボディは JSON とは限らず任意の文字列を含められるため、EventBridge Pipes は受信メッセージのフォーマット(有効な JSON か平文かどうか)に合わせた FilterCriteria を期待します。フォーマットが一致しない場合、(SQS ソースでは)EventBridge Pipes はそのメッセージをキューから自動的に削除します。ここは地味にハマりポイントです。
フィルタリングはコストにも関係していて、フィルタにマッチしたイベントに対してのみ課金されます。条件を絞れば余計なコストも減るので積極的に使っていきたいです。
boto3 で Pipe を作成する
前提リソース(SQS キュー・Lambda 関数・IAM ロール)はすでに作成済みとして、Pipe の作成コードだけ示します。
import boto3
import json
pipes = boto3.client("pipes", region_name="ap-northeast-1")
# SQS → Lambda のパイプ(ステータスが "NEW" のメッセージだけ通す)
response = pipes.create_pipe(
Name="order-filter-pipe",
RoleArn="arn:aws:iam::123456789012:role/EventBridgePipesRole",
Source="arn:aws:sqs:ap-northeast-1:123456789012:order-input-queue",
SourceParameters={
"SqsQueueParameters": {
"BatchSize": 10,
"MaximumBatchingWindowInSeconds": 5,
},
"FilterCriteria": {
"Filters": [
{
"Pattern": json.dumps({
"body": {
"status": ["NEW"] # ここ注意:リスト形式で指定
}
})
}
]
},
},
Target="arn:aws:lambda:ap-northeast-1:123456789012:function:process-order",
TargetParameters={
"LambdaFunctionParameters": {
"InvocationType": "REQUEST_RESPONSE"
}
},
)
print(response["Arn"])
FilterCriteria の Pattern は文字列として渡す必要があるので、json.dumps() で変換しています。ここを辞書のまま渡すとエラーになります(一敗)。
また、InvocationType は REQUEST_RESPONSE(同期)と FIRE_AND_FORGET(非同期)が選べます。Lambda 関数をターゲットにすると、デフォルトでは同期的に呼び出されます。非同期にしたい場合は invocationType を FIRE_AND_FORGET に設定します。処理に時間がかかる場合は非同期のほうが安全かもしれないですね。
Lambda 側で受け取るイベント構造
Pipes から Lambda に渡ってくるイベントはリスト形式になっているようです。SQS ソースの場合、各要素が SQS メッセージそのもののように見えます。
def lambda_handler(event, context):
for record in event:
body = json.loads(record["body"])
print(f"status: {body['status']}, order_id: {body['order_id']}")
return {"statusCode": 200}
通常の SQS → Lambda のイベント構造(event["Records"] を使うやつ)と微妙に違うようなので注意です。Pipes 経由だとラッパーが変わるように見えます。
パターンB:DynamoDB Streams → Pipes → SQS(INSERT だけ流す)
DynamoDB Streams をソースにする場合の設定
EventBridge Pipes を使うと DynamoDB ストリームのレコードを受け取ることができます。その後、オプションとしてフィルタリングやエンリッチメントを行ってから、ターゲットに送信します。
デフォルトではレコードが利用可能になり次第 Pipes が呼び出されますが、バッチングウィンドウを設定することで少量レコードのたびに起動するのを避けられます。ウィンドウが満杯になるか、期限切れになるか、ペイロードが 6 MB に達するまでレコードを読み続けるようです。
また、Pipes は DynamoDB から SQS への配信を少なくとも1回行う、とされています。レコードがドロップされないようにするため、DynamoDB ストリームの保持期間より短い最大エージのリトライポリシーを設定することが推奨されているようです。「少なくとも1回」ということは冪等性の設計が必要ですね。
import boto3
import json
pipes = boto3.client("pipes", region_name="ap-northeast-1")
# DynamoDB Streams → SQS(INSERT イベントだけ通す)
response = pipes.create_pipe(
Name="dynamo-insert-pipe",
RoleArn="arn:aws:iam::123456789012:role/EventBridgePipesRole",
Source="arn:aws:dynamodb:ap-northeast-1:123456789012:table/Orders/stream/2025-01-01T00:00:00.000",
SourceParameters={
"DynamoDBStreamParameters": {
"StartingPosition": "LATEST",
"BatchSize": 100,
"MaximumBatchingWindowInSeconds": 30,
"OnPartialBatchItemFailure": "AUTOMATIC_BISECT",
},
"FilterCriteria": {
"Filters": [
{
"Pattern": json.dumps({
"eventName": ["INSERT"] # INSERT のみ通過させる
})
}
]
},
},
Target="arn:aws:sqs:ap-northeast-1:123456789012:order-created-queue",
TargetParameters={
"SqsQueueParameters": {
"MessageGroupId": "$.dynamodb.NewImage.category.S"
}
},
)
print(response["Arn"])
OnPartialBatchItemFailure: "AUTOMATIC_BISECT" を設定しておくと、バッチの一部が失敗した場合に自動で二分探索してエラーレコードを特定してくれます。地味に便利。
ターゲット側の MessageGroupId で使っている $.dynamodb.NewImage.category.S は JSONPath 形式でイベントの値を参照しています。これを使うと FIFO キューのグループ ID を動的に決定できます。
フィルタリングパターンの書き方まとめ
EventBridge のフィルタパターンはイベントバスのルールとほぼ同じ書き方ですが、ソースによって参照できるフィールドが違います。
DynamoDB Streams のフィルタ例
# INSERT と MODIFY だけ通す
{
"eventName": ["INSERT", "MODIFY"]
}
# 特定フィールドが存在するレコードだけ通す
{
"eventName": ["INSERT"],
"dynamodb": {
"NewImage": {
"status": {
"S": ["PENDING"]
}
}
}
}
SQS のフィルタ例
# body の JSON フィールドに対してフィルタ
{
"body": {
"event_type": ["order_created"],
"amount": [{ "numeric": [">", 1000] }]
}
}
# 数値比較もできる(numeric 演算子)
{
"body": {
"retry_count": [{ "numeric": ["<=", 3] }]
}
}
数値比較の numeric 演算子は使う機会が多いのでぜひ。複雑な条件分岐が必要な場合は Lambda エンリッチメントに任せたほうがいいかもしれませんが、シンプルな条件ならフィルタだけで完結させるのが Pipes らしい使い方だと思います。
エンリッチメント:Lambda でデータを補完してからターゲットへ
フィルタを通過したイベントをそのままターゲットに流すだけでなく、途中で Lambda を挟んでデータを加工することもできます。これが「エンリッチメント」です。
エンリッチメントを使うと、イベントに不足している情報を追加してからターゲットに送ることができます。従来は EventBridge でこれをやろうとすると Lambda 関数を書いてメンテナンスし続ける必要がありましたが、Pipes ではそれをマネージドな形で組み込めます。
# エンリッチメント Lambda を追加する場合
response = pipes.create_pipe(
Name="enriched-order-pipe",
RoleArn="arn:aws:iam::123456789012:role/EventBridgePipesRole",
Source="arn:aws:sqs:ap-northeast-1:123456789012:order-input-queue",
SourceParameters={
"SqsQueueParameters": {"BatchSize": 1},
"FilterCriteria": {
"Filters": [
{"Pattern": json.dumps({"body": {"status": ["NEW"]}})}
]
},
},
Enrichment="arn:aws:lambda:ap-northeast-1:123456789012:function:enrich-order",
EnrichmentParameters={
"InputTemplate": '{"order_id": <$.body.order_id>, "source": "pipe"}'
},
Target="arn:aws:sqs:ap-northeast-1:123456789012:order-enriched-queue",
)
EnrichmentParameters の InputTemplate で JSONPath を使ってエンリッチメント Lambda に渡す入力を成形できます。エンリッチメント関数の出力はそのままパイプのターゲットに渡されます。つまりエンリッチメント Lambda の戻り値がターゲットへの入力になる、という流れです。
正直、InputTemplate の JSONPath 記法はちょっとクセがあって、文字列フィールドはクォートを付けて "<$.body.field>"、オブジェクトや数値はクォートなしで <$.body.amount> と書き分ける必要があるようです。EventBridge は実行時に input transformer を置き換えて有効な JSON を出力します。JSON パス変数を参照する場合はクォートで囲み、JSON オブジェクトや配列を参照する変数にはクォートをつけないようにしてください。ここを間違えると変な JSON が生成されてハマります。
IAM ロールで必要な権限
Pipes を動かすには専用の IAM ロールが必要です。最低限これだけは押さえておくリストとして:
- SQS ソース:
sqs:ReceiveMessage,sqs:DeleteMessage,sqs:GetQueueAttributes - DynamoDB Streams ソース:
dynamodb:GetRecords,dynamodb:GetShardIterator,dynamodb:DescribeStream,dynamodb:ListStreams - Lambda エンリッチメント/ターゲット:
lambda:InvokeFunction - SQS ターゲット:
sqs:SendMessage
信頼ポリシーの Principal は pipes.amazonaws.com です。ここを events.amazonaws.com にしてしまうと動かないので注意(これも一敗です)。
※この記事にはプロモーションが含まれます
ちなみに、お名前.com レンタルサーバー(WordPressに特化した高速レンタルサーバー。月額990円〜、独自ドメイン実質0円)も気になっています。お名前.com レンタルサーバー![]()
まとめ
EventBridge Pipes は「単純なフィルタリングとルーティングだけやりたい」という場面で特に活躍します。複雑な分岐や複数ターゲットへのファンアウトが必要な場合は素直にイベントバスを使ったほうがシンプルです。
DynamoDB Streams は1シャードあたり同時消費者が最大2つという制限があります。複数コンシューマーが必要なケースや、変更イベントを処理前にエンリッチしたい場合、より高度なフィルタリングやルーティングが必要な場合に EventBridge との組み合わせが有効です。
次回は実装パターンをもう少し深掘りして、エラーハンドリングやモニタリングの話に進む予定です。

