Exploring big data technologies with Raspberry Pi.
Big data is data which can’t be efficiently stored and processed using traditional methods.
The three v’s are commonly used to explain what makes this data so hard to deal with:
So what is Hadoop?
Hadoop is a collection of open source software that makes it easier to store and process big data.
The main components of the Hadoop framework are:
There’s a few reasons why storing data on a single machine can be problematic:
HDFS is a file system where the files are split into blocks and spread over a cluster of computers (nodes).
There are two key types of nodes in the Hadoop cluster that facilitate this file distribution:
If a DataNode in the cluster goes down, is the data it stores still available? Yes! Hadoop is fault tolerant. It achieves this by backing each file up on different DataNodes. The number of copies made is configurable and referred to as the replication factor.
The NameNode will know if a DataNode goes down as the DataNode sends the NameNode a periodic heartbeat, which it will obviously not receive in the event of a crash.
Let’s pause and go through the process of reading/writing data to HDFS.
The first thing that happens when a Hadoop client wants to write a file to HDFS is it splits the file into blocks. For example, with a 128MB block size a 400MB file will be broken up as follows:
The following process will then be followed for each block:
A few extra things to note about this process:
The read process is even simpler:
Replication facilitates recovery from DataNode failures, making HDFS fault tolerant. But what about NameNode failures? With only one NameNode, any downtime would make the cluster unavailable.
HDFS also has a solution for this, making it highly available (HA).
The basic idea is to have a standby NameNode which is ready to take over from the active NameNode in the event of a failure. For this to work, the active NameNode records all changes made to the file system in an edit log. This edit log is stored on a group of journal nodes.
The Standby NameNode reads all changes to this edit log to keep its fsimage synchronised with the active NameNode. It also receives heartbeats from DataNodes to keep an up to date image of the cluster. In the event of a NameNode crash, the standby NameNode can then continue where the active NameNode left off.
ZooKeeper is used to track when the active NameNode goes down.
Note that this approach to make HDFS highly available isn’t enforced. The cluster needs to be configured to do this.
When not running in highly availability mode, it’s still important to persist the NameNode’s fsimage / edit logs somewhere other than the NameNode. This can be achieved by backing up these file up on a remote file system.
A Secondary NameNode is also run when not in HA mode. The Secondary NameNode reads the NameNode’s edit log periodically and merges it with its local fsimage. That way when the NameNode restarts it can recreate the fsimage by starting with the checkpointed fsimage and then applying a small list of edits, rather than applying all of the edits.
HDFS provides distributed data storage. But what happens if we want to process that data? If only one machine were to do the processing, all relevant data would have to be moved from the DataNodes to that machine. This would be a huge bottleneck.
A better idea is to move the processing to the data. This is the approach taken by MapReduce.
MapReduce is a programming model which processes data in parallel on a cluster.
It consists of two key stages:
Let’s run through an example. Say we have a file which is stored in two blocks on HDFS. Block 1 contains the text
HDFS is a
distributed file
system
The second block contains the text
I am a file
in HDFS
Obviously this example is not realistic as each of the blocks has less than 50 bytes of data, but it’s easier to illustrate the process with small blocks.
We want to count the number of occurrences of each word in this file. So our goal is to produce
a | 2 |
am | 1 |
distributed | 1 |
file | 2 |
HDFS | 2 |
I | 1 |
in | 1 |
is | 1 |
system | 1 |
To do this we define a map and reduce task with the following pseudocode:
map(line):
for word in line:
output (word,1)
reduce(word,counts):
output (word,counts.sum())
Two map tasks will run in parallel - one for each block. As you can see, the map task doesn’t receive the all the text in the block at once. Rather, the block will be split up into records by the RecordReader, which is determined by the InputFormat. For text files (InputFormat is TextInputFormat), the record reader splits data up into lines. So the map function will be called once per line.
The reduce function will be called once per word ouputted by the map stage. However, before it is called all map outputs with the same word are combined. So rather than being called twice for the input (HDFS,1)
, it will be called once for the input (HDFS,[1,1])
.
The entire process looks something like
A few things to note:
[a,am,distributed,system]
going to reducer1 and the others to reducer2.Now let’s take a look at the real java code that would be used to run this job:
Some notable differences with the pseudocode:
In Hadoop 1.x the MapReduce framework included components that were responsible for providing cluster resources to run MapReduce jobs on. If you come across the terms JobTracker and TaskTracker when reading other tutorials, that’s them!
The issue with this coupling is that it prevents other distributed applications from being able to run on a Hadoop cluster. YARN (Yet Another Resource Manager) was made available in Hadoop 2.x to solve this problem. It provides apis for accessing cluster resources that can be used by any distributed execution framework, not just MapReduce 🙂.
YARN consists of two main components:
The process of running a YARN application is shown below:
The ApplicationMaster contains all the framework specific stuff. If this were a MapReduce application being run, it would be a MapReduce Application Master. This MRAppMaster would request resources from the Resource Manager to run map and reduce tasks on.
Hadoop runs in three different modes:
The first two modes will be covered in this section.
Before you can run Hadoop you need to install it. I installed it on the Raspberry Pi with the following steps:
curl -O http://binary_tarball_url
. The url can be found from the Hadoop Releases Pagetar xvzf hadoop-x.y.z.tar.gz
mv hadoop.x.y.z /opt/hadoop
sudo chown <current-user>:<current-group> /opt/hadoop -R
I’ll use pi:pi
as the user/group as that’s the default on the Raspberry Pi.
Note it’s best practice to create dedicated users for hadoop daemons and place them all in a Hadoop group. In that case we’d be changing ownership to a specific hadoop group/user. I’ve skipped that step to minimise the admin necessary to get the cluster up and running.
export HADOOP_HOME=/opt/hadoop
export JAVA_HOME=path_to_jdk
export PATH=$PATH:$HADOOP_HOME/bin:$HADOOP_HOME/sbin
For me JAVA_HOME=/usr/lib/jvm/java-8-openjdk-armhf
.
ssh-keygen -t rsa
cat ~/.ssh/id_rsa.pub >> ~/.ssh/authorized_keys
Run ssh localhost
to check you aren’t prompted for a password.
Hadoop comes with mapreduce examples that can be used to verify everything is working properly. Run the wordcount example using the following steps.
mkdir -p ~/hadoop-test/input
echo "one\ntwo\none" > ~/hadoop-test/input/input.txt
hadoop jar $HADOOP_HOME/share/hadoop/mapreduce/hadoop-mapreduce-examples-x.y.z.jar wordcount ~/hadoop-test/input ~/hadoop-test/output
cat ~/hadoop-test/output/part-r-00000
The last command should print
one 2
two 1
To set up pseudo-distributed mode the configuration files need to be edited. They are located at $HADOOP_HOME/etc/hadoop
. Here’s a basic description of those that need editing:
The configurations necessary to run in pseudo-distributed mode are:
JAVA_HOME
variable in this file.<configuration>
<property>
<name>fs.defaultFS</name>
<value>hdfs://localhost:9000</value>
</property>
</configuration>
<configuration>
<property>
<name>dfs.replication</name>
<value>1</value>
</property>
<property>
<name>dfs.namenode.name.dir</name>
<value>file:///opt/hadoop/hadoop_data/hdfs/namenode</value>
</property>
<property>
<name>dfs.datanode.name.dir</name>
<value>file:///opt/hadoop/hadoop_data/hdfs/datanode</value>
</property>
</configuration>
<configuration>
<property>
<name>mapreduce.framework.name</name>
<value>yarn</value>
</property>
<property>
<name>mapreduce.application.classpath</name>
<value>$HADOOP_MAPRED_HOME/share/hadoop/mapreduce/*:$HADOOP_MAPRED_HOME/share/hadoop/mapreduce/lib/*</value>
</property>
</configuration>
<configuration>
<property>
<name>yarn.nodemanager.aux-services</name>
<value>mapreduce_shuffle</value>
</property>
<property>
<name>yarn.nodemanager.env-whitelist</name>
<value>JAVA_HOME,HADOOP_COMMON_HOME,HADOOP_HDFS_HOME,HADOOP_CONF_DIR,CLASSPATH_PREPEND_DISTCACHE,HADOOP_YARN_HOME,HADOOP_MAPRED_HOME</value>
</property>
</configuration>
dfs.namenode.name.dir
and dfs.datanode.name.dir
are the locations of the NameNode’s fsimage and the DataNode’s data blocks on the local file system. Before we run Hadoop, we need to create those directories first.
sudo mkdir -p /opt/hadoop_tmp/hdfs/namenode
sudo mkdir -p /opt/hadoop_tmp/hdfs/datanode
sudo chown <current-user>:<current-group> -R /opt/hadoop_tmp
Run hdfs namenode -format
. This creates a fresh fsimage at dfs.namenode.name.dir
. The Hadoop cluster is now ready to be started.
There are a number of scripts in $HADOOP_HOME/sbin
that can be used to start Hadoop daemons. If you’ve followed all the instructions the scripts in this directory are already on the path. Run start-all.sh
to start the entire cluster. stop-all.sh
will stop it.
Verify the deamons are up by running jps
. You should see the NodeManager
, ResourceManager
, Namenode
, SecondaryNameNode
and DataNode
. If the NameNode
isn’t listed, you probably didn’t run hdfs namenode -format
You may notice a couple of warnings when you start the daemons.
Hadoop uses native code (machine code) rather than bytecode to increase the performance of some components. So if the native code downloaded wasn’t compiled on your system (maybe it was compiled on a 32 bit system and you’re on a 64 bit system) there’ll be some warnings. Three ways of dealing with them:
export HADOOP_OPTS="-XX:-PrintWarnings"
to hadoop-env.sh and
export HADOOP_HOME_WARN_SUPPRESS=1
export HADOOP_ROOT_LOGGER="WARN,DRFA"
to the bash profile.
The file system commands are very similar to Linux. The general structure of a command is
hadoop fs -<command> <args>
For example
hadoop fs -ls .
hadoop fs -rm -r temp-dir
hadoop fs -cat ouptut/test.txt
To run the same MapReduce job as in standalone the input file needs to be on hdfs. To do that first create a directory for your user.
hadoop fs -mkdir -p /user/<username>
The username can be found by running whomai
.
This directory is your home directory, so you don’t need to prefix all paths with /user/<username>
Now put the input file on hdfs and check the operation was successful:
hadoop fs -mkdir input
echo "one\ntwo\none" > ~/hadoop-test/input/input.txt
hadoop fs -copyFromLocal ~/hadoop-test/input/input.txt input/input.txt
hadoop fs -ls input
If so, run the job:
hadoop jar $HADOOP_HOME/share/hadoop/mapreduce/hadoop-mapreduce-examples-x.y.z.jar wordcount input output
hadoop fs -cat output/part-r-00000
The same output as in standalone mode should have been printed.
Custom MapReduce jobs can be run too. To demonstrate this let’s compile and run a wordcount program from scratch.
mkdir -p ~/hadoop-test/custom && cd ~/hadoop-test/custom
curl -O https://gist.githubusercontent.com/jdoldissen/ad6b12ced708e6ce698cf4a6b0d5128c/raw/ff61660212698a8c81260a960108854bdee95e6e/WordCount.java
javac -classpath $(hadoop classpath) WordCount.java
jar cvf WordCount.jar *.class
hadoop fs -rm -r output # Hadoop complains if the output directory is already there
hadoop jar WordCount.jar WordCount input output # Assuming you created the input file in the previous example
hadoop fs -cat output/part-r-00000
Once again the same output should be produced.
The master deamons come with web interfaces. These can be useful for checking the logs of a job, health of the cluster, file system state and a lot more.
The addresses of the interfaces are:
There’s a number of useful services the web interfaces provide. For the NameNode’s:
For the Resource Manager’s:
A cluster of Raspberry Pis are used to run Hadoop on in this section. See this page for more info on the pis.
Nodes in the cluster connect via ssh. As we’ll be relying on IP addresses to do this it’s best to set static IP addresses for each node.
To set a static IP address on the Raspberry Pi see this stack exchange answer.
Once set log onto your router and reserve the addresses. This prevents the DHCP server which runs on the router from giving the addresses out to other machines.
It’ll be less confusing if each node has a unique hostname rather than the default rasbperry pi. This is because you will likely have terminal panes for each pi and you want to be able to tell the difference, as shown in the below image:
To update the hostname open /etc/hostname
and change raspberrypi
to the desired name. Something like node1
or pi1
would do fine. Then open /etc/hosts
and again change raspberrypi
to the new name. Do this for each node.
The /etc/hosts
file is used for local domain translation (like a local DNS server). It will be useful to be able to translate the hostnames of other nodes to IP addresses. To do this add an entry in the file for each node:
x.x.x.x pi1
x.x.x.y pi2
...
...
You also need to remove lines that map the hostname of the machine to localhost (the lines you just edited in the step above). For example on pi1 remove the line
127.0.0.1 pi1
Your cluster won’t work if you don’t do this as some daemons (e.g. the NameNode) will listen on the loopback interface rather than an externally accessible interface.
As the loopback entries have been removed /etc/hosts
should be the same on each machine. As long as you haven’t added custom entries on the other nodes you can simply copy it using scp
.
Two things are going to make ssh play nicely:
pi@192.168.1.107
as pi1 add the following to ~/.ssh/config
:
HOST pi1
HostName 192.168.1.107
User pi
Now ssh is as easy as
ssh pi1
The config should contain aliases for all nodes on the network and should be identical on every node. So again just create the config once and use scp
.
cat ~/.ssh/id_rsa.pub | ssh pi1 'cat >> ~/.ssh/authorized_keys'
where pi1 is the alias of the machine the key is being copied to. Once this command has been run from each machine in the cluster, the authorized_keys file on pi1 can be copied to other machines using scp
. Don’t forget to copy pi1’s public key into its own authorized_keys file.
yarn-site.xml
and mapred-site.xml
will depend on the machine’s RAM. I have 4GB of RAM on each machine. See this page for a good explanation of how to set these properties.
JAVA_HOME
variable in this file.<configuration>
<name>fs.defaultFS</name>
<value>hdfs://pi1:9000</value>
</configuration>
Note pi1 should be the alias of the master node in /etc/hosts
.
<configuration>
<property>
<name>dfs.replication</name>
<value>1</value>
</property>
<property>
<name>dfs.namenode.name.dir</name>
<value>file:///opt/hadoop_tmp/hdfs/namenode</value>
</property>
<property>
<name>dfs.datanode.name.dir</name>
<value>file:///opt/hadoop_tmp/hdfs/datanode</value>
</property>
<property>
<name>dfs.namenode.checkpoint.dir</name>
<value>file:///opt/hadoop_tmp/hdfs/namenodesecond</value>
</property>
</configuration>
<configuration>
<property>
<name>mapreduce.framework.name</name>
<value>yarn</value>
</property>
<property>
<name>mapreduce.application.classpath</name>
<value>$HADOOP_MAPRED_HOME/share/hadoop/mapreduce/*:$HADOOP_MAPRED_HOME/share/hadoop/mapreduce/lib/*</value>
</property>
<property>
<name>yarn.app.mapreduce.am.resource.mb</name>
<value>512</value>
</property>
<property>
<name>mapreduce.map.memory.mb</name>
<value>256</value>
</property>
<property>
<name>mapreduce.reduce.memory.mb</name>
<value>256</value>
</property>
</configuration>
<configuration>
<property>
<name>yarn.resourcemanager.hostname</name>
<value>pi1</value>
</property>
<property>
<name>yarn.nodemanager.aux-services</name>
<value>mapreduce_shuffle</value>
</property>
<property>
<name>yarn.nodemanager.env-whitelist</name>
<value>JAVA_HOME,HADOOP_COMMON_HOME,HADOOP_HDFS_HOME,HADOOP_CONF_DIR,CLASSPATH_PREPEND_DISTCACHE,HADOOP_YARN_HOME,HADOOP_MAPRED_HOME</value>
</property>
<property>
<name>yarn.nodemanager.resource.memory-mb</name>
<value>1536</value>
</property>
<property>
<name>yarn.scheduler.maximum-allocation-mb</name>
<value>1536</value>
</property>
<property>
<name>yarn.scheduler.minimum-allocation-mb</name>
<value>128</value>
</property>
<property>
<name>yarn.nodemanager.vmem-check-enabled</name>
<value>false</value>
</property>
</configuration>
scp hadoop-x.y.z.tar.gz pi2:/home/pi
ssh pi2
sudo mv hadoop-x.y.z.tar.gz /opt && cd /opt
sudo tar xvzf hadoop-x.y.z.tar.gz
sudo mv hadoop.x.y.z hadoop
sudo chown pi:pi /opt/hadoop -R
sudo mv hadoop.x.y.z hadoop
) but first tar the directory:
tar czvf my-hadoop.tar.gz /opt/hadoop
for node in pi2 pi3 pi4; do
scp ~/hadoop/etc/hadoop/* $node:/opt/hadoop/etc/hadoop/;
done
sudo mkdir /opt/hadoop_tmp/hdfs
sudo chown -R pi:pi /opt/hadoop/hadoop_tmp
Note this is the local directory where hdfs data will actually be held.
On the cluster one node will be the master and all others workers. The master node runs the NameNode, Secondary NameNode and ResourceManager daemons. Each worker runs the DataNode and NodeManager daemons.
On the master node, add the file $HADOOP_HOME/etc/hadoop/workers
and list the workers. For example if you have 4 nodes and 2-4 are the workers the file resides on pi1 with contents:
pi2
pi3
pi4
Format the namenode and start up the cluster:
hdfs namenode -format
start-all.sh
start-all.sh
will start daemons across the cluster. Check it worked by running jps
on each node and ensuring the daemons discussed above are running.
One error you may see when running jobs with the above configuration is Error: Java heap space
The fix to this is to add the following to mapred-site.xml
:
<property>
<name>mapred.child.java.opts</name>
<value>-Xmx2048m</value>
</property>
Xmx is a java option that sets the max Java heap size. It’s an absolute value, so it will depend on available RAM. Above I’m giving 2GB of heap space.
See the pseudo-distributed mode word count instructions.
Let’s get some more data onto this cluster!
Hadoop comes with some useful tools for benchmarking how your cluster is performing. You can generate large amounds of data with the teragen program. For example, to store 4GB of random data in the directory random-data on hdfs run
hadoop jar $HADOOP_HOME/share/hadoop/mapreduce/hadoop-mapreduce-examples-x.y.z.jar teragen 40000000 random-data
The first parameter is the number of rows to write. Each row contains 100B of data.
Now let’s sort the 4GB of data:
hadoop jar $HADOOP_HOME/share/hadoop/mapreduce/hadoop-mapreduce-examples-x.y.z.jar terasort random-data sorted-data
While this is running you can logon to the web UI and track the progress, see what nodes tasks are running on etc.
There’s also a builin program to check the sort was successful. Any errors will be shown in the report/part-r-00000 file:
hadoop jar $HADOOP_HOME/share/hadoop/mapreduce/hadoop-mapreduce-examples-x.y.z.jar teravalidate sorted-data report
tree /opt/hadoop_tmp
.
You’ll see that there are a number of edit log files but only one with “inprogress” in its name. These are all the edits since the last checkpoint took place.yarn --help
, hdfs --help
and hadoop --help
and explore a little. Some commands I’ve found useful:
yarn -application -list # List running apps. If you want to see finished, use "-appStates ALL"
hdfs datanode # Start the datanode. This is useful when the datanode won't start when executing the helper scripts as you can conveniently see the startup log
hdfs namenode # Start the namenode.
hdfs dfsadmin -safemode get # Check if the cluster is in safemode
hdfs fsck / # Check the health of the file system
hdfs fsck /user/pi/input/input.txt -files -blocks # Show the blocks that input.txt is stored in
hdfs dfsadmin -report # Display the current status of the file system
yarn logs -applicationId <applicationId> # Show the yarn logs for the given application id (only works if log aggregation is enabled - see above point for details).
export HADOOP_CONF_DIR=xxx # Great for switching between configurations (e.g. you want to have a pseudo-distributed conf and a cluster conf on one machine)
hdfs dfs -setrep -w <REPLICATION_FACTOR> <PATH> # Change the replication factor of an existing file