Hadoop Streams

From Medialab

The Hadoop streaming utility allows running any executable as a mapper on a Map/Reduce cluster.

One of the typical uses is to run a program with 0 reducers, so that the input is split into partitions, processed in parallalel and then recombined by just concatenating the output.

In this solution the mapper program does not handle the (key/value) pairs directly.

The standard approach is to use key with empty values. The mapper program will deal with just the key that it will receive on its standard input.

In order to split the input one uses a suitable InputFormat that outputs pairs (file partition/empty). Besides various predefined formats, e.g. TextInputFormat that splits a file at newlines, for text processing applications one can write its own format.

Using reducers resulting output would be sorted according to the key, which is the input itself; using no reducers you'd just have one file for every processed split without any information of it's position in starting file: hence the results will be shuffled, concatenated in an unpredictable order (it is not known where the output of each partition ends up in the result).

Our solution to overcome this problem is to use a "hand-made" streaming by generating and handling suitable <key>/<value> pairs behind the scenes:

  • method next() of our InputFormat outputs <offset in file>/<text to be processed> pairs;
  • the Hadoop Mapper runs the mapper program by issuing exec(), passes the text partition to the mapper and grabs the tool stdout and stderr with two threads; processed text is then paired with the original <key>;
  • the Reduce phase uses a single reducer which sorts pairs by <key> and produces a single output file;
  • our specialized OutputFormat dumps <key> and outputs just the <value>.

This solution allows us to process a file with Hadoop streaming with a mapper written in any language preserving in the output the order of the original.


Build

Here follow steps needed to build Tanl package and to create a jar file.


  • Download Tanl package to your hd:
statuz@dhcp-131-114-3-113 ~ > \
wget http://medialab.di.unipi.it/wiki/images/e/e7/TanlStreaming-1.0-all.tgz
--2009-03-16 12:01:24--  http://medialab.di.unipi.it/wiki/images/e/e7/TanlStreaming-1.0-all.tgz
Risoluzione di medialab.di.unipi.it... 131.114.3.241
Connessione a medialab.di.unipi.it|131.114.3.241|:80... connesso.
HTTP richiesta inviata, in attesa di risposta... 200 OK
Lunghezza: 7721 (7,5K) [application/x-gzip]
Salvataggio in: "TanlStreaming-1.0-all.tgz"
100%[===============================================================>] 7721 --.-K/s   in 0,001s
2009-03-16 12:01:24 (7,12 MB/s) - "TanlStreaming-1.0-all.tgz" salvato [7721/7721]
  • Extract archive:
statuz@dhcp-131-114-3-113 ~ > tar xzf TanlStreaming-1.0-all.tgz
  • Create a directory for classes:
statuz@dhcp-131-114-3-113 ~ > cd src/
statuz@dhcp-131-114-3-113 ~/src > mkdir build
  • Build package:
statuz@dhcp-131-114-3-113 ~/src > javac -classpath \
$HADOOP_HOME/hadoop-0.17.2.1-core.jar:$HADOOP_HOME/lib/commons-logging-api-1.0.4.jar \
-d build/ org/medialab/tanl/*java
Note: Some input files use or override a deprecated API.
Note: Recompile with -Xlint:deprecation for details.
Note: org/medialab/tanl/MainRunner.java uses unchecked or unsafe operations.
Note: Recompile with -Xlint:unchecked for details.
  • Create jar file:
statuz@dhcp-131-114-3-113 ~/src > jar -cf runner.jar -C build/ .

And you got it!

statuz@dhcp-131-114-3-113 ~/src > ls -l
total 48
drwx------  3 statuz  staff    102 16 Mar 12:07 build
drwx------@ 3 statuz  staff    102  3 Mar 14:27 org
-rw-------  1 statuz  staff  21460 16 Mar 12:23 runner.jar


Usage

In general terms you can invoke Tanl' streaming as follows:

$HADOOP_HOME/bin/hadoop jar runner.jar org.medialab.tanl.MainRunner \
-Dtanl.cmdToRun="script to execute" \
-Dtanl.inputFormat="input format you wanna use" \
-Dtanl.multiOutput=true/false
[inputformat options] \
HDFS_input_dir HDFS_output_dir

Where HDFS_input_dir is the directory that contains files to be processed and HDFS_output_dir is where you want to put processed ones.

You have to specify absolute path of the program/script you wanna execute and the input format to properly handle your input files. In order to be Tanl-complaining your input format next() method have to return <LongWritable, Text> couples.


InputFormats

You can find four predefined InputFormats in Tanl package:


LineInputFormat: Reads file line by line.

Available options:

  • tanl.useOptimization [bool]: to reduce overhead between java and native tool you can make records bigger
  • tanl.expectedBlockSizeMb [int]: max record size in Mbyte when using optimization

e.g.:

... -Dtanl.separator="." -Dtanl.useOptimization=true -Dtanl.expectedBlockSizeMb=24 ...

split text at a line ending with a maximum record size of 24 Mbytes.


TagInputFormat: Records are delimited by given 'tags'.

Available options:

  • tanl.startTag [string]: opening tag have to start with this
  • tanl.endTag [string]: closing tag have to start with this
  • tanl.closingString [string]: opening and closing tags have to end with this
  • tanl.excludeTags [bool]: specify if tags have to be part of value

e.g.:

... -Dtanl.startTag="<doc" -Dtanl.endTag="</doc" -Dtanl.closingString=">" -Dtanl.excludeTags=true ...

splits text between <doc ... > and </doc> strings without including these in text to be processed.


DotInputFormat: Records are separated by a specified line followed by an empty line.

Available options:

  • tanl.separator [string]: the string that have to preceed the empty line
  • tanl.useOptimization [bool]: to reduce overhead between java and native tool you can make records bigger
  • tanl.expectedBlockSizeMb [int]: max record size in Mbyte when using optimization

e.g.:

... -Dtanl.separator="." -Dtanl.useOptimization=true -Dtanl.expectedBlockSizeMb=24 ...

split text when reaches .\n\n, maximum record size is 24 Mbytes.


ConllInputFormat: Records are separated by a line ending with specified separator followed by an empy line. Separator spaces will be treated as tabs. (e.g.: "a b" becomes "a\tb").

Available options:

  • tanl.separator [string]: the string that have to preceed the empty line
  • tanl.useOptimization [bool]: to reduce overhead between java and native tool you can make records bigger
  • tanl.expectedBlockSizeMb [int]: max record size in Mbyte when using optimization

e.g.:

... -Dtanl.separator="a b" -Dtanl.useOptimization=true -Dtanl.expectedBlockSizeMb=24 ...

split text when reaches a\tb, maximum record size is 24 Mbytes.


About reducers

When using more than one reducer, option tanl.multiOutput must be set to true. With this option every file outputted by reducer will be named like chunk_endingOffsetInFile. With this trick on it will be easy to concat files preserving original order.

Downloads

Version 1.0. All platforms.

Source code (.tgz)


References

  1. [Hadoop Streaming]
  2. [Google Cluster Computing slides]