DP-203 Data Engineering on Microsoft Azure – Design and Develop Data Processing – Azure Event Hubs and Stream Analytics part 3
- Lab – Reading data from a JSON file – Setup
Now, before we can actually connect our stream at Xjop onto our DB hub, I want to show you how you can also process data that’s blob in your Azure Data Lake Gen Two storage account. Now, in order to change our query, the first thing that we need to do do is to stop our job. So let me click on Stop and let me click on yes. Now, in my Data Lake 2000 account, I’ll go on to containers. Let me just create a new container. I’ll hit on create. And here I’m going to upload a file that I have on my local system. So this is a PT one H JSON file. Don’t worry, I’ll also ensure this file is added as a resource onto this chapter. So I’ll hit on upload. If I go on to the JSON file here, if I click on Edit, I can see all of this JSON content in place. Now, what exactly is this content? Well, it’s the same metric information from one of my other Azure SQL databases that was streamed onto Azure Blob Storage.
So this is the same type of data that we are also going to get in our DB hub. So when I was doing my research to try to understand how can we process this metric related information, obviously, the first, most important part is to understand what is the data that is going to be streamed by this diagnostic setting. We’ve enabled the setting, but what is the data? What are the columns? What are the values that are going to be sent now based on this diagnostic setting? Obviously, as a data engineer, it is always very, very important to understand your data. It’s only once you understand your data, then you can analyze your data. So for that, what I had done is I had ensured to also stream my data onto an Azure storage account. So you can stream your data onto multiple destinations.
For now, I am not doing this right. So I already have the JSON file in place. So I want to show you now how we can process that JSON file in Azure Stream antics next. Another important point is when you are looking at archiving onto a storage account, even though you can see your data like Gen two storage accounts in place over here as well as per the Microsoft documentation when it comes. To diagnostic setting as your data lake is not supported. As a destination, you have to choose a normal Azure Storage account just for your information. So for now, let me just keep this hazardous. I am not making any changes. So I have this file in place. Now I’ll go back onto my stream antics job. And here, let me define an input. So I’ll add another input stream. This time I’ll choose Blob storage. I’ll give an input alias name. Here, I’ll select my storage account.
Here, I’ll choose my container. I’ll choose to use a connection string. Here again, no partition key. The count of input partitions. I’m leaving it as one. Let me go ahead and click on Save. Once this is done, I’ll go on to my query. Now. Here, let me just hide this. Let me choose Data Lake 2000. So when I choose Data Lake 2000, it is now showing me the input preview data. So let me again change zoom. So now we can see the input preview data when it comes to the Blob that we have our JSON based file. So we can see values such as the count, the total, the minimum, the maximum, what is the resource ID, what is a time, what is the metric name? So I said the metric the diagnostic information that is sent from the Azure SQL database will send all the different metrics at different points in time.
And then based on each metric, it will give you a metric value of the average, the minimum, the maximum, and the total. And again, we have other aspects as well. So here, since this is now the Blob that we’re reading from the Azure Data Lake storage account, you can see that we have additional attributes of the Blob name and the Blob last modified UTC time. Right? So for now, now, let’s mark an end onto this chapter. And in the next chapter, let’s complete our implementation. Let’s take this data and write it onto a zero synapse.
- Lab – Reading data from a JSON file – Implementation
So in the previous chapter, we had looked at now reading data in Azure Blob. Now in Azure Synapse, let me create a table called dBlog that will take all of my data. So here I am mapping the columns based on the data that is arriving in Azure Steep Addicts based on the data in my JSON based file. So let me take this. I’ll go on to a window where I’m logged in as a SQL admin user. Let me run this query to create the table that’s done. So just to make this table much more faster, I’m using the round robin distribution, and I’m also creating this as a heap table. Now here I have my select query. So let me take this. Let me go on to my query. I’ll just replace this here. Let me also change what is the form. So we’re taking this now from our input of data. Lake 2000, right? That’s over here. Let me just save the query. I need to now add an output.
So let me add an output here. I’ll add a different output. Now, again, as your synapse here I’ll give the same table name as dBlog. Here again, connection string the username password. Here my table is dBlog. I’ll just confirm so it’s dBlog. I’ll click on save. This is also done. I’ll go on to my query again, let me just hide this. And here I want to put my data now into dBlog. So here we can define what is the input we want to take and what is the output we want to take. I’ll save my query. Now let me delete the file that I have in my container, because I want to show this now has a streaming process.So when an object comes here in my container, azure Steam Addicts should take that Blob, take the content and then move it onto a table.
That’s the idea of taking everything in real time. Before that, let me start my job. So I’ll go on to the overview. Let me start my job. Here. You can see the different options. So remember, I am starting my job right now. I could also start this as a custom time or when the job was last stopped. So let me go ahead and start the job right now. Again, it will take a couple of minutes. Now, if you’re asking the question as to whether we can have multiple queries defined wherein we can take data and put in different destinations, that is also possible. That’s something we are also going to see in a particular video in this section once our job has started. Now let me go ahead and upload the same file. So I’m uploading the same JSON based file.
Now if I go onto SQL Server Management Studio, let’s select the data from the table and you can see all of the data. So it’s within an instant. Azure Stream Antics has taken that data from the blob and sent it on to our table. And again, remember, Stream Antics is a managed service so automatically it is building the Compute Infrastructure which will read the data right from Blob from your input wherever it is, the Azure Event Hub as well and then send it on to a destination. Remember in the end you still need Compute machines to perform all of this. This is not all magic, right? You need Compute infrastructure that will take the data from the input, process the data accordingly and then send it on to the output, right? So this is all happening in real time.
- Lab – Reading data from the Event Hub – Setup
So we’ve already seen how we can take the metric data that is sent by diagnostics. As an example, I’ve shown a Blob, right, a JSON based file. Now we want to take the actual streaming events from the Event Hub. So firstly, I’ll stop my stream antics job. Now we are going to be reusing the same table. So here, let me delete whatever is there in this table, so we don’t have any rows in place. Now, once our stream addicts job is in the stop state, let me go on to the inputs and let me add a new stream input of the Event Hub. Here. I’ll name this as DB. Hub. Here. Now, this time in the event hub. I’ll choose DB. Hub. Here again. I’ll choose connection. String. I’ll leave the policy as it is, everything else has it is and I’ll click on Save if I actually go on to my Event Hub namespace, if I go on to my Event Hubs, and if I go onto DB Hub, if I scroll down.
So I can see that requests are coming in, messages are coming in. And that’s because now our SQL database is sending the diagnostic information onto Azure Event Hubs. Now here, let me go on to the query, I’ll just hide this. Now, here I’ll choose DB. Hub. So now it should take the input data from DB Hub. Now here I can see that I have one record in place. So again, let me adjust the zoom. Now here you’ll notice a couple of things. So first, I only have one record, but in our Event Hub we do have many records, many events coming in. And here you can see we are not getting the proper split of data. So I’m not getting count has an individual column, total has an individual column. Instead here I only have four columns in place.
I have my normal attributes, wherein I have the partition ID, the event process, UTC time, the event Nqed UDC time.But when it comes to the actual event, I only have one column and that’s known as records. If I switch over. Now on to the raw view here, here we can see how our JSON input is coming in. So firstly, we can see that our JSON objects are embedded in an enclosing array. First point next, all of our information is now embedded within a JSON object, an array basically with the name of records. And if we scroll down, we can see that all of our JSON objects, the metrics which are coming in is actually part of the records property, that is an array as part of the root JSON object. So in our Blob, what you had seen earlier on, it was a different structure, we had the same columns, right, of Count, Total, minimum, maximum.
But here in the Azure Event Hub, there is a bit of complication when it comes to the structure, because see, in Azure Event Hubs everything is sent as a batch, right? To increase the throughput of the information. Things are being sent as a batch of records. That’s why we now have an array that represents our data. So when you’re actually now building your SQL query, you have to take all of this into account. So my query here will not work because the column that we have only is records and if I just do a select records into DB log it will fail because our table expects these columns of data to be in place. So this is a change that we need to perform. So again, this input preview is very useful.
It helps you understand what is the structure of your data and especially when it comes to JSON, this is very important. Now next, let me go back onto table. Now here if I select the time range and here let me select a time in the past. So what I’ll do is that I’ll choose it as around 11:00 and here I’ll say please give me the data for the last one R and I’ll hit on sample. So this input preview is actually taking a sample of your data so that you can see what your data looks like. So now here you can see other records which are in place. So if you select the time range, then you can see the data in your input source in a particular range of time.
- Lab – Reading data from the Event Hub – Implementation
So in the last chapter, we have now connected an input on to DB Hub. This is the event Hub, which is taking the metric information from our Aziosql database via the diagnostic setting. Now, let’s go ahead and change our query. So for firstly changes into DBlock, that’s fine, I need to change this onto DB Hub. Now our query, right? So let me first take the select what’s in the select statement. So I’ll copy it here. Next I’ll take this and here let me put the alias of D because that’s how it’s mentioned over here. Just for formatting sake, you can keep it here. And now let me explain what I’m trying to do here. Now, I said since everything is in an array, I can now use the get array elements function that is available in Azure Stream antics. So here I have a reference.
You can take this if I go on to a new tab. So I always reference the Microsoft documentation when it comes to understanding what is available in a particular service. So with the cross apply operator, you can actually pass arrays and nested objects in JSON. And that is what I am trying to do over here. So for DB Hub, right, I’m giving an alias of D so that I can reference this later on. Here I am saying please get all of the array elements so that’s D records, remember, everything is coming in DB Hub has records. So I’m referencing that particular column, has D dot records. And then I am getting each individual record, right, and putting it in an alias of has records. And then in my select query, I am now referencing that records, which we have mentioned down here.
So I mentioned records down here if I go back on top. So for all of the records, I am now using another property that is available, that is array value. This is again based on Azure Stream antics. And now I am getting the individual properties. So the count, the total, the minimum, the maximum, et cetera. And then I am giving it a name, right? Has count, has total. So in case if your destination data store has different column names, you can actually alias each column over here because see, this mapping needs to be correct as per the columns that you have in your destination data store. And our destination data store is the table in a dedicated SQL pool. And even though these particular names are the same, it’s mapping here.
I’m just showing you how you can ensure that you place an alias for a column that is being referenced from your input data source. And then here I’m using the cast function, which is again available as part of the job Steam antics job, to ensure that whatever I’m getting is cast as a date time here. Now, if you want to test this particular query, because obviously you don’t know if it will work because even for me, it took time just to understand how I could take my array of values. It took time. So I am taking a very complex example here when it comes to the structure of the JSON data.
So since we already have now our input in place, right? This is our sample input. We can now test our query. So I can test the query here and we will get our results in the test results pane here and here you can see all of the information. So now this query is dissecting that entire JSON input. And now we are getting the count, the total, minimum, maximum, et cetera. We are getting all of the metric names, everything is coming in has desired. And then we can save our query so we know it is working as expected. And then we can go on to the overview and we can start our job now. You can start the job right now, or we could also start the job right at a particular point in time.
So I’ll start the job 1 hour beforehand because we already have data in our Event Hub. So it will now take the previous data that we have and then it will continuously take the data from the Event Hub. Let me click on start. I hope you’ve understood what we have done so far. It’s all based on the query. So when it comes on to stream antics, right, it’s very easy to define your input, define your output. It’s the query that’s very important. And obviously it’s very important to understand what’s the format of the data that is being streamed into the Stream Addicts job. The Steam Addicts job will do everything for you. It’s quite an easy service, right? You just define input your query and your output.
But again, there are some aspects that you also need to consider, right? So let’s wait till the job is started. Once a job is started, if I go onto SQL Server Management studio, let’s do select Star. We can only see our data in place if we do account Star, just to see the number of records, so we can see we have 767 records. Let’s wait for around five to ten minutes and then come back now. After some time, when I come back, if I do a select count Star so I can see the number of rows are increasing. That means our Steam Addicts top is now continuously taking our data from the Azure Event Hub and pushing it onto a table in our dedicated SQL Pool.
- Lab – Timing windows
Now, in this chapter I want to go through windowing functions which is available in Stream Attics. So remember, time is the most important factor when it comes to stream addicts jobs. And normally in real time scenarios, you might want to perform computations in certain window frames, so you can actually make use of windowing functions for this particular purpose. Now, there are different windowing functions in place. You have the tumbling window, the hopping window, the sliding window, the session and the snapshot window. So for example, if you look at the tumbling window, so I’ll scroll down. So the entire purpose of the tumbling window is to perform aggregation right on your data within a particular time frame.
So each time frame duration actually constitutes one window. Similarly, you might want to perform the aggregation of something over periods of time, especially when it comes to real time data. Sometimes you may not want all of the information, you want to perform some sort of aggregation on that data within a particular time frame. So when you’re looking at the tumbling window so here you can actually use the tumbling window function and say please aggregate my data over a time frame of 10 seconds. So here each window is over a ten second duration and then you can use your aggregation functions as required. So there are different types of windows that are available.
So for example, you also have the hopping window, wherein here has a use case scenario. It’s saying that over a period of 10 seconds, every 5 seconds give me the number of, let’s say, Tweets that are coming in. So in the hopping window, there can be an overlapping of events. So depending upon your requirement, you can actually use the different windowing functions that are available. Now, I’ll give an example on the tumbling window function. So here I have the code in place. So now, first, in order to capture that summary information that will actually come from my SQL query in the stream antics job, I am creating a new table. Here I have the metric name, the count and the timestamp.
So let me first create this table. In SQL Server Management studio. I am logged in as a SQL admin user. Let me run this command. So currently I don’t have any data in place. So this is done. Now next, what am I doing? So, I am now selecting certain aspects. So I’m making use of my same event hub, so, DB Hub. So remember, over there, all of our information about the metrics of the database is coming as records in the event Hub. Here again, I’m making use of the cross apply operator to get all of the array elements of the records. And then I am now only selecting the metric name and the time that’s it from DB Hub. Now, you will notice here I am using a with clause. So now you can actually break your queries into different stages because see, you could have complications in your queries, you could have complex queries in place.
So instead of actually writing one complete query, you can actually break this into separate stages. So here I am breaking this into a stage known as Staging Logs. And I’m saying please take. Staging Logs has this particular select query. It’s like actually creating a view based on a select query. So I’ll copy this. First. I’ll go onto my Steam antics job. So I have stopped the job. Here I’ll go on to the query. I’ll just hide this for the moment. Now, here I’ll go on top first. So it’s giving me an error because I’m using DB Hub in two places. So what I’ll do is that let me remove all of this. So I only have my first query in place. Now it’s giving me all these quicky lines because we have not completed our query, right? So first we have defined a stage.
Now next I’m saying let’s make use of that particular stage or that particular view. And I’ll take the metric name. I’ll take account of the number of metrics. I’ll also take what is the maximum time from all of those metrics into my summary table. So we have to now create an output from Staging Logs, right? So this is like having a view in place, right? So from Staging Logs group by statinglogs metric name. So now we are grouping all of our results based on the metric name. And here I’m using my tumbling window. So what I’m saying is that when I’m getting all of my metric data over every period of 10 seconds, right, this is the duration, let’s say group all of my metrics, first of all by the metric name, right? So you might get CPU percent, right? And et cetera, et cetera, all of the metrics.
And then get the count, right? The count of all the CPU metrics that is coming within that ten second window frame. So let’s say it’s three. And then also get what is the max timestamp and then insert that into the summary table. So this is being done over every ten second window. So now we are trying to take a summary of our metrics and then place it in a summary table. So firstly, let me save this query, let me define an output. Here I’ll click on Add, choose Azure Synapse, I’ll give the name, choose the authentication mode, choose the username, put the password, choose my table. So my table is the summary table. I’ll click on Save just to ensure we have the output in place, right? This is done. Now I’ll go back onto my query and let’s complete it. So I’ll scroll down and now take this. So I’ll go back here and I’ll enter this.
So now you can see all of these cricket lines have gone away, right? So now we are selecting only remember the metrics over a particular timestamp. Here let me save the query here. Let me now choose DB Hub just to test our query out. So it’s always good to test it out before you actually run it. So here we are seeing we’re getting some data. Let me test the query. So here you can see we are getting results. So we have the metric name, we have the count, and then we have the maximum timestamp. So normally all of the metrics I’ve seen that in the raw data, you’re three metric information that is actually given in the records. So that’s what’s coming over here. Now, I won’t run this job. Let’s keep this hazardous. We know that this query is currently working. Let’s move on to the next chapter.