Skip to main content

Streams

Introduction

Amazon DynamoDB Streams provide a time-ordered sequence of item-level modifications within a DynamoDB table, capturing changes such as inserts, updates, and deletes. This tool enables developers to build triggers to execute custom code in response to database changes in near real-time.

These triggers serve a variety of use cases, such as replicating to secondary datastores, computing aggregations, sending notifications, and more.

Creating a trigger

To get started with DynamoDB Streams integration, the first step is to create a module within integrates/infra/src/streams_lambda/streams. Inside this module, you'll create a file, commonly named handler.py, to define the function that will be invoked by DynamoDB Stream events.

Here's a simple example of a handler.py file with a function named process:

from streams.utils import (
format_record,
)
from typing import (
Any,
)

def process(event: dict[str, Any], _context: Any) -> None:
records = tuple(format_record(record) for record in event["Records"])

# Now you can iterate over the records and perform necessary actions
for record in records:
# Your custom logic for processing each record goes here

The next step is to declare the trigger in the integrates/infra/src/streams_lambda/triggers.json file. This file serves as the configuration for all triggers. To simplify the process and ensure correctness, you can leverage the provided JSON Schema with editor IntelliSense support.

Here's an example of how you might structure the trigger object:

{
...
"your_trigger_name_goes_here": {
"batch_size": 100,
"enabled": true,
"filters": [],
"handler": "package.file.function",
"memory_size": 512,
"reserved_concurrency": -1,
"timeout": 60,
"uses_vpc": false
},
}

Local Reproducibility

Locally, your triggers will be invoked by a KCL-based consumer. This consumer reads the stream from your local instance of DynamoDB, mimicking the behavior of the stream processing mechanism in AWS Lambda, allowing you to test the functionality of your triggers in a controlled setting.

To use it, you can start the database job as usual, running m . /integrates/db and then perform changes that match the filter criteria of your trigger. Logs will be outputted to the console as the trigger functions are invoked in response to the stream events.

You can optionally run the database job and the local consumer separately, by running the following command:

STREAMS=false m . /integrates/db`

and then:

m . /integrates/infra/src/streams_lambda/local`

Testing

Ensuring the reliability of your DynamoDB Streams triggers is a critical aspect of development. You can adopt a two-tiered testing approach encompassing both unit tests and functional tests.

Unit Tests

For unit testing, create tests within the integrates/infra/src/streams_lambda/test directory. In these tests, call the handler functions directly, simulating DynamoDB Stream events and verifying the state before and after trigger execution. These unit tests enable you to validate the behavior of your handler functions in isolation, ensuring they correctly process DynamoDB Stream events.

Functional Tests

Functional tests, located in integrates/back/test/functional, provide a higher-level validation of your DynamoDB Streams triggers. In these tests, rely on the local consumer to invoke your triggers whenever relevant changes occur in the database. Functional tests provide end-to-end validation of your trigger, ensuring that changes have the expected behavior.

Important Considerations

When creating a trigger,it's crucial to address key considerations for a robust and reliable implementation.

Ensure Idempotency

Idempotency is paramount when designing functions triggered by DynamoDB Streams. This ensures that even if an event triggers the function multiple times, the outcome remains consistent. To achieve idempotency, developers should carefully design their functions, accounting for potential retries and preventing unintended side effects.

Avoid Infinite Loops

Preventing infinite loops is crucial when dealing with Lambda functions triggered by a DynamoDB Streams. This loops can occur when a function inadvertently modifies data in the same database table, triggering additional events and leading to endless execution cycles. To mitigate this risk, developers should carefully structure their code to avoid unintentional self-triggering and implement safeguards to break potential loops.

Keep a Short Execution Time

Lambda functions triggered by DynamoDB Streams operate within a time-constrained environment, with a maximum execution timeout of 15 minutes. Designing functions to complete their tasks within this timeframe is essential to prevent premature termination.

For use cases requiring extended processing times that exceeds Lambda's execution limit, consider alternative approaches such us initiating jobs in AWS Batch, which supports longer execution durations.

Retry Mechanism

In the event of a failure during the invocation of Lambda functions triggered by DynamoDB Streams, AWS Lambda automatically retries invoking the function until the associated record expires in the stream, typically after 24 hours. If the issue persists beyond this timeframe, the event is considered unrecoverable and it is sent to a dead-letter queue (DLQ) named "integrates_streams_dlq" in AWS Simple Queue Service (SQS). This DLQ serves as a repository for events requiring manual review due to persistent failures.

Monitoring and Troubleshooting

Ensuring the health and performance of DynamoDB Streams triggers is essential for maintaining a responsive and reliable system. Here's how you can effectively monitor and troubleshoot your triggers:

Bugsnag and Coralogix

In the event of unhandled errors, your trigger will report them through Bugsnag and/or Coralogix under the integrates-streams project. This setup enables centralized error tracking, enabling prompt identification and resolution of issues. You can view traces and other performance-related insights in Coralogix.

CloudWatch Dashboard

Utilize the CloudWatch dashboard to monitor key metrics related to your Lambda functions triggered by DynamoDB Streams. Monitor metrics such as concurrent executions, indicating the number of functions running simultaneously. Additionally, track processing delay metrics to gain insights into the time elapsed since the change ocurred in the database until it was processed by a trigger. These metrics can help fine-tune the batch_size parameter of the trigger. View dashboard (internal link)

CloudWatch Logs

For detailed insight into trigger execution, review CloudWatch logs located under the log groups with the prefix "/aws/lambda/integratesstreams". These logs capture execution details, including any custom logging statements incorporated into your code. Analyzing these logs is crucial for diagnosing specific issues and understanding the flow of DynamoDB Streams integration.