Pairwise Document Similarity in Large Collections with MapReduce.這是一篇由UMD的一位博士生Tamer M. Elsayed和他的指導教授所共同發表在「ACL-08: HLT」的短篇論文,主要用「MapReduce」來處理大量文件相似度的計算,如果您對這篇論文有興趣的話,請參考上述論文連結,筆者不再詳述。
下述筆者撰寫的驗證程式需要用到「Cloud9 - A MapReduce Library for Hadoop」,「Cloud9」是由UMD所開發的,主要用來作為課程的教學工具和一些文字處理方面的研究,它採用Apache License,所以您可以直接用Subversion checkout下來使用,而下述範例主要用到「PairOfIntString」和「ArrayListWritable」。
Pairwise Document Similarity
在進行「Pairwise Similarity」的「Map」階段時,筆者純粹利用Regular Expression來處理~ 這並不是最佳的處理方式(我承認偷懶~),最佳的方式應該撰寫一些特定的「OutputFormat」和「Writable」來加以處理,整個效率才會大大的提高!(如:Cloud9所提供的Tuple)
由於此範例需要處理二次的MapReduce,所以筆者直接利用「job2.addDependingJob(job1);」將兩個Job產生相依性,也就是先執行job1完成之後JobControl才會去呼叫job2開始執行。
import java.io.IOException; import java.util.*; import java.util.regex.Matcher; import java.util.regex.Pattern; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.*; import org.apache.hadoop.mapred.*; import org.apache.hadoop.mapred.jobcontrol.Job; import org.apache.hadoop.mapred.jobcontrol.JobControl; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import edu.umd.cloud9.io.ArrayListWritable; import edu.umd.cloud9.io.PairOfIntString; public class PairwiseDS extends Configured implements Tool { public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, Text, PairOfIntString> { private Text word = new Text(); public void map(LongWritable key, Text value, OutputCollector<Text, PairOfIntString> output, Reporter reporter) throws IOException { FileSplit fileSplit = (FileSplit) reporter.getInputSplit(); String fileName = fileSplit.getPath().getName(); fileName = fileName.substring(0, fileName.length() - 4); String line = value.toString(); StringTokenizer tokenizer = new StringTokenizer(line); while (tokenizer.hasMoreTokens()) { word.set(tokenizer.nextToken()); output.collect(word, new PairOfIntString(1, fileName)); } } } public static class Reduce extends MapReduceBase implements Reducer<Text, PairOfIntString, Text, ArrayListWritable> { public void reduce(Text key, Iterator<PairOfIntString> values, OutputCollector<Text, ArrayListWritable> output, Reporter reporter) throws IOException { ArrayList<PairOfIntString> al = new ArrayList<PairOfIntString>(); HashMap<String, Integer> map = new HashMap<String, Integer>(); while (values.hasNext()) { PairOfIntString psi = values.next(); if (map.containsKey(psi.getRightElement())) { Integer i = (Integer) map.get(psi.getRightElement()); map.put(psi.getRightElement(), i.intValue() + 1); } else { map.put(psi.getRightElement(), psi.getLeftElement()); } } Iterator i = map.entrySet().iterator(); while (i.hasNext()) { java.util.Map.Entry m = (java.util.Map.Entry) i.next(); al.add(new PairOfIntString((Integer) m.getValue(), (String) m .getKey())); } output.collect(key, new ArrayListWritable<PairOfIntString>(al)); } } public static class Map2 extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> { private Text word = new Text(); public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { String line = value.toString().trim(); ArrayList<String> keyList = new ArrayList<String>(); ArrayList<Integer> valList = new ArrayList<Integer>(); String p = "\\(([0-9]+), ([a-z0-9.]+)\\)"; Pattern r = Pattern.compile(p); Matcher m = r.matcher(line); while (m.find()) { String k = m.group(2); String v = m.group(1); keyList.add(k); valList.add(new Integer(v)); } if (keyList.size() > 1) { String[] key_arr = keyList.toArray(new String[0]); Integer[] val_arr = valList.toArray(new Integer[0]); int klen = key_arr.length; for (int i = 0; i < klen; i++) { for (int j = i + 1; j < klen; j++) { word.set(key_arr[i] + "," + key_arr[j]); output.collect(word, new IntWritable(val_arr[i] * val_arr[j])); } } } } } public static class Reduce2 extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> { public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { int sum = 0; while (values.hasNext()) { sum += values.next().get(); } output.collect(key, new IntWritable(sum)); } } public int run(String[] args) throws Exception { // ===================== Indexing ===================== JobConf conf = new JobConf(getConf(), PairwiseDS.class); conf.setJobName("Indexing"); conf.setOutputKeyClass(Text.class); conf.setOutputValueClass(PairOfIntString.class); conf.setMapperClass(Map.class); conf.setReducerClass(Reduce.class); conf.setInputFormat(TextInputFormat.class); conf.setOutputFormat(TextOutputFormat.class); conf.setNumReduceTasks(1); FileInputFormat.setInputPaths(conf, new Path(args[0])); TextOutputFormat.setOutputPath(conf, new Path(args[1])); Job job1 = new Job(conf); // ===================== Pairwise Similarity ===================== JobConf conf2 = new JobConf(getConf(), PairwiseDS.class); conf2.setJobName("Pairwise Similarity"); conf2.setOutputKeyClass(Text.class); conf2.setOutputValueClass(IntWritable.class); conf2.setMapperClass(Map2.class); conf2.setReducerClass(Reduce2.class); conf2.setInputFormat(TextInputFormat.class); conf2.setOutputFormat(TextOutputFormat.class); conf2.setNumReduceTasks(1); FileInputFormat.setInputPaths(conf2, new Path(args[1] + "/p*")); TextOutputFormat.setOutputPath(conf2, new Path(args[2])); Job job2 = new Job(conf2); job2.addDependingJob(job1); JobControl controller = new JobControl("Pairwise Document Similarity"); controller.addJob(job1); controller.addJob(job2); new Thread(controller).start(); while (!controller.allFinished()) { System.out.println("Jobs in waiting state: "+ controller.getWaitingJobs().size()); System.out.println("Jobs in ready state: "+ controller.getReadyJobs().size()); System.out.println("Jobs in running state: "+ controller.getRunningJobs().size()); System.out.println("Jobs in success state: "+ controller.getSuccessfulJobs().size()); System.out.println("Jobs in failed state: "+ controller.getFailedJobs().size()); System.out.println(); try { Thread.sleep(20000); } catch (Exception e) { e.printStackTrace(); } } return 0; } public static void main(String[] args) throws Exception { int res = ToolRunner.run(new Configuration(), new PairwiseDS(), args); System.exit(res); } }
after Indexing:
after Pairwise Similarity:
相關資源
關於Cloud9
參照了版主本篇, 因此去使用 String2IntOpenHashMapWritable
一樣是cloud9的一個class
import edu.umd.cloud9.io.fastuil.String2IntOpenHashMapWritable;
但是, 卻發生了 Error: java.lang.ClassNotFoundException: it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap
去參照了API 手冊後發現
java.lang.Object
it.unimi.dsi.fastutil.objects.AbstractObject2IntFunction<K>
it.unimi.dsi.fastutil.objects.AbstractObject2IntMap<K>
it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap<K>
edu.umd.cloud9.io.fastuil.Object2IntOpenHashMapWritable<K>
因此去下載了 http://dsiutils.dsi.unimi.it/
但還是沒有成功, 不知可有好的建議.
是否我將dsiutils 的 dsiutils.jar 不應該置於 Hadoop_HOME/lib 內呢?
2011-08-26 21:11:46
檢查一下dsiutils.jar是否有包含「it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap 」該class, jar檔如果放在Hadoop_HOME/lib下的話,代表你的每台DataNode也都要有該份jar檔,確認一下吧!
2011-08-26 22:52:29
Thank you
我弄了半天, 我發現有3件事沒注意
1. 我一直沒有重開 hadoop, 所以 jar 沒有載入.
2. 別台的datanode 沒有複製jar檔到 hadoop/lib 下.
3. 這個問題比較麻煩 dsiutils-1.0.12 內真的沒有 fastutil
所以, 我必須去找舊一點的版本來使用了吧
2011-08-27 11:54:45
Solution: 3
改下載 http://fastutil.dsi.unimi.it/
這個就有fastutil.object 這個目錄, 但1.2.2 我還沒試成功就是了.
換回1.1.0 就可以用!!! 我現在也還不知, 是不是我在編譯cloud9時要自己額外加入fastutil.jar
2011-08-27 16:36:26