【第4回】AWS EventBridge × Python イベント駆動設計入門 — 本番運用を見据えたアーキテクチャ設計と、よくあるハマりどころ総まとめ

AWS

前回は EventBridge Pipes の基本構成(SQS → フィルタ → Enrichment Lambda → ターゲット)を動かすところまでやりました。今回はそれを「実際に本番で使う」前提で、監視・エラーハンドリング・設計上の注意点をまとめていきます。

正直、第1回〜第3回は「動かす」ことに集中していたので、本番運用の話をほとんど触れられていなかったんですよね。そこが一番大事なのに。というわけで今回はそこをちゃんと書きます。

この記事でわかること

  • EventBridge Pipes の CloudWatch メトリクスと監視の設定方法
  • SQS ソース側とターゲット側の DLQ 設定の違い
  • Enrichment Lambda でハマりやすい実装パターン
  • フィルタ設定で気をつけるべき点
  • 本番運用に向けたアーキテクチャ設計の考え方

EventBridge Pipes のおさらい:何が嬉しかったのか

改めて整理しておくと、EventBridge Pipes は「1つのソースから1つのターゲットへ、フィルタ・エンリッチメント・変換を挟みながらイベントを流す」サービスです。EventBus の多対多とは違って、ポイント・ツー・ポイントに特化しています。

従来だと SQS のポーリング → Lambda でフィルタ → 別の Lambda 呼び出し……という構成を自前で書いていたものが、Pipes で宣言的に定義できるようになりました。コードが減るのは素直に嬉しい。

ただ、その分「内部で何が起きているか見えにくくなる」という側面もあって、本番で使うとなると監視の設計がけっこう大事になります。

まず押さえたい:Pipes が出す CloudWatch メトリクス

EventBridge Pipes は標準でいくつかの CloudWatch メトリクスを出してくれます。最低限これだけは把握しておきたいです。

  • ExecutionStarted:パイプの実行が開始した回数
  • ExecutionThrottled:スロットリングが発生した回数
  • ExecutionFailed:実行が失敗した回数
  • ExecutionPartiallyFailed:バッチ処理で一部だけ失敗した回数

特に ExecutionFailedExecutionThrottled はアラームを張っておきたい筆頭です。スロットリングは「実行が何らかの理由で制限された」状態を示すので、高スループットが予想される場合は事前に Service Quotas を確認しておいたほうがいいです。

boto3 でアラームを設定するならこんな感じです:

import boto3

cloudwatch = boto3.client("cloudwatch", region_name="ap-northeast-1")

def create_pipe_alarm(pipe_name: str, sns_topic_arn: str):
    # ExecutionFailed アラーム
    cloudwatch.put_metric_alarm(
        AlarmName=f"{pipe_name}-execution-failed",
        MetricName="ExecutionFailed",
        Namespace="AWS/EventBridgePipes",
        Dimensions=[{"Name": "PipeName", "Value": pipe_name}],
        Statistic="Sum",
        Period=300,
        EvaluationPeriods=1,
        Threshold=1,
        ComparisonOperator="GreaterThanOrEqualToThreshold",
        TreatMissingData="notBreaching",
        AlarmActions=[sns_topic_arn],
    )

    # ExecutionThrottled アラーム
    cloudwatch.put_metric_alarm(
        AlarmName=f"{pipe_name}-throttled",
        MetricName="ExecutionThrottled",
        Namespace="AWS/EventBridgePipes",
        Dimensions=[{"Name": "PipeName", "Value": pipe_name}],
        Statistic="Sum",
        Period=60,
        EvaluationPeriods=3,
        Threshold=5,
        ComparisonOperator="GreaterThanOrEqualToThreshold",
        TreatMissingData="notBreaching",
        AlarmActions=[sns_topic_arn],
    )

閾値はシステムによって変わりますが、ExecutionFailed は 1 回でもアラートが飛ぶようにしておくのが無難です。スロットリングは少し余裕を持たせています。

DLQ の設定とハマりやすいポイント

Pipes ではソース(SQS など)側と、ターゲット側の両方で DLQ(Dead Letter Queue)を考える必要があります。ここがちょっとわかりにくいです。

SQS ソースの DLQ

SQS をソースにしている場合、SQS キュー自体にリドライブポリシーを設定します。Pipes の処理に失敗してメッセージが返ってくると、SQS の maxReceiveCount を超えた段階でキューの DLQ に飛びます。これは Pipes というより SQS 側の仕組みです。

import boto3
import json

sqs = boto3.client("sqs", region_name="ap-northeast-1")

def create_queue_with_dlq(queue_name: str) -> dict:
    # DLQ を先に作る
    dlq_response = sqs.create_queue(QueueName=f"{queue_name}-dlq")
    dlq_url = dlq_response["QueueUrl"]
    dlq_attrs = sqs.get_queue_attributes(
        QueueUrl=dlq_url,
        AttributeNames=["QueueArn"]
    )
    dlq_arn = dlq_attrs["Attributes"]["QueueArn"]

    # メインキューにリドライブポリシーを設定
    main_response = sqs.create_queue(
        QueueName=queue_name,
        Attributes={
            "RedrivePolicy": json.dumps({
                "deadLetterTargetArn": dlq_arn,
                "maxReceiveCount": "3",  # 3回失敗で DLQ へ
            })
        }
    )
    return {"main": main_response["QueueUrl"], "dlq": dlq_url}

maxReceiveCount はケースによりますが、リトライのコストや冪等性の保証具合を考えて決めます。とりあえず 3〜5 くらいにしておくことが多いです。

ターゲット側の DLQ

Pipes 自体にも DLQ を設定できます(ただし設定方法や適用範囲はソース種別などの条件で変わるようです)。boto3 で Pipes を作るときに DeadLetterConfig を指定できます。ここを設定しておくと、ターゲット配信周りのトラブルシュートがしやすくなるので、本番用途だと検討したいです。

DLQ に溜まったメッセージを定期的に確認して再処理するスクリプトを用意しておくと、障害対応が格段に楽になります。本番前に作っておくのをおすすめします。

Enrichment Lambda の設計で気をつけること

Enrichment(エンリッチメント)は Pipes の中で一番コードを書く場所なので、ここの品質が全体の安定性に直結します。

入力形式を正確に把握する

SQS をソースにした場合、Enrichment Lambda に渡ってくるイベントは SQS のレコードリスト形式になります。うっかり EventBus からのイベント形式と混同してしまいがちです。

import json
import boto3
from typing import Any

dynamodb = boto3.resource("dynamodb", region_name="ap-northeast-1")
table = dynamodb.Table("user-profiles")

def enrich_handler(event: list[dict], context: Any) -> list[dict]:
    enriched = []

    for record in event:
        # SQS ソースの場合 body は文字列なので json.loads が必要
        body = json.loads(record["body"]) if isinstance(record.get("body"), str) else record.get("body", {})

        user_id = body.get("user_id")
        if not user_id:
            enriched.append(body)
            continue

        resp = table.get_item(Key={"user_id": user_id})
        profile = resp.get("Item", {})

        enriched.append({
            **body,
            "user_name": profile.get("name", "unknown"),
            "user_tier": profile.get("tier", "free"),
        })

    return enriched

返り値はリスト形式にする必要があります。ここ注意。単一オブジェクトを返すと後続のターゲットに渡す際に型エラーになります(自分は一回ここでハマりました)。

エラーが起きたときの挙動

Enrichment Lambda が例外を投げると、Pipes はそのバッチ全体を失敗扱いにします。部分的な成功(一部レコードだけ通す)はできないので、Lambda 内で個別にエラーハンドリングして、ダメなレコードはデフォルト値で流すか、明示的に除外するかを決めておく必要があります。

個人的には、エンリッチメントで例外を出すより「フォールバック値を入れて流す」ほうが安全だと思っています。問題のあるデータはターゲット側でハンドリングしてもらう設計です。

Pipes のフィルタ設計:何を通して何を止めるか

フィルタは Pipes の便利機能のひとつですが、設定ミスが起きやすい場所でもあります。特に以下の点は要確認です。

  • SQS メッセージの body に対してフィルタをかける場合、body が文字列(JSON エンコードされた文字列)なのか、オブジェクトなのかを意識する
  • フィルタに一致しなかったメッセージはターゲットには届かないが、SQS からは削除される(ちゃんと消費される)
  • フィルタの条件が意図と逆になっていても、エラーは出ずにメッセージが消える

最後の点が地味に怖くて、「なぜかターゲットにイベントが届いていない」という症状が出たときにフィルタの設定ミスだったというのはよくある話です。デプロイ後は意図したメッセージが実際にターゲットまで届くかを手動で確認するフローを入れておきたいところです。

本番アーキテクチャとして整理してみる

最終的に「本番に出せる Pipes 構成」として意識していることをまとめると、こんな感じです。

  • SQS ソース:DLQ 設定必須。maxReceiveCount は 3〜5
  • フィルタ:設定後は意図したイベントが通るか必ずテスト
  • Enrichment Lambda:例外を素通しさせない。フォールバック値で流す設計
  • ターゲット:DLQ or エラーハンドリングを忘れずに
  • CloudWatch アラームExecutionFailedExecutionThrottled は最低限設定する
  • ログ:Pipes の実行ログは CloudWatch Logs に出力できる。TRACE レベルにしておくとデバッグが楽

Pipes の実行ログは、マネジメントコンソール上の Pipes の設定画面から Logging を有効にすることで CloudWatch Logs に出力されます。ログレベルを TRACE にするとフィルタ・エンリッチメント・ターゲットそれぞれのステップで何が起きたかが全部記録されるので、開発中はこれにしておくといいです。本番では ERROR にしてコストを抑えるのが現実的かなと思います(なおログ出力自体のデフォルトは OFF で、コンソールでは CloudWatch Logs と ERROR がデフォルト選択になっています)。

シリーズを振り返って:Pipes を使う・使わないの判断軸

4回書いてきて、正直「Pipes 万能じゃないな」という感想も出てきました。向いているケースと向いていないケースを整理しておきます。

向いているケース:

  • SQS / DynamoDB Streams / Kinesis などのポーリングソースを 1 つのターゲットに繋ぎたい
  • 軽いフィルタや DynamoDB 参照程度のエンリッチメントで済む
  • インフラをコードで宣言的に管理したい(CDK / Terraform との相性が良い)

向いていないケース:

  • 1つのソースから複数のターゲットに振り分けたい(EventBus のほうが向いている)
  • エンリッチメントのロジックが複雑で、Step Functions レベルのオーケストレーションが必要
  • 処理の透明性が強く求められる(Pipes はブラックボックスに感じる場面がある)

Pipes は「ちょうどいい抽象化」を提供してくれるサービスだと思っています。ただ、ちょうどいいを超えた複雑さになってくると、素の Lambda + SQS のほうがデバッグしやすかったりします。まだ自分も全部わかっているわけではないので、もっと実際に使いながら判断軸を磨いていきたいところです。

※この記事にはプロモーションが含まれます

ちなみに、お名前.com レンタルサーバー(WordPressに特化した高速レンタルサーバー。月額990円〜、独自ドメイン実質0円)も気になっています。お名前.com レンタルサーバー

まとめ

EventBridge Pipes を本番運用する際に抑えておきたい要点をまとめました:

  • CloudWatch メトリクスの ExecutionFailedExecutionThrottled は必ず監視する
  • SQS ソースとターゲット側の DLQ 設定は役割が異なるので注意
  • Enrichment Lambda では例外を出さず、フォールバック値で対応するのが安全
  • フィルタの設定ミスは無言でメッセージを落とすので、デプロイ後の動作確認が重要
  • Pipes は単一ソース→単一ターゲットの単純な構成で活躍するサービス。複雑になったら別の選択肢を検討する

本番運用の経験を積んでいく中で、これ以外の引っかかりポイントが出てくるかもしれません。その時はまたどこかで書けたらなと思います。

📚 シリーズ「AWS EventBridge × Python イベント駆動設計入門」(第4回 / 全4回)

← 前回の記事: 前回の記事はこちら

🎉 このシリーズは今回で完結です!

参考になったらクリックしてもらえると嬉しいです!

Blogmura CloudAWS Ranking
タイトルとURLをコピーしました