「Average Length of URL?」這是之前寫的一個小範例,由於Hadoop在0.20.0版就新增了全新的API(HADOOP-1230),不過它的文件則一直要到0.21.0版才會補齊(Modify the mapred tutorial documentation to use new mapreduce api.),而本文純粹將原先的版本改寫成新的API版本,如下所示:
import java.io.BufferedReader; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Counter; import org.apache.hadoop.mapreduce.Counters; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class URLAvgLength { static enum URLCounter { COUNT } public static class URLMapper extends Mapper<Object, Text, Text, IntWritable> { private Text len = new Text("Len"); public void map(Object key, Text value, Context context) throws IOException, InterruptedException { context.write(len, new IntWritable(value.getLength())); context.getCounter(URLCounter.COUNT).increment(1); context.setStatus(value.toString()); } } public static class URLReducer extends Reducer<Text, IntWritable, Text, IntWritable> { private IntWritable result = new IntWritable(); public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } result.set(sum); context.write(key, result); } } public static void main(String[] args) throws Exception { String input = "/user/shen/urllist/*"; String output = "/user/shen/urlavglen"; Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(conf); fs.delete(new Path(output), true); Job job = new Job(conf, "URLAvgLength"); job.setJarByClass(URLAvgLength.class); job.setMapperClass(URLMapper.class); job.setCombinerClass(URLReducer.class); job.setReducerClass(URLReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path(input)); FileOutputFormat.setOutputPath(job, new Path(output)); boolean status = job.waitForCompletion(true); Counters cs = job.getCounters(); Counter c = cs.findCounter(URLCounter.COUNT); long count = c.getValue(); InputStream in = fs.open(new Path("hdfs://shen:9000" + output + "/part-r-00000")); BufferedReader br = new BufferedReader(new InputStreamReader(in)); String line = br.readLine(); Integer value = Integer.parseInt(line.split("\t")[1]); System.out.println("Avg:" + value / count); System.exit(status ? 0 : 1); } }
上述範例中,「Counter」和「Status」都已經從原先的「Reporter」換成了「Context」來處理,同樣地可以從JobTracker Admin(default:50030)來看執行的狀態。
相關資源
你好,又來問問題
我要自己設定mapreduce map的input key
找到的資料都是說在檔案裡用tab(/t)隔開
tab前為key tab後為value
但我是著去改還是不行
請問你有碰過這種情況嗎?
還是會是Hadoop的版本問題?(我安裝的是0.19.1)
2010-03-27 01:55:26
嗯,Hadoop預設的InputFormat為TextInputFormat,而預設的key為Byte Offset
如果你要處理的資料本身就是用tab區隔Key,Value的話,那可以直接採用KevValueTextInputFormat,它就是用tab區隔Key,Value
2010-03-27 11:46:16