The course is part of this learning path
One of the best ways to learn new programming languages and concepts is to build something. Learning the syntax is always just the first step. After learning the syntax the question that arises tends to be: what should I build? Finding a project to build can be challenging if you don’t already have some problems in mind to solve.
Throughout this course, we’re going to learn more about Python 3 by building a data ingestion process. We’re going to go from setting up a development VM through to deploying the app to a 16 core, cloud VM where we can test. The application is going to allow us to submit articles to a front-end HTTP endpoint where they’ll be enqueued onto a multiprocessing queue. On the back-end, a set of worker processes will dequeue the articles, extract named entities from the article, and enqueue the results to be saved. A set of saver processes will dequeue the results and save the records to Cloud Firestore. Both the front and back-ends will be run together using supervisord. And we’ll use setuptools to create a software distribution used to deploy the app.
This course is broken up into sprints to give you a real-world development experience, and guide you through each step of building an application with Python.
The source code for the course is available on GitHub.
If you have any feedback relating to this course, feel free to contact us at email@example.com.
- Configure a local development environment for an app using a VM
- Implement a data processor that can accept text, extract named entities, and return the results
- Implement a multi-process aware message queue and use Pytest
- Create data models to use as messages to pass on the message queue
- Create the backend for the application
- Create a web endpoint that we can use to enqueue our post models
- Implement a method for running the frontend and backend together
- Run the application using a dataset to see how the system performs under actual server load
This course is intended for software developers or anyone who already has some experience in building simple apps in Python and wants to move on to something more complex.
To get the most out of this course, you should be familiar with Python, ideally Python 3, and have some knowledge of Linux and how to use Git.
Hello, and welcome to sprint five part two. We now have a rough implementation of our backend application. Let's fire it up and kick the tires. The way that we're going to run this code in production is by using setup tools to create a distribution, which will also include an entry point for calling our main function. We're going to create a file called setup.PI inside of our root application directory. And I'll paste in this as a starting point. This calls the setup function and passes in all of these keyword arguments. Noteworthy arguments are this packages section here, which indicates the packages included in our distribution. There's entry points, which allows us to create an executable entry point that runs the specified function. In this case, we'll be able to call ingestion D on the command line and have it run our main method. We can specify any dependencies using install requires, and we can specify additional dependencies using extras requires. We can run our setup by calling PIP install using this square brace syntax here, we can specify the extras as well. After this completes, we're going to be able to run our backend code by using this entry point. And we get this unexpected error. Okay, well, I didn't expect this. It does mean that the setup has worked and we now have our entry point, let's debug this. The problem is with the models and it seems that we can't have a class use itself as a type hint, at least not in the way I've defined it. So we're just going to remove this, type hints should solve problems for us, not make them. So let's get rid of it. Running this again, we get a new error. This is great because things are actually working. However, we're trying to create a database client and we don't have a service account. So we can disable persistence using our no persist flag. And that way it won't try and create that client. So we run it again and we get another error. I expected the last error, but I did not expect this one. So let's debug this. It appears that we have a problem when we're trying to start the saver process, let's interrupt this process so we can stop it. The error happened in the start processes function. So let's add a break point and we're going to use the builtin breakpoint method for that. We'll run the code. And we know the first break point is the worker process. We know because we start the input processes first and the output second, each one only has one process. So it's pretty easy to reason here, so we can continue to the next break point by typing C. Typing L is going to list off the code around the break point. And we can see that it's about to call start. So our saver process was instantiated just fine. However, when we call start, it seems to be raising an exception let's type interact so that we can jump into a interactive Python shell. And let's just see what the representation of P is by using its repr. And we get an error. I see behind the scenes P is attempting to print the representation by calling the Dunder repr method. It's complaining that it doesn't have a underscore closed attribute. Something's off with our saver. Let's check it out, specifically let's compare it to our worker, which is being started. I see, I forgot to call Dunder in it, so let's fix that. Okay, cool, so now let's run this again and we still have the break point set. I'll just click C to continue and again, awesome. Underwhelming, yes. However, this is up and running without error, okay. I want to stop this and let's better understand how this works by actually adding something to our input queue, for now I'm only going to add a string. This will be put onto the input queue and our worker process should get it, process it and put it onto our output queue, okay. Let's add some logging to print out the message before it gets processed so that we can see what that looks like. Notice in the logs, we can see that the text we entered is reflected back. Let's stop this and log from the saver process. And running it again, we can see the dictionary that we're returning from the data processors process method. So what have we done? We've enqueued a message, processed it and then passed the processed results to our fake persistence method. This means that data is actually flowing through the system. However, the front end is going to enqueue posts and not strings. Let's finish implementing our data processor. Here's our current version and here's our updated version. This method here, process message, accepts a post, and it's going to return a process to post. Pedantic models can be initialized using keyword arguments that match the property names. By default, pedantic is going to ignore any keywords that are passed in that don't exist on the model. This is just a generic way to take any common properties from post such as publication and any of the keys in the process dictionary that match, in this case just entities. And then with these double asterisks, we can expand the dictionary into keyword arguments. We need our worker process to use the process message method. That's tough to say, so let's change that. And if we run this code again, it's going to raise an error because we're still enqueuing a string. So let's just see it happen. And we get an error. So let's import our post model and we're going to wrap this string in a post. The string here is going to be our content and for the publication, we'll just say me. Now, running this, we can see that the error is gone, which means our worker process is now expecting messages from the input queue to be of type post. Now, when we pass the post message to our data processor via the process message, we get a processed post object back. When I first started testing out this application, I had the worker process, the post, and then it added a process to post to the output queue. Then the saver process saved all of the entities. Now it worked well enough. However, each post could have dozens of entities. Often they were the same core set of entities. So I was making a lot of network calls just to keep updating the same records over and over. Fire store charges for reads and writes. Now, I wasn't going to break the bank by leaving it this way. However, I did find it wasteful for this use case. My solution for this was to have the worker aggregate and cache, all of our processed posts for each publication. So let's check out that code now. I'm going to replace this existing worker with a modified version. Okay, so in our constructor, we're setting up the properties for cache size and cache count. The size here refers to the number of messages that we process for a given worker before we flush the cache. This call here to reset cache is responsible for initializing our cache. Our cache is just a default dictionary with its default type set to process to post. The cache method accepts a processed post, and then it's going to add it into its cache. Now this highlights the value of the default dictionary. If this was a normal dictionary, we'd have to first check to see if this key has a value before we attempt to add to it. Our dictionary is key value here is just a normalized publication name and the value is going to be a process to post. We're using the plus equals operator here to add them together. To do that, we actually have to modify our model, which we're going to do shortly. Let's keep going through this. After caching the data, we increment the count by one and we return the cache size. Inside the processing loop, notice we're caching and checking to see if we need to flush the cache. And right before we leave our process, we call flush one more time just to catch any remaining data that hasn't been flushed yet. Okay, let's finish the process to post implementation that we started in a previous sprint. So here's our before and here's our after. Notice this Dunder add method, it increments the article count by one. It overwrites the publication and it emerges the entities. The reason for overriding the publication here is so that when the default dictionary creates a new processed post value, we can use this to set that initial value. When our worker flushes its cache, the processed post transform for database method is called. The database structure is going to be roughly this. There's a collection called publications, which will have a document for each publication. Each publication document has a collection called Ents, which is short for entities, and it stores a document for each entity. The ID of the entity is hashed using Python's Hash Method. And the document that we store contains properties for word and count. Also each publications document stores an article count. So this will allow us to track how many articles we've processed for a given publication, as well as either entities. This internal method here is going to yield some tuples and the public version is going to return a list of those tuples. These tuples are handed directly to the persist function as positional arguments. The flush cache method takes the values from the cache. It calls transformed for database and incused the results with the Q rappers put many method. Let's see the implemented persist method alongside this transform so that this will make a bit better sense. This is how persist currently looks. And if I paste in the new version, we can see this final implementation. Notice the return structure of this transform. The tuple value is aligned with the persist functions, arguments starting after client. So we pass this function to database client, we pass it a publication name collection, name document ID, and a dictionary. If the collection or document ID are none, then this is an article count for a publication. So we save that by calling increment publication, which we'll use fire store's increment class to update the count. If it's an entity, we do roughly the same thing. We increment the count and save by calling set and passing in merge to ensure that we're updating rather than overriding. Okay, let's close out of some of this. And I just noticed we have an indentation problem here. So I'm going to fix that. Okay, let's run the app again, to see the caching behavior. Before doing that, I want to change the loggers level to debug. And now let's add two posts to our input queue. And by running this with a cache size of two, we can see that the logs tell us that the queue has been flushed, which only happens if we've hit our cache size. All right, this has been a rather feature intense sprint. The backend is the combination of all of these different components that we've created so far. With this backend up and running, we now have a mechanism for creating our input and output queues. We can also create the corresponding input and output processes. Also by using setup tools, we were able to create an entry point that allows us to run the back end's main method on the command line. In our next sprint, we're going to focus on creating the frontend so that we can enqueue data over HTTP. So whenever you're ready to keep building, I will see you in the next sprint.
Ben Lambert is a software engineer and was previously the lead author for DevOps and Microsoft Azure training content at Cloud Academy. His courses and learning paths covered Cloud Ecosystem technologies such as DC/OS, configuration management tools, and containers. As a software engineer, Ben’s experience includes building highly available web and mobile apps. When he’s not building software, he’s hiking, camping, or creating video games.