Processing IoT Hub data streams with Azure Stream Analytics
Start course

Once your data is in IoT Hub, you need to be able to access it; ideally with different processors. Using different processors allows you to break out the responsibilities of each.

Azure provides a few options for processing events and sending feedback to devices. In this course we'll introduce you to Stream Analytics, Azure Functions and raw processing with C#.

If you want to follow along, or just try this out for yourself, you can find the code here.

Processing Azure IoT Hub Events and Data: What You'll Learn

Lecture What you'll learn
Course Intro What to expect from this course
Understanding Streaming Data Processing for IoT Hub  
Writing an Event Processor for IoT Hub with C#  
Handling Device-To-Cloud data with Azure Functions  
Processing IoT Hub data streams with Azure Stream Analytics  
Sending feedback to devices with Azure Functions Reviewing the code for the device
Final Thoughts Wrapping up the course



Welcome back!

In this lesson I'll show you how to process IoT Hub data streams with Azure Stream Analytics.

In case you’re not familiar with it, Stream Analytics is a cost effective event processing engine. It is built on the same notion of an "event stream" as IoT Hub.

It can be used to transform, enrich and correlate events that come from different sources, including IoT Hub. And instead of using C#, you can use a SQL like language for querying data.

If you’re already familiar with SQL, then you’ll find the learning curve won’t be as steep. Because the syntax is close to being SQL, with the addition of some elements for time slicing.

What is different with Stream Analytics queries is that traditional SQL databases assume that the data does not change while you're querying it. That’s not true with Stream Analytics, because we’re running on a stream of data..a potential unlimited set of events.

So this introduces some new concepts. Specifically you need to think about the Arrival Time; which is when the event arrives in Stream Analytics.
With time in mind, you can define subsets of the stream that you can process with windowing functions so you can group events into different types of windows.

Let’s check out an example in the portal.

So, I’ll click on "Stream Analytics Jobs", and click "Add."
Let’s name this "processiot." I’ll use the East US region, and click create.

Okay so the job is created. In the center of the screen we have inputs, query, and outputs.
Let's begin with a simple job for logging that will save telemetry inside a DocumentDb database.

So let's start with inputs. You can there aren’t any inputs yet, though, we could use multiple.

I’ll click on "Add" and type "events" as the name.

The source type can be "reference data" or "data stream".

We’ll use "data stream", and you can chose between "event hub", "blob storage", and "IoT hub". I’ll select IoT hub. We have the default "messaging" endpoint, which is fine. For the shared access policy I’ll use the processor policy, and I’ll also use the processor consumer group.

For the event serialization format, you can choose between Avro which is for "HD Insight", CSV, or JSON. We’ll use JSON. And UTF-8 is fine for encoding.

Okay. Let’s create this.

Now we need an output. So let’s click on "Add" and name the alias "logging"...

The sink is going to use "Document DB." We’ll use the current subscription for the DB instance, we’ll use the “demo” database, and the “events” collection. And let’s create this.

With that done, we’re ready to create a simple query.

Notice here we have a query template for a basic select.

We want to select FROM the events input, which is our only input. And we want to select INTO the "logging" output, which is our DocumentDB collection.


We could use a "star" for the SELECT, but it's a better practice to specify the properties.

So let’s specify DeviceId, and “index”, which we need to wrap in square brackets because the name overlaps with a reserved keyword. Then there’s [Data], and and [DateTime]...Great!

The gist of this query is that it will query all of the events and save them to DocumentDB. So let’s save this.

Okay, let’s close out of here and click Start. When you click on start you can choose when the job should start.

I’ll select “Now” and click start. And the job is starting.

Now that the process is started let’s switch to the command line to run the simulated device.

We just need a few events, so I’ll let this run for a moment...and let’s stop it. So there are 5 events that should have been sent.

If I go back into the portal and over to DocumentDB, and open up the query explorer...and I run this… you can see that we have our five documents.

So, this is similar to the other lessons where we’re just logging the data. However, just like with the other options, you could also add properties dynamically based on some condition.

Let’s try out something more interesting. Let’s look for patterns in the data and if we find a particular pattern let’s send feedback to the device.

Let’s say that if the average data value is greater than 24, and that’s based on at least 6 events, then we’ll send a command to the device.

First, let’s create a new output for our feedback endpoint. So in outputs, I’ll click Add...and I’ll name this “feedback.”

It’s going to be an Event Hub sink, and the defaults are fine except I’ll change the policy to the processor policy. Okay, let’s create this. Now I’m going to fast forward to once the code is created...

Okay, I have the completed code here. If you’re familiar with common table expressions in SQL then this will look familiar; we’re using the WITH keyword to basically create some fake tables that we can query.

The top option is just the raw data from the events, and it includes a timestamp. Then there’s the windowing query which breaks the results up into overlapping time slices of 10 seconds.

The next query uses the previous query’s data to either return “SWITCH-ON” in the command, or leave it empty, depending on if our where clause is met.

Then at the bottom there are two additional queries, which is kind of cool; because since we’re selecting the data into our output, we can run multiple queries here.

We’re running one to save the data to DocumentDB, and the other to use the feedback endpoint of IoT Hub to send a command.

Let’s save this, and head back to the overview tab to start it up.

Now let’s switch to the command line to start up a feedback listener and a device simulator.

Let’s fire up the feedback listener first...and it’ll start listening for any the moment there aren’t any because we don’t have any events.

So let’s start the device simulator… and in just a moment we’ll have some events.

Okay, great, now we’re starting to receive events from stream analytics.

The processor has started processing the events the command are empty. And about every five seconds, we have a command sent.

Moving the window here, you can see that we have an average of eight events inside the window that impacts the average calculation.

To see it trigger a command I’ll increase the data value for a moment...and then decrease.

Okay, in a moment we should get a command to SWITCH-ON...and it shows multiple times because of the overlapping time windows. And now as the average lowers, the SWITCH-ON command goes away.

So Stream Analytics is great way find patterns in data using a SQL-like syntax. It also integrates nicely with other Azure services.

So far in the course I’ve show you how to send feedback to devices by sending events to the “feedback” endpoint. However you may have noticed a problem with that, specifically it doesn’t target a device. In the next lesson I’ll show to handle feedback to specific devices with Azure Functions.

About the Author

Marco Parenzan is a Research Lead for Microsoft Azure in Cloud Academy. He has been awarded three times as a Microsoft MVP on Microsoft Azure. He is a speaker in major community events in Italy about Azure and .NET development and he is a community lead for 1nn0va, an official Microsoft community in Pordenone, Italy. He has written a book on Azure in 2016. He loves IoT and retrogaming.