In this course, we’ll explain the concept of slowly changing dimensions and the different approaches to dealing with them. Then we’ll show you how to create a data flow in Azure Synapse Pipelines to update slowly changing dimensions. Even if you’re not particularly interested in how to handle slowly changing dimensions, the demos in this course will give you a good idea of how to create data flows in either Azure Synapse Pipelines or Azure Data Factory since they both use the same interface.
Learning Objectives
- Understand various methods for handling slowly changing dimensions in databases
- Create a data flow in Azure Synapse Pipelines that updates slowly changing dimensions in a database table
Intended Audience
- Anyone who would like to learn about slowly changing dimensions in databases
- Anyone who needs to know how to create data flows in Azure Synapse Analytics Pipelines and Azure Data Factory
Prerequisites
- Some knowledge of Azure Synapse Analytics, specifically dedicated SQL pools and Synapse Pipelines (take Introduction to Azure Synapse Analytics if you’re not familiar with this Microsoft service)
- Experience with SQL (not mandatory)
Resources
The GitHub repo for this course can be found here.
Before we start, I just want to say again that this course is intended for people who already have some experience with Azure Synapse Analytics, so I’m not going to show you how to set up a Synapse workspace or a dedicated SQL pool. I’ve already created those for this example.
Now, to set up the SQL pool as your data source, go into Synapse Studio and click on “Data”. If your SQL pool doesn’t show up here, then click “Refresh”. Then right-click on the SQL pool and under “New SQL script”, select “Empty script”. Now go to the GitHub repository for this course and open the create_tables script.
This creates a table called “CustomerSource” and fills it with data from this CSV file. CustomerSource is where we’ll put new data we want to add to the production table. It contains columns like CustomerID, FirstName, LastName, and CompanyName.
It also creates a table called “CustomerSink”, which is the destination or production table where we want to put the new data. I called it that because a sink is a place where you write data. You’d probably call the table something else if you were doing this in a real production database, but I named it CustomerSink to make it easier to remember where it fits in this example.
It contains the same columns as the source table, which is what you’d expect, but it also has three extra columns: InsertedDate, ModifiedDate, and HashKey. I’ll explain why it has those when we build the data flow.
Now copy the script, go back to Synapse Studio, and paste the code here. Then run the script. It should only take a few seconds. Okay, it’s done. If we click the arrow and then click Tables, we see that those two tables do exist now.
Next, we’re going to create a data flow. Go to “Develop”, then click the add button and select “Data flow”. Change the name to “UpdateCustomer”. Let’s hide the Properties pane.
I’m going to enable “Data flow debug”, which makes it easy to track down problems in your data flow. This’ll start a Spark cluster that we can use for testing. We need to specify how long it should wait before it shuts down due to inactivity. I’ll set it to 2 hours. It takes a while to start the cluster, so I’ll fast-forward.
Okay, it’s ready. Now click this to tell it where to get the data from. Let’s call it “CustomerSource”, which is the name of the table where we’re getting the data from. The source type is “Dataset”. Then we need to create a new dataset, which is essentially a pointer to the actual data. You can see that there are lots of different places you can get data from. We’re going to get the data from Synapse Analytics, so select that..and “Continue”.
Call the dataset “CustomerSource” as well. For the linked service, select the one that was automatically created by the workspace. For the table name, click the refresh button. First, it needs to know the name of the database where the table resides. I’ll type “sqlpool1” because that’s the name of the dedicated SQL pool I created.
Now the dropdown menu has the two tables we created earlier. Select “CustomerSource”. And click OK. It’s bringing this up again for some reason, but it’s already filled in, so just click OK. All right, we did all of that just to fill in this Dataset field here.
Let’s test it and make sure it works. It says the connection failed. If we click on “More”, it says that no value was provided for DBName. I’ve seen this happen before. Even though we filled in the DBName, it didn’t stick for some reason, so we’ll have to set it again. Click “Open”, then fill in the DBName. Now if we go back to the data flow and test the connection again, it works this time. Great.
You’ll notice that this “Allow schema drift” option is checked. Schema drift is when the dataset’s schema changes after you create the data flow. For example, if you add a column to the source table, then the schema has changed. When you enable “Allow schema drift”, it will read in all of the column names every time this data flow is executed and allow all of the new or changed columns to pass through the data flow.
All right, now because we’re in debug mode, we can go to the “Data preview” tab and see if it pulls up the data. When you click “Refresh”, it comes back with a sample of rows. So it seems to be working.
Okay, now we need to click “Add Source” again so we can tell it about the other table we created, which was called “CustomerSink”. We’ll go through the same process as we did for the first table. Call it “CustomerSink”, and click new for the Dataset. Select “Synapse Analytics” again. Call the dataset “CustomerSink” as well. Select the same linked service as before. Click refresh. The database name is already filled in, so click OK. Now in the table name dropdown, we can select “CustomerSink”. Click OK. Click OK. And let’s test this connection. Same thing happened. And it’s working.
This time the table’s empty, but that’s actually what we expected because we didn’t put any data in this table when we created it. We’re going to copy data into this table from CustomerSource.
One thing to notice, though, is that it has 13 columns. The source table only has 10 because it’s missing the InsertedDate, ModifiedDate, and HashKey columns. So this looks correct.
Now that we’ve added the two tables, we can add the transformations we need. Click the plus button next to the first table. This gives us lots of choices for transformations. We need to create a new column, so we should select “Derived Column”. Call the output stream “CreateHashKey”. The incoming stream is already set to “CustomerSource”, so we don’t need to change that. For the name of the column we’re creating, use “HashKey”. Then in the Expression field, copy and paste this.
Let’s have a look at the Data preview tab and see if it’s working. If we scroll to the right, you can see that it added a HashKey column and filled it with hash values.
All right, now we need to find every row in the source table that doesn’t have an identical row in the destination table. We’ll do that by comparing the hash keys that we created in this step with the hash keys in the destination table. If an identical hash key doesn’t exist in the destination table, then either the entire row is new or at least one of the values in the row has changed. Of course, the first time we run this data flow, every row will be new because the destination table doesn’t have any data in it yet. But after the first run, this step will also find rows that have changed.
Click the plus sign, and this time, select “Exists”. Let’s name it “HashDoesNotExist”. We need to say which two streams of data we’re comparing. The left stream is the data coming from the CreateHashKey step. The right stream should be the destination table, which is called CustomerSink. We need to change the type to “Doesn’t Exist” because we’re looking for hash values that don’t exist in CustomerSink. Down here, we have to specify what we’re comparing. We’re comparing HashKey in the source stream to Hashkey in the destination table. It looks like I need more room to be able to select it. There we go.
Notice that there are now two lines coming into this transformation, one from the previous step and one from CustomerSink because we’re comparing data in these two streams.
The Data preview shows us exactly the same data as before because none of the hash keys exist in the destination table since it’s empty at the moment.
Next, we need to add two columns from the CustomerSink table: InsertedDate and ModifiedDate. These columns aren’t in the source table, so we need to add them before we write new data to the destination table. We’re going to use the Lookup function to do that. It’s similar to an outer join, so it will keep all of the data from the source stream and add all of the columns in the lookup stream.
All right, click the plus sign and select “Lookup”. We’ll call it “LookupCustomerID”. Set the lookup stream to CustomerSink. For the lookup conditions, we need to get it to match the rows based on the CustomerID. That is, rows with the same CustomerID should be joined together. So set both sides to CustomerID.
Let’s go to Data preview. It has duplicate columns, which is what we’re expecting. If we scroll to the right, you’ll see that we now have InsertedDate and ModifiedDate as well.
Now we need to fill those two columns in. Click the plus sign and select “Derived Column”. Let’s call this step “SetDates”. In the column dropdown, select “InsertedDate”. Notice that this is different from when we used Derived Column to create the hash key. In that case, we were creating a new column. In this case, we’re going to modify an existing column. Now paste this expression. It says that if this column is null, then set it to the current timestamp. Otherwise, leave it with the existing timestamp. That’s because this field should be set to the date and time when this customer was first added, so if there was already a timestamp there, it was added earlier, and we shouldn’t change the timestamp.
Then click “Add” and select the ModifiedDate column. Then paste in this expression. It’s much simpler because we always want to set the ModifiedDate field to the last time it was updated, which is the current timestamp. Now when we look in Data preview, those columns have been filled in.
Next, we’re going to write the new data to the destination table. But first, we have to set a policy as to how that should be done. Specifically, we’re going to tell it to allow upserts.
Click the plus sign and select “Alter Row”. Let’s call it “AllowUpserts”. Now, in this dropdown, select “Upsert if”. It’s possible to only allow upserts under certain conditions, which is why you need to enter an expression here. But we’re going to allow upserts in all cases, so we can just type “true()” here.
All right, it’s finally time to write the new data to the destination table. This time, select “Sink”. Let’s just call it “Sink”. For the dataset, select CustomerSink because that’s the destination table. Then click the Settings tab. Check “Allow upsert”, and uncheck the other ones. It might seem odd that we have to check “Allow upsert” here when we just added an entire step before this one to allow upserts, but you actually have to do it in both places, or you’ll get an error.
For the key column, select CustomerID. This is what it uses to match incoming rows to rows in the destination table. Uncheck the “Enable staging” option. This can improve performance when loading data into a dedicated SQL pool, but it would require some extra setup, so let’s disable it.
Now go to the Mapping tab. Remember when we added all those duplicate columns in the Lookup step, and I said we’d filter them out later on. Well, now it’s time to do that. First, disable automapping because we’re going to map the columns manually. Now we need to specify which column in the source stream should be written to which column in the destination table. For example, for CustomerID, there are two possibilities: the one in the source table and the one in the destination table. Of course, it wouldn’t make sense to use the one from the destination table because then we wouldn’t be changing anything. So pick the source CustomerID. Then do the same thing for all of the other columns in the source table.
InsertedDate and ModifiedDate are different because they don’t exist in the source table. We added them in the Lookup step, and then we modified them. It has already selected them for the mapping, so we don’t need to change that.
The HashKey is a bit different because it’s a Derived Column. So you’ll notice that this refers to the step where it was created, which is “CreateHashKey”.
If we hover over the transformation, it says there are 13 columns, whereas the previous step says 24 columns. That’s good because it means that the mapping got rid of the duplicate columns. If we look at Data preview, we can see that it did, in fact, discard the duplicate columns.
Okay, our data flow is finally finished. In the next demo, I’ll show you how to run this data flow.
Guy launched his first training website in 1995 and he's been helping people learn IT technologies ever since. He has been a sysadmin, instructor, sales engineer, IT manager, and entrepreneur. In his most recent venture, he founded and led a cloud-based training infrastructure company that provided virtual labs for some of the largest software vendors in the world. Guy’s passion is making complex technology easy to understand. His activities outside of work have included riding an elephant and skydiving (although not at the same time).