One of the best ways to learn new programming languages and concepts is to build something. Learning the syntax is always just the first step. After learning the syntax the question that arises tends to be: what should I build? Finding a project to build can be challenging if you don’t already have some problems in mind to solve.
Throughout this course, we’re going to learn more about Python 3 by building a data ingestion process. We’re going to go from setting up a development VM through to deploying the app to a 16 core, cloud VM where we can test. The application is going to allow us to submit articles to a front-end HTTP endpoint where they’ll be enqueued onto a multiprocessing queue. On the back-end, a set of worker processes will dequeue the articles, extract named entities from the article, and enqueue the results to be saved. A set of saver processes will dequeue the results and save the records to Cloud Firestore. Both the front and back-ends will be run together using supervisord. And we’ll use setuptools to create a software distribution used to deploy the app.
This course is broken up into sprints to give you a real-world development experience, and guide you through each step of building an application with Python.
If you have any feedback relating to this course, feel free to contact us at firstname.lastname@example.org.
- Configure a local development environment for an app using a VM
- Implement a data processor that can accept text, extract named entities, and return the results
- Implement a multi-process aware message queue and use Pytest
- Create data models to use as messages to pass on the message queue
- Create the backend for the application
- Create a web endpoint that we can use to enqueue our post models
- Implement a method for running the frontend and backend together
- Run the application using a dataset to see how the system performs under actual server load
This course is intended for software developers or anyone who already has some experience of building simple apps in Python and wants to move on to something more complex.
To get the most out of this course, you should be familiar with Python, ideally Python 3, and have some knowledge Linux and how to use Git.
Hello, and welcome to Sprint 5! In this sprint, we're going to be putting all of our disparate components together to create the backend for our application. I've created a file here called backend.py, and I've already pasted in our template. As part of this backend, we're going to implement two processes. One will be named to Worker, and the other will be named Saver. The goal here was to separate out the CPU bound work from the IO-bound work. Extracting named entities is a CPU problem and saving the data to the database is an IO problem. We're going to have two message queues. The first is the input queue. The frontend will put Posts onto the queue, and the Worker processes will get the Posts and process them. The Workers are going to cache the processed data. And when we flush the cache, it's going to put the result onto a second output-queue. The Saver processes are responsible for getting data from that output queue and saving it to cloud Firestore. Looking at the imports, notice we have all of the components that we've been creating so far. Let's start in by implementing the Worker process. The worker class is going to be a multiprocessing.Process, which is going to allow us to run our code in separate operating system processes. These processes are started by calling the start method, which will, in turn, call the run method. So we can override the run method to implement our own functionality. Recall that workers get messages from the input queue and write messages to the output queue. So our constructor needs to accept the in and output queues, and they're both of type QueueWrapper. For now, let's ignore everything here that is related to caching. So let's set a property here for the input and output queues. We also need to make sure that we call the init method from our superclass. Okay, now I'm going to paste in the code for this run method because it's a bit more involved. We start by registering a shutdown handler that is going to allow us to shut these processes down more gracefully than simply terminating them in the middle of their work. So whenever we send a SIGTERM to these child processes, they're going to run the shutdown method. Next up, we instantiate a data processor. I'm creating this here in an attempt to reduce memory consumption. The backend module is going to be responsible for creating our worker processes. By instantiating the processor in the init method, our backend processes end up loading the data processor into memory unnecessarily. Recall that Spacy loads a machine learning model for the text that's going to process. So by instantiating this here in the run method, it should ensure that only the worker processes are loading Spacy. Next, we need to actually get messages from the input queue and process them. To do that, we're going to use the built-in iter function. Iter is going to call our QueueWrapper's, get method until it receives a specific value called a sentinel that tells it to stop. In our case, the sentinel is actually the text "STOP." Remember that the get method is a blocking call. It'll block until it returns a message or raises an exception. We pass the message to the processor's process method, and we put the results onto the output queue. This loop is going to stay running until it's told to stop. Once it stops and before we leave the run method, we're going to call exit, and we're going to pass in a status of zero to indicate that everything went well. Let's implement our shutdown method, and we'll log that we're shutting down. And now we're going to enqueue a STOP message. Notice that we're not using the put method of our queue wrapper. Instead, we're calling the put method of the queue that we're wrapping. The reason for this is that our queue wrapper does not allow writes after prevent_writes is called. So we're adding this directly to the underlying queue. Now, when the process is sent a SIGTERM, it's going to enqueue a STOP message, and once that message is dequeued, it's going to stop our processing loop, and the process will exit. The Saver process is similar to the Worker. The saver needs to know about the output queue, the database client, and the persistence function. I like to pass in independencies such as these because it makes testing easier because we can just pass in a real version or a fake one. Let's start our init by asserting that the persistence function is callable. In cases like this, I want my code to fail early if we receive something that we don't expect. If the value passed in is not callable, this raises an error, and the code stops before attempting to do any real work. Okay, let's set up our properties. Okay, great. For our shutdown method, we're going to log that we're shutting down, and we're going to put a STOP message onto the queue rapper's underlying queue. Let's copy the logic in our worker's run method. We want this process to have a signal handler to call shutdown on SIGTERM. We'll copy the loop, and we need to modify it just a bit. The messages we get from the queue are going to be passed to our persistence function. The first parameter of that function is going to be the database client. If I open up the persistence file, notice that persist accepts a client, and then some additional arguments. At the moment, this message is going to be a dictionary returned by the processors.process method. We'll be changing that in a bit to use a Tuple that matches the arguments required for the persist method. Using an asterisk here is going to expand that Tuple so that each value is passed to persist as a positional argument. Okay, we now have worker and saver classes. Now we need a way to start them up. This start_processes function accepts a number of processes to start, the Process class, that it should start in our case, it's going to be a Worker or a Saver. And it takes a list of arguments that is going to be passed into the process. When it's instantiated. I'm just going to paste this logic here. We start by logging that we're starting some number of processes. Then we use the list comprehension here to create the instances of our processes. And we pass in the arguments. Again, the asterisk here is going to expand the list of arguments so that they become positional arguments for the process. So Procs becomes a reference to our instantiated processes. However, we still need to start them by calling the start method. When we call start on a process, that process is going to call the run method for us. Finally, we return the list of processes so that we have a reference to them for when we want to shut them down later. This shutdown function accepts a queue wrapper and a list of processes. The first step in shutting down our application is to call prevent_writes on the queue so that we're no longer writing new messages. Next, we send each of the processes, a SIGTERM, which tells them to call their shutdown method. Then we join the processes which will wait until they exit. In order to call our shutdown code, we're going to use the atexit module. When I'm only using the module in a limited capacity, I like to include it right next to where it's used. I do this because it indicates that if this function goes away, that import can too. This function here accepts a list of queue wrappers and it list containing lists of processes. This will become clear once you see it used, for now, just know that when exiting under normal circumstances, atexit is going to run this function. Okay, we now have the ability to create and start our saver and worker processes. And we can also shut them down. These are the final components needed to implement the main function for our backend code. So let's start assembling. There's going to be a lot of code, so I'm going to paste it in sections, and we'll review it. First up, we need a way to configure the application. We're going to use argparse for that. The naming I chose for these flags is based on the notion of having an input and output queue. So we have input processes and output processes, which correspond to our Workers and Savers. We need to know how many of each to create. For CPU bound work, we really don't want to create more worker processes than CPUs. For IO-bound work, that's not really a concern. Recall that the frontend is going to be putting posts directly onto our queue. It communicates over the network, and it needs a network port. We're defaulting here to port 50_000, though we can override it. We have a flag that indicates if we want to save to the database or not, and this makes local development and testing much easier. We can set a cache size for our worker processes. We haven't implemented that. So we're going to circle back on that later. Notice these no qa comments. These prevent the linter from automatically wrapping these lines based on the pep-8 line size. I use this whenever the now arbitrary 80 character length doesn't add value. I don't like to follow, even well-meaning, guidelines that don't add value. And for these lines, wrapping them for the sake of wrapping them doesn't feel all that valuable. We import argparse, we create a parser, and we add our arguments. Calling parse_args is going to parse the values. We get our arguments, and we set them as variables by specifying the types we can ensure that we're actually getting the types we expect. All of these are going to be integers, except for the boolean here that determines if we persist to the database or not. Using the action of store_true is going to set this to true by default. So anytime the flag is provided, it's going to make it false. If we don't actually want to persist to the database, we can use the persist_no_op function that accepts whatever arguments we pass into it, though it does nothing. Otherwise, we call get_client to get a new firestore client, and we use the persist function. Next, we need to create our input and output queue rappers. We also need to make sure the input queue is accessible to the frontend. For that, we're going to use the Queue Manager. First, we registered the queue that we want to share with the register function that we created in a previous sprint. Now that the queue manager has registered the queue that it wants to share, we need to create and start a manager by calling our create_queue_ manager function and specifying the network port. And then we also need to make sure that we're starting it up by calling the start method. This will allow the manager to accept connections. With the input and output queue is created, we can create the processes next. We do that by calling our start_processes method. We specify the number of each that we want to create the process class and its arguments. The worker requires the input and output queues as well as cache size. The saver requires the output queue, a database client, and the "persist" function. Using the asterisk here will expand this persistent tuple so that it uses positional arguments. Next, we need to register our shutdown handlers and pass in two lists. The first list contains the queues, and the second list contains our processes. If I split the screen, we can get a better understanding of what we're doing here. This zip function here is going to zip these lists together so that the elements with the same index will be paired together. So, the input queue is going to be paired with the input processes; and the output queue will be paired with the output processes. So this shutdown_gracefully function is going to pair up the elements in the list and then pass those pairs to our shutdown function. The final bit of code here is our ShutdownWatcher. This is going to ensure that our codebase stays running until it's signaled to stop. All right, we still more work to do in this sprint. However, we are running a bit long here. So this is a good stopping point. Let's stop here. And we're going to resume sprint 5 in the next lesson. So whenever you're ready to fire this up and actually see this code in action, I will see you in the next lesson.
Ben Lambert is a software engineer and was previously the lead author for DevOps and Microsoft Azure training content at Cloud Academy. His courses and learning paths covered Cloud Ecosystem technologies such as DC/OS, configuration management tools, and containers. As a software engineer, Ben’s experience includes building highly available web and mobile apps. When he’s not building software, he’s hiking, camping, or creating video games.