Introduction
To send real-time notifications to consumer applications whenever changes are made to tables in our relational database (Amazon Aurora PostgreSQL), we will use a combination of database triggers, AWS Lambda, and SNS. This approach is similar to the “Transactional Outbox Pattern”, but implemented differently in Aurora PostgreSQL. In this article, we are going to learn how to implement real-time notifications (Transactional Outbox Pattern) in Amazon Aurora PostgreSQL.
Transactional Outbox Pattern
The transactional outbox pattern ensures that database updates and notifications are performed atomically in a single operation, maintaining data consistency. The Outbox Pattern ensures reliable event publishing. Instead of publishing events directly to the message broker, messages are written to an “outbox” table within the same database. Both the business data and outbox data are written in a single transaction. The outbox processor reads messages with a “pending” status from the outbox table, publishes them to the message broker, and then updates the message status to “Completed” or deletes the message from the table to prevent data growth. Please refer to the link to learn more about the “Transactional Outbox Pattern”.
Image 1. Transactional Outbox Pattern
How to implement real-time notification (Transactional Outbox Pattern) in Amazon Aurora PostgreSQL
Using the database trigger in Aurora PostgreSQL, we will invoke the Lambda and send notifications to consumer applications through SNS. For example, whenever there are changes in the "Order" table (if any new order is placed), we need to send an email to the customer about order confirmation and notifications to consumer applications/modules.
Image 2. Transactional Outbox Pattern in AWS Aurora PostgreSQL
- Service - Service that inserts/updates/deletes the data from the business table and inserts the message to the outbox table in the same transaction.
- Aurora PostgreSQL - The database that stores the business entities and message outbox
- Outbox Table - This is a table that stores the messages to be sent
- PostgreSQL Table Trigger – A custom table trigger function that makes a call to the Lambda function.
- Lambda - The Lambda function receives the input data from the trigger function and publishes it to the SNS topic.
- Dead Letter Queue (Optional) - Configure DLQ to the lambda so that we won't lose the messages during failures
- SNS topic (Message Broker) – This is the topic that delivers the event message to their consumers
Here, we have replaced the Outbox Processor (Image 1: Transactional Outbox Pattern) with a database trigger and AWS Lambda (Image 2: Transactional Outbox Pattern in AWS Aurora PostgreSQL)
Note: Both Aurora PostgreSQL and Lambda functions should be in the same VPC and private subnet with the required security group rules.
Step 1. Create Aurora RDS PostgreSQL and AWS Lambda
I assume you are already aware of how to create Amazon Aurora PostgreSQL, AWS Lambda, and SNS.
Step 2. Add the required permission to the Aurora PostgreSQL cluster
We need to create an IAM role with the required policies and attach that IAM to the RDS instance.
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "AllowAccessToRDSTriggeredLambdaFunction",
"Effect": "Allow",
"Action": "lambda:InvokeFunction",
"Resource": "<lambda_arn>"
}
]
}
Step 3. Install PostgreSQL Extension
We have to install the aws_lambda extension. This extension provides Aurora PostgreSQL DB cluster with the ability to call Lambda functions from PostgreSQL.
CREATE EXTENSION IF NOT EXISTS aws_lambda CASCADE;
GRANT USAGE ON SCHEMA aws_lambda TO <db_username>;
GRANT EXECUTE ON ALL FUNCTIONS IN SCHEMA aws_lambda TO <db_username>;
Example
GRANT USAGE ON SCHEMA aws_lambda TO postgres;
GRANT EXECUTE ON ALL FUNCTIONS IN SCHEMA aws_lambda TO postgres;
Step 4. Create Order and Outbox Table
CREATE TABLE public.order (
order_id uuid not null default uuid_generate_v4() primary key,
product_id uuid not null,
customer_id uuid not null,
quantity int not null,
price double not null,
status varchar(100) not null,
created_date timestamptz not null default now()
);
tablespace pg_default;
CREATE TABLE public.outbox (
msg_id uuid not null primary key,
msg_data varchar not null,
delivery_status varchar(50) not null,
created_date timestamptz not null default now()
)
tablespace pg_default;
Step 5. Create Database Table Trigger
aws_lambda.invoke is used to invoke the lambda function from the database.
SELECT * FROM aws_lambda.invoke(aws_commons.create_lambda_function_arn('<lambda_arn>'),<payload_to_the_lambda>, '<invocation_type>');
The aws_lambda.invoke function behaves synchronously or asynchronously, depending on the invocation_type mentioned in the aws_lambda.invoke. The two types are RequestResponse (the default) and Event
- RequestResponse: This invocation type is synchronous. It's the default behavior. The response payload includes the results of the aws_lambda.invoke function. RequestResponse invocation type is used for when your workflow requires receiving results from the Lambda function before proceeding.
- Event: This invocation type is asynchronous. The response doesn't include a payload containing results. Use this invocation type when your workflow doesn't need a result from the Lambda function. Lambda will re-try a maximum of 3 times if it is failed or any exception occurs in the lambda. For asynchronous invocation, Lambda adds events to a queue before sending them to your function. If your function does not have enough capacity to keep up with the queue, events may be lost.
The create_lambda_function_arn creates a Lambda function ARN structure that is compatible with the invoke API call.
CREATE TRIGGER trigger_insert_event
AFTER INSERT
ON public.outbox
FOR EACH ROW
EXECUTE PROCEDURE public.db_trigger_function();
The following code is used to invoke the Lambda from the database trigger using aws_lambda.invoke.
CREATE OR REPLACE FUNCTION public.db_trigger_function()
RETURNS TRIGGER
LANGUAGE PLPGSQL
AS
$$
declare
msg record;
BEGIN
SELECT * FROM aws_lambda.invoke(aws_commons.create_lambda_function_arn('<lambda_arn>'), CONCAT('{"msg_data": "', new.msg_data, '"}')::json, 'Event') into msg;
RETURN NEW;
END;
$$
Invoking an AWS Lambda function is supported in these RDS for PostgreSQL versions.
- All PostgreSQL 16 versions
- All PostgreSQL 15 versions
- PostgreSQL 14.1 and higher minor versions
- PostgreSQL 13.2 and higher minor versions
- PostgreSQL 12.6 and higher minor versions
Step 6. Create Lambda IAM execution role
- Add the "AWSLambdaVPCAccessExecutionRole" policy, which is an AWS-managed policy for permissions to run the Lambda function in VPC.
- Add a custom policy to publish notifications to the SNS topic.
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "TopicWriterSID",
"Effect": "Allow",
"Action": "sns:Publish",
"Resource": "<sns_topic_arn>"
}
]
}
- Add a custom policy with basic permissions for cloudwatch logs.
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "VisualEditor1",
"Effect": "Allow",
"Action": [
"logs:CreateLogGroup",
"logs:CreateLogStream",
"logs:PutLogEvents"
],
"Resource":"*"
}
]
}
Step 7. Lambda Function Code
public class Function
{
private readonly IServiceProvider _serviceProvider;
private readonly ISNSProcessor _snsProcessor;
public Function() : this(null)
{
}
public Function(IServiceProvider? serviceProvider)
{
_serviceProvider = serviceProvider ?? ConfigureServices(new ServiceCollection());
_snsProcessor = _serviceProvider.GetRequiredService<ISNSProcessor>();
}
private static ServiceProvider ConfigureServices(ServiceCollection services)
{
services.AddScoped<ISNSProcessor, SNSProcessor>()
return services.BuildServiceProvider();
}
public async Task FunctionHandler(object events, ILambdaContext context)
{
var logger = context.Logger;
try
{
if (events == null)
{
Console.WriteLine("Empty events");
return;
}
// Parse the input events
var eventString = ((JsonElement)events).ToString();
var eventObject = JObject.Parse(eventString);
Console.WriteLine("RDS Data : " + eventString);
var msgData = Convert.ToString(eventObject?["msg_data"]) ?? "";
await _snsProcessor.PublishMessage(msg_data);
}
catch (Exception ex)
{
logger.LogError($"Error while getting the events data : {ex.Message}");
}
}
}
public interface ISNSProcessor
{
Task PublishMessage(string message);
}
public class SNSProcessor : ISNSProcessor
{
private readonly IAmazonSimpleNotificationService _snsService;
private readonly string _topicArn;
public SNSProcessor()
{
_snsService = new AmazonSimpleNotificationServiceClient(<aws_region>);
_topicArn = Environment.GetEnvironmentVariable("TOPIC_ARN");
}
public async Task PublishMessage(string message)
{
try
{
ArgumentNullException.ThrowIfNull(nameof(message));
var request = new PublishRequest
{
TopicArn = _topicArn,
Message = message,
MessageGroupId = "1"
};
var response = await _snsService.PublishAsync(request);
}
catch
{
throw;
}
}
}
Step 8. Insert the values into the table
insert into public.order values ('c89fb383-bc22-4e24-981f-fb7eabc6ecd4', '103f98f7-7193-43b7-972a-e6ea9b2fae41', '4b23501b-f068-47f8-a959-4250e0318fd9', 1, 1000 , 'Pending', now());
insert into public.outbox values ('6b927c0b-9b67-48e1-adbf-bf527c239034', '{''order_id'':''c89fb383-bc22-4e24-981f-fb7eabc6ecd4'',''product_id'':''103f98f7-7193-43b7-972a-e6ea9b2fae41'',''customer_id'':''4b23501b-f068-47f8-a959-4250e0318fd9'', ''quantity'':''1'', ''price'':''1000'', ''status'':''Pending'', ''created_date'': ''2024-10-24 09:42:28''}', 'Pending', now());
The above INSERT statement invokes the public trigger.outbox table, which, in turn, triggers the Lambda function with output in the CloudWatch logs. In PostgreSQL, triggers are executed within the same transaction as the INSERT statement. Consequently, if a trigger fails, the associated INSERT statement will also fail.
We need to consider a few points before going to this approach,
- A high number of transactions on the table results in more trigger invocations, which can lead to performance issues. In our example, we used only a
FOR EACH ROW
trigger, which activates when data is inserted into the table.
- During database operation, a transaction will only be committed if the Lambda call from the table trigger is successful.
- A synchronous Lambda function call from a trigger affects database performance, as the trigger must wait for the function's response. The asynchronous approach is the preferred option.
- A high number of transactions against the database table will invoke an equivalent number of Lambda functions. This could exceed the region-specific concurrency limits of the lambda function, which leads to throttling. Therefore, ensure that transactions on the table are within the Lambda concurrency limits.
Conclusion
In this article, you have learned the following topics,
- What is a Transactional Outbox Pattern?
- How to Implement Transactional Outbox Pattern in AWS Aurora PostgreSQL using the Trigger and its Pros and Cons