A Simple Pipeline
More Complex Pipelines
Dealing with Time
The course is part of these learning paths
Most organizations are already gathering and analyzing big data or plan to do so in the near future. One common way to process huge datasets is to use Apache Hadoop or Spark. Google even has a managed service for hosting Hadoop and Spark. It’s called Cloud Dataproc. So why do they also offer a competing service called Cloud Dataflow? Well, Google probably has more experience processing big data than any other organization on the planet and now they’re making their data processing software available to their customers. Not only that, but they’ve also open-sourced the software as Apache Beam.
Cloud Dataflow is a serverless data processing service that runs jobs written using the Apache Beam libraries. When you run a job on Cloud Dataflow, it spins up a cluster of virtual machines, distributes the tasks in your job to the VMs, and dynamically scales the cluster based on how the job is performing. It may even change the order of operations in your processing pipeline to optimize your job.
In this course, you will learn how to write data processing programs using Apache Beam and then run them using Cloud Dataflow. You will also learn how to run both batch and streaming jobs.
This is a hands-on course where you can follow along with the demos using your own Google Cloud account or a trial account.
- Write a data processing program in Java using Apache Beam
- Use different Beam transforms to map and aggregate data
- Use windows, timestamps, and triggers to process streaming data
- Deploy a Beam pipeline both locally and on Cloud Dataflow
- Output data from Cloud Dataflow to Google BigQuery
The Github repository is at https://github.com/cloudacademy/beam.
The UserScore program in the last lesson added up all the scores for each player regardless of when those scores were achieved. If we wanted to calculate the scores for a particular day, then we’d have to make sure that the data file we fed in would only contain scores from that day.
Even if we set up an automated job that saved a day’s events at midnight and sent it to the UserScore program, we still couldn’t be sure that it would contain the right data. Why? Because in this mobile gaming system, events don’t get logged immediately after they occur. The information needs to be sent to a game server from the player’s mobile device, which might temporarily lose its connection. This means that today’s data might contain data from yesterday and might be missing data that actually did occur today.
To solve this problem, we can use windowing. The idea is that you divide the data into multiple windows based on time. To make it work, each element needs to have a timestamp so Beam will know which window to put it in.
This concept is demonstrated in the HourlyTeamScore program. It divides the data into hourly windows and calculates the total score per team, rather than per player, during each hour.
HourlyTeamScore is a subclass of UserScore, so it’s relatively short because it only needs to add the windowing features.
Here’s the pipeline. It reads the data and parses it just like UserScore did. Then it uses the Filter transform to filter out any events that occurred outside of the overall time period you want to analyze. In this case, we want to look at data from a 24-hour period and divide it into hourly windows, so we need to filter out any events that are outside of the 24-hour period.
The Filter transform runs each element through this predicate. If this statement is true for an element, then that element will be included in the new PCollection. In this case, it makes sure that the event’s timestamp is after the start time of the period. The next Filter only includes elements that have a timestamp before the end of the period.
The next transform might seem unnecessary because it adds a timestamp. Why would we need to do that when the data elements already have timestamps? It’s because Beam’s windowing functions don’t know about the timestamps in the data. They need to use a standardized timestamp. This transform takes the existing timestamp from the data and adds it as a type called Instant.
Then the next transform, called Window, divides all of the data into windows based on the duration that you give it. If you don’t set the duration on the command line, then it defaults to 60 minutes, which is what we want.
The next transform is the same as it was in UserScore except that it passes the value “team” to ExtractAndSumScore so that it will calculate scores for teams rather than individual players.
And finally, this transform is the same as in UserScore except that it uses a new configureOutput method that’s defined here, because it needs to print scores by team and by window. It also sets the last argument to “true”, which tells WriteToText that this is windowed data.
OK, to run it, we need to be in the java8 directory again. Then paste this command. The command to run HourlyTeamScore is the same as it was for UserScore except that we need to add the startMin and stopMin options. These will set the boundaries for the time period we want to analyze. The default data files only contain data for Nov. 16th to 17th, 2015, so we need to select a 24-hour period within those two days. We’ll use 4pm on Nov. 16th to 4pm on Nov. 17.
I’ll fast forward to when the job is done. OK, the job finished, so let’s have a look at the console.
Check out the autoscaling graph. It shows that Dataflow adjusted the number of workers a couple of times based on how the job was doing.
One thing I didn’t point out for the last job is this custom counter. It shows that there were 328 parse errors. Remember the parse errors we saw in the log? With this custom counter, we don’t have to manually figure out how many parse errors there were in the log.
Here’s where the ParseErrors counter was set in UserScore. You create the counter here...and you increment the counter here whenever a parse error occurs.
OK, now let’s look at the output from this job in Cloud Storage. I need to refresh the screen because I already had this open before the job started. There we go. There are a lot more files this time. That’s because there are 3 files for every hour. Multiply that by 24 hours and you get 72 files.
Open up one of the files. On each line, it lists the start time for the window, the total score, and the team name. The window start time is the same for every event in this file because each file only contains data for one window.
And that’s it for this lesson.
About the Author
Guy launched his first training website in 1995 and he's been helping people learn IT technologies ever since. He has been a sysadmin, instructor, sales engineer, IT manager, and entrepreneur. In his most recent venture, he founded and led a cloud-based training infrastructure company that provided virtual labs for some of the largest software vendors in the world. Guy’s passion is making complex technology easy to understand. His activities outside of work have included riding an elephant and skydiving (although not at the same time).