SQSのLambdaトリガーによる非同期処理の構築方法【ハンズオン】

目次

この記事の内容

本記事では、SQSのメッセージ受信をトリガーにしてLambdaを起動するための構成・構築手順を解説します。

本記事で分かること
  • 本構成の概要(処理フロー、メリット・デメリット、採用ケース)
  • 構築手順

解説する構成

本記事では、LambdaがSQSをポーリングしてメッセージの処理を行う構成を解説します。

解説対象の構成

構成の解説

上記構成について、以下の観点で解説します。

  • 処理フロー
  • メリット・デメリット
  • 利用シーン

処理フロー

LambdaがSQSをポーリングしてから、処理が終了するまでの流れを解説します。

処理フロー図
STEP
Lambda関数のポーリング

Lambda関数がSQSキューに接続されると、Lambdaは自動的にポーリングを行います。ポーリングの頻度はLambdaで管理し、メッセージが到着するたびにリアルタイムで確認されます。

【ポーリングとは?】

キューに対して定期的に問い合わせを行い、新しいメッセージが存在するかを確認することです。

STEP
メッセージの投入

送信側のアプリケーションがキューに対してメッセージを送信すると、そのメッセージはSQSキュー内に保存されます。

STEP
メッセージの受信 & Lambdaの処理実行

LambdaはSQSキューから取得したメッセージをイベントデータとして受け取り、そのメッセージを処理します。

STEP
メッセージの削除 ※Lambdaの処理が成功した場合

Lambdaが正常にメッセージを処理すると、そのメッセージは自動的にSQSキューから削除されます。このメッセージ削除は、Lambdaの処理が正常終了しない限り行われません。

STEP
デッドレターキューへのメッセージ送信 ※Lambdaの処理が失敗した場合

Lambdaが処理に失敗場合、メッセージはSQSキューに残ったままになります。このメッセージは再度処理されるまで「可視性タイムアウト」に入り、その間はLambdaによって処理されることはありません。

指定したリトライ回数を超えると、メッセージはデッドレターキューに送信され、以降は処理されません。デッドレターキューを設定していない場合は、メッセージはSQSキューに残り続けます。

【可視性タイムアウトとは?】

メッセージが受信された後に、他のコンシューマ(処理側)がそのメッセージを取得できない期間。

【デッドレターキューとは?】

処理に失敗したメッセージを一時的に保存するためのキュー。

※SQSで登場する用語については、以下の記事で詳しく解説しています。

メリット・デメリット

続いて、本構成におけるメリット・デメリットを解説します。

メリット
  • 非同期処理による処理の効率化
    • SQSはメッセージを一時的にキューに蓄積できるため、メッセージの送信側と受信側を独立させることができます。これにより、複数のタスクを並列で処理できます。
  • 自動スケーリング
    • LambdaはSQSのメッセージ数に応じて自動的にインスタンスを増減させることで、システム不可に対して柔軟に対応できます。
  • 運用の簡素化
    • SQS, LambdaはともにAWSのマネージドサービスのため、インフラやサーバを管理する必要がありません。
デメリット
  • リアルタイム処理には不向き
    • Lambdaは一定間隔でSQSキューをポーリングするため、メッセージの受信後すぐに処理を開始できないケースがあります。
    • Lambdaはコールドスタートすることがあるので、関数の起動に時間がかかるケースがあります。
  • メッセージ重複処理の可能性
    • SQSは「少なくとも1回配信」を保証するため、同じメッセージが複数回Lambdaに配信される可能性があります。
  • 実行時間の制限
    • Lambdaは最大15分までしか処理を実行できません。

本構成が適しているケース

上記のメリット・デメリットを踏まえると、本構成が適している利用シーンは以下の通りです。

本構成が適しているケース
  • 非同期処理が必要な場合
    • 大量のリクエストを処理するシステムにおいて、即時応答が不要なタスクを非同期で処理することで、全体の応答時間を短縮できます。
  • スケーラビリティが求められる場合
    • 突発的なトラフィックの増加に対応するため、SQSがメッセージをキューイングし、Lambdaが自動的にスケールして処理を行うことで、負荷を分散できます。
  • 耐障害性が求められるシステム
    • SQSを介してメッセージを一時的に保存し、Lambdaがそれらを処理することで、システムの一部が障害を起こしても全体の処理が滞ることなく続行できます。
  • 短時間で完了する処理
    • Lambda の実行時間には最大15分の制限があります。そのため、1回のメッセージ処理が短い処理(例えば数秒~数分で完了するタスク)に適しています。
    • 長時間の処理が必要な場合、Lambda よりも別のアプローチ(例:EC2 や ECS を使用したワークロード)が適している場合があります。

Lambdaの特徴(メリット・デメリット)は以下の記事で解説しているので、この機に抑えておきましょう。

構築手順

ここからは、SQSキューが受信したメッセージをLambda関数が処理する構成を構築します。
なおLambda関数は、メッセージ本文をCloudWatch Logsに書き込む処理を行います。

構築対象の構成


以下のステップに分けて、構築手順を解説していきます。

①:ロールの作成

SQSへのアクションを許可するロールを作成します。ここで作成したロールはLambdaにアタッチします。

STEP
ロールの作成画面に遷移

IAMのコンソール画面から、「ロール(左側のメニュー)」>「ロールを作成」の順に選択します。

STEP
【設定①】信頼されたエンティティを選択

以下の値を設定して「次へ」を選択します。

項目設定値
信頼されたエンティティタイプAWSのサービス
ユースケースLambda
STEP
【設定②】許可を追加

「AWSLambdaSQSQueueExecutionRole」を選択して、「次へ」を選択します。

STEP
【設定③】名前、確認、および作成

任意の名前を設定して、「ロールを作成」を選択します。

②:Lambda関数の作成

SQSが受信したメッセージを処理するLambda関数を作成します。この関数を実行すると、SQSのメッセージ本文をCloudWatch Logsに出力します。

ここではPythonを採用しますが、その他の言語で作成したい方はAWS公式サイトをご参照ください。

STEP
関数の作成画面に遷移

Lambdaのコンソール画面に遷移し、「関数の作成」を選択します。

STEP
【設定】関数の作成

以下の値を設定して、「関数の作成」を選択します。

項目設定値
関数名任意の値
ランタイムPython 3.12
アーキテクチャx86_64
デフォルトの実行ロールの変更既存のロールを使用する
既存のロール「①:ロールの作成」で作成したロール
STEP
ソースコードの編集

「コード」タブを選択し、「lambda_function.py」を以下のソースコードに置換して「Deploy」を選択します。

def lambda_handler(event, context):
    for message in event['Records']:
        process_message(message)
    print("done")

def process_message(message):
    try:
        print(f"Processed message {message['body']}")
    except Exception as err:
        print("An error occurred")
        raise err

【ソースコードの解説】

  • 正常時
    • SQSから受信したメッセージの本文をCloudWatch Logsに出力した後に「done」という文字列をログ出力します。
  • 例外発生時
    • 「An error occurred」という文字列をログ出力します。
STEP
Lambda関数の動作確認
  1. 「テスト」タブを選択して、「イベント JSON」に以下のJSONコードを貼付して「テスト」を選択します。
{
    "Records": [
        {
            "messageId": "059f36b4-87a3-44ab-83d2-661975830a7d",
            "receiptHandle": "AQEBwJnKyrHigUMZj6rYigCgxlaS3SLy0a...",
            "body": "test",
            "attributes": {
                "ApproximateReceiveCount": "1",
                "SentTimestamp": "1545082649183",
                "SenderId": "AIDAIENQZJOLO23YVJ4VO",
                "ApproximateFirstReceiveTimestamp": "1545082649185"
            },
            "messageAttributes": {},
            "md5OfBody": "098f6bcd4621d373cade4e832627b4f6",
            "eventSource": "aws:sqs",
            "eventSourceARN": "arn:aws:sqs:us-east-1:111122223333:my-queue",
            "awsRegion": "us-east-1"
        }
    ]
}
  1. 「モニタリング」タブを選択し、「CloudWatch ログを表示」を選択します。
  1. 最新のログストリームを選択し、そこに以下情報がログ出力されていれば動作確認は完了です。
    • Processed message {テスト時のイベント名}
    • done

③:SQSキューの作成

Lambda関数がイベントソースとして使用するSQSキューを作成します。

STEP
SQSキューの作成画面に遷移

SQSのコンソール画面に遷移し、「キューを作成」を選択します。

STEP
【設定】キューを作成

任意の名前を設定し、それ以外の項目はデフォルト設定のままで「キューを作成」を選択します。

参考記事

SQSにおける各設定項目の詳細については、以下の記事で詳しく解説しています。

④:Lambda トリガーの設定

受信メッセージがLambda関数をトリガーするように設定します。

STEP
Lambda トリガーの設定画面に遷移

作成したSQSキューの詳細画面の「Lambda トリガー」タブを選択し、「Lambda 関数トリガーを設定」を選択します。

STEP
【設定】AWS Lambda関数をトリガー

「②:Lambda関数の作成」で作成したLambda関数を選択し、「保存」を選択します。

動作確認

SQSにメッセージを送信し、Lambda関数がそのメッセージを適切に処理するか(=CloudWatch Logsにログ出力されるか)を確認します。

STEP
メッセージの送信画面に遷移

作成したSQSキュー画面の「メッセージを送受信」を選択します。

STEP
【設定】メッセージを送受信

以下項目を設定して、「メッセージを送信」を選択します。

項目設定値
メッセージ本文任意の文字列
上記以外デフォルト値のまま
STEP
ログ確認

CloudWatch Logsから該当のロググループ・ログストリームを選択し、以下内容のログが出力されていれば成功です。

  • Processed message {メッセージ送信時の「メッセージ本文」}

関連記事

本記事の解説は以上です。
ここからは、より知識を深めたい人向けに関連記事を紹介します。

本記事で登場したAWSサービス

本記事では「SQS」や「Lambda」といったAWSサービスが登場しました。
これらのサービスについては、構成要素や料金、設定項目等について詳しく解説しているので、もっとよく知りたい方は以下の記事をご覧ください。

よかったらシェアしてね!
  • URLをコピーしました!
  • URLをコピーしました!

コメント

コメントする

目次