Google Professional Data Engineer – Appendix: Hadoop Ecosystem part 4
- Windowing Hive
We’ve explored partitioning and bucketing as ways to devy up table data into more manageable chunks. We’ve also seen two types of join optimizations the use of the smaller table in memory and the use of maponly joins. Let’s now turn our attention to the third bit of olap functionality offered by Hive, which is windowing functions. Windowing functions can be thought of as syntactic sugar. There is almost nothing that windowing functions can do for you, which traditional queries cannot. But the real value and the importance of windowing functions in Hive arises because they allow complex operations while the mapreduce job is being calculated. This allows one data access, and it reduces the need for intermediate calculations or intermediate tables.
Intermediate tables are not all that expensive in a traditional RDBMS because all of the data is in the same database server. But an intermediate calculation or multiple reads or writes of the same data end up becoming prohibitively expensive in a distributed environment. So when you think about windowing functions in the context of the distributed nature of HDFS and the petabyte size files which Hive mapreduce jobs work on, windowing functions are really not syntactic sugar. They are almost a musthave, because without them it would be really difficult to calculate complicated queries in any realistic length of time. Consider, for instance, a query like this one.
Let’s say that you wish to find the top selling N products in some subset of data. Let’s say in last week’s sales. Here you would like to define a window, which is one week. You’d like to define an operation on that window, which consists of ranking products by sales. Now, if you were running this query with just 100 products and 1000 sales, you just read all of these rows into data, maybe into a nice language like Python, sort them by time and just pick off the top end products. But the difficulty of doing this when there are millions of sales and maybe thousands or hundreds and thousands of products, is what gives rise to the need for windowing functions. There is just no way to efficiently accomplish this programmatically.
And that’s why the onus for operations like these has to shift from a program or from a client SDK call into right into the olap software like Hive. The same deal with another really common kind of operation. Let’s say you’d like to know for a given supplier, in what percentile of all of your suppliers did this particular one fall? Once again, you have a window that is a one quarter of data, and you would like to perform some operation on that window. You’d like to calculate the percentiles based on the revenues that different suppliers got for you. And once again, the crucial common link between calculating percentiles and calculating ranks is that this is only possible if all of the data has been examined.
And once you are going to look at all of the data in a big data set, which is maybe order of petabytes in size. You want to be really sure that you access that data only once and not on multiple occasions. One no longer has the luxury of getting all of the data, fetching all of the data, and then programmatically combining it in, say, a Python program. What we really need is the ability to carry out all of these operations in one single query, which will only read all of the underlying file blocks once, if possible. Another similar operation would involve calculating a running total of revenue. Let’s say you have a grocery store and you would like to add a new column with a cumulative total of the revenue for all orders so far.
Imagine what this would require in a system which has distributed data in a whole cluster. To begin with, we would need to sort this data by the order ID column. Good luck even trying that with an in memory solution. Once we have all of the sorted data, we then need to somehow come up with a running total. And that running total is going to rely on a window. That window is going to be the set of all orders up to and including the present one. This is easy enough to do in something like Microsoft Excel, but once again, imagine doing this when you have sharded data lying on maybe a million different machines.
What we need here is an ability to maintain that running total which increments by the revenue of each order, and that is carried out over a window. That window, in turn, needs to be carried out on the basis of the Order ID, because clearly here we are going to calculate the sum over all rows from the beginning of time up to the current row. So once again, we have a window. That window extends from the start of time until the current row. And the operation we would like to perform within that window is the sum of all of the revenues there. Hive can accomplish all of this and even more. It even will allow you to calculate per day revenue totals. So here the running total is going to reset at the start of each day.
Effectively, what Hive is going to do is it’s going to deviate the data on the basis of the day. Each time a new day starts, that running total will be reset to zero. The net effect of all of this is going to be, once again, to divvy up the data into blocks and apply a function to the data within those blocks. Here, the only difference is that those blocks do not start at the very start of time. Rather they reset each time the day changes. This really gets at the power of tools like Hive or Bigquery. For big data processing, particularly in olap or analytical use cases, you can define moving averages. For instance, you might say for the last three orders give me the average revenue, that is, the order size.
This will carry out an operation on a window which includes the three previous rows versus the current row. The operation that needs to be carried out within that window is an averaging on the revenue column. Likewise, one other example of a really powerful query that one can execute in Hive one can even calculate the percentage of the total revenue per day. The objective in talking about all of these is not to describe Hive’s syntax. Rather it is to give you a sense of operations which can be easily implemented in Hive using a single query, but which would be really difficult to accomplish in any other way given a data set which is petabytes in size and which is distributed over thousands of clusters.
So all of these bits of functionality, windowing, bucketing, partitioning, all of these are extremely important in an Olap application like Hive. We will come back to the idea of windowing functions when we talk about streaming applications in just a little bit. For now, let’s observe how cool it is that we can calculate the percentage of total per day in terms of a windowing operation. All that we really need to do is to alter the denominator so that we are performing the sum over the revenue column. And this is being done in a partition of the data by D. Let’s turn back to the question we had posed at the beginning of these pair of videos.
Why are windowing and partitioning so important in Hive when traditional relational database management systems don’t emphasize these a lot? Hopefully we’ve grasped the answer. It’s because when one has a really large data set, which is maybe petabytes in size, and when this is distributed over a large number of machines, there is no alternative but to do this in the database itself, in any other solution. For instance, if you as the end user started partitioning your data manually, or if you started trying to calculate Windows programmatically, this is very easy to do, mind you, you would just be fighting out of memory errors all the time. This is why these operations are crucial in Olap, whereas they are just nice to have at best in OLTP.
- Pig
We’ve now understood quite thoroughly the capabilities that Hive brings with it. Let’s understand its close ally, something that’s used very, very closely with Hive, and that is Pig. Hive, just like Pig and Spark, will be available by default on every instance of a Dataproc cluster on the Google Cloud platform. As usual, here is a question that I’d like you to keep in mind as we discuss Pig. In a typical Etl use case, would a data storage technology like Hive or Bigquery on the Google Cloud platform be a source or a sync? I hope the question is clear. In a typical Etl use case, would Hive or Bigquery be a source or a sync? And, of course, Etl. Is an acronym for extract, Transform and Load. Let’s go ahead and understand the role of Pig in the Hadoop ecosystem, and hopefully it will also give us the clarity needed to answer this question.
Let’s introduce Pig. Let’s pick up right from where we left off. We’ve spoken about Hive and how it runs on top of Hadoop and how all of its processes are effectively mapped. Reduced jobs under the hood. So what Hive is doing is really mapping from Mapreduce into SQL. If the primary purpose of Hive is to carry out Olap, that is, analytical processing of huge data sets. An obvious question that’s going to arise next is how did those huge data sets, how did those files, which are petabytes in size, land up in a nice and easy to consume form on HDFS in the first place? Virtually anyone who spent any length of time parsing or munging data can attest to the fact that most data has no schema.
It is completely unstructured, incomplete. It’s a pain in the you know what to clean, and it’s got a bunch of consistency problems. These issues of an unknown schema, incomplete or missing data, and inconsistent or wrong data are all exactly the use cases for Pig. Pig can be thought of as a transformation language. It can be thought of as a high level scripting language which helps clean data which has unknown or inconsistent characteristics and load them into a relatively clean and easy to use HDFS format. Pig is a pretty powerful tool in the Hadoop ecosystem. Just like Hive and Spark, it is available by default on every instance of a GCP Dataproc cluster. The big advantage of Pig is that it works very well with unstructured and incomplete data.
And specifically, it can work really well with unstructured and incomplete data on HDFS. Use Pig in order to get data into your data warehouse. So, for instance, Hive or Bigquery or other business intelligence or data warehousing applications. So the way this typical process works is you take really raw data which could be incomplete or inconsistent. You clean it up using a tool like Pig, you load it into a data warehouse, and then you use a query language like Hive Ql in order to perform some high end and relatively sophisticated analysis. The first part of this process is known as Etl. And the second, well, that’s just analysis. Etl, which is an acronym for Extract, Transform and Load, is really where Pig comes into its own. Pig helps to get etl data into a data warehouse such as Bigquery or Hive.
So remember that the culmination of the Etl process is the load step. And that load step refers to loading this data into a data warehouse. So that really is what Pig does. Pig extracts, transforms and loads data. It does so by pulling unstructured, inconsistent and incomplete data from a bunch of sources. It cleans that data by applying transformations which you specify and then places it or loads it into another database, specifically a data warehouse where it is available for easy analysis. This gets us back to the whole problem of data cleaning. In the real world, data is pretty nasty. Consider for instance log entries like this one. Log files are ginormous and they are full of a whole bunch of crud. There is absolutely no structure for even the weakest form of schema enforcement.
Forget about schema on write such as traditional databases have. Even High, with its schema on read, would find it basically impossible to check a schema like this one. But maybe I just exaggerated a little bit because there is actually a little bit of structure even in log entries. Consider for instance that they start with something like an IP address that is followed up by a date and a time enclosed within square brackets. After that comes some random log entry, for instance a Get command, maybe a request type. This corresponds to an Http get request, for instance. And all of this stuff now needs to be structured and put into a semi structured or relational form with clear rows and columns. Just to be clear, this is not a relational database. This is still a file on HDFS. But at least now that file is going to have clearly structured data.
There are going to be records and each record is going to consist of different columns. This is a transformation which we can accomplish using Pig, specifically using a scripting language called Pig Latin. Pig Latin can be described as a procedural data flow language to extract, transform and load data. I should point out here that that choice of words data flow is an interesting one because indeed the equivalent of Pig on the Google Cloud platform is cloud data flow. We are going to examine data flow in quite a bit of detail. It kind of acts as a substitute for pig as well as spark, but you get the idea coming back to Pig Latin. Pig Latin is procedural because it uses a series of very well defined steps to perform operations. Notice that it does not support either if statements or for loops.
So advanced control transfer mechanisms are not provided in Pig. It’s a data flow language because really it’s expressing a set of transformations that have to be applied to the data. And we will get back to this idea of a directed asylic graph or a Dag denoting operations on a set of data items. The data items will form the edges and the operations will form the nodes. This is something that we will return to not just with data flow, but also with TensorFlow. Now, here’s maybe the most interesting part of Pig Latin. It can do all of this in parallel. Data from one or more sources can be stored or processed in parallel. So if the Pig Latin language is pretty straightforward and has nothing complex going on, what’s going on under the hood? The execution of those Pig Latin scripts is still pretty complicated because it’s abstracting away from you all of this parallel processing.
So that really is what Pig is all about. It cleans data, pre computes, common aggregates, does a bunch of stuff with data, cleans it up, and then stores it in a data warehouse where it can be used for olap purposes. Coming back to the question that we began with, the question was in a typical Etl use case, is a data warehouse like Hive or BigQuery a source or a sync? And hopefully now we are quite clear on this a data warehouse like Hive or Bigquery is the sync. It is the end destination of data in an Etl pipeline because the data is going to be extracted from some unstructured underlying source, like a log set of files, transformed using something like Pig or data flow, and then loaded into a data warehouse like Bigquery or Hive.
- More Pig
The Hadoop ecosystem and the Google Cloud platform. Both have three distinct languages. They have a Data Flow language, a SQL like language which does not run SQL under the Hold, and lastly, they have SQL itself. In the Hadoop ecosystem. These three languages correspond to Pig Latin, hive, Q, L and SQL. On the Google Cloud platform. These correspond to Dataflow Bigquery and Cloud SQL. My question to you is why really do we need each of these three elements? Isn’t it overkill? Could we get away with less than three? We’ve already spent a bunch of time discussing the differences between Hive, Ql, and SQL and between Hive and traditional RDBMS. So let’s now repeat the same exercise this time. Let’s compare Pig and SQL and then Pig and Hive.
We are not going to spend a lot of time talking about Pig syntax, but even so, it’s instructive to compare a simple bit of Pig Latin versus a simple bit of SQL. What jumps out straight away is that Pig is procedural, while sequel is imperative. If you are not quite sure on what the difference between those two terms is, think of it this way the Pig syntax is explicitly telling Pig how certain operations have to be carried out. Check out the use of the for each, for instance. This is not a very sharp distinction, but there is a philosophical difference underlying these two sequel. Syntax does not tell the SQL interpreter or whatever database engine is executing the SQL query what has to be done? And this gets to the real difference.
Pig is a data Flow language. It relies on data transformations. SQL is a query language. It’s focused on retrieving results. Pig syntax looks a little bit like syntax in Python or some other high level programming language because it specifies exactly how the data has to be modified. SQL, on the other hand, abstracts that away completely because there is only a finite set of query constructs, and all of those are known to the underlying SQL server. The use cases of Pig and SQL are also very different. The objective of Pig is to clean up data and process it and store it in a queryable format. The objective of SQL is then to query that queryable data for analysis in order to generate insights of some sort.
So in a nutshell, Pig is a data cleaning technology which works best with inconsistent or incomplete schema. SQL is for extracting insights, and it really only works with extremely well formed, extremely structured data. Notice how nothing in what we’ve said so far about Pig makes it specific to Hadoop. Remember that there are three core building blocks in Hadoop a file system, a programming paradigm that’s map reduced, and a resource allocator that is yarn. Peg sits on top of Hadoop in much the same way that Hive does peak runs. On top of this Hadoop distributed computing framework. It accesses Data, which is in HDFS, and it also stores intermediate results in HDFS and writes its final output there.
It uses the Mapreduce programming paradigm to decompose operations into parallelizable units. This gives it nontrivial and built in implementations of the standard data operations which are very efficient. They have been optimized for running on Hadoop in a sense. So Pig is really optimizing operations before they get to Mapreduce. So in that sense, Pig has a special relationship with Hadoop, and indeed Pig is a genuine part of the Hadoop ecosystem. But there is nothing that prevents Pig from having a further level of indirection between itself and Hadoop. In fact, it can run on top of a couple of other technologies, notably Apache Ez and Apache Spark. Ez is a framework which improves mapreduce.
It has much the same purpose as the optimizations specific to Hadoop within Pig, that is, to make mapreduce operations faster before they get to the Hadoop world in the first place. And Spark, which we are going to discuss in just a moment, is a distributed computing technology with a whole bunch of advantages. Spark is extremely popular these days and Spark serves as an abstraction on top of Mapreduce. You can think of Spark as a programmers tool. It helps folks to write parallel code which is being translated into Mapreduce under the hood, but free of the tyranny of all of the map reduce boilerplate and also easy to use from Python and not tied to Java. So like Hive, Pig runs on top of Hadoop and decomposes its operations into mapreduce.
But unlike Hive, it is also possible for Pig to run on top of other abstractions such as Tears or Spark, which lie between itself and Hadoop. That’s as far as the differences between the locations of Pig and Hive in the Hadoop ecosystem go. But even their use cases are fundamentally different. Pig is used for Etl operations to get the data into Hive. Hive is then used to process that data and generate insights or reports. Peak can work with data from a whole bunch of sources. It is omnivorous, so it can be used by developers to collect and aggregate data into the warehouse. Hive is not used by developers. It is typically used by analysts, folks who are most at home with a language like SQL, which they use for business intelligence applications. And this really is why it makes sense to have both Big Latin and Hive.
Big Latin is going to be used by developers. It’s procedural. It’s a data flow language optimized for specifying data transformations and for use by developers and engineers. Hive, on the other hand, is optimized for usage by business analyst types. These are folks who are at home with SQL, like we just discussed a moment ago. And that’s why Hive Ql is their language of choice. Hopefully this has made clear the requirement for three different languages a data flow language, SQL itself working with relational databases, and a SQL like language which can work with an underlying distributed file system such as either Hive or Bigquery. This is why it is not overkill to have all three languages in a big data stack. And this is why both the Hadoop ecosystem and the Google Cloud Platform have a place for each of these three technologies.