圖片來源:www.jauhari.net
前言
有一陣子沒有寫MapReduce程式了,所以找個代表性的實例來練習一下...
PageRank in MapReduce
PageRank演算法最早是由Google兩位創辦人Sergey Brin & Larry Page在1998年的時候發表在World-Wide Web Conference的一篇論文:The Anatomy of a Large-Scale Hypertextual Web Search Engine所提出來的,該演算法主要用來計算網頁的重要性,以決定搜尋引擎該呈現搜尋結果時的一個排名依據,然而根據Google在1998年當時所索引的網頁數量來看,他們共索引了26 million pages(We knew the web was big...),所以可能三、四台機器就足以運算完成,但是到2000年時Google就索引了超過one billion pages,而這樣的規模就適合用MapReduce來分散式處理了,而本文主要介紹該如何用MapReduce的方式來完成這樣的演算法,然而重點在於PageRank是一種反覆式演算法(Iterative Algorithm),所以該如何應用在MapReduce並決定何時該跳離這個反覆式迴圈以結束運算就需要一些方式來處理。
P.S. 本範例純粹使用「純文字型態」來處理,如果你有效率的考量請試著改寫特定的OutputFormat和Writable實作。
Google PageRank 範例
這裡的範例假設全世界只有四個網頁,它們分別為:Adobe, Google, MSN and Yahoo,每個網頁的PageRank值(簡稱PR值)預設為10。
1. Adobe有三個對外連結,分別連到Google, MSN and Yahoo。
2. Google只有一個對外連結為Adobe。
3. MSN有一個對外連結為Google。
4. Yahoo則有兩個對外連結為MSN and Google。
Adobe 10.00 Google,MSN,Yahoo Google 10.00 Adobe MSN 10.00 Google Yahoo 10.00 MSN,Google
所以從這個範例來看,由於有三個網頁都連結到Google,所以相對來說它的PR值應該是最高的,其次應則為Adobe,因為Google的分數最高且又只連結到Adobe,所以Adobe的PR值也會比較高。
PageRank - MapReduce for Hadoop 0.21.x
import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Cluster; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; public class PageRank { static enum PageCount{ Count,TotalPR } public static class PageRankMapper extends Mapper<Object, Text, Text, Text> { public void map(Object key, Text value, Context context) throws IOException, InterruptedException { context.getCounter(PageCount.Count).increment(1); String[] kv = value.toString().split("\t"); String _key = kv[0]; String _value = kv[1]; String _PRnLink[] = _value.split(" "); String pr = _PRnLink[0]; String link = _PRnLink[1]; context.write(new Text(_key), new Text(link)); String site[] = link.split(","); float score = Float.valueOf(pr)/(site.length)*1.0f; for(int i = 0 ; i < site.length ; i++) { context.write(new Text(site[i]), new Text(String.valueOf(score))); } } } public static class PageRankReducer extends Reducer<Text, Text, Text, Text> { public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { StringBuilder sb = new StringBuilder(); float factor = 0.85f; float pr = 0f; for(Text f : values) { String value = f.toString(); int s = value.indexOf("."); if(s != -1) { pr += Float.valueOf(value); }else{ String site[] = value.split(","); int _len = site.length; for(int k = 0 ; k < _len ;k++) { sb.append(site[k]); sb.append(","); } } } pr = ((1-factor)+(factor*(pr))); context.getCounter(PageCount.TotalPR).increment((int)(pr*1000)); String output = pr+" "+sb.toString(); context.write(key, new Text(output)); } } public static void main(String[] args) throws Exception { String input; String output; int threshold = 1000; int iteration = 0; int iterationLimit = 100; boolean status = false; while(iteration < iterationLimit) { if((iteration % 2) == 0) { input = "/pagerank_output/p*"; output = "/pagerank_output2/"; }else{ input = "/pagerank_output2/p*"; output = "/pagerank_output/"; } Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(conf); fs.delete(new Path(output), true); Job job = Job.getInstance(new Cluster(conf)); job.setJobName("PageRank"); job.setJarByClass(PageRank.class); job.setMapperClass(PageRankMapper.class); job.setReducerClass(PageRankReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); TextInputFormat.addInputPath(job, new Path(input)); TextOutputFormat.setOutputPath(job, new Path(output)); status = job.waitForCompletion(true); iteration++; long count = job.getCounters().findCounter(PageCount.Count).getValue(); long total_pr = job.getCounters().findCounter(PageCount.TotalPR).getValue(); System.out.println("PageCount:"+count); System.out.println("TotalPR:"+total_pr); double per_pr = total_pr/(count*1.0d); System.out.println("Per PR:"+per_pr); if((int)per_pr == threshold) { System.out.println("Iteration:"+iteration); break; } } System.exit(status?0:1); } }
關於上述程式所執行Map和Reduce所處理的過程及輸出結果就不詳加敘述了,留待有興趣的朋友們自行研究~
而關於如何決定跳離反覆式迴圈以結束運算的處理方式,筆者採用下述兩種方式:
1. 最多執行100次的反覆式運算,讓程式有一定的執行次數限制。
2. 分別累加頁面數量和每個網頁的PR值,並觀察其變化量呈現穩定狀態時就離開迴圈,上述範例求到小數第三位。
透過上述的處理方式,可以觀察到在執行第54次MapReduce運算時所呈現出來的結果:
Adobe 1.3334262 Google,MSN,Yahoo, Google 1.39192 Adobe, MSN 0.7523096 Google, Yahoo 0.5279022 MSN,Google,
結果如預期的,Google的PR值最高,其次為Adobe,最後才是MSN和Yahoo。
P.S. 筆者沒有討厭Yahoo也沒有特別喜歡Google,純粹實驗性質... Orz (我比較愛Adobe)
相關資源
.Jimmy Lin and Michael Schatz. Design Patterns for Efficient Graph Algorithms in MapReduce. Proceedings of the 2010 Workshop on Mining and Learning with Graphs Workshop (MLG-2010), July 2010, Washington, D.C.
Shen大大你好:
小弟想請問PageRank in MapReduc此篇的 Job job = Job.getInstance(new Cluster(conf));
和Chaining Maps此篇的 Cluster cluster = new Cluster(conf);
在hadoop0.20.2沒有支援 import org.apache.hadoop.mapreduce.Cluster;下
如何是用 import org.apache.hadoop.mapred.ClusterStatus; 來改寫
拜託指導小第一下...感激不盡
2011-08-04 16:18:41
Dear zys,
請參考我之前相關的Hadoop文章,重點只是在如何取得Job Object,只是後來我都改用Hadoop 0.21.x來改寫而已。
2011-08-05 00:49:01
你好:
想請問I/O的問題
是說我們創一個pagerank_output資料夾,內有一個p*.txt的內容為
Adobe 10.00 Google,MSN,Yahoo
Google 10.00 Adobe
MSN 10.00 Google
Yahoo 10.00 MSN,Google
還是我們每行個別放置不同p1.txt p2.txt p3.txt p4.txt
然後PATH是由我們上傳至DFS之路徑:
input = "user/hdp1/pagerank_output/p*";
output = "user/hdp1/pagerank_output2/";
input = "user/hdp1/pagerank_output2/p*";
output = "user/hdp1/pagerank_output/";
小弟這樣執行起來無法得到原有的結果,請問是有漏缺了什麼重要的環節...請您指教謝謝
2011-08-08 15:20:31
麻煩將錯誤訊息或相關的Exception貼上來~ 不然不曉得你的問題。
2011-08-08 21:19:06
有勞Shen大大指點迷津了...謝謝您
/home/912user1/hadoop-0.20.2/bin/hadoop jar "sample-0.1.jar"
"wordcount.WordCount" pagerank_output pagera$nk_output2
11/08/08 14:59:31 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
11/08/08 14:59:32 INFO input.FileInputFormat: Total input paths to process : 4
11/08/08 14:59:32 INFO mapred.JobClient: Running job: job_201108081332_0058
11/08/08 14:59:33 INFO mapred.JobClient: map 0% reduce 0%
11/08/08 14:59:41 INFO mapred.JobClient: map 25% reduce 0%
11/08/08 14:59:43 INFO mapred.JobClient: Task Id : attempt_201108081332_0058_m_000000_0, Status : FAILED
java.lang.ArrayIndexOutOfBoundsException: 1
at wordcount.WordCount$PageRankMapper.map(WordCount.java:27)
at wordcount.WordCount$PageRankMapper.map(WordCount.java:1)
at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:144)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:621)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:305)
at org.apache.hadoop.mapred.Child.main(Child.java:170)
11/08/08 14:59:44 INFO mapred.JobClient: map 50% reduce 0%
11/08/08 14:59:47 INFO mapred.JobClient: map 75% reduce 0%
11/08/08 14:59:49 INFO mapred.JobClient: Task Id : attempt_201108081332_0058_m_000000_1, Status : FAILED
java.lang.ArrayIndexOutOfBoundsException: 1
at wordcount.WordCount$PageRankMapper.map(WordCount.java:27)
at wordcount.WordCount$PageRankMapper.map(WordCount.java:1)
at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:144)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:621)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:305)
at org.apache.hadoop.mapred.Child.main(Child.java:170)
11/08/08 14:59:50 INFO mapred.JobClient: map 75% reduce 16%
11/08/08 14:59:55 INFO mapred.JobClient: Task Id : attempt_201108081332_0058_m_000000_2, Status : FAILED
java.lang.ArrayIndexOutOfBoundsException: 1
at wordcount.WordCount$PageRankMapper.map(WordCount.java:27)
at wordcount.WordCount$PageRankMapper.map(WordCount.java:1)
at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:144)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:621)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:305)
at org.apache.hadoop.mapred.Child.main(Child.java:170)
11/08/08 14:59:59 INFO mapred.JobClient: map 75% reduce 25%
11/08/08 15:00:04 INFO mapred.JobClient: Job complete: job_201108081332_0058
11/08/08 15:00:04 INFO mapred.JobClient: Counters: 13
11/08/08 15:00:04 INFO mapred.JobClient: Job Counters
11/08/08 15:00:04 INFO mapred.JobClient: Launched reduce tasks=1
11/08/08 15:00:04 INFO mapred.JobClient: Launched map tasks=7
11/08/08 15:00:04 INFO mapred.JobClient: Data-local map tasks=7
11/08/08 15:00:04 INFO mapred.JobClient: Failed map tasks=1
11/08/08 15:00:04 INFO mapred.JobClient: FileSystemCounters
11/08/08 15:00:04 INFO mapred.JobClient: HDFS_BYTES_READ=59
11/08/08 15:00:04 INFO mapred.JobClient: FILE_BYTES_WRITTEN=211
11/08/08 15:00:04 INFO mapred.JobClient: wordcount.WordCount$PageCount
11/08/08 15:00:04 INFO mapred.JobClient: Count=3
11/08/08 15:00:04 INFO mapred.JobClient: Map-Reduce Framework
11/08/08 15:00:04 INFO mapred.JobClient: Combine output records=0
11/08/08 15:00:04 INFO mapred.JobClient: Map input records=3
11/08/08 15:00:04 INFO mapred.JobClient: Spilled Records=7
11/08/08 15:00:04 INFO mapred.JobClient: Map output bytes=83
11/08/08 15:00:04 INFO mapred.JobClient: Combine input records=0
11/08/08 15:00:04 INFO mapred.JobClient: Map output records=7
PageCount:3
TotalPR:0
Per PR:0.0
11/08/08 15:00:04 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
Exception in thread "main" org.apache.hadoop.mapreduce.lib.input.InvalidInputException: Input Pattern hdfs://hdp1:9000/user/912user1/pagerank_output2/p* matches 0 files
at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.listStatus(FileInputFormat.java:224)
at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.getSplits(FileInputFormat.java:241)
at org.apache.hadoop.mapred.JobClient.writeNewSplits(JobClient.java:885)
at org.apache.hadoop.mapred.JobClient.submitJobInternal(JobClient.java:779)
at org.apache.hadoop.mapreduce.Job.submit(Job.java:432)
at org.apache.hadoop.mapreduce.Job.waitForCompletion(Job.java:447)
at wordcount.WordCount.main(WordCount.java:109)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
at java.lang.reflect.Method.invoke(Method.java:597)
at org.apache.hadoop.util.RunJar.main(RunJar.java:156)
2011-08-09 11:26:06
我猜你的問題出在下述格式:
Adobe 10.00 Google,MSN,Yahoo
Google 10.00 Adobe
MSN 10.00 Google
Yahoo 10.00 MSN,Google
例如:Adobe<tab>10.00<space>Google,MSN,Yahoo
你用vim去仔細檢查一下,問題應該是在這地方!尤其是<tab>
2011-08-09 13:07:32
Thank you for the article!
2011-12-24 02:51:42
very nice codes, appreciated
2012-11-02 11:18:44
如果有dead end的話(不會連出去)測值要怎麼描述?
2017-04-13 22:35:08