Google Professional Data Engineer – Appendix: Hadoop Ecosystem part 2
- MapReduce
Let’s now move on to the next part, the next building block of Hadoop, and that is Mapreduce, which of course is the parallel programming paradigm, which really is at the heart of it all. While understanding Mapreduce, I’d like you to try and answer this question why is there such a strong need for some kind of SQL interface on top of Mapreduce? Both hive and bigquery are exactly this. They are both SQL interfaces on top of Mapreduce like activities. And the question for you is why are such interfaces so important? We’ll get back to this towards the end of this video. For now, let’s understand Mapreduce and its role in processing huge amounts of data.
Recall that because any distributed programming paradigm will require running processes on multiple machines, a distributed system is going to have to know how to divvy up a computation job, send it to those different machines, have each of those machines perform its computation, and then return the results. This requires a programming paradigm, just like Multithreading requires a concurrency paradigm. And Map reduces exactly such a programming paradigm. Just as Multithreading offers a way to split up operations such that they can execute on different threads in a safe manner, mapreduce is a way to split up and take advantage of parallelism inherent in a distributed system.
Let’s say that we have millions of records of some raw data and we’d like to carry out some calculations, some computation on this. Any calculation and Map reduce is going to be processed in two steps or broken into two steps, a Map step or a Map phase and a reduced phase. You as a programmer just need to express your operations in terms of Map and reduce operations, just as if you were writing multi threaded code. All you would need to do is to express the body of what each thread needs to accomplish. Then each of those map and reduce steps will be distributed across the millions or thousands of machines in your cluster, and their execution will happen in parallel.
So the programmer defines the Map and the reduced functions, and Hadoop will do all of the rest. Behind the scenes, you don’t need to bother yourself with the nittygritty of devying up all of the data, sending it to different nodes, calculating the intermediate results, or any of that. All you need to care about are the Map and the reduced operations. Let’s start with the Map operation. This is an operation that’s going to be performed in parallel. Remember this it’s going to be performed on thousands or maybe millions of nodes in parallel. Each operation will only work with a small subset of the data, maybe just one record, and the output of that Map operation is going to be a key value output.
The reduce operation will then go ahead and in some sense combine all of the results of the Map step according to your logic. So for instance, you’ll have a whole bunch of mapper outputs. These will then be reduced, hence the term reduced into a final output. So again, just as a multi threaded programmer only needs to care about the body of the operation that’s going to run in each thread, and the logic which combines the results of different threads as a map reduce programmer you only need to care about the map operation, which is a step that can be performed in parallel across many nodes, and a reduced operation, which will then combine all of the mapper outputs. Let’s take the prototypical example of counting word frequencies.
Let’s see how this would be done in a map reduce world. Let’s say you have a large text file which consists of lines. The question is how would you write a map reduce application which would return a frequency table? This frequency table would have two columns. The first column is the word, the second column is the words frequency in that document. Nothing very complex. This is trivial to write in a non map reduce way. But as we shall see, it’s kind of mind bending to see how mapreduce can be applied to this problem. Now, before we get to the nitty gritty of the map reduce, we should remember that the file here is huge, potentially order of petabytes in size, and also that it’s distributed across all of the machines in our cluster.
Each of those machines is going to hold only a partition of the data. Keep this in mind while coming up with the mapreduce solution. Effectively, what’s going to happen is each partition will be assigned to a different mapper process and that mapper process will be running in parallel on each or on many of the machines in our cluster. Those mapper processes are going to run the map of function within which the rows will be processed in serial order. So any one mapper will only get a small subset of rows and the output of that mapper function will then be a word table or a word frequency for only that subset of rows. Here, for instance, the mapper process gets in only two lines and then the output consists merely of the word frequencies in the form of key value pairs for those two lines.
Now, the same process is playing out on every one of the other mapper nodes and as a result, at the end of the map step, we are going to end up with a very large number of key value pairs distributed across the mapper processes. These key value pairs will include repetitions. The word twinkle for instance, will appear in many of these key value pair sets and therefore this needs to be passed to a reducer which is going to aggregate all of this. The reducer is logically going to take in all of these key value pairs and compute or sum up all of the values for the same key. This is crucially important. The reducer is going to group data, in a sense by the keys in the output of the mappers here, because there are two mapper outputs which contain the word twinkle.
The reducer is going to sum up all of the frequencies across all of these key value pairs where the key is equal to twinkle and output a world frequency of four for that particular world. So, just like writing multithreaded code requires you to think in terms of threads and in combining the operations of those threads, writing mapreduce code requires you to think in the form of this abstraction on screen. Now, where key value pairs are going to be passed in between mapper and reducer functions. Really, as a MapReduce programmer, you’ve got to be asking yourself two questions. The first question is what key value pairs should be emitted from the map step? That’s the most important first question that you’ve got to answer.
And the second question is how should values with the same key be combined in the reducer step? That’s the second really important question. Let’s walk this through with our word count example. The input into the mapper function is going to be just all of the text in the original file, and the output for each mapper function is going to be a set of key value pairs. And those key value pairs will have words and frequencies, but only for a subset. And that frequency is just going to be the count one for each time that the word is encountered. So we’ve answered the first of our two questions. We’ve decided that the output of our map phase is going to consist of key value pairs.
The keys will be the words, the value will just be one each time the word is encountered. Now, the next question is what should our reducer do? How should it aggregate all of these values which correspond to the same key? And the answer is, it’s simply going to sum them all. So the reducer is going to apply the sum function to all of the values with the same key. This will cause the final output to be exactly the word count frequency table that we wanted. And this really is the magic of mapreduce. By answering these two questions, we have decomposed this operation and we can potentially decompose any operation into something which can be easily paralyzed and run on a distributed cluster.
Now, raw mapreduce operations are almost always implemented in Java, and a part of that is because the original mapreduce framework was also implemented in Java. So it’s just a lot easier to hook into that mapreduce framework if you are writing code in Java. It’s a little more complicated if you are doing this in Python. A Java mapreduce job is going to require a mapper class. This is where the map logic is implemented, a reducer class, and lastly, a driver program like a main, which is going to actually set up the mapreduce job. So there’s clearly quite a bit of boilerplate going on when we implement Mapreduce in Java. And really, writing Mapreduce code in Java is not for the faint of heart.
The Mapper class requires you to set up a generic class with four type parameters. The exact same goes for the reduced class. Once again, we’ve got to encapsulate this within a generic class with four type parameters. What’s more, the output types of the mapper must match the input types of the reducer. And so that requires the Map and the reduced type parameters to be in sync. And all of this is coordinated by a driver, which requires a job object which is typically configured in the main class. So clearly there just are a lot of moving parts in a Java implementation of Mapreduce. And this also really answers the question which we had begun this video with.
It tells us why there is such a strong need, why there is so much demand for a SQL interface on top of Mapreduce. This is why Hive and Bigquery are so popular. It’s because there is a very large number of data analysts and business analysts who understand SQL, but who really have no experience of writing any Java code, let alone the kind of formidable Java code that’s required to implement Mapreduce. A SQL interface on top of Mapreduce will solve this problem. It will allow these folks to be self sufficient while using Hadoop. And this explains the great popularity of both Hive and Bigquery.
- Yarn
Next, let’s move on to discussing the third building block within Hadoop, and that is Yarn. While discussing Yarn, I’d like you to keep in mind this question why is managed Hadoop such a great convenience? This is relevant because Dataproc, which is the Google Cloud platform offering corresponding to Hadoop, is really just a managed Hadoop service. And in case you’re wondering what managed Hadoop is, well, think of it as a version of Hadoop in which scaling a cluster up and down is managed for you. Yarn, which stands for yet another resource negotiator, is at the heart of most of the coordination functions within Hadoop.
It coordinates tasks running on the cluster and assigns new nodes if old nodes go down. So cases of failure. This requires Yarn to have two distinct subcomponents a resource manager and a node manager. The resource manager runs on the master node. The node manager software runs on each individual cluster node or data node. The interaction between the resource manager and the node managers is extremely important. The resource manager will schedule tasks across the nodes very much like a conventional operating system, and then the node manager on each node is responsible for executing that task.
So let’s imagine that a new job comes into a resource manager and that resource manager is already keeping tabs on a number of node managers, each running on individual node instances. The nodes colored red correspond to those which are already running tasks. And all of this is before the new mapreduce job is submitted via the Hadoop API into the resource manager. Then Yarn is going to need to find the node manager which has free capacity, and then farm out the task to that node manager and get it run. If you recall the Google container engine, and if you recall the relative roles of Kubernetes and the Kubernetes running on individual container engine instances, this is very similar.
And indeed this similarity is not coincidental. It’s because all processes on a node are really running within a container. Conceptually, each container is contained within a node manager. That node manager is responsible for abstracting the computation task from everything else on that machine. This is the logical unit for all resources needed by the process. Now, a container is going to execute a specific application and one node manager can have multiple containers. Once again, we can see the similarities between a Pod and different containers is even greater. The resource manager needs to start off the application master within the container.
This ought to be enough to get the computation process up and running. If additional resources are required, the application master will make that request and Yarn is going to have to try and satisfy it if possible. So we can see that the flow of information is in both directions. Node managers will also request mapper and reducer containers which have to be run on that particular data node as well as the required resources. These node manager instances which are highlighted in yellow are the ones which are in the process of being assigned new jobs. These nodes will need to notify the application master. And as with replication, this assignment will happen with certain constraints.
So for instance, Hadoop will try and minimize right bandwidth by assigning processes to nodes which are located close to the data that’s got to be processed. Now, if CPU or memory or some other resources are not available, then that node is just going to wait. Clearly we can see that this process of scheduling jobs and assigning resources is a complex one. And so Yarn has very different scheduling algorithms that can be specified. The simplest of these is a FIFO scheduler. Here, jobs will be assigned resources in the order in which they come in.
As with any FIFO scheduling algorithm, this causes short jobs or high priority jobs to suffer because they might have to stand in queue before a very low priority and long running job. As with economy, first class and business class tickets, an obvious solution is to create different queues for different priority levels. This is known as capacity scheduling, and Yarn supports this as well. And a third scheduling algorithm is called Fair Scheduling. Here, jobs are basically assigned equal share of all resources. As a Hadoop cluster administrator, it’s your responsibility to set up the scheduling policies that work best for your mix of workloads.
The complexity of Yarn answers our question about the popularity of managed Hadoop, which, after all, is what Dataproc really is. Dataproc on the Google Cloud platform is a way to use Spark or Hadoop clusters. Without an administrator or special software, it’s relatively easy for you to interact with clusters and to scale clusters up and down, and also to simply turn a cluster off if you’re not using it. This way, you don’t spend money on an idle cluster. And you also don’t need to worry about losing data, because, as we already discussed, this data is going to reside in cloud storage rather than HDFS, and therefore it does not require a new node or any other cluster operating services to remain up.
- Hive
We’ve already introduced Hive briefly as a SQL like interface which runs on top of Mapreduce in Hadoop. Let’s now introduce it and understand it more formally while doing so. While discussing Hive, I’d like you, as usual, to try and answer this question why is Hive best suited to relatively high latency applications? In other words, why is it okay to use Hive for applications like Olap, that is, analytics processing or business intelligence, not so much for real time applications. And I should also tell you here that this is a restriction which is rather specific to Hive. Bigquery on the Google Cloud platform actually has pretty good performance, and you can use it in almost real time processing as well.
Okay, so please ponder over that question while we introduce Hive and also discuss where it sits in relation to Hadoop. We’ve seen in some detail. Now how Hadoop includes three important components the distributed file system HTFs, the parallel programming paradigm mapreduce and the resource allocator or resource negotiator yarn. Now, writing Mapreduce applications is awesome. It’s a great abstraction, but it’s also not for the faint of heart. And that’s where Hive comes in. Hive runs on top of Hadoop and it offers a sequel like way for analysts to perform analytical or business intelligence processing.
Crucially, Hive stores its data in HDFS bigquery, which is GCP’s equivalent, does not storage data in HDFS, by the way. But in any case, coming back to this point, recall that HDFS has data stored as files, text or binary. These files are partitioned across machines in the cluster, and those partitions are also replicated for fault tolerance. Recall now that HDFS is optimized for batch processing, specifically for parallelized batch processing making use of Mapreduce. And Hive also runs all of its processes in the form of Map. Reduced jobs under the hood. We’ve discussed mapreduce.
We saw how this is a programming paradigm in which we’ve got to decompose all operations into Mapper and reducer phases. The Mapper phase will output key value pairs. The reducer phase needs to specify an operation on what is to be done with all of the values corresponding to a particular key. Once again, Mapreduce is very much batch processing focused, and what’s more, these batch processing jobs need to be specified using the Hadoop Mapreduce library, which is most conveniently used in Java. So the question is, do we have to write Mapreduce code to work with Hive? And the answer, of course, is no. We’ve already discussed this.
Hive is a great SQL like interface, which allows analysts, even folks who do not know Java programming, to carry out pretty heavy duty analytical and business intelligence processing. And this is made possible using something known as Hive Ql, which is a SQL like interface. It’s modeled on SQL, but of course it’s got a bunch of fine print around it. This is similar and it’s going to be familiar to analysts and engineers. The query constructs in Hive ql can be pretty simple, but it also involves much more complex features for heavy duty queries. We will discuss the Olap, or the analytical processing horsepower of Hive Ql in just a little bit, features like partitioning, bucketing, windowing and so on.
Now, because Hive relies on files in HDFS, and because Hive expresses all of its operations in terms of mapreduce, really Hive Ql is a wrapper around the mapreduce paradigm. And on top of Hadoop as a whole, hive is going to translate queries which are written in Hive Ql into mapreduce jobs, submit those jobs to the Hadoop Mapreduce API and run them in batch processing mode on Hadoop. Hadoop will then return the results back to Hive, and Hive in turn, will send them back to the Hive Ql user, the analyst. In this way, Hive has abstracted away the details of all of the underlying mapreduce operations. You can work with it almost exactly as you would with a traditional Rd BMS.
Hive users can now see data as if they were stored in tables, when in reality that data is stored in HDFS distributed files. And this is made possible by adding yet another layer of indirection. This is the hive metastore. The metastore is a bridge between HDFS and Hive. It exposes HDFS files in the form of Ables. This metastore, which is the bridge between Hive and Hadoop, is usually a relational database itself. It stores metadata for all Hive tables. This metadata will map files and directories in HDFS to tables and vice versa. In this way, we can specify table definitions and schemas for tables as we would in an Rtp Ms. And as mentioned a moment ago, the Hive metadore can really just be any database with a GDBC driver.
GDBC is important because the map reduced code in the original Hadoop implementation is in Java, so this makes it easy for the metastore to interface with Hadoop and with HDFS. The typical choice of database for the metastore is something known as Darby. Development environments typically use this because it’s built in. It’s an embedded data store which comes wrapped up with Hive. Darby is usually what you’d use in development. You should know that the Hive metastore is going to run in the same Java process as Hive itself, and there is always going to be one Hive session connected to this metastore database.
Clearly, this setup might be a little too fragile for you to use in a production environment. And so in production it’s actually pretty common to have multiple sessions connect to the local metastore and also to just have a remote metastore so that there are separate processes for Hive and the metastore. It’s worth understanding some of these internal operational aspects of tools such as Hive, so that you get a sense of why some of these bits are not going to be needed when you switch to the cloud. And it is some internal representation details which answer our original question. Hive is best suited to high latency applications because under the hood it’s all Hadoop and Mapreduce.
It’s using HDFS, which is inherently a batch processing system and cannot be really used for any real time processing. In contrast, Bigquery, which is the Gcp SQL like interface on top of Google Cloud storage, actually uses a different internal representation. It actually uses a columnar data store, not that different conceptually from Big table or Hbase. And that’s why Bigquery is a reasonable choice. If you are looking for low latency, almost real time applications with a SQL interface on top of a distributed file system. Try to remember as much as possible of the internal representations of these different big data technologies. You’ll find it surprisingly handy both in passing the certification tests and in making the right design choices for your cloud.