Terraform - AWS Lambda via SQS

While playing around with Terraform, I realized how hard it is to find a simple working example for spinning up a Lambda function triggered by SQS messages.

As a way to consolidate my learnings, I decided to write this small document.

You can find the source code for this post here.

Note that the design is intentionally simple. It doesn’t consider important bits that should be used in a production context, like the use of Terraform cloud connected to a VCS; timeout and memory limits for the lambda function; monitoring and alerting; or encryption of data at rest.

Architecture

The architecture I want to create with IaC (Infra as Code) is super simple: we have a Lambda function triggered by SQS (Simple Queue Service) messages. We log messages to CloudWatch Logs, just by writing their content on the standard output.

Image.png

Despite its simplicity, this kind of architecture is at the core of high-performance and large-scale systems. For instance, the amazing BinaryAlert released by AirBnB uses the same principle to scan with Yara every single object uploaded in S3.

Why SQS?

One question I had when I started experimenting with server-less computing in AWS was to understand why it is common to see solutions using SQS instead of SNS (Simple Notification Service). Or, solutions directly connecting Lambda with other event sources (like events generated by S3).

The answer really depends on what you are trying to achieve.

For a scenario in which we want to minimize the probability of loosing messages, one nice feature of SQS is the persistence: on well-designed systems, this allows an easier debugging and recover from outages and bugs pushed to production.

Going into details about these systems is beyond the scope of this post. Several resources are explaining the pros and cons of SQS vs SNS. Here are some links:


AWS user and permissions

First off, we need to create a service user for Terraform.

I am not going to explain this step in details, but one way of doing this is via the IAM service on the AWS web console.

For our scenario, this user should have the following permissions.

  • AmazonSQSFullAccess
  • IAMFullAccess
  • CloudWatchLogsFullAccess
  • AmazonSNSFullAccess
  • AWSLambda_FullAccess

Note that these permissions are too much for what we are doing. In a hypothetical production environment, we might want to be more restrictive and only add actions we actually need Terraform to perform.

Terraform

With the following snippets we build, piece by piece, our infra.

In the remainder of this post, I am going to explain how that code works.

Lambda function

The lambda function we use in this example is a toy function. It will receive messages from SQS and log their content in CloudWatch.

The only “interesting” bit is that we need to handle multiple records per event, as our queue could deliver multiple messages in batch.

For simplicity, I am going to use Python and create a single file named lambda.py with our handler function.

#!/usr/bin/env python3

import json

print('Loading function test')


def lambda_handler(event, context) -> str:
    print(f"Received event: {json.dumps(event, indent=2)}")

    print(f"Received {len(event['Records'])} record in a single SQS message")
    for i, record in enumerate(event['Records']):
        print(f"{i:03} -> {record['body']}")

    return None

While building our infra, AWS Lambda service expects us to deliver a zip file with our source code. With Terraform, we can zip the content of a directory and use the zip file to define our serverless function.

data "archive_file" "lambda_function_zip" {
  type        = "zip"
  source_dir  = "${path.module}/lambda-python/"
  output_path = "${var.name_prefix}.zip"
}

resource "aws_lambda_function" "lambda_function" {
  filename         = "${var.name_prefix}.zip"
  function_name    = var.name_prefix
  role             = aws_iam_role.iam_for_lambda.arn
  handler          = "lambda.lambda_handler"
  source_code_hash = data.archive_file.lambda_function_zip.output_base64sha256
  runtime          = "python3.9"
}

In the above snippet, there are 2 important things that took me more than it should have to grasp by reading various blog post on the Internet.

First, the value of handler, must be <name of python file>.<name of function> in our source above. Since we are using Python, this should have been obvious, but I found it confusing at first.

The second thing is about source_code_hash. This is critical to make sure the zip file is installed every time our source code changes. For this, can use the b64 encoded sha256 of the zip file, as generated by the archive_file data block above.

The value of the role attribute will be explained later. In short, it defines what this lambda function can do (i.e.: logging + SQS message reception).

SQS Queue(s)

We are going to create 2 queues: a “main” one and a dead-letter queue (DLQ). The latter is useful for debugging purposes as it will contain messages that couldn’t be read and then deleted by the consumer.

resource "aws_sqs_queue" "dead_letter_queue" {
  name                      = "lambda_sqs_dead_letter_queue"
  message_retention_seconds = var.sqs_retention_seconds
}

resource "aws_sqs_queue" "lambda_queue" {
  name                      = "lambda_queue"
  message_retention_seconds = var.sqs_retention_seconds

  redrive_policy = jsonencode({
    deadLetterTargetArn = aws_sqs_queue.dead_letter_queue.arn
    maxReceiveCount     = var.sqs_max_retry_count
  })
  redrive_allow_policy = jsonencode({
    redrivePermission = "byQueue",
    sourceQueueArns   = [aws_sqs_queue.dead_letter_queue.arn]
  })
}

Messages will end up in the DLQ after var.sqs_max_retry_count attempts a consumer tried to receive a message from a queue, without deleting it. These messages will stay in the DLQ for var.sqs_retention_seconds seconds, allowing recovery and debugging.

More info about DLQs in SQS can be found here.

Log Group

To log messages from our function, we need to create a LogGroup.

With Terraform, we can do the following:

resource "aws_cloudwatch_log_group" "test_cloudwatch_log_group" {
  name              = "/aws/lambda/${var.name_prefix}"
  retention_in_days = var.log_retention_days
}

Connect SQS queue with lambda

Now we can connect the SQS queue (the event source) to our lambda. Doing so, we can specify the maximum batch size for message reception.

resource "aws_lambda_event_source_mapping" "lambda_via_sqs" {
  batch_size       = var.sqs_batch_size
  event_source_arn = aws_sqs_queue.lambda_queue.arn
  function_name    = var.name_prefix

  depends_on = [
    aws_lambda_function.lambda_function
  ]
}

Roles and Policies

What we need to do now is to create a role for our lambda and attach policies to it to allow certain “actions”. Specifically, we have to authorize our function to log to CloudWatch, and receive/delete messages from our SQS queue.

As a matter of fact, this is the first thing that has to be defined while we build our infra. Terraform will automatically create elements in the “right order”.

A thing that I found a bit confusing as a n00b was the AssumeRole action. In short, when we create a role, we need to specify what entities can assume that role. In our case, lambda functions.

data "aws_iam_policy_document" "lambda_execution_policy_document" {
  statement {
    sid     = "AllowLambdaToAssumeRole"
    actions = ["sts:AssumeRole"]

    principals {
      type        = "Service"
      identifiers = ["lambda.amazonaws.com"]
    }
  }
}

# Role for lambda
resource "aws_iam_role" "iam_for_lambda" {
  name = "${var.name_prefix}_role"
  # Grant lambda permission to assume the role "iam_for_lambda"
  assume_role_policy = data.aws_iam_policy_document.lambda_execution_policy_document.json
}

We can now attach policies to the role. This is what we need to allow logging:

data "aws_iam_policy_document" "lambda_logging_policy" {
  statement {
    sid = "EnableLogs"

    actions = [
      "logs:CreateLogStream",
      "logs:PutLogEvents",
    ]

    resources = ["*"]
  }
}

resource "aws_iam_role_policy" "lambda_logs_policy" {
  name   = "lambda_logs_policy"
  role   = aws_iam_role.iam_for_lambda.name
  policy = data.aws_iam_policy_document.lambda_logging_policy.json
}

And this is needed to operate the SQS queue (receive):

data "aws_iam_policy_document" "lambda_sqs_policy_document" {
  statement {
    sid = "ProcessSQSMessages"

    actions = [
      "sqs:ChangeMessageVisibility",
      "sqs:DeleteMessage",
      "sqs:GetQueueAttributes",
      "sqs:ReceiveMessage",
    ]

    resources = [aws_sqs_queue.lambda_queue.arn]
  }
}

resource "aws_iam_role_policy" "lambda_sqs_policy" {
  name   = "lambda_sqs_policy"
  role   = aws_iam_role.iam_for_lambda.name
  policy = data.aws_iam_policy_document.lambda_sqs_policy_document.json
}

Testing the system

If you just want to test the system with one message, you can run

aws sqs send-message \
    --queue-url "$(terraform output --raw sqs_url)" \
    --message-body "$(date) - sqs test"

To test the function with batch processing and concurrent execution (and also to give a sense of how to programmatically send messages to the queue), I created a super simple Python script. This script sends itself, line by line, to the SQS queue. It does that in batches of up to 10 lines.

#!/usr/bin/env python3

import logging
import sys

import boto3
from botocore.exceptions import ClientError

logger = logging.getLogger(__name__)
sqs = boto3.resource('sqs')


def send_messages(queue, messages):
    """
    Send a batch of messages in a single request to an SQS queue.
    This request may return overall success even when some messages were not
    sent.
    The caller must inspect the Successful and Failed lists in the response and
    resend any failed messages.

    :param queue: The queue to receive the messages.
    :param messages: The messages to send to the queue. These are simplified to
                     contain only the message body and attributes.
    :return: The response from SQS that contains the list of successful and
             failed
             messages.
    """
    try:
        entries = [{
            'Id': str(ind),
            'MessageBody': msg['body'],
            'MessageAttributes': msg['attributes']
        } for ind, msg in enumerate(messages)]
        response = queue.send_messages(Entries=entries)
        if 'Successful' in response:
            for msg_meta in response['Successful']:
                logger.info(
                    "Message sent: %s: %s",
                    msg_meta['MessageId'],
                    messages[int(msg_meta['Id'])]['body']
                )
        if 'Failed' in response:
            for msg_meta in response['Failed']:
                logger.warning(
                    "Failed to send: %s: %s",
                    msg_meta['MessageId'],
                    messages[int(msg_meta['Id'])]['body']
                )
    except ClientError as error:
        logger.exception("Send messages failed to queue: %s", queue)
        raise error
    else:
        return response


def get_queues(prefix=None):
    """
    Gets a list of SQS queues. When a prefix is specified, only queues with
    names that start with the prefix are returned.

    :param prefix: The prefix used to restrict the list of returned queues.
    :return: A list of Queue objects.
    """
    if prefix:
        queue_iter = sqs.queues.filter(QueueNamePrefix=prefix)
    else:
        queue_iter = sqs.queues.all()
    queues = list(queue_iter)
    if queues:
        logger.info("Got queues: %s", ', '.join([q.url for q in queues]))
    else:
        logger.warning("No queues found.")
    return queues


def test_sqs_lambda():
    """
    Shows how to:
    * Read the lines from this Python file and send the lines in
      batches of 10 as messages to a queue.
    * Receive the messages in batches until the queue is empty.
    * Reassemble the lines of the file and verify they match the original file.
    """
    def pack_message(msg_path, msg_body, msg_line):
        return {
            'body': msg_body,
            'attributes': {
                'path': {'StringValue': msg_path, 'DataType': 'String'},
                'line': {'StringValue': str(msg_line), 'DataType': 'String'}
            }
        }

    def unpack_message(msg):
        return (msg.message_attributes['path']['StringValue'],
                msg.body,
                int(msg.message_attributes['line']['StringValue']))

    queue = get_queues(prefix='lambda_queue')[0]

    with open(__file__) as file:
        lines = file.readlines()

    line = 0
    batch_size = 10
    print(f"Sending file lines in batches of {batch_size} as messages.")
    while line < len(lines):
        messages = [
            pack_message(__file__, lines[index], index)
            for index in range(line, min(line + batch_size, len(lines)))
        ]
        line = line + batch_size
        send_messages(queue, messages)
        print('.', end='')
        sys.stdout.flush()
    print(f"Done. Sent {len(lines) - 1} messages.")


if __name__ == '__main__':
    test_sqs_lambda()

After running this script, you should be able to see messages in CloudWatch logs:

Image.png

Image.png


That's all

Thanks for reading

Image copy.png