概要

SQSについて資格試験などでよく出てくるものの実際に触ったことがなかったので触る。

構成

最もシンプルな構成で作成する。

SQSを作成する

  • デフォルトの構成で実施する
    • 可視性タイムアウト
      • 受信後にコンシューマから見えなくする時間
    • メッセージ保持期間
      • キューにメッセージが残っている時間
    • 配信遅延
      • 初回配信時に遅延させる時間
    • メッセージ受信待機時間
      • ポーリングが待機する期間をキュー側で指定できる
    • アクセスポリシーもデフォルトのままだとアカウント内すべてとなる

Producerを作成する

Python + boto3を使用してエンキューするだけのスクリプトを作成する。

import json
import boto3

sqs = boto3.resource('sqs')

def lambda_handler(event, context):
    queue = sqs.get_queue_by_name(QueueName='test-sqs')
    response = queue.send_message(MessageBody='hello sqs')
    return response

Lambda関数のアクセスポリシーを指定しなければSQSの検出エンキューができない。

{
    "Effect": "AllowLambdaSQSSendMessageRole",
    "Action": [
        "sqs:SendMessage",
        "sqs:GetQueueUrl"
    ],
    "Resource": "arn:aws:sqs:ap-northeast-1:000000000000:test-sqs"
}

Consumerを作成する

キューに作成したメッセージを処理してメッセージを削除する。

import json
import boto3

# SQSリソースオブジェクトを作成
sqs = boto3.resource('sqs')

# キューの名前
QUEUE_NAME = 'test-sqs'

# キューオブジェクトを取得
queue = sqs.get_queue_by_name(QueueName=QUEUE_NAME)

def lambda_handler(event, context):
    try:
        # メッセージを受信
        messages = queue.receive_messages(
            MaxNumberOfMessages=10,  # 最大10件のメッセージを受信
            WaitTimeSeconds=10,      # ロングポーリング
            VisibilityTimeout=30     # メッセージの可視性タイムアウト
        )
        
        # メッセージがない場合の処理
        if not messages:
            print("データがありません。")
            return {
                'statusCode': 200,
                'body': json.dumps('データがありません。')
            }
        
        # メッセージを処理
        for message in messages:
            print("メッセージ: ", message.body)

            # メッセージの削除
            message.delete()
            print("メッセージ削除")

        return {
            'statusCode': 200,
            'body': json.dumps('メッセージを処理し削除しました。')
        }

    except Exception as e:
        print(f"エラーが発生しました: {e}")
        return {
            'statusCode': 500,
            'body': json.dumps(f"エラーが発生しました: {str(e)}")
        }

Lambda関数のアクセスポリシーを指定する。

{
    "Sid": "AWSLambdaSQSPollerExecutionRole",
    "Effect": "Allow",
    "Action": [
        "sqs:DeleteMessage",
        "sqs:GetQueueAttributes",
        "sqs:ReceiveMessage",
        "sqs:GetQueueUrl"
    ],
    "Resource": "arn:aws:sqs:ap-northeast-1:000000000000:test-sqs"
}

トリガー設定

作成したキューのLambdaトリガーにConsumerを追加する。

動作確認

Producerをテスト実行してConsumerが実行されることを確認する。

{
  "MD5OfMessageBody": "3b7bef57d06c0021d0aafe8f6d587241",
  "MessageId": "05ad62e4-a413-4a82-a38e-93d842379165",
  "ResponseMetadata": {
    "RequestId": "6a258866-f8cc-5540-b970-d7feb3c47984",
    "HTTPStatusCode": 200,
    "HTTPHeaders": {
      "x-amzn-requestid": "6a258866-f8cc-5540-b970-d7feb3c47984",
      "date": "Sun, 27 Oct 2024 14:06:00 GMT",
      "content-type": "application/x-amz-json-1.0",
      "content-length": "106",
      "connection": "keep-alive"
    },
    "RetryAttempts": 0
  }
}

Consumer側のログで完了を確認する。

感想

デフォルト設定での構築は簡単ではあるが、プロダクション環境で構築するとなるとパラメータを適切に設定し、しっかりした検証が必要になりそう。