Amazon Kinesis Data Streams
The course is part of these learning paths
This course is part 2 of 2 on how to stream data using Amazon Kinesis Data Streams.
The course covers shard capacity, Kinesis Data Streams as a streaming storage layer, and the basics of securing data in a Kinesis Data Stream.
- Build upon the topics covered in Amazon Kinesis Data Streams Part 1
- Learn about shard capacity, scaling, and limits
- Obtain an in-depth understanding of how a data streaming service is layered and its operations
- Understand how to secure a Kinesis Data Stream
This course is intended for people that want to learn how to stream data into the AWS cloud using Amazon Kinesis Data Streams.
To get the most out of this course, you should have a basic knowledge of the AWS platform.
Hello! I'm Stephen Cole, a trainer here at Cloud Academy and I'd like to welcome you to this lecture about the data stored inside a Kinesis Data Stream. What I'm going to talk about is how a Kinesis Data Stream is a type of streaming storage. To be understood, this concept needs some context.
Once I understood that a Kinesis Data Stream is a type of high-speed ephemeral storage service, it helped me conceptualize how to efficiently use data streams. That's me getting ahead of myself. Let's back up and find a better starting point.
Generally speaking, Amazon Kinesis Data Streams is a real-time data ingestion service from AWS. However, it is more accurate to describe a Kinesis Data Stream as a stream storage layer. This makes more sense when looking at how stream processing works at a high level.
In general, there are five layers of real-time data streaming. There is the source layer, the stream ingestion layer, the stream storage layer, the stream processing layer, and the destination.
The source layer is the data being collected from various sources. The steam ingestion layer is an application tier that collects the source data and publishes it to the third component, the stream storage layer. The stream processing layer is where a one or more consumers read and process the data in the stream storage layer. This processing includes ETL, aggregation, anomaly detection, or analysis. The final layer is the destination such as a Data Warehouse, a Data Lake, or a relational database. Of these, Data Lakes are the most common.
The actual ingestion, that is, putting data into a Kinesis Data Stream, is done using a Producer or Producer Application. Producers include the Amazon Kinesis Agent and custom applications built with the Amazon Kinesis Producer Library or an AWS SDK.
Data is collected by Producers in real time and stored in a Kinesis Data Stream. It's made available to Consumer Applications within milliseconds. However, data remains in a Kinesis Data Stream until it expires. By default, Data Records stored in a Kinesis Data Stream expire after 24 hours. This retention period means that a Kinesis Data Stream is a storage layer for stream processing.
Let's look at that in a little more detail. Producers are applications that capture and send data into Kinesis Data Streams for processing in real time. It is possible to create Producers using the AWS SDKs, the Amazon Kinesis Producer Library, the Kinesis Agent, and third-party tools.
The primary difference between Kinesis Data Streams and Kinesis Data Firehose is that Kinesis Data Streams typically requires custom code to get data from the stream. Data Firehose, however, is a fully-managed service that includes the Producers, the Stream, and Consumers.
With Kinesis Data Streams and thinking about it as a storage layer, it's possible to access the data stream with multiple Consumer Applications that are different.
Kinesis Data Firehose does not have this ability. It is a set of trade-offs. Where Kinesis Data Streams do not auto scale, Firehose streams automatically scale as needed. Also, it is not possible to create custom applications to consume the data directly.
Instead, an AWS Lambda function can be used to transform data before it enters the Firehose Data Stream. If you want to simply stream data to durable storage--such as S3--Firehose is going to be easier from a development point of view because that is what it was designed to do.
When creating a custom Kinesis Producer application, the Kinesis SDK can be used to send data directly into a Data Stream programmatically. The APIs can also be called from the AWS command-line interface.
As an example, here's how to create a stream called myStream from the AWS CLI with a single shard. The parameter --shard-count is required because a shard is a unit of scale in a Kinesis Data Stream and every Data Record is put into a shard.
A shard can also be thought of as a partition in a data stream but this can cause some confusion with terminology because, when putting data into a Kinesis Data Stream one of the required parameters is a Partition Key. The Partition Key parameter is used by Kinesis to determine which shard to use inside the stream when storing data. Because of this, it's possible to think of shards as a unit of scale.
The available throughput--including the amount of data that can be stored in a stream--and cost of a stream are determined by the number of provisioned shards. More shards means more throughput. More shards also comes with a larger cost.
When using the command line to see the status of a stream, use the AWS CLI command describe-stream-summary. This will give the stream's status but not provide any details about the shards. For example, to see the status of the earlier command that created a stream with the name of myStream. It will return something that looks like this.
In this example, the StreamStatus is ACTIVE. While it's in the process of being provisioned, the StreamStatus is CREATING. When using the AWS CLI and piping the results to jq, this command will return only the status.
Jq is a lightweight and flexible command-line JSON processor. It can be used to slice, filter, map, and transform structured data. The man page for jq is long but there are a number of tutorials to be found online and the documentation is extensive.
In my previous example, I piped the output--in JSON--to jq. The -r means that I want raw output. I do not want JSON formatted output with quotes. This is a personal preference only. Inside the single quotes is the data I want. From the StreamDiscriptionSummary I want StreamStatus.
If you are going to spend any significant amount of time using the AWS CLI or start working with JSON, jq will help you get the information you want quickly and easily. Also, to make this code more readable and fit nicely on the screen, I used a line continuation character. In Linux, the line continuation character is a backslash. When using PowerShell, there are two characters that act as a line-continuation character; the backtick ( ` ) or the pipe ( | ) symbol.
The Kinesis Producer Library--also called the KPL-- and the Command Line Interface use the same APIs to interact with the service. I'm going to take a little time to go through some of them and show how they work.
The purpose of this is to illustrate how Kinesis Data Stream Producer Applications put Data Records into a Stream and show how the Data Stream is accessed by Consumer Applications.
There are two distinct API calls available to write Data Records to a Kinesis Data Stream, putRecord() and putRecords(). The method putRecord() writes a single Data Record to a Kinesis Data Stream. Alternatively, when there is a batch of records that need to be written, use putRecords(). That's with an 's.'
When writing multiple records at the same time, putRecords() supports batches of up to 500 records or 5 megabytes of data. In general, putRecords() is preferable to using putRecord(). To explain and Illustrate, I've written some code in Python to put 3 Data Records into a Kinesis Data stream one at a time as well as in a batch.
I needed some data for my code that put records into a Kinesis Data Stream. To keep it simple, I chose my name along with two of my co-workers, Stuart and Will. To add some variety, I used the python datetime module to include the time of day.
When calling putRecord(), three pieces of data are required. The stream name, the partition key, and the data. Records 1 and 3 used a partition key value of partitionKey1. Record 2 had a partition key of partitionKey2.
Here are the responses.
Since record 1 and record 3 had the same Partition Key specified, they were assigned the same ShardID. Putting the records into a batch is similar. Like with putRecord(), putRecords() requires the stream name and data. However, the partition key is sent with the data.
Sending the same three names using putRecords()--with updated times--returns a different type of output. There are a couple of things to note in this response. FailedRecordCount is 0. There are 3 records listed.
From a throughput perspective, Kinesis Data Streams treats batching records the same way as individual requests. In a batch, each record is considered separately and counted against the overall throughput limits for a shard.
However, putting one record at a time into a Kinesis Data Stream has the potential to create a significant amount of request overhead. Putting records into batches reduces the size and number of HTTP requests made to AWS.
Batching records can improve an application's performance. If an application has to wait for every individual request to complete, a large number of requests will add to the latency of a producer application. As a general rule, when possible, prefer batch operation.
When using the APIs or SDK to create a consumer application with multiple threads; each thread gets data from a shard using the GetRecords() API in conjunction with the getShardIterator() API.
A shard iterator specifies the position in the stream where to start reading Data Records. This position can be a sequence number, a timestamp, the first record in a stream, or the most recent record.
Data is returned at a rate of 2 megabytes per second, per shard. This is a limitation of the service and cannot be changed.
The throughput available on a stream is the sum of the provisioned shards. For example, if there are 6 shards, the stream will have an aggregate throughput of 12 MB per second downstream but each shard will have 2 MB per second limit of its own.
The GetRecords() API call is a polling operation that can only be made 5 times per second on a single shard. This is also a limitation of the service and cannot be changed.
This means that the fastest that consumers can request data from the stream is every 200 ms because it has to stay below the threshold of 5 requests per second.
If a consumer calls GetRecords() with a polling window of 100 ms this means it would make 10 requests per second. 5 of the requests would get throttled and the Kinesis service would return an exception. The consumer would then have to process the exception, do some sort of backoff operation, and then retry the request.
There are two maximums to be aware of when requesting data from a stream. The maximum number of records that can be returned by a single GetRecords()API request is 10,000. The maximum amount of data that a single GetRecords() request can return is 10 MB.
If a request returns this much data, subsequent calls made for the next 5 seconds will return a ProvisionedThroughputExceededException. This keeps the throughput at 2 MB per second. Like the polling window example, a consumer that processes a 10 MB data record would need to have some type of logic to handle subsequent requests.
These limitations are per shard. If multiple consumers access a single shard, they share the limit of 2 MB per second and the 200 ms access rate. Putting this into round numbers, this means that if a single shard has 5 consumers that are polling it, each one of the consumers will be able to access the shard once per second.
I sometimes have problems visualizing math like this in my head. So, I'll break it down a bit further.
There are 1,000 milliseconds in a second. If this single second is divided by the 200 millisecond limit, 1,000 divided by 200 is 5. 5 consumers can access a shard once per second.
Similarly, since the shard's throughput is limited to 2 MB per second, dividing this amount between the 5 consumers means that each one of them can receive about 400 kilobytes per second.
The takeaway here is that adding consumers to a shard lowers the available throughput for each consumer.
For some, this limitation is a problem. Thankfully, AWS created a new type of consumer called Enhanced Fan-Out to address it. You can learn more about Enhanced Fan Out in Part 1 of this course in the lecture titled The Elements of a Kinesis Data Stream.
A simplified version of how Consumer Applications work looks like this. It starts with a loop. Use getShardIterator() to get the location of the first data record available in the Data Stream. The oldest record in a stream is at the trim horizon.
In fact, there are five possible starting positions for getting Data Records out of a Kinesis Data Stream.
- AT_SEQUENCE_NUMBER will start reading at a specific position in the Data Stream.
- AFTER_SEQUENCE_NUMBER will start reading right after the position specified.
- AT_TIMESTAMP will start reading from the Data Stream at the specified time.
- TRIM_HORIZON--as I've recently mentioned--starts at the oldest record in the shard.
- LATEST starts with the most recent record put into the shard.
Several elements are returned by getRecords(). One of them is the value NextShardIterator. This is the next sequence number in the shard. Use this value with getRecords()to get the next Data Record.
Continue with the loop until get returns a null value. When null, the shard has been closed and the requested iterator will not return any more data.
If there's any one lesson I've learned while working in the cloud it is that failures happen. Well, maybe this isn't the exact lesson. That failures happen is more life-lesson and this is not unique to cloud computing.
Thinking about it more, what I've learned to do while putting workloads in the cloud is to anticipate the various types of failure and respond accordingly.
In the early days of computing--as far back as the age of mainframes--the focus while writing code was more on preventing errors from happening at all. This is still important. Writing quality code that solves problems efficiently and cost-effectively is still essential.
That said, the more distributed a system is, the more places failures can occur. Failures that are out of the control of application developers. Expecting failures to happen means that I can anticipate them in a general way. This is true of Amazon Kinesis Data Streams.
If a request fails due to what's called a retryable failure, the AWS SDK retries the request up to three times by default. An example of this would be a failure due to a ServiceUnavailable or similar transient 500-level error.
The retries use something called an exponential backoff. The default delay is 100 milliseconds and the time between the following retries increases exponentially. When provisioning a stream, the number of retries and the base delay are configurable and can be changed.
While it seems that the AWS SDK has failure management built into it, there are cases where this is not true. It's possible to get a 200 response--meaning the request succeeded--where the service was available but one or more records were not processed because the provisioned throughput was exceeded.
If the value of FailedRecordCount is greater than 0, this means there's been a partial failure of a request. Some records have been written to a stream successfully but one or more of them have failed.
Batch operations are not atomic. An atomic transaction is one that either completely succeeds or completely fails. The AWS SDK for Kinesis Data Streams will treat partial failures as a success and then move on to the next transaction.
This means that, when building Producer applications using the AWS SDK, relying on a 200 response code could result in data loss. The primary cause of this type of failure is exceeding the throughput of either a stream or an individual shard.
This can happen even after carefully calculating throughput requirements and provisioning a stream appropriately because of traffic spikes and network latency. These things can cause records to be put into the stream unevenly. In turn, this results in having spikes in throughput.
To alleviate network latencies for Producer Applications running inside a VPC, always use an Interface VPC Endpoint. This will keep traffic between the Amazon VPC and Kinesis Data Streams inside the Amazon network. Otherwise, traffic to Kinesis Data Streams from the VPC needs to be routed through an Internet Gateway.
Using an endpoint, traffic to Kinesis Data Streams stays inside the VPC. This minimizes latency and reduces the risk of data being exposed to the public Internet in general.
When it comes to dealing with spiky traffic, you could try to implement some form of backpressure in your producer application. In software development, backpressure is a concept borrowed from fluid dynamics. It is resistance to the amount of the desired flow of gas or liquid through a pipe.
In the context of Kinesis, backpressure generally refers to the resistance that prevents the desired flow of data through a stream.
Kinesis Data Streams takes data from a source using a Producer Application, puts it into a stream, and delivers it to a destination using a Consumer Application. Backpressure is when the process of moving that data through the stream is met with resistance. Something slows it down. This can be caused by having insufficient compute resources, throughput limits, or architectural design.
As an aside, the English language can be tricky. There are people that use the word backpressure to describe a system that can manage the resistance to data flow. You might read or hear someone describe their code as having built-in backpressure.
It sounds like they have programmed resistance to data movement in their logic. That's not it at all. What they're saying is that they're code has the ability to detect and handle backpressure when it occurs.
The takeaway is that it is important to have proper error handling and retries for partial failures.
When the FailedRecordCount is greater than 0, it's possible to do the retries manually.
To get the correct records, use the array of records from the response of the PutRecords() request.
It contains individual record responses that are in exactly the same order as the outbound request. Compare the two arrays and select the records that have an ErrorCode and ErrorMessage instead of a RecordId.
It is important to have an upper limit for the number of retries. At some point, the process needs to give up and send a request to the equivalent of the SQS dead letter queue.
Putting records into a Kinesis Data Stream takes a fair amount of effort. The data has to be formatted correctly and error handling is required. It is totally possible to do this on your own.
However, the people that created Kinesis at AWS realized that this was going to be a common and regular need. To address the complexities of data ingestion and consumption they created and released a pair of libraries to make it easier.
The Kinesis Producer Library--or KPL--is the library available from AWS that puts data into a Kinesis Data Stream. It was created to simplify the development of Kinesis producer applications and allows programmers to achieve high write throughput to a Kinesis data stream while, at the same time, manage exception and error handling.
Using the SDK, the putRecord() and putRecords() methods will put one or more Data Records into a Kinesis Data Stream. These methods even handle writing to the correct shard when data needs to be partitioned and delivered to multiple shards in a consistent manner.
So, why bother with the KPL? Well, when using the SDKs there are some limitations that are not entirely obvious when first using them. An Amazon Kinesis shard is billed by the hour and supports writes up to 1,000 records per second at a maximum rate of 1 megabyte per second.
Binary and Base10 math, when mixed, can create some rather interesting rounding errors that frustrate and confuse. For now, I'm trying to keep it simple. For the sake of this discussion, 1,000 kilobytes is a megabyte. This is in Base10. If it were base Base2--binary--1,024 kilobytes is a megabyte. The difference seems small but it can have a large impact.
For now, I'm going to use the idea of 1,000 kilobytes in Base10 is a megabyte. One way to reach the input limit of a Kinesis Data Stream is to write records that are 1 kilobyte in size at a rate of 1,000 per second. This is approximately 1 megabyte.
In the real world, this is unlikely because most streaming data is made up of much smaller record sizes. For example, consider a system that monitors failed login attempts and sends each one to a Kinesis Data Stream. Each Data Record would probably contain only a URL, an IP address, and the username and it might be 50-60 bytes in size, not 1,000.
If you don't fully utilize a shard you're basically giving Amazon Web Services money because you are paying for throughput you are not using. My personal opinion is that AWS already gets enough money from me. I don't need to give it any more. I can't make you feel the way I do. All I can is be encouraging and supportive. Hooray for wasting money?!?
The worst case--in terms of spend--is writing a single byte into a shard and paying the full price for 1,000 bytes per second of utilization. A solution to this is to put multiple records inside a single Data Record before writing it to a Kinesis Data Stream.
To manage putting multiple records in a single Data Record, AWS created the Kinesis Producer Library--the KPL--for Data Record aggregation. It has a sister library, the Kinesis Client Library--the KCL--that does Data Record de-aggregation.
The idea is to use as much of the available throughput as possible. Data Records are opaque to Amazon Kinesis. The service, itself, has zero visibility to what's inside it. So, the data has to be managed before it enters the stream and after it comes out.
This can be done manually. However, AWS realized that enough people needed this type of functionality that they released these libraries.
The KPL is licensed under the Amazon Software License which grants a...
"...perpetual, worldwide, non-exclusive, royalty-free, copyright license to reproduce, prepare derivative works of, publicly display, publicly perform, sublicense and distribute its Work and any resulting derivative works in any form."
When writing small records into a Kinesis Data Stream, using the KPL can improve throughput and lower your costs. The KPL is designed to work with Kinesis Data Streams, NOT Kinesis Data Firehose. The KPL can only put data into a Kinesis Data Stream.
Firehose has its own producers for data streams. However, an interesting thing is that one of Firehose's possible producers is Kinesis Data Streams. The workaround is to aggregate Data Records with the KPL and put them into a Kinesis Data Stream. Then, have Data Firehose use the Data Stream as a Producer.
Firehose is intelligent enough to recognize data that was aggregated by the KPL and can unpack it appropriately before sending the Data Records to their destination.
The KPL has retry logic built into it. When a failure or exception is returned from a putRecords() call, the KPL will automatically retry. There is a complete error handling mechanism to make retries at the appropriate time. Records put into a stream using the KPL should always appear in a Kinesis Data Stream at least once.
The Kinesis Client Library was created to read data from a Kinesis Data Stream that was created by the KPL, the Kinesis Producer Library. The KPL aggregates records to maximize shard and write utilization. The KCL, in turn, de-aggregates the records.
However, the KCL is more just just an application framework that processes records created by the KPL. The KCL takes care of many of the complex tasks associated with streaming and distributed computing.
The KCL can do load balancing across multiple Consumer application instances, respond to Consumer application instance failures by checkpointing records , and react to resharding operations. Basically, it acts as an intermediary between record processing logic and Kinesis Data Streams.
Rather than manually coding all of the needed logic to create a Consumer application, the KCL automatically does most of the work. The KCL is different from the Kinesis Data Streams APIs that are available in the AWS SDKs.
The Kinesis Data Streams APIs allow you to manage Kinesis Data Streams, including creating streams, resharding, and putting & getting records. The KCL provides a layer of abstraction around all of these common tasks so that you don't have to build them every time you need to create a Consumer Application.
A couple of features of the KCL that are worth mentioning here. With the KCL, there's a checkpoint feature available that allows a consumer to resume progress when an application goes down. When the application comes back online, it can pick up where it left off.
Checkpoints use DynamoDB to track a sub-sequence number that's related to the Data Record's sequence number. Inside DynamoDB, there's one row for each shard in a Kinesis Data Stream.The reason this is worth mentioning here is that, with the Checkpoint feature, it's possible to experience throttling related to DynamoDB provisioning.
This means that the Kinesis Data Stream might be configured correctly but, if DynamoDB doesn't have enough Read Capacity Units or Write Capacity Units, the throughput will be throttled. A fix for this is to use on-demand provisioning for DynamoDB.
The takeaway is that an under-provisioned DynamoDB table can throttle KCL performance.
That's it for this lecture. While it seems that I covered a number of different topics, they're all related to the idea that a Kinesis Data Stream is a storage layer for data streaming frameworks.
Generally speaking, there are five layers of a data streaming service. The source layer, the stream ingestion layer, the stream storage layer, the stream processing layer, and the destination.
While Amazon Kinesis Data Streams describes the entire streaming service from AWS, a Kinesis Data Stream--singular--is a storage layer for streaming data.
Creating a stream from the AWS CLI uses the create-stream API call.
To see the status of a stream from the AWS CLI, use the describe-stream or describe-stream-summary API call.
To put a Data Record into a stream, the API calls are putRecord() and putRecords().
The API call getShardIterator() is used to determine where to start reading data in a shard. A specific sequence number, the Data Record that comes after a specific sequence number, a time stamp, the oldest Data Record, or the newest Data Record are all valid options.
The Kinesis Producer Library--KPL--and the Kinesis Client Library--KCL--were released by AWS to make building Producers and Consumers fast and efficient.
The KPL aggregates records and the KCL de-aggregates records. The KCL also can automatically take care of common tasks related to creating Consumer applications.
That's it for now. I hope you found the information engaging and enlightening. I'm Stephen Cole with Cloud Academy. Thank you for watching!
Stephen is the AWS Certification Specialist at Cloud Academy. His content focuses heavily on topics related to certification on Amazon Web Services technologies. He loves teaching and believes that there are no shortcuts to certification but it is possible to find the right path and course of study.
Stephen has worked in IT for over 25 years in roles ranging from tech support to systems engineering. At one point, he taught computer network technology at a community college in Washington state.
Before coming to Cloud Academy, Stephen worked as a trainer and curriculum developer at AWS and brings a wealth of knowledge and experience in cloud technologies.
In his spare time, Stephen enjoys reading, sudoku, gaming, and modern square dancing.