本文主要介紹在Hadoop 0.19.0就開始提供的Chaining Maps(Hadoop-3702)功能,在開始介紹之前先假想下述情況~ 是否曾在一個Mapper中處理許多步驟的事項?或者是否曾在Reducer結束之後還要針對每筆資料個別處理的情況?簡單來說~ 如果在你的MapReduce程式中有需要preprocessing或postprocessing的情況,那就適合用Chaining Maps來完成。
這裡我們假設將MapReduce的執行順序用「[MR]+」符號來表示,那麼透過Chaining Maps所執行的工作就可以用「M+RM*」來表示,代表可以執行一個以上的Mapper(preprocessing),接著在Reducer之後再由零或多個Mapper做後續的處理(postprocessing),這樣的作法可以帶來一些額外的好處,譬如:方便除錯、測試、易維護以及MapReduce程式的可重用性,下述是一個簡單的展示範例,主要藉由「ChainMapper」和「ChainReducer」兩個類別來完成,此範例執行的順序為:「WordSegmentationMap | StopWordMap | Reduce | FilterMap」。
Chaining Maps for Hadoop 0.21.x
import java.io.IOException;
import java.util.HashSet;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Cluster;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.chain.ChainMapper;
import org.apache.hadoop.mapreduce.lib.chain.ChainReducer;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
public class ChainExample
{
public static class WordSegmentationMap extends Mapper<LongWritable, Text, Text, IntWritable>
{
@Override
public void map(LongWritable key, Text value , Context context) throws IOException, InterruptedException
{
String s[] = value.toString().split(" ");
for(String w : s)
context.write(new Text(w), new IntWritable(1));
}
}
public static class StopWordMap extends Mapper<Text, IntWritable, Text, IntWritable>
{
private Set<String> stopwords = new HashSet<String>();
public static final String[] ENGLISH_STOP_WORDS = {
"a", "and", "are", "as", "at", "be", "but", "by",
"for", "if", "in", "into", "is", "it",
"no", "not", "of", "on", "or", "s", "such",
"t", "that", "the", "their", "then", "there", "these",
"they", "this", "to", "was", "will", "with"
};
protected void setup(Context context) throws IOException, InterruptedException
{
for(int i = 0 ; i < ENGLISH_STOP_WORDS.length ; i++)
stopwords.add(ENGLISH_STOP_WORDS[i]);
}
@Override
public void map(Text key, IntWritable value , Context context) throws IOException, InterruptedException
{
if(!stopwords.contains(key.toString()))
context.write(key, value);
}
}
public static class Reduce extends Reducer<Text, IntWritable, Text, IntWritable>
{
@Override
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException
{
int sum = 0;
for(IntWritable i : values)
sum++;
context.write(key, new IntWritable(sum));
}
}
public static class FilterMap extends Mapper<Text, IntWritable, IntWritable,Text>
{
@Override
public void map(Text key, IntWritable value, Context context) throws IOException, InterruptedException
{
if(value.get()>100)
context.write(value, key);
}
}
public static void main(String[] args)throws Exception
{
String output = "/chainExample_out/";
Configuration conf = new Configuration();
Cluster cluster = new Cluster(conf);
FileSystem fs = FileSystem.get(conf);
fs.delete(new Path(output), true);
Job job = Job.getInstance(cluster);
job.setJarByClass(ChainExample.class);
job.setJobName("ChainExample");
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
TextInputFormat.setInputPaths(job, new Path("/chainExample/*"));
TextOutputFormat.setOutputPath(job, new Path(output));
ChainMapper.addMapper(job, WordSegmentationMap.class, LongWritable.class, Text.class, Text.class, IntWritable.class, new Configuration(false));
ChainMapper.addMapper(job, StopWordMap.class, Text.class, IntWritable.class, Text.class, IntWritable.class, new Configuration(false));
ChainReducer.setReducer(job, Reduce.class, Text.class, IntWritable.class, Text.class, IntWritable.class, new Configuration(false));
ChainReducer.addMapper(job, FilterMap.class, Text.class, IntWritable.class, IntWritable.class, Text.class, new Configuration(false));
job.waitForCompletion(true);
}
}