The course is part of this learning path
One of the best ways to learn new programming languages and concepts is to build something. Learning the syntax is always just the first step. After learning the syntax the question that arises tends to be: what should I build? Finding a project to build can be challenging if you don’t already have some problems in mind to solve.
Throughout this course, we’re going to learn more about Python 3 by building a data ingestion process. We’re going to go from setting up a development VM through to deploying the app to a 16 core, cloud VM where we can test. The application is going to allow us to submit articles to a front-end HTTP endpoint where they’ll be enqueued onto a multiprocessing queue. On the back-end, a set of worker processes will dequeue the articles, extract named entities from the article, and enqueue the results to be saved. A set of saver processes will dequeue the results and save the records to Cloud Firestore. Both the front and back-ends will be run together using supervisord. And we’ll use setuptools to create a software distribution used to deploy the app.
This course is broken up into sprints to give you a real-world development experience, and guide you through each step of building an application with Python.
The source code for the course is available on GitHub.
If you have any feedback relating to this course, feel free to contact us at email@example.com.
- Configure a local development environment for an app using a VM
- Implement a data processor that can accept text, extract named entities, and return the results
- Implement a multi-process aware message queue and use Pytest
- Create data models to use as messages to pass on the message queue
- Create the backend for the application
- Create a web endpoint that we can use to enqueue our post models
- Implement a method for running the frontend and backend together
- Run the application using a dataset to see how the system performs under actual server load
This course is intended for software developers or anyone who already has some experience in building simple apps in Python and wants to move on to something more complex.
To get the most out of this course, you should be familiar with Python, ideally Python 3, and have some knowledge of Linux and how to use Git.
Hello, and welcome to Sprint Three. In this sprint, we're going to implement a multi-process aware message queue and we'll use Pytest to verify that it works correctly. Let's start right in by creating a file named messageq.py. And I'll free up some screen space here and paste in our template. Okay, great. Let's pause here and talk about message passing and why we need it. CPUs these days are incredibly fast. However, they can only really do one thing at a time. And the OS hides that a bit by scheduling which process can use the CPU at any given moment. Skipping over all of that complexity, the end result is that the OS gives each of the running processes some amount of CPU time. Tasks that require a lot of CPU are referred to as CPU bound because the CPU is going to be a bottleneck. Tasks that require a lot of I/O effort are referred to as I/O bound because the input and/or output are the bottlenecks. The Python interpreter has a component called the global interpreter lock, often referred to as the GIL. This is what the Python documentation has to say about the GIL. The GIL is the mechanism used by the CPython interpreter to assure that only one thread executes Python bytecode at a time. It simplifies the CPython implementation by making the object model implicitly safe against concurrent access. Locking the entire interpreter makes it easier for the interpreter to be multi-threaded at the expense of much of the parallelism afforded by multi-processor machines. What this means for us is that CPU bound problems are effectively single-threaded, which means in order to utilize multiple CPUs in our app, we need to invest a bit more effort. Here's what that looks like. When solving an I/O bound problem in Python, we can use threads which are just OS threads. We can also use the async module, which is based on an event loop. When solving CPU bound problems in Python, we use the multiprocessing module. This enables us to have our CPU bound code run on multiple CPUs. Each process has its own interpreter, and therefore it has its own GIL. I don't recall where I heard it, though, I recall once hearing something to the effect of when you solve a problem with multiprocessing, you now have two problems. In my experience, there is a bit of truth in that. With a single process application, we pass data around with relative ease. Get some data from somewhere and then pass it to some function for processing. When we have multiple processes, shared memory is possible, though it's not recommended. The preferred solution is to use message passing. Python provides different mechanisms for passing messages. The one that we're going to use is the multiprocessing Queue. This is a process aware mirror of the queue, which resides in the Queue module. Being process aware means it allows us to send messages between processes. The value here is we can create a single input queue and have different processes get data from that queue. Now, one of my considerations when I was designing this app was that I wanted to be able to shut down gracefully. What that means in this context is that when stopping the app, I want to try to make sure that we've processed everything that's been enqueued before the application shuts down. The way I chose to solve that was to include a mechanism that can prevent writes to the queue. Then the processes that are getting data from the queue can continue on until the queue is completely drained. Our definition for drained here is going to be this, drained means the queue is no longer writeable and it is empty. Having such specific rules, we want to be able to test and verify that things work as intended. So before we implement this wrapper, let's add our unit tests. Let's create a file called messageq_test.py. And I'm going to paste in the completed tests. Notice at the top, we import Pytest. Pytest is not part of the standard library, so we need to install it. And I chose Pytest because it's minimal, but it's very effective. It uses the builtin assert keyword to define tests, which makes for one fewer thing that we have to learn in the development process. It reduces that cognitive load. Let's run it. And notice it's collecting a lot of items here. I'm actually going to cancel this. Pytest can discover our tests based on naming conventions. Because we're in the home directory, where our virtual environment resides, it's going through and identifying tests from our virtual environment. Let's change directories to where our code resides, which in this case is the /Vagrant directory. Vagrant automatically mounts the directory containing the Vagrantfile at /Vagrant. If we run this again, it should find our test. We haven't implemented the wrapper at all. So it's going to fail. This is not the error I expected. So let's debug this. Ah, I see what happened. I intended for these Python files to reside under the ingest directory. So let's just move those. And we'll test this one more time. And notice we have four failed tests. This is expected. We'll review these tests in a bit. For now, just focused on the fact that we have a mechanism that will tell us when we've successfully implemented the functionality. Let's start by setting a "q" property. It's going to be set to either a queue that is passed into the constructor, or a multiprocessing queue that we create. This is just a nice shorthand way to say, if the value passed in exists, use it, otherwise create it. In order to drain the queue, we need a mechanism to signal that the queue is no longer accepting writes. If we were to try and use a Boolean flag here, it wouldn't work the way you might expect. The queue wrapper is going to be created by a parent process. Then it's going to be pickled and then passed to our worker processes. Pickling is a Python specific object serialization format, which makes it easy to save the state of an object. So it's pickled and then passed to our worker processes, which means the property specified in the class are going to be copies of the version that was created by the parent. If we create an instance of this class and pass it to two processes, we have three instances of QueueWrapper. Since the Queue is multi-process aware, those copies are going to use a proxy that Python creates for us automatically to interact with the Queue. For properties that are not multi-process aware, those values don't sync up automatically. Here's a more complete example. Imagine we add a Boolean property to QueueWrapper and we set the value to True. When we create an instance of QueueWrapper and we pass it off to our worker processes, each process is handed a pickled copy of QueueWrapper. We now have our three copies of QueueWrapper, the original resides in the parent process, and we have one for each of the two worker processes. Again, we can use the Queue from each because the multiprocessing Queue is designed to be used by different processes. The Boolean properties being primitive typed are all independent. Changing one is not going to impact the others. So we need a mechanism that is multi-process aware to determine if the queue is writeable. The way I chose to solve this problem is to use a multiprocessing Event. This is a multiprocessing version of the threading.Event. The idea is that an event is basically a Boolean value and that it's either set or unset. When an event is set, any process that is listening for that event is notified. This is similar to our multiprocessing Queue in that because it was designed to be multi-process aware, we can create it in the parent process and to pass it to our worker processes. And when the event is set, every process listening knows about it. And I forgot to add the name property up here. I like to add these in the order that they appear in the method signature. By the way, we can safely ignore this connect method. This wasn't used in the end. So now we have our multiprocessing Queue and Event from the underlying queue. If the queue is drained, meaning it's not writeable and it's empty, we're just going to return STOP. This tells whatever code is calling get that this queue is no longer usable. By default, the get method for the multiprocessing Queue is a blocking call. So it's going to wait until it takes something off of the queue because it's blocking until there's something on the queue, it could get interrupted. An example of an interruption might be if the queue was closed for some reason. So in that case, let's also return STOP. For our put method, we only want to put messages onto the queue if the queue is writable. I like to use a debug level logger for cases like this. I don't wanna record every put in the production logs. However, in development, it can be nice to get that additional info by setting the log level to debug. This put_many method is just a convenience method that passes lists into put. The way that we'll prevent writes is by calling set on our prevent_writes event. Let's use a format string to log this with the queue name, and finally we'll call the set method. Also, I just noticed I wanted the prevent_writes to be internal only. So I just want to prepend an underscore to this property's name. Awesome. Now, when prevent_writes is called, it sets our event, and any processes which are using this QueueWrapper will know that this is no longer writeable. The logic for is_writeable is simply to check that the event is not set. Is drained is going to be true if the queue is not writeable and it is empty. And I just noticed, I didn't add type hints here for the return types. These property names are pretty descriptive, though, I wanna use type hints here just to show that they're Boolean values. Empty will return the results of the underlying queue's empty method. So this QueueWrapper class is going to be our message queue. It's a thin wrapper over a multiprocessing Queue that uses a multiprocessing Event to indicate if the queue is writeable. Next, we need to implement a multiprocessing Manager by using the BaseManager. Multiprocessing Managers can allow processes to call code inside of our process. The way we're going to use this is that we're going to allow the front end to put a message directly onto the queue. QueueManager implements a BaseManager, and we don't need to add or override any of the methods. The way a manager exposes functionality to remote processes is that we register functions that we want to share. This register_manager function accepts a name and an optional QueueWrapper. If the QueueWrapper exists, we're going to call the manager's register method, passing in the name and a lambda function that returns the QueueWrapper. This is going to allow any process that has access to call any public methods on that queue wrapper, as well as interact with its properties. If the queue isn't passed in, we're simply going to register the given name. The reason for this switch is that the backend is going to create a QueueWrapper and register it so that it's accessible to the front end. So the backend needs to provide a queue. The front end is going to connect to the QueueWrapper that's created by the back end. So the front end doesn't need to pass in a queue wrapper since it's the one consuming it. However, both the front and back ends need to register the name, so this allows us to just use the same function for both. Our create_queue_manager function requires a port number to bind the manager to. In my testing with Ubuntu, I found that if I use local host for the address that the interactions are extremely slow, it's like one per second. So instead I'm going to use 127.0.0.1 as an added layer of security. By binding to 127.0.0.1, we can limit connections to only processes which are running on the same host. And by using the authkey, we can add another layer of protection that requires connections to provide this key. So these functions here are going to allow us to share a QueueWrapper with our front end. With all of this done, let's run the tests and see if everything is working as intended. Running Pytest finds are four tests, and they all pass. Let's use the verbose flag to see the test names. I find that a little bit more helpful. This just makes it a little clearer to see exactly what's wrong. Now, let's go see what the tests are actually doing. I'm using this queue_wrapper fixture to create a new QueueWrapper for each function. Notice that the test functions accept a queue_wrapper. Pytest passes this into the test automatically by calling the fixture with the matching name. So each of these tests starts with a new QueueWrapper. These tests are supposed to prove that the logic for our queue wrapper works the way we expect. This test starts by assuming that a new queue wrapper has zero items and is therefore empty. And after putting something on the queue, it should no longer be empty. This test puts two messages onto the queue, it calls get twice getting those messages off of the queue. It asserts that the values match, and then it asserts that the queue should be empty. To test that get returns stop if it encounters an exception. I'm using MagicMock to make it so that when get is called, it's going to raise an exception. This makes it easy to test out that error handling works the way we expect. Testing the draining logic starts with the assertion that the Queue is writeable and it's empty. Then it calls prevent writes and asserts that the queue is no longer writeable and that it is drained. By the way, this teardown code here at the top, this is only to suppress an error that's related to multiprocessing and Pytest. So we can safely ignore this. So we've created a drainable message queue called QueueWrapper. And we also created a Manager that enables us to share that QueueWrapper with remote callers that are running on the same host. And finally, we use some pre-written unit tests to validate that our wrapper works. With this done, we've created one of the key components that we need for this application. That's gonna wrap up this sprint. In the next sprint, we're going to create our data models, as well as a shutdown watcher, and we'll stub out our persistence code. So whenever you're ready to dive back in, I will see you in the next sprint.
Ben Lambert is a software engineer and was previously the lead author for DevOps and Microsoft Azure training content at Cloud Academy. His courses and learning paths covered Cloud Ecosystem technologies such as DC/OS, configuration management tools, and containers. As a software engineer, Ben’s experience includes building highly available web and mobile apps. When he’s not building software, he’s hiking, camping, or creating video games.