blog.Ring.idv.tw

Pairwise Document Similarity in Large Collections with MapReduce

Pairwise Document Similarity in Large Collections with MapReduce

Pairwise Document Similarity in Large Collections with MapReduce.這是一篇由UMD的一位博士生Tamer M. Elsayed和他的指導教授所共同發表在「ACL-08: HLT」的短篇論文,主要用「MapReduce」來處理大量文件相似度的計算,如果您對這篇論文有興趣的話,請參考上述論文連結,筆者不再詳述。

下述筆者撰寫的驗證程式需要用到「Cloud9 - A MapReduce Library for Hadoop」,「Cloud9」是由UMD所開發的,主要用來作為課程的教學工具和一些文字處理方面的研究,它採用Apache License,所以您可以直接用Subversion checkout下來使用,而下述範例主要用到「PairOfIntString」和「ArrayListWritable」。

Pairwise Document Similarity

在進行「Pairwise Similarity」的「Map」階段時,筆者純粹利用Regular Expression來處理~ 這並不是最佳的處理方式(我承認偷懶~),最佳的方式應該撰寫一些特定的「OutputFormat」和「Writable」來加以處理,整個效率才會大大的提高!(如:Cloud9所提供的Tuple)

由於此範例需要處理二次的MapReduce,所以筆者直接利用「job2.addDependingJob(job1);」將兩個Job產生相依性,也就是先執行job1完成之後JobControl才會去呼叫job2開始執行。

import java.io.IOException;
import java.util.*;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.mapred.jobcontrol.Job;
import org.apache.hadoop.mapred.jobcontrol.JobControl;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import edu.umd.cloud9.io.ArrayListWritable;
import edu.umd.cloud9.io.PairOfIntString;

public class PairwiseDS extends Configured implements Tool
{

    public static class Map extends MapReduceBase implements
            Mapper<LongWritable, Text, Text, PairOfIntString>
    {
        private Text word = new Text();

        public void map(LongWritable key, Text value,
                OutputCollector<Text, PairOfIntString> output, Reporter reporter)
                throws IOException
        {
            FileSplit fileSplit = (FileSplit) reporter.getInputSplit();
            String fileName = fileSplit.getPath().getName();
            fileName = fileName.substring(0, fileName.length() - 4);

            String line = value.toString();
            StringTokenizer tokenizer = new StringTokenizer(line);
            while (tokenizer.hasMoreTokens())
            {
                word.set(tokenizer.nextToken());
                output.collect(word, new PairOfIntString(1, fileName));
            }
        }
    }

    public static class Reduce extends MapReduceBase implements
            Reducer<Text, PairOfIntString, Text, ArrayListWritable>
    {
        public void reduce(Text key, Iterator<PairOfIntString> values,
                OutputCollector<Text, ArrayListWritable> output,
                Reporter reporter) throws IOException
        {

            ArrayList<PairOfIntString> al = new ArrayList<PairOfIntString>();
            HashMap<String, Integer> map = new HashMap<String, Integer>();

            while (values.hasNext())
            {
                PairOfIntString psi = values.next();
                if (map.containsKey(psi.getRightElement()))
                {
                    Integer i = (Integer) map.get(psi.getRightElement());
                    map.put(psi.getRightElement(), i.intValue() + 1);
                } else
                {
                    map.put(psi.getRightElement(), psi.getLeftElement());
                }
            }
            Iterator i = map.entrySet().iterator();
            while (i.hasNext())
            {
                java.util.Map.Entry m = (java.util.Map.Entry) i.next();
                al.add(new PairOfIntString((Integer) m.getValue(), (String) m
                        .getKey()));
            }
            output.collect(key, new ArrayListWritable<PairOfIntString>(al));

        }
    }

    public static class Map2 extends MapReduceBase implements
            Mapper<LongWritable, Text, Text, IntWritable>
    {
        private Text word = new Text();

        public void map(LongWritable key, Text value,
                OutputCollector<Text, IntWritable> output, Reporter reporter)
                throws IOException
        {
            String line = value.toString().trim();

            ArrayList<String> keyList = new ArrayList<String>();
            ArrayList<Integer> valList = new ArrayList<Integer>();

            String p = "\\(([0-9]+), ([a-z0-9.]+)\\)";
            Pattern r = Pattern.compile(p);
            Matcher m = r.matcher(line);
            while (m.find())
            {
                String k = m.group(2);
                String v = m.group(1);
                keyList.add(k);
                valList.add(new Integer(v));
            }

            if (keyList.size() > 1)
            {
                String[] key_arr = keyList.toArray(new String[0]);
                Integer[] val_arr = valList.toArray(new Integer[0]);
                int klen = key_arr.length;
                for (int i = 0; i < klen; i++)
                {
                    for (int j = i + 1; j < klen; j++)
                    {
                        word.set(key_arr[i] + "," + key_arr[j]);
                        output.collect(word, new IntWritable(val_arr[i]
                                * val_arr[j]));
                    }
                }
            }

        }
    }

    public static class Reduce2 extends MapReduceBase implements
            Reducer<Text, IntWritable, Text, IntWritable>
    {
        public void reduce(Text key, Iterator<IntWritable> values,
                OutputCollector<Text, IntWritable> output, Reporter reporter)
                throws IOException
        {
            int sum = 0;
            while (values.hasNext())
            {
                sum += values.next().get();
            }
            output.collect(key, new IntWritable(sum));
        }
    }

    public int run(String[] args) throws Exception
    {
        // ===================== Indexing =====================
        JobConf conf = new JobConf(getConf(), PairwiseDS.class);
        conf.setJobName("Indexing");
        conf.setOutputKeyClass(Text.class);
        conf.setOutputValueClass(PairOfIntString.class);

        conf.setMapperClass(Map.class);
        conf.setReducerClass(Reduce.class);

        conf.setInputFormat(TextInputFormat.class);
        conf.setOutputFormat(TextOutputFormat.class);

        conf.setNumReduceTasks(1);
        FileInputFormat.setInputPaths(conf, new Path(args[0]));
        TextOutputFormat.setOutputPath(conf, new Path(args[1]));

        Job job1 = new Job(conf);
        // ===================== Pairwise Similarity =====================
        JobConf conf2 = new JobConf(getConf(), PairwiseDS.class);
        conf2.setJobName("Pairwise Similarity");
        conf2.setOutputKeyClass(Text.class);
        conf2.setOutputValueClass(IntWritable.class);

        conf2.setMapperClass(Map2.class);
        conf2.setReducerClass(Reduce2.class);

        conf2.setInputFormat(TextInputFormat.class);
        conf2.setOutputFormat(TextOutputFormat.class);
        conf2.setNumReduceTasks(1);

        FileInputFormat.setInputPaths(conf2, new Path(args[1] + "/p*"));
        TextOutputFormat.setOutputPath(conf2, new Path(args[2]));
        Job job2 = new Job(conf2);

        job2.addDependingJob(job1);
        JobControl controller = new JobControl("Pairwise Document Similarity");
        controller.addJob(job1);
        controller.addJob(job2);
        new Thread(controller).start();

        while (!controller.allFinished())
        {
            System.out.println("Jobs in waiting state: "+ controller.getWaitingJobs().size());
            System.out.println("Jobs in ready state: "+ controller.getReadyJobs().size());
            System.out.println("Jobs in running state: "+ controller.getRunningJobs().size());
            System.out.println("Jobs in success state: "+ controller.getSuccessfulJobs().size());
            System.out.println("Jobs in failed state: "+ controller.getFailedJobs().size());
            System.out.println();

            try
            {
                Thread.sleep(20000);
            } catch (Exception e)
            {
                e.printStackTrace();
            }
        }
        return 0;

    }

    public static void main(String[] args) throws Exception
    {
        int res = ToolRunner.run(new Configuration(), new PairwiseDS(), args);
        System.exit(res);
    }
}

after Indexing:

after Pairwise Similarity:

相關資源

Hadoop常用SDK系列四 JobControl

2009-01-05 22:30:04

4 comments on "Pairwise Document Similarity in Large Collections with MapReduce"

  1. 1. CJ 說:

    關於Cloud9

    參照了版主本篇, 因此去使用 String2IntOpenHashMapWritable
    一樣是cloud9的一個class
    import edu.umd.cloud9.io.fastuil.String2IntOpenHashMapWritable;

    但是, 卻發生了 Error: java.lang.ClassNotFoundException: it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap

    去參照了API 手冊後發現
    java.lang.Object
    it.unimi.dsi.fastutil.objects.AbstractObject2IntFunction<K>
    it.unimi.dsi.fastutil.objects.AbstractObject2IntMap<K>
    it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap<K>
    edu.umd.cloud9.io.fastuil.Object2IntOpenHashMapWritable<K>

    因此去下載了 http://dsiutils.dsi.unimi.it/
    但還是沒有成功, 不知可有好的建議.
    是否我將dsiutils 的 dsiutils.jar 不應該置於 Hadoop_HOME/lib 內呢?

    2011-08-26 21:11:46

  2. 2. Shen 說:

    檢查一下dsiutils.jar是否有包含「it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap 」該class, jar檔如果放在Hadoop_HOME/lib下的話,代表你的每台DataNode也都要有該份jar檔,確認一下吧!

    2011-08-26 22:52:29

  3. 3. CJ 說:

    Thank you
    我弄了半天, 我發現有3件事沒注意
    1. 我一直沒有重開 hadoop, 所以 jar 沒有載入.
    2. 別台的datanode 沒有複製jar檔到 hadoop/lib 下.

    3. 這個問題比較麻煩 dsiutils-1.0.12 內真的沒有 fastutil
    所以, 我必須去找舊一點的版本來使用了吧

    2011-08-27 11:54:45

  4. 4. CJ 說:

    Solution: 3
    改下載 http://fastutil.dsi.unimi.it/

    這個就有fastutil.object 這個目錄, 但1.2.2 我還沒試成功就是了.

    換回1.1.0 就可以用!!! 我現在也還不知, 是不是我在編譯cloud9時要自己額外加入fastutil.jar

    2011-08-27 16:36:26

Leave a Comment

Copyright (C) Ching-Shen Chen. All rights reserved.

::: 搜尋 :::

::: 分類 :::

::: 最新文章 :::

::: 最新回應 :::

::: 訂閱 :::

Atom feed
Atom Comment