A Simple Pipeline
More Complex Pipelines
Dealing with Time
The course is part of this learning path
Most organizations are already gathering and analyzing big data or plan to do so in the near future. One common way to process huge datasets is to use Apache Hadoop or Spark. Google even has a managed service for hosting Hadoop and Spark. It’s called Cloud Dataproc. So why do they also offer a competing service called Cloud Dataflow? Well, Google probably has more experience processing big data than any other organization on the planet and now they’re making their data processing software available to their customers. Not only that, but they’ve also open-sourced the software as Apache Beam.
Cloud Dataflow is a serverless data processing service that runs jobs written using the Apache Beam libraries. When you run a job on Cloud Dataflow, it spins up a cluster of virtual machines, distributes the tasks in your job to the VMs, and dynamically scales the cluster based on how the job is performing. It may even change the order of operations in your processing pipeline to optimize your job.
In this course, you will learn how to write data processing programs using Apache Beam and then run them using Cloud Dataflow. You will also learn how to run both batch and streaming jobs.
This is a hands-on course where you can follow along with the demos using your own Google Cloud account or a trial account.
- Write a data processing program in Java using Apache Beam
- Use different Beam transforms to map and aggregate data
- Use windows, timestamps, and triggers to process streaming data
- Deploy a Beam pipeline both locally and on Cloud Dataflow
- Output data from Cloud Dataflow to Google BigQuery
The Github repository is at https://github.com/cloudacademy/beam.
So far, all of our pipeline runs have been batch jobs. When you need to process data once, or periodically, such as every month, then batch processing is the way to go. But if you want to continuously process data as it’s generated, then you need to use a streaming job.
For example, if you wanted to have a continuously updated leaderboard showing the top scores for your mobile game, then the UserScore and HourlyTeamScore programs wouldn’t work because they run in batch mode.
One of Beam’s biggest strengths is that it uses the same programming model for both batch and streaming. That means you can run a job in either mode with only a few changes. So this Leaderboard program uses the transforms defined in UserScore and HourlyTeamScore and makes them work in streaming mode.
The LeaderBoard program takes in a never-ending stream of gaming event data. Every 10 minutes, it outputs the total score for each user. It writes these scores into a BigQuery table, so they can be retrieved by another program to display on a leaderboard. This other program isn’t part of this example, so you’ll just have to imagine it.
The LeaderBoard program does something else as well. It outputs a team score every hour. This is similar to what the HourlyTeamScore program does except that it does this every hour from an ongoing stream of data instead of just once in batch mode. The source of the stream of data is called an unbounded source.
You might be wondering where we’re going to find an unbounded source of data to feed into this program. Fortunately, they’ve also provided a program called Injector that outputs random game data. I’ll show you that later.
When designing a Beam program for streaming data, one of the first decisions you have to make is which windowing function to use. There are five different types of windowing functions available in Beam. We used fixed time windows in the HourlyTeamScore program. Each window was one hour long and there was no overlap between them. That’ll work for the team scores in the Leaderboard program as well, but we’ll need to use a different windowing function for the user scores. That’s because the user scores will be a running total of all the past data for each user rather than just the data from a particular time period. The team scores, on the other hand, will only include the data from a particular one-hour period.
Sliding time windows look like this. In this example, a new window starts every 30 seconds. This is called the period. Each window has a 60-second duration, though, so the windows overlap each other. This means that one piece of data can be in multiple windows. This type of windowing is useful for calculating running averages. That’s not what we need for our user scores.
Session windows are quite different from the others. They do not have a fixed length and they are different for every unique data key, such as for every user. A good example is tracking each user’s mouse clicks. People typically interact with a website intensively for a period of time, which you could call a session, and then are idle until their next session. To configure session windows, you specify the maximum amount of time between events that you would expect in the same session. A longer gap than that would represent idle time between sessions. This type of windowing wouldn’t work for our user scores, either.
Calendar-based windows divide up the data by a given number of days, months, or years, so they’re quite straightforward. They’re also not what we need for our user scores.
A single global window is pretty self-explanatory. In essence, it’s not really windowing because there’s only one window. If you don’t set a windowing function, then it will default to global windowing. That’s what it did in the UserScore program, which was fine because we needed to add up all of the scores in the data, so we didn’t need multiple windows.
However, if you use a global window with unbounded data, then it can cause problems. If you use a group transform, such as the Sum transform that adds up a user’s score, then it will wait until all of the data has arrived before it runs the transform. Since an unbounded data source keeps sending new data forever, the transform will never run. To prevent this situation from happening, Beam will give you an error when you try to build a pipeline with these characteristics.
There are two solutions to this problem. The first is to set a non-global windowing function. The second is to configure something called a trigger.
A trigger tells the pipeline when data in a window should be aggregated. The default trigger won’t fire until all of the data has arrived. That’s why you need to set a non-default trigger in a global window if the data source is unbounded.
The Leaderboard program needs to output each user’s total score every 10 minutes. Since a player’s total score should include all of the scores that player has achieved so far, we need to use a global window. To get the pipeline to aggregate the data every 10 minutes, we need to set a trigger.
The code that does that is at the bottom of Leaderboard. It starts by passing GlobalWindows to the Window class. Then it calls the triggering method and passes Repeatedly.forever, because this will run forever. Then it passes AfterProcessingTime. This is a processing time trigger.
Beam distinguishes between event time and processing time. The event time in our gaming scenario is when an event is generated on the player’s mobile device. The processing time is when it’s received by the pipeline.
Beam provides four types of pre-built triggers. An event time trigger looks at the timestamp on each data element, which says when the event actually occurred. When the HourlyTeamScore program calculated the team scores in each hourly window, it used the event time because it only included scores that were achieved during a given hour rather than when they were received by the pipeline.
A processing time trigger looks at the time when each data element is processed. This is what we need for the user scores because we want to update the leaderboard based on all of the data we have so far. We don’t care when the events occurred because we want to include all of the events in the processing pipeline.
Data-driven triggers operate by examining the data as it arrives, but at the moment, the only type of data-driven trigger that Beam supports is one that fires after a given number of elements have arrived.
Composite triggers simply combine multiple triggers, similar to how composite transforms combine multiple transforms.
OK, now back to the code. So this is a processing time trigger. The pastFirstElementInPane method says that the timer for this trigger starts after an element has been received in a pane. A pane is the portion of a window that contains the data to be aggregated. Then it adds plusDelayOf(TEN_MINUTES), so each pane is 10 minutes long. But since it waits until a piece of data arrives before starting a new pane, there will actually be more than 10 minutes between each firing of the trigger. That’s a bit different from how I described what LeaderBoard does. It doesn’t output the latest user scores every 10 minutes, but if there’s a fairly continuous stream of data, then it’ll be just slightly more than 10 minutes in between updates.
Next, it has to set the window’s accumulation mode. There are two possible modes: accumulating and discarding. In accumulating mode, it saves the data in all of the previous panes and adds them to the current pane. That’s what we need for this program because the user’s score needs to include all of the data so far.
The discarding mode, as you’ve probably guessed, discards the data in all of the previous panes, so the trigger only emits the aggregated data from the current pane.
The allowed lateness line won’t make sense until we’ve gone through the team score code, so I’ll explain it later.
OK. That wraps up the windowing code for user scores. Now we’ll move on to the team scores. The code for that is right here.
You’ll recall that LeaderBoard calculates the total score for each team every hour. This is very different from what it does what user scores, so it needs to use a different windowing function and a different type of trigger.
It uses fixed windows with a default size of one hour, just like in the HourlyTeamScore program. Then it adds an AfterWatermark trigger, which is an event time trigger.
So what’s a watermark? As you know, there’s a lag between when an event occurs and when the pipeline receives data about that event. For example, suppose a data element has a timestamp of 3:00 and it arrives a little while after 3:00. This is the processing time. The event time is what’s shown on the timestamp, which is 3:00. The difference between the event time and the processing time is known as time skew.
If you’re using a processing time trigger, then this lag time isn’t a concern because the trigger is based on when the data arrived in the pipeline, but if you’re using an event time trigger, then it matters a lot.
For example, if you don’t set a trigger, then the default is to wait until all of the data in a window has arrived before doing an aggregation, such as a sum. But if there’s a lag between when the event occurred and when its data arrives in the pipeline, then how will the trigger know when all of the data has arrived? Well, it has to guess.
As the pipeline starts receiving your data, it looks at the difference between the event time and the processing time for the elements. Then it comes up with an estimate of the normal lag time between the two. A watermark is the pipeline’s estimate of the current event time. This means a watermark will always be at an earlier time than the current processing time.
When the watermark passes the end of the window, then the pipeline considers the window to be complete and it fires the trigger to aggregate the data. But remember that this is just a guess. Sometimes data will arrive after the watermark passes the end of the window. This is considered to be late data. The default trigger simply discards late data. If you care about accuracy, then that’s not what you want, so you should configure a non-default trigger.
Alright, so to calculate hourly team scores, the LeaderBoard program uses a trigger called AfterWatermark.pastEndOfWindow, which is the default trigger, but then it adds some extra methods to handle late data and other conditions.
The withLateFirings method says that if a new data element arrives after the watermark has passed the end of the window, then the trigger should fire again 10 minutes later. When it fires again, then it waits for another late data element to arrive and fires again 10 minutes after that. Every time it fires, it does a new data aggregation that includes the late data it has received so far.
At some point, it needs to stop accepting late data and finalize the scores for this window. That’s what the withAllowedLateness method does. You pass it the number of minutes after the watermark passes the end of the window when it should stop accepting late data. If you don’t set that option on the command line, then the default is 120 minutes, or 2 hours.
They’ve also added a withEarlyFirings method that causes the trigger to fire before the watermark has reached the end of the window. Why would you configure that? So you could show early results during the hour, allowing people to see how the teams are doing without having to wait until the final results are in. It fires 5 minutes after the first data element arrives in a pane, so if there’s a lot of data, then it will fire essentially every 5 minutes until the watermark reaches the end of the hour.
Finally, it also includes the accumulatingFiredPanes method, just like it did for user scores. However, in this case, it’s not accumulating all of the fired panes from the beginning of the game. Instead, it’s accumulating all of the data in this window, which is only an hour long. That’s because the team scores for each hour do not includes the scores from previous hours.
Alright, that’s it for the team scores windowing code. Now we can go back to the last line of the user scores windowing code. It includes the withAllowedLateness method, just like in the team scores code. But that’s kind of weird, isn’t it, because this is a processing time trigger, so why would there be any late data? Well, I have to admit that it took me a while to realize what it’s doing here when I first saw it. You see, since the team score code discards any data that’s more than 2 hours late, the user score code needs to do that too. If it didn’t, then the user scores could be higher than they should be and there’d be a discrepancy with the team scores.
That’s it for streaming concepts. In the next lesson, I’ll show you how to write output to BigQuery tables.
About the Author
Guy launched his first training website in 1995 and he's been helping people learn IT technologies ever since. He has been a sysadmin, instructor, sales engineer, IT manager, and entrepreneur. In his most recent venture, he founded and led a cloud-based training infrastructure company that provided virtual labs for some of the largest software vendors in the world. Guy’s passion is making complex technology easy to understand. His activities outside of work have included riding an elephant and skydiving (although not at the same time).