Skip to main content

Streams

Introduction

Amazon DynamoDB Streams provide a time-ordered sequence of item-level modifications within a DynamoDB table, capturing changes such as inserts, updates, and deletes. This tool enables developers to build triggers execute custom code in response to database changes in near real-time.

These triggers serve a variety of use cases, such as replicating to secondary datastores, computing aggregations, sending notifications, and more.

Creating a trigger

To get started with DynamoDB Streams integration,the first step is to create a module within integrates/infra/src/streams_lambda/streams. Inside this module, you'll create a file, commonly named handler.py, to define the function that will be invoked by DynamoDB Stream events.

Here's a simple example of a handler.py file with a function named process:

from streams.utils import (
format_record,
)
from typing import (
Any,
)

def process(event: dict[str, Any], _context: Any) -> None:
records = tuple(format_record(record) for record in event["Records"])

# Now you can iterate over the records and perform necessary actions
for record in records:
# Your custom logic for processing each record goes here

The next step is to declare the trigger in the integrates/infra/src/streams_lambda/triggers.json file. This file serves as the configuration for all triggers. To simplify the process and ensure correctness, you can leverage the provided JSON Schema with editor IntelliSense support.

Here's an example of how you might structure the trigger object:

{
...
"your_trigger_name_goes_here": {
"batch_size": 100,
"enabled": true,
"filters": [],
"handler": "package.file.function",
"memory_size": 512,
"reserved_concurrency": -1,
"timeout": 60,
"uses_vpc": false
},
}

Local Reproducibility

Locally, your triggers will be invoked by a KCL-based consumer. This consumer reads the stream from your local instance of DynamoDB, mimicking the behavior of the stream processing mechanism in AWS Lambda, allowing you to test the functionality of your triggers in a controlled setting.

To use it you can start the database job as usual, running m . /integrates/db and then perform changes that match the filter criteria of your trigger. Logs will be outputted to the console as the trigger functions are invoked in response to the stream events.

You can optionally run the database job and the local consumer separately, by running the following command:

STREAMS=false m . /integrates/db`

and then:

m . /integrates/infra/src/streams_lambda/local`

Testing

Ensuring the reliability of your DynamoDB Streams triggers is a critical aspect of development. You can adopt a two-tiered testing approach encompassing both unit tests and functional tests.

Unit Tests

For unit testing, create tests within the integrates/infra/src/streams_lambda/test directory. In these tests, call the handler functions directly, simulating DynamoDB Stream events and verifying the state before and after trigger execution. These unit tests enable you to validate the behavior of your handler functions in isolation, ensuring they correctly process DynamoDB Stream events.

Functional Tests

Functional tests, located in integrates/back/test/functional provide a higher-level validation of your DynamoDB Streams triggers. In these tests, rely on the local consumer to invoke your triggers whenever relevant changes occur in the database. Functional tests provide end-to-end validation of your trigger, ensuring that changes have the expected behavior.

Important Considerations

When creating a trigger,it's crucial to address key considerations for a robust and reliable implementation.

Ensure Idempotency

Idempotency is paramount when designing functions triggered by DynamoDB Streams. This ensures that even if an event triggers the function multiple times, the outcome remains consistent. To achieve idempotency, developers should carefully design their functions, accounting for potential retries and preventing unintended side effects.

Avoid Infinite Loops

Another critical aspect to consider is preventing infinite loops. This can occur when the Lambda function, triggered by a DynamoDB Stream event, inadvertently modifies data in the same database table. Such modifications trigger additional events, leading to an endless loop of execution. To prevent this scenario, developers should carefully structure their code to avoid unintentional self-triggering and implement safeguards to break potential loops.

Keep a Short Execution Time

Lambda functions triggered by DynamoDB Streams operate within a time-constrained environment, with a maximum execution timeout of 15 minutes. It's imperative to design functions that complete their tasks within this timeframe to prevent premature termination.

If your use case requires a potentially long-running operation that exceeds Lambda's execution limit, consider alternative approaches. One effective strategy is to initiate a job in AWS Batch, which supports extended processing times.

Retry Mechanism

In the event of a failure during the invocation of Lambda functions triggered by DynamoDB Streams, AWS Lambda will automatically retry invoking the function until the associated record expires in the stream, which occurs after 24 hours. In cases where the issue persists beyond the 24-hour timeframe, the event is considered unrecoverable, and it is sent to a dead-letter queue (DLQ) named "integrates_streams_dlq" in AWS Simple Queue Service (SQS). This designated queue serves as a repository for events that require manual review due to persistent failures.

Monitoring and Troubleshooting

Ensuring the health and performance of your DynamoDB Streams triggers is essential for maintaining a responsive and reliable system. Here's how you can effectively monitor and troubleshoot your triggers:

Atatus and Bugsnag

In the event of unhandled errors, your trigger will report them through Atatus and/or Bugsnag under the "integrates-streams" project. This setup facilitates centralized error tracking, enabling prompt identification and resolution of issues. Same as with other applications, you can view traces and other performance-related insights in atatus.

CloudWatch Dashboard

Utilize the dashboard to monitor key metrics related to your Lambda functions triggered by DynamoDB Streams. Keep an eye on metrics such as concurrent executions, which indicates the number of functions running simultaneously. Additionally, track processing delay metrics, which provide insights into the time elapsed since the change ocurred in the database until it was processed by a trigger. You can use these metrics to fine-tune the batch_size parameter of the trigger. View dashboard (internal link)

CloudWatch Logs

For a more detailed insight into the execution of your trigger, review CloudWatch logs located under the log groups with the prefix "/aws/lambda/integrates_streams_". These logs capture execution details, including any custom logging statements you've incorporated into your code. Analyzing these logs is instrumental in diagnosing specific issues and understanding the flow of your DynamoDB Streams integration.