當兩隻大象結合的時候... 說實在的,這個標題我想很難讓人聯想到這意含為何.. 加上Goolge搜尋引擎對於網頁中的「title元素」所佔的權重又比較高... 嗯~ 所以本文似乎不容易被搜尋得到... 天曉得這是一篇探討Hadoop結合PostgreSQL的文章.. 不過我還是想這麼做...
以往要將資料庫中的資料抓出來當作MapReduce的輸入/輸出都必須先自行處理這當中的轉換工作,而本文要探討的是直接採用資料庫當作MapReduce的輸入/輸出資料,因為這在Hadoop 0.19版(目前為0.19.1)就納入支援了「MapReduce for MySQL(Hadoop-2536)」,底下是一個簡單的測試範例,下述是筆者自行建立的「wordcount」資料表:
CREATE TABLE wordcount
(
id serial NOT NULL,
word character varying(20) NOT NULL,
count integer NOT NULL DEFAULT 1,
CONSTRAINT wc_id PRIMARY KEY (id)
)
WITH (OIDS=FALSE);
ALTER TABLE wordcount OWNER TO postgres;
預設的資料內容如下:
基本上就是先透過DBConfiguration去設定資料庫相關的組態工作,然後交由DBInputFormat和DBOutputFormat來處理相對應資料表的輸入和輸出,並且撰寫一個實作DBWritable介面的Class,用來作為資料庫讀/寫工作的橋梁,在這個範例中為「WordRecord」Class,詳細請參考附檔。
P.S. 請拷貝一份「JDBC」driver放置在「HADOOP_HOME/lib」底下,另外您執行的jar檔也需要一同打包這個driver。
import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.lib.db.DBConfiguration;
import org.apache.hadoop.mapred.lib.db.DBInputFormat;
import org.apache.hadoop.mapred.lib.db.DBOutputFormat;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class WordCount extends Configured implements Tool
{
public int run(String[] arg0) throws Exception
{
JobConf job = new JobConf(getConf(), WordCount.class);
job.setJobName("DBWordCount");
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordCountReducer.class);
DBConfiguration.configureDB(job, "org.postgresql.Driver", "jdbc:postgresql://localhost/WordCount", "帳號", "密碼");
String[] fields = { "word", "count" };
DBInputFormat.setInput(job, WordRecord.class, "wordcount", null, "id", fields);
DBOutputFormat.setOutput(job, "wordcount(word,count)", fields);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(WordRecord.class);
job.setOutputValueClass(NullWritable.class);
JobClient.runJob(job);
return 0;
}
static class WordCountMapper extends MapReduceBase implements
Mapper<LongWritable, WordRecord, Text, IntWritable>
{
public void map(LongWritable key, WordRecord value,
OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException
{
output.collect(new Text(value.word), new IntWritable(value.count));
}
}
static class WordCountReducer extends MapReduceBase implements
Reducer<Text, IntWritable, WordRecord, NullWritable>
{
public void reduce(Text key, Iterator<IntWritable> values,
OutputCollector<WordRecord, NullWritable> output,
Reporter reporter) throws IOException
{
int sum = 0;
while (values.hasNext())
{
sum += values.next().get();
}
output.collect(new WordRecord(key.toString(), sum), NullWritable.get());
}
}
public static void main(String args[]) throws Exception
{
int ret = ToolRunner.run(new WordCount(), args);
System.exit(ret);
}
}
結果:(直接寫回wordcount資料表)
詳細的內部實作可以參考DBInputFormat和DBOutputFormat,會發現DBInputFormat中的「getSelectQuery()」方法裡面用了select... order by、limit、offset去串起來這樣的SQL語法(所以目前尚不支援某些資料庫,如:Oracle),相反的DBOutputFormat當然就是用insert into tablename values(fields name),而在此範例中雖然有一個serial number當作Primary Key(id),不過筆者所撰寫的「WordRecord」並沒有去操作這個ID,所以在「setOutput」的方法中筆者明確地告知資料表名稱為「wordcount(word,count)」,如此在輸出到資料表時才不會出錯。
.原始檔
參考資源
.Database Access with Hadoop
.DBInputFormat (Hadoop 0.19.1 API)