blog.Ring.idv.tw

2009 March

當兩隻大象結合的時候...

當兩隻大象結合的時候... 說實在的,這個標題我想很難讓人聯想到這意含為何.. 加上Goolge搜尋引擎對於網頁中的「title元素」所佔的權重又比較高... 嗯~ 所以本文似乎不容易被搜尋得到... 天曉得這是一篇探討Hadoop結合PostgreSQL的文章.. 不過我還是想這麼做...

以往要將資料庫中的資料抓出來當作MapReduce的輸入/輸出都必須先自行處理這當中的轉換工作,而本文要探討的是直接採用資料庫當作MapReduce的輸入/輸出資料,因為這在Hadoop 0.19版(目前為0.19.1)就納入支援了「MapReduce for MySQL(Hadoop-2536)」,底下是一個簡單的測試範例,下述是筆者自行建立的「wordcount」資料表:

CREATE TABLE wordcount
(
  id serial NOT NULL,
  word character varying(20) NOT NULL,
  count integer NOT NULL DEFAULT 1,
  CONSTRAINT wc_id PRIMARY KEY (id)
)
WITH (OIDS=FALSE);
ALTER TABLE wordcount OWNER TO postgres;

預設的資料內容如下:

基本上就是先透過DBConfiguration去設定資料庫相關的組態工作,然後交由DBInputFormatDBOutputFormat來處理相對應資料表的輸入和輸出,並且撰寫一個實作DBWritable介面的Class,用來作為資料庫讀/寫工作的橋梁,在這個範例中為「WordRecord」Class,詳細請參考附檔。

P.S. 請拷貝一份「JDBC」driver放置在「HADOOP_HOME/lib」底下,另外您執行的jar檔也需要一同打包這個driver。

import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.lib.db.DBConfiguration;
import org.apache.hadoop.mapred.lib.db.DBInputFormat;
import org.apache.hadoop.mapred.lib.db.DBOutputFormat;

import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class WordCount extends Configured implements Tool
{

    public int run(String[] arg0) throws Exception
    {
        JobConf job = new JobConf(getConf(), WordCount.class);

        job.setJobName("DBWordCount");

        job.setMapperClass(WordCountMapper.class);
        job.setReducerClass(WordCountReducer.class);

        DBConfiguration.configureDB(job, "org.postgresql.Driver", "jdbc:postgresql://localhost/WordCount", "帳號", "密碼");

        String[] fields = { "word", "count" };

        DBInputFormat.setInput(job, WordRecord.class, "wordcount", null, "id", fields);
        DBOutputFormat.setOutput(job, "wordcount(word,count)", fields);

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);

        job.setOutputKeyClass(WordRecord.class);
        job.setOutputValueClass(NullWritable.class);

        JobClient.runJob(job);

        return 0;
    }

    static class WordCountMapper extends MapReduceBase implements
            Mapper<LongWritable, WordRecord, Text, IntWritable>
    {

        public void map(LongWritable key, WordRecord value,
                OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException
        {
            output.collect(new Text(value.word), new IntWritable(value.count));
        }
    }

    static class WordCountReducer extends MapReduceBase implements
            Reducer<Text, IntWritable, WordRecord, NullWritable>
    {

        public void reduce(Text key, Iterator<IntWritable> values,
                OutputCollector<WordRecord, NullWritable> output,
                Reporter reporter) throws IOException
        {
            int sum = 0;
            while (values.hasNext())
            {
                sum += values.next().get();
            }
            output.collect(new WordRecord(key.toString(), sum), NullWritable.get());
        }
    }

    public static void main(String args[]) throws Exception
    {
        int ret = ToolRunner.run(new WordCount(), args);
        System.exit(ret);
    }
}

結果:(直接寫回wordcount資料表)

詳細的內部實作可以參考DBInputFormatDBOutputFormat,會發現DBInputFormat中的「getSelectQuery()」方法裡面用了select... order by、limit、offset去串起來這樣的SQL語法(所以目前尚不支援某些資料庫,如:Oracle),相反的DBOutputFormat當然就是用insert into tablename values(fields name),而在此範例中雖然有一個serial number當作Primary Key(id),不過筆者所撰寫的「WordRecord」並沒有去操作這個ID,所以在「setOutput」的方法中筆者明確地告知資料表名稱為「wordcount(word,count)」,如此在輸出到資料表時才不會出錯。

原始檔

參考資源

Database Access with Hadoop

DBInputFormat (Hadoop 0.19.1 API)

2009-03-15 00:41:51 | Add Comment

當大象遇上PDFBox...

當大象遇上PDFBox... 這個標題看起來蠻有趣的,相反的當「Hadoop + PDFBox」就太正式了.. XD

兩個月前筆者曾po「PDFBox - 擷取PDF檔案中的純文字」,現在一樣請多台幾器一起來做這件事~ 如果沒機會體驗的話~ 看看「Self-service, Prorated Super Computing Fun!」這篇描述NYT在兩年前用Hadoop將1100萬份文章的TIFF影像檔轉成PDF檔案,重點在於只花了一天的時間就搞定了... = =" 而本文要做的就是分散式的將這些PDF檔案擷取出純文字~ 當然會比一台機器快多了~ (不過話說我也是在一台機器上測試...)

P.S. third party library 請記得放在「lib」資料夾一同打包

import java.io.ByteArrayInputStream;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.lib.NullOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.pdfbox.pdmodel.PDDocument;
import org.pdfbox.util.PDFTextStripper;

public class PDF2TXT extends Configured implements Tool
{

    public static class Map extends MapReduceBase implements
            Mapper<NullWritable, BytesWritable, Text, Text>
    {

        private JobConf conf;

        @Override
        public void configure(JobConf conf)
        {
            this.conf = conf;
        }

        public void map(NullWritable key, BytesWritable value,
                OutputCollector<Text, Text> output, Reporter reporter)
                throws IOException
        {
            String filename = conf.get("map.input.file");
            String output_dir = conf.get("output.dir");
            filename = getFileName(filename);

            FileSystem fs = FileSystem.get(conf);
            FSDataOutputStream dos = fs.create(new Path(output_dir + filename + ".txt"));
            PDDocument document = PDDocument.load(new ByteArrayInputStream(value.getBytes()));
            PDFTextStripper stripper = new PDFTextStripper();
            stripper.setStartPage(1);
            stripper.setEndPage(document.getNumberOfPages());
            String s = stripper.getText(document);
            dos.write(s.getBytes("UTF-8"));
            dos.close();
        }

        public String getFileName(String s)
        {
            return s.substring(s.lastIndexOf("/"), s.lastIndexOf("."));
        }
    }

    public int run(String[] args) throws Exception
    {
        JobConf conf = new JobConf(getConf(), PDF2TXT.class);
        conf.set("output.dir", args[1]);

        conf.setJobName("PDF2TXT");
        conf.setMapperClass(Map.class);

        conf.setInputFormat(WholeFileInputFormat.class);
        conf.setOutputFormat(NullOutputFormat.class);

        conf.set("mapred.child.java.opts", "-Xmx256m");
        conf.setNumReduceTasks(0);

        WholeFileInputFormat.setInputPaths(conf, new Path(args[0]));
        JobClient.runJob(conf);
        return 0;
    }

    public static void main(String[] args)
    {
        try
        {
            int res = ToolRunner.run(new Configuration(), new PDF2TXT(), args);
            System.exit(res);
        } catch (Exception e)
        {
            e.printStackTrace();
        }
    }
}

原始檔

2009-03-14 00:26:23 | Add Comment

分散式處理Sobel Edge Detector

.2010/05/24 已新增MapReduce New API版本

大約兩年前我曾用ActionScript寫了「Sobel - 邊緣偵測 for AS2」,那時純粹只是抱持著好玩的心態~ 而現在用同樣的例子改成Hadoop版本來試試~ 當然最主要就是要藉重它分散式運算的能力~ 只是這樣的應用僅需要透過「Map」階段將處理後的影像直接寫入HDFS就行了~ 不需要再經過shuffle和reduce階段來浪費頻寬等資源~ 另外值得一提的是~ 這個例子要處理的是整張影像檔~ 所以要避免在進行「Map」階段之前處於被分割的命運~ 這裡採用的作法是覆寫「isSplitable()」method並將整份檔案當作一筆Record來處理,有興趣的朋友請見附檔:

import java.awt.image.BufferedImage;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import javax.imageio.ImageIO;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.lib.NullOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import com.sun.image.codec.jpeg.JPEGCodec;
import com.sun.image.codec.jpeg.JPEGImageEncoder;

public class SobelProcessing extends Configured implements Tool
{

    public static class Map extends MapReduceBase implements
            Mapper<NullWritable, BytesWritable, Text, Text>
    {

        private JobConf conf;

        @Override
        public void configure(JobConf conf)
        {
            this.conf = conf;
        }

        public void map(NullWritable key, BytesWritable value,
                OutputCollector<Text, Text> output, Reporter reporter)
                throws IOException
        {
            String filename = conf.get("map.input.file");
            String output_dir = conf.get("output.dir");
            filename = getFileName(filename);
            FileSystem fs = FileSystem.get(conf);
            FSDataOutputStream dos = fs.create(new Path(output_dir + filename + ".jpg"));

            BufferedImage src = ImageIO.read(new ByteArrayInputStream(value.getBytes()));

            float sobscale = Float.valueOf(conf.get("sobscale"));
            int offsetval = Integer.valueOf(conf.get("offsetval"));

            int iw = src.getWidth();
            int ih = src.getHeight();
            BufferedImage dest = new BufferedImage(iw, ih, src.getType());

            int[][] gray = new int[iw][ih];

            for (int x = 0; x < iw; x++)
            {
                for (int y = 0; y < ih; y++)
                {
                    int rgb = src.getRGB(x, y);
                    int r = 0xFF & (rgb >> 16);
                    int g = 0xFF & (rgb >> 8);
                    int b = 0xFF & rgb;
                    gray[x][y] = (int) (0.299 * r + 0.587 * g + 0.114 * b);
                }
            }

            for (int x = 1; x < iw - 1; x++)
            {
                for (int y = 1; y < ih - 1; y++)
                {
                    int a = gray[x - 1][y - 1];
                    int b = gray[x][y - 1];
                    int c = gray[x + 1][y - 1];
                    int d = gray[x - 1][y];
                    int e = gray[x + 1][y];
                    int f = gray[x - 1][y + 1];
                    int g = gray[x][y + 1];
                    int h = gray[x + 1][y + 1];

                    int hor = (a + d + f) - (c + e + h);

                    if (hor < 0)
                        hor = -hor;

                    int vert = (a + b + c) - (f + g + h);

                    if (vert < 0)
                        vert = -vert;

                    int gc = (int) (sobscale * (hor + vert));
                    gc = (gc + offsetval);

                    if (gc > 255)
                        gc = 255;

                    int sobel = 0xff000000 | gc << 16 | gc << 8 | gc;
                    dest.setRGB(x, y, sobel);
                }
            }

            JPEGImageEncoder encoder = JPEGCodec.createJPEGEncoder(dos);
            encoder.encode(dest);
            dos.close();
        }

        public String getFileName(String s)
        {
            return s.substring(s.lastIndexOf("/"), s.lastIndexOf("."));
        }
    }

    public int run(String[] args) throws Exception
    {
        JobConf conf = new JobConf(getConf(), SobelProcessing.class);

        conf.set("sobscale", "1.0");
        conf.set("offsetval", "0");
        conf.set("output.dir", args[1]);

        conf.setJobName("SobelProcessing");
        conf.setMapperClass(Map.class);

        conf.setInputFormat(WholeFileInputFormat.class);
        conf.setOutputFormat(NullOutputFormat.class);

        conf.set("mapred.child.java.opts", "-Xmx256m");
        conf.setNumReduceTasks(0);

        WholeFileInputFormat.setInputPaths(conf, new Path(args[0]));
        JobClient.runJob(conf);
        return 0;
    }

    public static void main(String[] args)
    {
        try
        {
            int res = ToolRunner.run(new Configuration(), new SobelProcessing(), args);
            System.exit(res);
        } catch (Exception e)
        {
            e.printStackTrace();
        }

    }
}

結果:

原始碼

原始碼(New API)

2009-03-13 23:22:21 | Comments (58)

Copyright (C) Ching-Shen Chen. All rights reserved.

::: 搜尋 :::

::: 分類 :::

::: 最新文章 :::

::: 最新回應 :::

::: 訂閱 :::

Atom feed
Atom Comment