This demonstration application shows an approach to implementing a Last-In, First-Out (LIFO) queue, using AWS Lambda, Amazon DynamoDB and other AWS Serverless technologies. The demonstration is an AWS Serverless Application Model (AWS SAM) application and is written in JavaScript.
We created the application as a companion to an AWS Compute Blog post (see link below). The post also describes a use case where this approach is useful.
The demonstration supports use cases where a system must prioritise newer tasks over older tasks, and where older tasks can be deleted under insurmountable high load (via load shedding). For these use cases a First-In, First-Out (FIFO) queue implementation is not appropriate, as the oldest tasks (first in) would be prioritised, and load shedding would be applied to newer tasks (last in), which is the opposite of what’s required.
Under low load (sufficient capacity), the newest tasks will take priority, but eventually all tasks would be processed. Under heavy load (constrained capacity), the newest tasks still take priority, but older tasks will be buffered, and may eventually be dropped (load shedding). Under conditions of insurmountable load, this approach allows the system to continue doing useful work with fresh tasks, but to eventually give up on older stale tasks.
The application is not production-ready, it is a demonstration. You can run the application using AWS SAM. Instructions are below.
The application consists of the following resources defined in the AWS SAM template:
QueueTable
: An Amazon DynamoDB table containing queue task items.TriggerFunction
: An AWS Lambda function which triggers queue task item processing.ProcessTasksFunction
: An AWS Lambda function which processes queue task items.CreateTasksFunction
: An AWS Lambda function which creates queue task items.TriggerTopic
: An Amazon Simple Notification Service (SNS) topic whichTriggerFunction
subscribes to.ProcessTasksTopic
: An Amazon SNS topic whichProcessTasksFunction
subscribes to.
CreateTasksFunction
inserts task items intoQueueTable
withPENDING
state.- A DynamoDB stream invokes
TriggerFunction
for all item activity inQueueTable
. TriggerFunction
publishes a notification onProcessTasksTopic
if task items should be processed.ProcessTasksFunction
subscribes toProcessTasksTopic
.ProcessTasksFunction
fetchesPENDING
task items fromQueueTable
for up to 1 minute, or until allPENDING
task items have been processed.ProcessTasksFunction
processes eachPENDING
task item by calling the legacy system.ProcessTasksFunction
updates each task item during processing to reflect state (first toTAKEN
, and then toSUCCESS
,FAILURE
orPENDING
).ProcessTasksFunction
publishes an SNS notification onTriggerTopic
ifPENDING
task items remain in the queue.TriggerFunction
subscribes toTriggerTasksTopic
.
Activity continues whilst QueueTable
events are sent on the stream (2) or notifications are
published on TriggerTasksTopic
(9).
ProcessTasksFunction
gets tasks by querying theQueueTable
Global Secondary Index (GSI).- The query returns
- a small page of task items (e.g. 10 tasks),
- with a state of
PENDING
, - sorted by created timestamp descending (LIFO).
- Some older
PENDING
tasks may never be selected under insurmountable load.
- Before processing a task,
ProcessTasksFunction
transitions thetaskStatus
attribute fromPENDING
toTAKEN
. - When updating the task item, a DynamoDB Update Expression
and Condition Expression
updates the
taskStatus
attribute toTAKEN
only if the value is stillPENDING
. - This technique ensures the task is processed only once, and protects against multiple concurrently invoked functions (this shouldn’t happen, but we must still be careful).
- The application contains a fake implementation of task processing logic.
- Once processing for a task is complete, the function updates the
taskStatus
attribute toSUCCESS
,FAILURE
orPENDING
, depending on the outcome of the task work. - If the function updated the
taskStatus
attribute toPENDING
, it is effectively placed back on the queue. The task drops in priority compared to newer tasks (as the created timestamp is not updated). - When the task item is updated an Update Expression and Condition Expression (as described above) protects against concurrent and invalid modifications.
- When the load reaches an insurmountable level some tasks will never be selected for processing, and a backlog
will build up. Each time
ProcessTasksFunction
polls for batches ofPENDING
tasks to process, it selects those tasks based on created timestamp, in descending order. If the rate of new task creation exceeds the rate of task processing then the backlog will build up. - To avoid an ever-growing task backlog
QueueTable
is configured to delete the backlog items when the TTL timestamp of each item expires.
Here are a few things to consider if you intend to use the implementation in this demonstration application as inspiration for your own system:
- Read and understand the demonstration code carefully, and especially aim to understand how DynamoDB expressions have been implemented.
- You will need your own approach to creating the task items. This demonstration application creates fake tasks purely for demonstration purposes (see app/create_tasks.js).
- You will also need to provide your own real implementation of a task runner. This demonstration application contains only a fake task runner (see app/task_runner.js).
- You may need to use rate limiting (or another similar technique) to protect the downstream system.
- Think about how you will observe your application. Amazon CloudWatch and AWS X-Ray provide good logging and tracing for your application, and this demonstration application has examples of this.
- Each
QueueTable
item is a task in the queue. - Each item contains the following attributes:
taskId
, configured as the table hash keytaskStatus
(e.g.PENDING
orSUCCESS
)taskCreated
(timestamp)taskUpdated
(timestamp)ttl
(timestamp), configured as the DynamoDB table TTL attribute- potentially other fields relevant to the task work itself
- The table has one Global Secondary Index (GSI)
taskStatus
is the hash key andtaskCreated
is the range key.
- A DynamoDB stream is configured, which is an event source for
TriggerFunction
. - DynamoDB Condition Expressions and Update Expressions ensure atomic and exclusive database item modification (e.g. for transition the state of each queue task item).
- See
QueueTable
in template.yaml.
TriggerFunction
is a proxy forProcessTasksFunction
(see below).- The function is invoked by:
- Events from the
QueueTable
DynamoDB Stream. - Notifications on
TriggerTopic
.
- Events from the
- Notifications published on
TriggerTopic
are tail calls fromProcessTasksFunction
. TriggerFunction
callsProcessTasksFunction
if an event is:- An INSERT into
QueueTable
(filters out all other DynamoDB events). - Any notification on
TriggerTopic
(all tail calls).
- An INSERT into
- The DynamoDB stream event source is configured so that table events arrive quickly, in batches of 10, with maximum 10 seconds delay.
- The function is configured without a concurrency limit to minimise the risk of missing trigger events.
- The function is configured for zero retries on asynchronous invocation to avoid redundant attempts
to invoke
ProcessTasksFunction
. - See
TriggerFunction
in template.yaml and app/trigger.js.
ProcessTasksFunction
implements the LIFO queue behaviour.- The function is called by
TriggerFunction
, viaProcessTasksTopic
. - Whilst the function is executing it polls for
PENDING
tasks, and works on those tasks. - The function remains active for up to 1 minute, and then exits.
- If there are no more
PENDING
tasks (after polling), the function will exit. - Before the function exits, if there are remaining
PENDING
tasks, the function will tail callTriggerFunction
(by publishing onTriggerTopic
), so that any remaining tasks can be processed. - Each task is transitioned from
PENDING
toTAKEN
state, and then following processing by task runner logic, the task is transitioned toSUCCESS
orFAILURE
state. - A fake implementation of task runner logic simulates integration with a throughput constrained external system.
- The function has a concurrency limit of 1, so that the function is in control of task processing concurrency.
- The function is configured for one retry on asynchronous invocation, to minimise the risk of
missing a notification that there are
PENDING
state task items. - See
ProcessTasksFunction
in template.yaml and app/process_tasks.
CreateTasksFunction
creates and inserts fake task items intoQueueTable
, and has fake task processing logic.- A real application would insert real task items into the table, and provide real task processing logic.
- On creation, the
taskStatus
attribute is updated toPENDING
. - The task item
ttl
attribute is set to 1 hour into the future, so that DynamoDB TTL functionality will delete the items once they expire. - See
CreateTasksFunction
in template.yaml and app/create_tasks.
- Structured logging statements (JSON objects) enable detailed log analysis with Amazon CloudWatch Logs Insights.
- AWS X-Ray tracing is added to the Lambda configuration and X-Ray captures and wraps the DynamoDB and SNS client objects.
Important: this application uses various AWS services and there are costs associated with these services after the Free Tier usage - please see the AWS Pricing page for details. You are responsible for any AWS costs incurred. No warranty is implied in this demonstration application.
- An AWS account. (Create an AWS account if you do not already have one and login.)
- AWS CLI already configured with Administrator permission.
- AWS SAM CLI installed. (the application was developed with version 1.22.0)
- NodeJS 14.x installed.
- Build and deploy the demonstration application using AWS SAM.
- If you're not familiar with SAM then a good place to start is the Getting started with AWS SAM page.
- Use
CreateTasksFunction
to experiment with creating task items. - You can invoke the function directly in the AWS Management Console.
- You could also use Amazon CloudWatch Schedule Expressions for Rules to invoke the function on a schedule.
- The function creates a few
PENDING
task items. If you invoke the function repeatedly (either manually or via a schedule) then you will be simulating a flow of tasks into the system.
Here are some suggestions on how to monitor task queue processing:
- Look at the items in
QueueTable
, especially thetaskStatus
attribute. Watch for tasks transitioning fromPENDING
toTAKEN
and so on. - Look at the application logs in Amazon CloudWatch Insights. Filter the log events based on the structured logging attributes.
- Look at the application traces in AWS X-Ray. This is a good way to see the tail calls in action.
- You can clean up the demonstration application by deleting the associated CloudFormation stack.
- You may need to manually delete some resources, such as the CloudWatch log groups.
See CONTRIBUTING for more information.
This demonstration application is licensed under the MIT-0 License. See the LICENSE file.