Thursday, 31 July 2014

Big Data Basics - Part 5 - Introduction to MapReduce

Big Data Basics - Part 5 - Introduction to MapReduce

Problem

I have read the previous tips in the Big Data Basics series including the storage aspects (HDFS). I am curious about the computation aspect of Hadoop and want to know what it is all about, how it works, and any other relevant information.

Solution

In this tip we will take a look at the 2nd core component of Hadoop framework called MapReduce. This component is responsible for computation / data processing.

Introduction

MapReduce is basically a software programming model / software framework, which allows us to process data in parallel across multiple computers in a cluster, often running on commodity hardware, in a reliable and fault-tolerant fashion.

Key Concepts

Here are some of the key concepts related to MapReduce.

Job

A Job in the context of Hadoop MapReduce is the unit of work to be performed as requested by the client / user. The information associated with the Job includes the data to be processed (input data), MapReduce logic / program / algorithm, and any other relevant configuration information necessary to execute the Job.

Task

Hadoop MapReduce divides a Job into multiple sub-jobs known as Tasks. These tasks can be run independent of each other on various nodes across the cluster. There are primarily two types of Tasks - Map Tasks and Reduce Tasks.

JobTracker

Just like the storage (HDFS), the computation (MapReduce) also works in a master-slave / master-worker fashion. A JobTracker node acts as the Master and is responsible for scheduling / executing Tasks on appropriate nodes, coordinating the execution of tasks, sending the information for the execution of tasks, getting the results back after the execution of each task, re-executing the failed Tasks, and monitors / maintains the overall progress of the Job.  Since a Job consists of multiple Tasks, a Job's progress depends on the status / progress of Tasks associated with it. There is only one JobTracker node per Hadoop Cluster.

TaskTracker

A TaskTracker node acts as the Slave and is responsible for executing a Task assigned to it by the JobTracker. There is no restriction on the number of TaskTracker nodes that can exist in a Hadoop Cluster. TaskTracker receives the information necessary for execution of a Task from JobTracker, Executes the Task, and Sends the Results back to JobTracker.

Map()

Map Task in MapReduce is performed using the Map() function. This part of the MapReduce is responsible for processing one or more chunks of data and producing the output results.

Reduce()

The next part / component / stage of the MapReduce programming model is the Reduce() function. This part of the MapReduce is responsible for consolidating the results produced by each of the Map() functions/tasks.

Data Locality

MapReduce tries to place the data and the compute as close as possible. First, it tries to put the compute on the same node where data resides, if that cannot be done (due to reasons like compute on that node is down, compute on that node is performing some other computation, etc.), then it tries to put the compute on the node nearest to the respective data node(s) which contains the data to be processed. This feature of MapReduce is "Data Locality".

How Map Reduce Works

The following diagram shows the logical flow of a MapReduce programming model.
Hadoop - MapReduce - Logical Data Flow
Let us understand each of the stages depicted in the above diagram.
  • Input: This is the input data / file to be processed.
  • Split: Hadoop splits the incoming data into smaller pieces called "splits".
  • Map: In this step, MapReduce processes each split according to the logic defined in map() function. Each mapper works on each split at a time. Each mapper is treated as a task and multiple tasks are executed across different TaskTrackers and coordinated by the JobTracker.
  • Combine: This is an optional step and is used to improve the performance by reducing the amount of data transferred across the network. Combiner is the same as the reduce step and is used for aggregating the output of the map() function before it is passed to the subsequent steps.
  • Shuffle & Sort: In this step, outputs from all the mappers is shuffled, sorted to put them in order, and grouped before sending them to the next step.
  • Reduce: This step is used to aggregate the outputs of mappers using the reduce() function. Output of reducer is sent to the next and final step. Each reducer is treated as a task and multiple tasks are executed across different TaskTrackers and coordinated by the JobTracker.
  • Output: Finally the output of reduce step is written to a file in HDFS.

MapReduce Word Count Example

For the purpose of understanding MapReduce, let us consider a simple example. Let us assume that we have a file which contains the following four lines of text.
Hadoop - MapReduce - Word Count Example - Input File
In this file, we need to count the number of occurrences of each word. For instance, DW appears twice, BI appears once, SSRS appears twice, and so on. Let us see how this counting operation is performed when this file is input to MapReduce.
Below is a simplified representation of the data flow for Word Count Example.
Hadoop - MapReduce - Word Count Example - Data Flow
  • Input: In this step, the sample file is input to MapReduce.
  • Split: In this step, Hadoop splits / divides our sample input file into four parts, each part made up of one line from the input file. Note that, for the purpose of this example, we are considering one line as each split. However, this is not necessarily true in a real-time scenario.
  • Map: In this step, each split is fed to a mapper which is the map() function containing the logic on how to process the input data, which in our case is the line of text present in the split. For our scenario, the map() function would contain the logic to count the occurrence of each word and each occurrence is captured / arranged as a (key, value) pair, which in our case is like (SQL, 1), (DW, 1), (SQL, 1), and so on.
  • Combine: This is an optional step and is often used to improve the performance by reducing the amount of data transferred across the network. This is essentially the same as the reducer (reduce() function) and acts on output from each mapper. In our example, the key value pairs from first mapper "(SQL, 1), (DW, 1), (SQL, 1)" are combined and the output of the corresponding combiner becomes "(SQL, 2), (DW, 1)".
  • Shuffle and Sort: In this step, output of all the mappers is collected, shuffled, and sorted and arranged to be sent to reducer.
  • Reduce: In this step, the collective data from various mappers, after being shuffled and sorted, is combined / aggregated and the word counts are produced as (key, value) pairs like (BI, 1), (DW, 2), (SQL, 5), and so on.
  • Output: In this step, the output of the reducer is written to a file on HDFS. The following image is the output of our word count example.
Hadoop - MapReduce - Word Count Example - Output File

Highlights of Hadoop MapReduce

Here are few highlights of MapReduce programming model in Hadoop:
  • MapReduce works in a master-slave / master-worker fashion. JobTracker acts as the master and TaskTrackers act as the slaves.
  • MapReduce has two major phases - A Map phase and a Reduce phase. Map phase processes parts of input data using mappers based on the logic defined in the map() function. The Reduce phase aggregates the data using a reducer based on the logic defined in the reduce() function.
  • Depending upon the problem at hand, we can have One Reduce Task, Multiple Reduce Tasks or No Reduce Tasks.
  • MapReduce has built-in fault tolerance and hence can run on commodity hardware.
  • MapReduce takes care of distributing the data across various nodes, assigning the tasks to each of the nodes, getting the results back from each node, re-running the task in case of any node failures, consolidation of results, etc.
  • MapReduce processes the data in the form of (Key, Value) pairs. Hence, we need to fit out business problem in this Key-Value arrangement.

References

Next Steps

  • Explore more about Big Data and Hadoop
  • In the next and subsequent tips, we will look at the other aspects of Hadoop and the Big Data world. So stay tuned!

 

Big Data Basics - Part 4 - Introduction to HDFS

Big Data Basics - Part 4 - Introduction to HDFS

Problem

I have read the previous tips in the Big Data Basics series and I would like to know more about the Hadoop Distributed File System (HDFS). I would like to know about relevant information related to HDFS.

Solution

Before we take a look at the architecture of HDFS, let us first take a look at some of the key concepts.

Introduction

HDFS stands for Hadoop Distributed File System. HDFS is one of the core components of the Hadoop framework and is responsible for the storage aspect. Unlike the usual storage available on our computers, HDFS is a Distributed File System and parts of a single large file can be stored on different nodes across the cluster. HDFS is a distributed, reliable, and scalable file system.

Key Concepts

Here are some of the key concepts related to HDFS.

NameNode

HDFS works in a master-slave/master-worker fashion. All the metadata related to HDFS including the information about data nodes, files stored on HDFS, and Replication, etc. are stored and maintained on the NameNode. A NameNode serves as the master and there is only one NameNode per cluster.

DataNode

DataNode is the slave/worker node and holds the user data in the form of Data Blocks. There can be any number of DataNodes in a Hadoop Cluster.

Data Block

A Data Block can be considered as the standard unit of data/files stored on HDFS. Each incoming file is broken into 64 MB by default. Any file larger than 64 MB is broken down into 64 MB blocks and all the blocks which make up a particular file are of the same size (64 MB) except for the last block which might be less than 64 MB depending upon the size of the file.

Replication

Data blocks are replicated across different nodes in the cluster to ensure a high degree of fault tolerance. Replication enables the use of low cost commodity hardware for the storage of data. The number of replicas to be made/maintained is configurable at the cluster level as well as for each file. Based on the Replication Factor, each file (data block which forms each file) is replicated many times across different nodes in the cluster.

Rack Awareness

Data is replicated across different nodes in the cluster to ensure reliability/fault tolerance. Replication of data blocks is done based on the location of the data node, so as to ensure high degree of fault tolerance. For instance, one or two copies of data blocks are stored on the same rack, one copy is stored on a different rack within the same data center, one more block on a rack in a different data center, and so on.

HDFS Architecture

Below is a high-level architecture of HDFS.
Architecture of Multi-Node Hadoop Cluster

Image Source: http://hadoop.apache.org/docs/stable1/hdfs_design.html Here are the highlights of this architecture.

Master-Slave/Master-Worker Architecture

HDFS works in a master-slave/master-worker fashion.
  • NameNode servers as the master and each DataNode servers as a worker/slave.
  • NameNode and each DataNode have built-in web servers.
  • NameNode is the heart of HDFS and is responsible for various tasks including - it holds the file system namespace, controls access to file system by the clients, keeps track of the DataNodes, keeps track of replication factor and ensures that it is always maintained.
  • User data is stored on the local file system of DataNodes. DataNode is not aware of the files to which the blocks stored on it belong to. As a result of this, if the NameNode goes down then the data in HDFS is non-usable as only the NameNode knows which blocks belong to which file, where each block located etc.
  • NameNode can talk to all the live DataNodes and the live DataNodes can talk to each other.
  • There is also a Secondary NameNode which comes in handy when the Primary NameNode goes down. Secondary NameNode can be brought up to bring the cluster online. This process of switching of nodes needs to be done manually and there is no automatic failover mechanism in place.
  • NameNode receives heartbeat signals and a Block Report periodically from each of the DataNodes.
    • Heartbeat signals from a DataNode indicates that the corresponding DataNode is alive and is working fine. If a heartbeat signal is not received from a DataNode then that DataNode is marked as dead and no further I/O requests are sent to that DataNode.
    • Block Report from each DataNode contains a list of all the blocks that are stored on that DataNode.
    • Heartbeat signals and Block Reports received from each DataNode help the NameNode to identify any potential loss of Blocks/Data and to replicate the Blocks to other active DataNodes in the event when one or more DataNodes holding the data blocks go down.

Data Replication

Data is replicated across different DataNodes to ensure a high degree of fault-tolerance.
  • Replication Factor can be configured at a cluster level (Default is set to 3) and also at a file level.
  • The need for data replication can arise in various scenarios like the Replication Factor is changed, a DataNode goes down, and when Data Blocks get corrupted, etc.
  • When the replication factor is increased (At the cluster level or for a particular file), NameNode identifies the data nodes to which the data blocks need to be replicated and initiates the replication.
  • When the replication factor is reduced (At the cluster level or for a particular file), NameNode identifies the data nodes from which the data blocks need to be deleted and initiates the deletion.

General

Here are few general highlights about HDFS.
  • HDFS is implemented in Java and any computer which can run Java can host a NameNode/DataNode on it.
  • Designed to be portable across all the major hardware and software platforms.
  • Basic file operations include reading and writing of files, creation, deletion, and replication of files, etc.
  • Provides necessary interfaces which enable the movement of computation to the data unlike in traditional data processing systems where the data moves to computation.
  • HDFS provides a command line interface called "FS Shell" used to interact with HDFS.

When to Use HDFS (HDFS Use Cases)

There are many use cases for HDFS including the following:
  • HDFS can be used for storing the Big Data sets for further processing.
  • HDFS can be used for storing archive data since it is cheaper as HDFS allows storing the data on low cost commodity hardware while ensuring a high degree of fault-tolerance.

When Not to Use HDFS

There are certain scenarios in which HDFS may not be a good fit including the following:
  • HDFS is not suitable for storing data related to applications requiring low latency data access.
  • HDFS is not suitable for storing a large number of small files as the metadata for each file needs to be stored on the NameNode and is held in memory.
  • HDFS is not suitable for scenarios requiring multiple/simultaneous writes to the same file.

References

Next Steps

  • Explore more about Big Data and Hadoop
    • Big Data Basics - Part 1 - Introduction to Big Data
    • Big Data Basics - Part 2 - Overview of Big Data Architecture
    • Big Data Basics - Part 3 - Overview of Hadoop
    • Compare Big Data Platforms vs SQL Server
  • In the next and subsequent tips, we will look at the other aspects of Hadoop and the Big Data world. So stay tuned!


 

Related Posts Plugin for WordPress, Blogger...