Building a serverless architecture for data collection with AWS Lambda

AWS Lambda is one of the best solutions for managing a data collection pipeline and for implementing a serverless architecture. In this post, we’ll discover how to build a serverless data pipeline in three simple steps using AWS Lambda Functions, Kinesis Streams, Amazon Simple Queue Services (SQS), and Amazon API Gateway!

How to build a serverless data pipeline in 3 steps

Data generated by web and mobile applications is usually stored either to a file or to a database (often a data warehouse). Because the data comes from different sources, such as the back-end server or the front-end UI, it is often very heterogeneous. Furthermore, this data must typically be synced with multiple services such as a CRM (e.g., Hubspot), an email provider (e.g., Mailchimp), or an analytics tool (e.g., Mixpanel). Moreover, this process must be backward compatible and avoid data loss. For such reasons, building and managing a custom infrastructure that is able to handle the collection of such data could be very expensive.

I would like to propose a solution based on AWS Cloud services with the goal of creating a serverless architecture. This also fits a typical use case where the data flow involves thousands or more events per minute.

The following schema summarizes the architecture that we’ll be describing:

Serverless Architecture for Data Collection Pipeline with AWS Lambda Functions and Kinesis Streams
Serverless Architecture for Data Collection Pipeline with AWS Lambda Functions and Kinesis Streams

The use case that I will be analyzing involves the collection of data from multiple sources: backend, front-end, and mobile generated events. The architecture for data collection is designed to send events to different destinations, like our data warehouse and other third-party services (e.g., Hubspot, Mixpanel, GTM, customer.io). As a result, the architecture includes several serverless AWS cloud services, creating a basic data collection flow that can be easily extended by adding further modules as needed:

  1. Amazon Kinesis Streams collects events that originate from front-end and mobile applications through an Amazon API Gateway that works as a REST endpoint. The events generated by the server are directly sent to Kinesis Streams using the Kinesis Producer Library.
  2. AWS Lambda Functions are used to manage the core logic of the pipeline. A first Lambda Function consumes the output of Kinesis Streams and then forwards events to each single custom Lambda Function, which works as a connector to a third-party service, providing additional features such as authentication and the retry strategy.
  3. Amazon SQS queues are used as a DLQ (Dead Letter Queue) for each AWS Lambda Function as a fallback in case of multiple failures in data processing.

Architecture implementation

1. Amazon Kinesis Streams: Manage events from multiple sources

Let’s analyze the architecture in detail.
The data that we need to track is generated by the back-end server, the front-end website, and the mobile application. The back-end directly sends events to Kinesis Streams using the KPL (Kinesis Producer Library). An example of server-side generated events is a user login or a file download.

On the other hand, events originated from the front-end and from mobile are sent to Kinesis Streams through an Amazon API Gateway that exposes a rest endpoint. An example of such events is a page scroll in the homepage tracked via javascript, or a button clicked in the iOS or Android app.

First of all, why Amazon Kinesis Streams? Amazon Kinesis Streams is the ideal tool if you need to collect and process large streams of data in real time. As a result, there are typical scenarios for using Amazon Kinesis Streams:

  • Managing multiple producers that push their data feeds directly into the same stream
  • Collecting real-time analytics and metrics
  • Processing application logs
  • Integrating the data collection pipeline with other AWS services (i.e. consumers) to process the events

In addition, the API Gateway can be used as the endpoint for front-end and mobile. During the configuration of Amazon Kinesis Streams, we suggest starting with a single shard and increase it only when needed.

The following python code allows you send 300 events to the configured stream. The attribute PartitionKey is a random id used to partition events among shards; in the example, we generated it using a datetime field. Finally, pay attention to payload content format: remember to always add data and name fields. You can find a ready-to-use blueprint in the AWS Lambda console.

import boto3
from time import gmtime, strftime
client = boto3.client(
    service_name="kinesis",
    region_name="us-east-1",
)
for i in xrange(300):
    print "sending event", i, "\nresponse: ",
    response = client.put_record(
        StreamName="data-collection-stream",
        Data='{"name":"event-%d","data":{"square":%d}}' % (i, i*i),
        PartitionKey=strftime("PK-%Y%m%d-%H%M%S", gmtime()),
    )
    print response

 2. Manage events routing and retry strategy with AWS Lambda Functions

AWS Lambda is a serverless cloud service by Amazon Web Services that runs your code for virtually any type of backend application. Lambdas automatically manage the underlying compute resources for you in response to events triggered from other AWS services or call it directly from the outside. Relevant AWS Lambda Functions:

  • Process a single event in real-time without managing servers
  • Are highly scalable
  • Provide a fallback strategy in case of error

With reference to the proposed architecture, an AWS Lambda Function is first directly triggered by Kinesis Streams itself. To preserve the priority, we recommend configuring the trigger with batch size equals to 1 and starting position set to trim the horizon, as shown in the following snapshot of the AWS Lambda Function console:

Trigger AWS Lambda Functions with Amazon Kinesis Streams
Trigger AWS Lambda Functions with Amazon Kinesis Streams

This AWS Lambda Function is in charge of routing the events coming from the Kinesis Stream to several destination services. For a more cost-effective solution, we recommend implementing a conditional routing based on the same event properties. For instance, based on the event name, a set of rules can be configured to decide whether to discard or to forward an event toward a certain destination.
With this purpose, I defined a set of routing rules in a JSON file stored in an S3 bucket:

[
 {
   "destination_name": "mixpanel",
   "destination_arn": "arn:aws:lambda:region:account-id:function:function-name:prod",
   "enabled_events": [
     "page_view",
     "search",
     "button_click",
     "page_scroll",
   ]
 },
 {
   "destination_name": "hubspotcrm",
   "destination_arn": "arn:aws:lambda:region:account-id:function:function-name:prod",
   "enabled_events": [
     "login",
     "logout",
     "registration",
     "page_view",
     "search",
     "email_sent",
     "email_open",
   ]
 },
 {
   "destination_name": "datawarehouse",
   "destination_arn": "arn:aws:lambda:region:account-id:function:function-name:prod",
   "enabled_events": [
     "login",
     "logout",
     "registration",
     "page_view",
     "search",
     "button_click",
     "page_scroll",
     "email_sent",
     "email_open",
   ]
 }
]

I recommend using the AWS Lambda Function environment variables to configure the name of the S3 file and bucket so that they can be updated at any time without editing the code. Each rule has a destination_arn attribute that configures where the event is to be sent if its name is included in the list of enabled_events. The AWS Lambda Function will send the event to the configured ARN (Amazon Resource Name), with each corresponding to a specific Lambda Function.

I suggest outputting events one by one to avoid data loss. In this way, the AWS Lambda Function in charge of conditional routing is sure to manage only one message at a time.

Finally, we need to implement a new AWS Lambda Function that is in charge of validating the incoming event and forwards it to each destination service (e.g., Google Analytics, Mixpanel, Hubspot, database). Such functions are asynchronously invoked by the routing Lambda (using the destination_arn).
Each AWS Lambda function implements:

  • The connector: It provides the logic to connect to the destination service (e.g., HubSpot)
  • The retry strategy: If event saving fails, it retries for a specified number of times with an exponential delay

In addition, a few suggestions that we found useful during the design and the development of the data collection architecture:

  • Invoke Lambda Functions that work as connector asynchronously.
  • Always create aliases and versions for each Function. This configuration enables you to publish a different version of your Lambda Function code depending on development, beta, and production workflow.
  • Use environment variables for configurations.
  • Create a custom IAM role for each AWS Lambda Function.
  • Detect delays in stream processing by monitoring the IteratorAge metric in the Lambda console’s monitoring tab.

 3. Configuring a Dead Letter Queue on AWS Lambda to avoid event loss

While each Lambda Function implements its own retry strategy, some events may not be successfully stored by the destination service (e.g., network problems, missing data, etc.).

Fortunately, Lambda allows you to implement a fallback strategy and, in the event of errors, you can discard the event and store it into a so-called Dead Letter Queue (DLQ) The DLQ can be either an SQS (Amazon Simple Queue Service) or an SNS (Amazon Simple Notification Service).

From advanced settings of Lambda (see the following snapshot) you can configure the AWS Lambda Function to forward payloads that were not processed to a dedicated Dead Letter Queue. In our example, we configured an SQS queue.

Configure DLQ in AWS Lambda to avoid data loss in our Serverless architecture
Configure DLQ in AWS Lambda to avoid data loss in our serverless architecture

You can recover the complete list of DLQ messages using another AWS Lambda Function that is manually triggered. Remember to set a DLQ on each Lambda Function that can fail! Then you can process all of the collected events again with a custom script, like the following:

import json
import boto3
def get_events_from_sqs(
       sqs_queue_name,
       region_name='us-west-2',
       purge_messages=False,
       backup_filename='backup.jsonl',
       visibility_timeout=60):
    """
        Create a json backup file of all events in the SQS queue with the given 'sqs_queue_name'.
        :sqs_queue_name: the name of the AWS SQS queue to be read via boto3
        :region_name: the region name of the AWS SQS queue to be read via boto3
        :purge_messages: True if messages must be deleted after reading, False otherwise
        :backup_filename: the name of the file where to store all SQS messages
        :visibility_timeout: period of time in seconds (unique consumer window)
        :return: the number of processed batch of events
    """
    forwarded = 0
    counter = 0
    sqs = boto3.resource('sqs', region_name=region_name)
    dlq = sqs.get_queue_by_name(QueueName=sqs_queue_name)
    with open(backup_filename, 'a') as filep:
        while True:
            batch_messages = dlq.receive_messages(
                MessageAttributeNames=['All'],
                MaxNumberOfMessages=10,
                WaitTimeSeconds=20,
                VisibilityTimeout=visibility_timeout,
            )
            if not batch_messages:
                break
            for msg in batch_messages:
                try:
                    line = "{}\n".format(json.dumps({
                       'attributes': msg.message_attributes,
                       'body': msg.body,
                    }))
                    print("Line: ", line)
                    filep.write(line)
                    if purge_messages:
                        print('Deleting message from the queue.')
                        msg.delete()
                    forwarded += 1
                except Exception as ex:
                    print("Error in processing message %s: %r", msg, ex)
            counter += 1
            print('Batch %d processed', counter)

Conclusions

I strongly recommend that you consider building a serverless architecture for the management of your data collection pipeline. In this post, we designed a serverless data collection pipeline using several AWS Cloud services such as AWS Lambda Functions and Amazon Kinesis Streams in order to ensure scalability and prevent data loss. The main advantage of this solution is the full control of the process and the cost, since you only pay for the compute time that you consume.

In conclusion, I would suggest a couple of very useful extensions that can be integrated into the serverless architecture:

  • Create a custom CloudWatch dashboard to verify the performance of the processing, monitor the presence of events in the DLQ, and eventually raise alerts in the case of issues.
  • Configure Kinesis Firehose and link it as the output of the Kinesis Streams module to ensure that all data is backed up on S3.
  • Implement a listener with Kinesis Analytics to discover correlations among the incoming events.
Avatar

Written by

David Santucci

Computer science engineer, passionate about machine learning and Lego Mindstorms.


Related Posts

Avatar
Cloud Academy Team
— July 9, 2020

Which Certifications Should I Get?

The old AWS slogan, “Cloud is the new normal” is indeed a reality today. Really, cloud has been the new normal for a while now and getting credentials has become an increasingly effective way to quickly showcase your abilities to recruiters and companies.With all that in mind, the s...

Read more
  • AWS
  • Azure
  • Certifications
  • Cloud Computing
  • Google Cloud Platform
Alisha Reyes
Alisha Reyes
— July 2, 2020

New Content: AWS, Azure, Typescript, Java, Docker, 13 New Labs, and Much More

This month, our Content Team released a whopping 13 new labs in real cloud environments! If you haven't tried out our labs, you might not understand why we think that number is so impressive. Our labs are not “simulated” experiences — they are real cloud environments using accounts on A...

Read more
  • AWS
  • Azure
  • DevOps
  • Google Cloud Platform
  • Machine Learning
  • programming
Joe Nemer
Joe Nemer
— June 19, 2020

Kickstart Your Tech Training With a Free Week on Cloud Academy

Are you looking to make a jump in your technical career? Want to get trained or certified on AWS, Azure, Google Cloud Platform, DevOps, Kubernetes, Python, or another in-demand skill?Then you'll want to mark your calendar. Starting Monday, June 22 at 12:00 a.m. PDT (3:00 a.m. EDT), ...

Read more
  • AWS
  • Azure
  • cloud academy content
  • complimentary access
  • GCP
  • on the house
Alisha Reyes
Alisha Reyes
— June 11, 2020

New Content: AZ-500 and AZ-400 Updates, 3 Google Professional Exam Preps, Practical ML Learning Path, C# Programming, and More

This month, our Content Team released tons of new content and labs in real cloud environments. Not only that, but we introduced our very first highly interactive "Office Hours" webinar. This webinar, Acing the AWS Solutions Architect Associate Certification, started with a quick overvie...

Read more
  • AWS
  • Azure
  • DevOps
  • Google Cloud Platform
  • Machine Learning
  • programming
Rebecca Willis
Rebecca Willis
— June 3, 2020

Azure vs. AWS: Which Certification Provides the Brighter Future?

More and more companies are using cloud services, prompting more and more people to switch their current IT position to something cloud-related. The problem is most people only have that much time after work to learn new technologies, and there are plenty of cloud services that you can ...

Read more
  • AWS
  • Azure
  • certification
Alisha Reyes
Alisha Reyes
— June 2, 2020

Blog Digest: 5 Reasons to Get AWS Certified, OWASP Top 10, Getting Started with VPCs, Top 10 Soft Skills, and More

Thank you for being a valued member of our community! We recently sent out a short survey to understand what type of content you would like us to add to Cloud Academy, and we want to thank everyone who gave us their input. If you would like to complete the survey, it's not too late. It ...

Read more
  • AWS
  • Azure
  • blog digest
  • Certifications
  • Cloud Academy
  • OWASP
  • OWASP Top 10
  • Security
  • VPCs
Alisha Reyes
Alisha Reyes
— May 11, 2020

New Content: Alibaba, Azure Cert Prep: AI-100, AZ-104, AZ-204 & AZ-400, Amazon Athena Playground, Google Cloud Developer Challenge, and much more

This month, our Content Team released 8 new learning paths, 4 courses, 7 labs in real cloud environments, and 4 new knowledge check assessments. Not only that, but we introduced our very first course on Alibaba Cloud, and our expert instructors are working 'round the clock to create 6 n...

Read more
  • alibaba
  • AWS
  • Azure
  • gitops
  • Google Cloud Platform
  • lab playground
  • programming
Avatar
Rhonda Martinez
— May 4, 2020

Top 5 Reasons to Get AWS Certified Right Now

Cloud computing trends are on the rise and have been for some time already. Fortunately, it’s never too late to start learning cloud computing. Skills like AWS and others associated with cloud computing are in high demand because cloud technologies have become crucial for many businesse...

Read more
  • Amazon Elastic Book Store
  • Amazon Elastic Compute Cloud (EC2)
  • AWS
  • AWS Certifications
  • Glacier
Alisha Reyes
Alisha Reyes
— May 1, 2020

Introducing Our Newest Lab Environments: Lab Playgrounds

Want to train in a real cloud environment, but feel slowed down by spinning up your own deployments? When you consider security or pricing costs, it can be costly and challenging to get up to speed quickly for self-training. To solve this problem, Cloud Academy created a new suite of la...

Read more
  • AWS
  • Azure
  • Docker
  • Google Cloud Platform
  • Java
  • lab playgrounds
  • Python
Alisha Reyes
Alisha Reyes
— April 30, 2020

Blog Digest: AWS Breaking News, Azure DevOps, AWS Study Guide, 8 Ways to Prevent a Ransomware Attack, and More

  New articles by topicAWS Azure Data Science Google Cloud  Cloud Adoption Platform Updates & New Content Security Women in TechAWSBreaking News: All AWS Certification Exams Now Available Online As an Advanced AWS Technology Partner, C...

Read more
  • AWS
  • Azure
  • blog digest
  • Certifications
  • Cloud Academy
  • programming
  • Security
Avatar
Stuart Scott
— April 27, 2020

AWS Certified Solutions Architect Associate: A Study Guide

Want to take a really impactful step in your technical career? Explore the AWS Solutions Architect Associate certificate. Its new version (SAA-C02) was released on March 23, 2020, though you can still take SAA-C01 through July 1, 2020. This post will focus on version SAA-C02.The AWS...

Read more
  • AWS
  • AWS Certifications
  • AWS Certified Solutions Architect Associate
Alisha Reyes
Alisha Reyes
— April 9, 2020

New on Cloud Academy: AWS Solutions Architect Exam Prep, Azure Courses, GCP Engineer Exam Prep, Programming, and More

Free content on Cloud Academy More and more customers are relying on our technology and content to keep upskilling their people in these months, and we are doing our best to keep supporting them. While the world fights the COVID-19 pandemic, we wanted to make a small contribution to he...

Read more
  • AWS
  • Azure
  • Google Cloud Platform
  • programming