One of the best ways to learn new programming languages and concepts is to build something. Learning the syntax is always just the first step. After learning the syntax the question that arises tends to be: what should I build? Finding a project to build can be challenging if you don’t already have some problems in mind to solve.
Throughout this course, we’re going to learn more about Python 3 by building a data ingestion process. We’re going to go from setting up a development VM through to deploying the app to a 16 core, cloud VM where we can test. The application is going to allow us to submit articles to a front-end HTTP endpoint where they’ll be enqueued onto a multiprocessing queue. On the back-end, a set of worker processes will dequeue the articles, extract named entities from the article, and enqueue the results to be saved. A set of saver processes will dequeue the results and save the records to Cloud Firestore. Both the front and back-ends will be run together using supervisord. And we’ll use setuptools to create a software distribution used to deploy the app.
This course is broken up into sprints to give you a real-world development experience, and guide you through each step of building an application with Python.
The source code for the course is available on GitHub.
If you have any feedback relating to this course, feel free to contact us at email@example.com.
- Configure a local development environment for an app using a VM
- Implement a data processor that can accept text, extract named entities, and return the results
- Implement a multi-process aware message queue and use Pytest
- Create data models to use as messages to pass on the message queue
- Create the backend for the application
- Create a web endpoint that we can use to enqueue our post models
- Implement a method for running the frontend and backend together
- Run the application using a dataset to see how the system performs under actual server load
This course is intended for software developers or anyone who already has some experience of building simple apps in Python and wants to move on to something more complex.
To get the most out of this course, you should be familiar with Python, ideally Python 3, and have some knowledge Linux and how to use Git.
Hello, and welcome to Sprint Six. Our backend process exposes the input queue to processes running on the same host that also know the auth key. Having the front end separated out, allows us to implement them however best works for the given use case. Our goal for this sprint is to create a web endpoint that we can use to enqueue our post models. And we'll protect it by using an API key that is sent in the request header. Let's start by creating a file called frontend.py. And as always I'll paste in our template. Our front end is going to use a library called FastAPI. So let's install that with pip. FastAPI is an async web framework with really good documentation and really good developer tooling. The purpose of our front end here is to accept HTTP requests from authorized clients and ingest articles that are posted to the /post/enqueue end point. Recall that we used the queue manager to make our input queue accessible over the network. And in the backend we've bound the queue manager to 127.0.0.1 on port 50_000. To consume it in the front end, we need to connect to it. And that's what this connector class here is going to do. Now, this is an easy thing to do conceptually, we create a manager, connect to it and interact with the queue that we've registered in the backend. When I was building out this app, I called connect for every HTTP request. Now it worked well enough with no server load as you'd expect. However, once I started sending a lot of requests, I quickly ended up using all of our network ports. So I looked for a simple way to check, to see if the manager was connected. And I didn't find anything that jumped out. Now, like all code, I was on a deadline for this app. So this connector is how I solve the problem of being able to reuse the same connection inside of a given front end process. We're going to build that out shortly. For now, let's keep reading through the code. This check API header function is going to be used to authenticate the request. And our last function here named create post is responsible for all of the heavy lifting. This decorator tells fast API to listen for any post requests on this path. And by default it returns a 201 status if everything went well. Fast API uses Pydantic for data validation. So by using type hints in the function signature, fast API knows how to convert HTTP requests into these data types. Recall that our post model is a Pydantic based model. So fast API knows how to use it right out of the box. So here we're specifying that the request body is going to contain a post model, which consists of a content and publication property. Also, we define, we want a queue rapper named queue and we create a default for it. Notice this depends class. This allows us to specify a callable that fast API will create for us when it receives a request to the endpoint For the queue argument, it's wrapping this global iqueue reference, which is an instance of the connector. This authenticated Boolean argument depends on the check auth header function. Let's start by implementing it. Fast API is going to call check auth header. It's going to see that the keyword argument named API key header is a security object, which is similar to depends. It references this API key header object here. And this is part of fast API, which allows us to extract a value named access token from the header, by setting the auto error here to false, we can ensure that this is not gonna raise an error if the value doesn't exist in the header. I'm just gonna paste this code in and we can review it. This is pretty basic. If the key is this hard coded value, we return true. Now I debated on whether to return anything at all. We're raising an error here if the header value, isn't hard coded. So I'm not sure returning anything is all that valuable, but for now, that's how it's going to stay. So if the header doesn't have a token with this value, we raise an HTTP exception and this will stop our requests before our method is even called. Alright, let's implement the connector and we're going to make this class callable. This is just like using Spacey in the data processor. On init, we're going to register the iqueue. Then we need to create a manager and tell it which port to connect on. I'm going to hardcode this here. Now, this is configurable in the backend. Though, I'm using the defaults already in the backend set to 50_000. So I'm just going to hardcode here and let's set this iqueue property to none. The dunder call method is what allows us to make this callable. If the queue is initialized, we just return it. Otherwise we try and initialize it by calling iqueue on the manager. Now, if all went well, we can simply return it. So what could go wrong if the front end attempts to establish a connection to the queue manager before the backend finishes starting up, then it's going to result in an assertion error. If that happens, we attempt to reconnect to the manager. If it's refused, we're just gonna raise an exception and allow an HTTP exception to be returned. Now, if we do establish a connection, we return the results of calling self. And this is just going to run this method again. And now hopefully with this connected, this code here in the try block will actually succeed. For other exceptions we're just going to raise them and we can deal with them as HTTP errors. Honestly, this is rough code. It's not really resilient. For example, it's not going to reconnect if it loses the connection with the backend, this is the type of code where dragons live. It highlights the old adage. If I had more time, I'd write less code. So now let's implement our create post function. We'll attempt to put post onto the queue. And if it fails, we're just gonna raise a 500 error. Fast API uses Python's async functionality. So to run this, we're going to use a library called uvicorn. And while it's installing, I'll add it as a dependency to the setup.py file. Okay, awesome. Let's run this with uvicorn and see how it works. We'll use port 8,000 because vagrant is already set up to pass through on 8,000 and will bind to 0.0.0.0. Next, we need to specify the package, module and app. So we have ingest.frontend:app, and here it is up and running. Now, following this link, there's not much to see here. However, if we browse to /docs, we're presented with a built in tool for interacting with the API. Notice that it already knows about our post model and our end point. Let's test this out. Let's make sure our front end and backend are running and then try it. I have two terminals here on the left. We have the front end and on the right we have the backend. Starting the backend first. Okay. And the front end. Great. So we can use the built in docs page to test our API. First, we need to authenticate. So let's go copy the API key from our code, and we're going to paste it here and authorize. Now clicking try here is going to allow us to edit this default. And when I submit this, we should receive a 201 status. If everything worked. And here it is. In the terminal, we can see that the front end received a request and the backend processed and flushed that message. Let's add some gratuitous logging to the saver and display that message. And let's restart the backend. If we head over to the browser and we try and submit a new post, notice that we get an error. That's because we didn't restart the front end. And the connection to the queue was cut when we restarted the backend. This is what I was mentioning when I was talking about how that connector is really not that resilient. So let's just restart both and we'll try again. And now if we check out the logs, notice all of these lines, these are the results of calling transform for database. Let's stop this again. And we're going to add some logging to our flush cash method and let's restart both processes. Okay. Spacey loaded. So we're ready to test again, and we're going to submit the same value as before. Notice here we have our process to post, calling transform for database is going to turn it into all of these tuples, which get put on the output queue. I like to see the shape of the data as it moves through the application. It helps me to better understand how a system functions. So let's do a little bit more, let's stop these. And I want to run them again with a cache size of five. If we submit just one request and we check out the logs, notice that we don't have anything logged. That's because the cache size is set to five. So if we run it four more times, and for our fifth time let's just change the content a bit, spice things up and we'll submit it. Now, back in the logs, we can see the cache has aggregated five posts together. And when we write the records to the database, we end up with one record for each entity that was extracted and their aggregated values. Alright, with this completed, we now have the raw building blocks to get this into a production like environment for testing. However, we still have some set up to do. We still need a way to keep our processes running. We also need a mechanism that allows the front end and backend to be started as a complete system. And that's what we're gonna tackle in the next sprint. So whenever you're ready to keep going, I will see you in the next sprint.
Ben Lambert is a software engineer and was previously the lead author for DevOps and Microsoft Azure training content at Cloud Academy. His courses and learning paths covered Cloud Ecosystem technologies such as DC/OS, configuration management tools, and containers. As a software engineer, Ben’s experience includes building highly available web and mobile apps. When he’s not building software, he’s hiking, camping, or creating video games.