AWSでちょっとしたイベント処理を作るとき、「SQSのメッセージをLambdaで受け取って、中身を整形して別のサービスに渡す」みたいなコードを何度書いたことか。それ自体は別に難しくないんですが、同じようなグルーコードを量産しているな〜という感覚がずっとあって。そんなときに知ったのが EventBridge Pipes でした。
このシリーズでは、EventBridge PipesをPythonから使いこなすことを目標に、全4回で進めていきます。第1回の今回は「Pipesってそもそも何なの?」という基本構造の整理と、Pythonで実際にパイプを作るところまで。第2回以降でフィルタリングの深掘り、Enrichment(エンリッチメント)との組み合わせ、実践的なアーキテクチャパターンと続く予定です。
この記事でわかること
- EventBridge Pipesの概要と4ステップの基本構造
- EventBridgeルールとPipesの使い分け
- boto3でSQS→Lambdaパイプを作る最小実装
- FilterCriteria・Input Transformerの基本的な書き方
- パイプの状態確認・停止・削除の方法
EventBridge Pipesとは
EventBridge Pipesはソースとターゲットを接続するサービスで、ポイント・ツー・ポイントの統合を意図して設計されており、フィルタリングや入力変換(transform)、エンリッチメントもサポートしています。一言でいうと「AWSサービス間のデータ受け渡しを、Lambdaを挟まず(または最小限にして)つなぐコンベア」みたいなものです。
EventBridge Pipesは 2022年12月に一般提供(GA) されています(re:Invent 2022の発表枠のひとつでもありました)。
EventBridge ルールとPipesの使い分け
EventBridgeのイベントバスは多対多のルーティングに向いています。一方、Pipesはポイント・ツー・ポイント、つまり1つのソースから1つのターゲットへの接続に適しています。
簡単に整理するとこんな感じです。
- EventBridge ルール(イベントバス):イベントを複数のターゲットに配信したい。ファンアウト構成。
- EventBridge Pipes:特定のソースから特定のターゲットへ。途中でフィルタしたり、変換(InputTemplate)やデータ補完(Enrichment)を挟みたい。
どちらを使うか迷ったら「1対1か、1対多か」で判断するのが一番シンプルかなと思います。
Pipesの基本構造:4つのステップ
パイプのセットアップは、ソースの選択 → オプションのフィルタリング → オプションのエンリッチメントの定義 → ターゲットの指定という流れで行います。
図にすると次のようなイメージです。
[Source] → [Filter(任意)] → [Enrichment(任意)] → [Target]
各ステップを見ていきます。
① Source(ソース)
対応しているソースは SQS、Kinesis Data Streams、DynamoDB Streams、Kafka(Amazon MSK / セルフマネージド)、Amazon MQ(ActiveMQ / RabbitMQ) などです。ポーリング型のソースが中心で、EventBridgeがバックグラウンドで自動的にポーリングしてくれます。
② Filter(フィルタリング)
フィルタリングを設定すると、条件に一致したイベントだけが処理されます。Pipesの課金は「フィルタに一致して通過した分」が対象なので、フィルタで落とせた分はコスト抑制になります。
そういえば最近、何でもかんでも「とりあえず全部流してからLambdaでif文」みたいな雑設計をしがちなので、こういう”入口で絞れる”仕組みはありがたいです(自戒)。
③ Enrichment(エンリッチメント)
エンリッチメントステップでは、ソースから受け取ったデータをターゲットに送る前に補完できます。たとえば、チケットIDしか含まないイベントを受け取ったとき、LambdaでチケットAPIを呼び出してフル情報を付け加えるといった使い方ができます。
従来はこういう「データ補完のためだけのLambda」を書いていたわけですが、Pipesに乗せることで処理の流れが一箇所にまとまります。
④ Target(ターゲット)
Lambda、SQS、SNS、Step Functions、EventBridgeイベントバス、API Gateway などがターゲットになれます。
ターゲットの呼び出しは 同期(REQUEST_RESPONSE) と 非同期(FIRE_AND_FORGET) の2種類があります。選択できる呼び出し方式はターゲットの種別やソースの設定にも依存するので、詳細は公式ドキュメントの「Invocation type」周りを確認するのが確実です。
実際にPythonでパイプを作ってみる
余談ですが、Pipesはコンソールからも作れてGUIがかなり直感的なので、最初はコンソールで構造を確認してからコードに落とす、という流れがおすすめです。
ここではSQSをソース、Lambdaをターゲットにした最小構成のパイプをboto3で作ります。
前提リソース
以下のリソースは作成済みの想定です。
- SQSキュー(ソース用)
- Lambda関数(ターゲット用)
- EventBridge Pipesに付与するIAMロール
IAMロールについて(重要)
PipesはSQSからのメッセージ読み取りとLambdaの起動、両方の権限が必要です。最低限必要なポリシーは以下のとおりです。
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"sqs:ReceiveMessage",
"sqs:DeleteMessage",
"sqs:GetQueueAttributes"
],
"Resource": "arn:aws:sqs:ap-northeast-1:ACCOUNT_ID:your-source-queue"
},
{
"Effect": "Allow",
"Action": [
"lambda:InvokeFunction"
],
"Resource": "arn:aws:lambda:ap-northeast-1:ACCOUNT_ID:function:your-target-function"
}
]
}
信頼ポリシーの Principal は pipes.amazonaws.com にします。ここを忘れると「ロールが使えない」エラーで詰まります(詰まりました)。
パイプ作成のPythonコード
SQSをソース、Lambdaをターゲットとした最小構成です。FilterCriteria で event_type: order_created のメッセージだけを通すフィルタも合わせて設定しています。
import boto3
import json
pipes_client = boto3.client("pipes", region_name="ap-northeast-1")
SOURCE_ARN = "arn:aws:sqs:ap-northeast-1:ACCOUNT_ID:your-source-queue"
TARGET_ARN = "arn:aws:lambda:ap-northeast-1:ACCOUNT_ID:function:your-target-function"
ROLE_ARN = "arn:aws:iam::ACCOUNT_ID:role/your-pipes-role"
response = pipes_client.create_pipe(
Name="my-first-pipe",
RoleArn=ROLE_ARN,
Source=SOURCE_ARN,
SourceParameters={
"SqsQueueParameters": {
"BatchSize": 1,
"MaximumBatchingWindowInSeconds": 0,
},
# フィルタ:bodyに "event_type": "order_created" が含まれるメッセージだけ通す
"FilterCriteria": {
"Filters": [
{
"Pattern": json.dumps({
"body": {
"event_type": ["order_created"]
}
})
}
]
},
},
Target=TARGET_ARN,
TargetParameters={
"LambdaFunctionParameters": {
"InvocationType": "REQUEST_RESPONSE", # 同期呼び出し
}
},
Description="SQS -> Lambda の最小構成パイプ",
)
print(response["Arn"])
FilterCriteria の Pattern はboto3的にはstringとして渡す仕様なので、json.dumps() を挟んでJSON文字列化しておくのが安全です。
ターゲット側のLambdaで受け取るイベント
LambdaがPipesから受け取るイベントは(バッチの場合)JSON配列になります。SQSのバッチ処理に近い構造です。
import json
def lambda_handler(event, context):
for record in event:
body = record.get("body", "{}")
data = json.loads(body)
print(f"受け取ったイベント: {data}")
return {"statusCode": 200}
Enrichment関数の場合も、第一引数 event にソース(またはフィルタ後の)ペイロードが渡されます。SQSがソースでメッセージを送った場合、Lambdaが受け取るのはそのSQSメッセージの配列になります(バッチ設定次第で要素数が変わります)。
Input Transformerで整形する
ターゲットに渡す前にデータの形を変えたい場合は InputTemplate を使います。JSONパスで元データから値を取り出し、好きな形に組み替えられます。
response = pipes_client.create_pipe(
Name="my-pipe-with-transformer",
RoleArn=ROLE_ARN,
Source=SOURCE_ARN,
Target=TARGET_ARN,
TargetParameters={
"LambdaFunctionParameters": {
"InvocationType": "REQUEST_RESPONSE",
},
"InputTemplate": json.dumps({
"order_id": "<$.body.order_id>",
"user_id": "<$.body.user_id>",
"pipe_arn": "<aws.pipes.pipe-arn>", # 予約変数
}),
},
)
Input Templateでは、JSONパスで参照した変数に加えて、aws.pipes.pipe-arn などいくつかの予約変数を使うこともできます。デバッグ時にどのパイプから来たか追えるので、予約変数は入れておくと便利です。
ただ、JSON文字列型の変数を参照するときはクォートが必要で、オブジェクトや配列型のときはクォート不要といった細かいルールがあります。正直ここは実際に動かしながら確認するのが早いです。
パイプの状態確認・停止・削除
作ったパイプの管理もboto3でできます。よく使うやつだけ。
import boto3
pipes = boto3.client("pipes", region_name="ap-northeast-1")
# 状態確認
detail = pipes.describe_pipe(Name="my-first-pipe")
print(detail["CurrentState"]) # RUNNING / STOPPED / CREATING ...
# 停止
pipes.stop_pipe(Name="my-first-pipe")
# 削除
pipes.delete_pipe(Name="my-first-pipe")
パイプの状態遷移は CREATING → RUNNING が基本で、停止すると STOPPED になります。CREATING のままずっと進まないときはIAMロールの権限を疑うのがだいたい正解です。
第1回のまとめ
今回はEventBridge Pipesの4ステップ構造(Source → Filter → Enrichment → Target)を整理して、SQS→Lambdaの最小構成をboto3で作るところまで試してみました。
- EventBridge PipesはAWSサービス間のポイント・ツー・ポイント接続をシンプルに実現するサービス
- 基本構造は Source → Filter(任意)→ Enrichment(任意)→ Target の4ステップ
- フィルタは「入口で絞る」設計ができるのでコスト面でも有効
FilterCriteriaのPatternはJSON文字列として渡す(json.dumps()必須)- IAMロールの信頼ポリシーの
Principalにpipes.amazonaws.comを忘れずに
Lambdaで書いていたグルーコードをかなり減らせる可能性があることは伝わったかなと思います。第2回ではフィルタリングの条件指定をもう少し深掘りしていく予定です。
📚 シリーズ「AWS EventBridge × Python イベント駆動設計入門」(第1回 / 全4回)
→ 次回の記事: 【第2回】AWS EventBridge × Python イベント駆動設計入門 — イベントバスとルールの設計パターン、Pythonでのイベント送受信実装

