blog.Ring.idv.tw

Hadoop

TFile - A new binary file format for Hadoop

Hadoop 0.20.1版本釋出後,它多了一個名為「TFile」的Binary File Format,因為當初設計「SequenceFile」的Block Compression格式過於複雜,所以重新設計了這個「TFile」檔案格式,同時它也俱備了較佳的效能、可擴充性和語言的中立性(意指不會看到Java中的package名稱,可參考筆者先前po文「Hadoop - Uncompressed SequenceFile Format 詳解」),更多的詳細細節可參考HADOOP-3315

基本上一個TFile storage format是由兩個部份所組成的:一個是Block Compressed File layer (簡稱BCFile),另一個為TFile-specific <key,value> management layer(這部份未來也許會逐漸地擴充)。而一個BCFile storage layout是由五個部份所組成,它們分別為:

(1)a 16-byte magic.

(2)a data section that consists of a sequence of Data Blocks.

(3)a meta section that consists of a sequence of Meta Blocks.

(4)a Meta Block index section (“Meta Index”).

(5)a tail section.

這裡筆者直接測試一個範例:

import java.io.IOException;
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.file.tfile.TFile;

public class TFileWriter
{
	private static final String[] DATA = {"One","Two"};

	public static void main(String[] args) throws IOException
	{
		String uri = "hdfs://shen:9000/user/shen/test.tfile";
		Configuration conf = new Configuration();
		FileSystem fs = FileSystem.get(URI.create(uri), conf);
		fs.delete(new Path("test.tfile"), true);
		
		Path path = new Path(uri);
		FSDataOutputStream fdos = fs.create(path, true);
	
		TFile.Writer writer = new TFile.Writer(fdos, 1024*128, TFile.COMPRESSION_NONE, null , conf);
		
		for (int i = 0; i < DATA.length; i++)
		{		
			writer.append(new byte[]{(byte)i}, DATA[i].getBytes());
		}

		writer.close();
	}
}

在這個範例中只會有兩筆Record,它們分別為:<0,'One'>和<1,'Two'>,最後輸出成TFile檔案格式,如下圖:

P.S. 下圖筆者用「紅→綠→藍」顏色區隔BCFile storage layout所組成的五個部份。

2009-10-31 15:57:23 | Add Comment

Sqoop: A database import tool for Hadoop

Sqoop("SQL-to-Hadoop").簡單來說它是用來協助開發人員將資料庫的資料轉換到HDFS的一項工具,不過要到Hadoop 0.21.0版本釋出才會正式的納進來,但如果現在就要使用的話可以直接Patch HADOOP-5815來使用。

Sqoop主要是透過JDBC來和各個資料庫連結,並自動產生O/R Mapping的java檔,最後透過MapReduce將資料庫的資料轉換到HDFS之中,下述是筆者的範例測試:

Sqoop example for MySQL

下載JDBC Driver for MySQL (Connector/J 5.1),並設定好它的CLASSPATH。

資料庫: test

資料表: student

create table student(
    id int not null primary key,
    name varchar(20),
    score tinyint
);

預設三筆資料:

mysql> select * from student;
+----+------+-------+
| id | name | score |
+----+------+-------+
|  1 | mary |    98 |
|  2 | jack |    94 |
|  3 | john |    40 |
+----+------+-------+
3 rows in set (0.00 sec)

這裡的目標是指定將student table轉出到HDFS之中:

hadoop jar sqoop.jar org.apache.hadoop.sqoop.Sqoop --connect jdbc:mysql://localhost/test --table student

執行過程:

09/10/30 06:48:47 INFO sqoop.Sqoop: Beginning code generation
09/10/30 06:48:48 INFO manager.SqlManager: Executing SQL statement: SELECT t.* FROM student AS t WHERE 1 = 1
09/10/30 06:48:48 INFO manager.SqlManager: Executing SQL statement: SELECT t.* FROM student AS t WHERE 1 = 1
09/10/30 06:48:48 INFO orm.CompilationManager: HADOOP_HOME is /home/hdp/hadoop-0.20.1/bin/..
09/10/30 06:48:48 INFO orm.CompilationManager: Found hadoop core jar at: /home/hdp/hadoop-0.20.1/bin/../hadoop-0.20.1-core.jar
09/10/30 06:48:48 INFO orm.CompilationManager: Invoking javac with args: -sourcepath ./ -d /tmp/sqoop/compile/ -classpath /home/hdp/hadoop-0.20.1/bin/../conf:/usr/lib/jvm/java-6-sun-1.6.0.10/lib/tools.jar:/home/hdp/hadoop-0.20.1/bin/..:/home/hdp/hadoop-0.20.1/bin/../hadoop-0.20.1-core.jar:/home/hdp/hadoop-0.20.1/bin/../lib/commons-cli-1.2.jar:/home/hdp/hadoop-0.20.1/bin/../lib/commons-codec-1.3.jar:/home/hdp/hadoop-0.20.1/bin/../lib/commons-el-1.0.jar:/home/hdp/hadoop-0.20.1/bin/../lib/commons-httpclient-3.0.1.jar:/home/hdp/hadoop-0.20.1/bin/../lib/commons-logging-1.0.4.jar:/home/hdp/hadoop-0.20.1/bin/../lib/commons-logging-api-1.0.4.jar:/home/hdp/hadoop-0.20.1/bin/../lib/commons-net-1.4.1.jar:/home/hdp/hadoop-0.20.1/bin/../lib/core-3.1.1.jar:/home/hdp/hadoop-0.20.1/bin/../lib/hsqldb-1.8.0.10.jar:/home/hdp/hadoop-0.20.1/bin/../lib/jasper-compiler-5.5.12.jar:/home/hdp/hadoop-0.20.1/bin/../lib/jasper-runtime-5.5.12.jar:/home/hdp/hadoop-0.20.1/bin/../lib/jets3t-0.6.1.jar:/home/hdp/hadoop-0.20.1/bin/../lib/jetty-6.1.14.jar:/home/hdp/hadoop-0.20.1/bin/../lib/jetty-util-6.1.14.jar:/home/hdp/hadoop-0.20.1/bin/../lib/junit-3.8.1.jar:/home/hdp/hadoop-0.20.1/bin/../lib/kfs-0.2.2.jar:/home/hdp/hadoop-0.20.1/bin/../lib/log4j-1.2.15.jar:/home/hdp/hadoop-0.20.1/bin/../lib/oro-2.0.8.jar:/home/hdp/hadoop-0.20.1/bin/../lib/servlet-api-2.5-6.1.14.jar:/home/hdp/hadoop-0.20.1/bin/../lib/slf4j-api-1.4.3.jar:/home/hdp/hadoop-0.20.1/bin/../lib/slf4j-log4j12-1.4.3.jar:/home/hdp/hadoop-0.20.1/bin/../lib/sqoop.jar:/home/hdp/hadoop-0.20.1/bin/../lib/xmlenc-0.52.jar:/home/hdp/hadoop-0.20.1/bin/../lib/jsp-2.1/jsp-2.1.jar:/home/hdp/hadoop-0.20.1/bin/../lib/jsp-2.1/jsp-api-2.1.jar:.:/home/hdp/hadoop-0.20.1/bin/../mysql-jdbc.jar:/home/hdp/hadoop-0.20.1/bin/../hadoop-0.20.1-core.jar ./student.java
09/10/30 06:48:49 INFO orm.CompilationManager: Writing jar file: /tmp/sqoop/compile/student.jar
09/10/30 06:48:49 INFO mapred.ImportJob: Beginning data import of student
09/10/30 06:48:50 INFO manager.SqlManager: Executing SQL statement: SELECT t.* FROM student AS t WHERE 1 = 1
09/10/30 06:48:50 INFO mapred.JobClient: Running job: job_200910300539_0002
09/10/30 06:48:51 INFO mapred.JobClient:  map 0% reduce 0%
09/10/30 06:49:04 INFO mapred.JobClient:  map 50% reduce 0%
09/10/30 06:49:07 INFO mapred.JobClient:  map 100% reduce 0%
09/10/30 06:49:09 INFO mapred.JobClient: Job complete: job_200910300539_0002
09/10/30 06:49:09 INFO mapred.JobClient: Counters: 6
09/10/30 06:49:09 INFO mapred.JobClient:   Job Counters
09/10/30 06:49:09 INFO mapred.JobClient:     Launched map tasks=2
09/10/30 06:49:09 INFO mapred.JobClient:   FileSystemCounters
09/10/30 06:49:09 INFO mapred.JobClient:     HDFS_BYTES_WRITTEN=30
09/10/30 06:49:09 INFO mapred.JobClient:   Map-Reduce Framework
09/10/30 06:49:09 INFO mapred.JobClient:     Map input records=3
09/10/30 06:49:09 INFO mapred.JobClient:     Spilled Records=0
09/10/30 06:49:09 INFO mapred.JobClient:     Map input bytes=3
09/10/30 06:49:09 INFO mapred.JobClient:     Map output records=3

在執行Sqoop上述指令後,它會自動產生一個「student.java」檔(ORM),並打包在「/tmp/sqoop/compile/student.jar」,而這個檔案除了包含「student.class」,同時也包含「sqoop.jar」在它的lib資料夾內,執行成功之後就可以在HDFS之中看到轉換後的資料。

從上述的執行過程中,不難發現其實Sqoop只是透過Map Phase來進行轉換。

相關資源

Introducing Sqoop

Getting Started With Sqoop

Sqoop

2009-10-30 19:20:01 | Comments (4)

Average Length of URL? - Using Hadoop New API

Average Length of URL?」這是之前寫的一個小範例,由於Hadoop在0.20.0版就新增了全新的API(HADOOP-1230),不過它的文件則一直要到0.21.0版才會補齊(Modify the mapred tutorial documentation to use new mapreduce api.),而本文純粹將原先的版本改寫成新的API版本,如下所示:

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class URLAvgLength
{
	static enum URLCounter
	{
		COUNT
	}

	public static class URLMapper extends Mapper<Object, Text, Text, IntWritable>
	{
		private Text len = new Text("Len");

		public void map(Object key, Text value, Context context) throws IOException, InterruptedException
		{
			context.write(len, new IntWritable(value.getLength()));
			context.getCounter(URLCounter.COUNT).increment(1);
			context.setStatus(value.toString());
		}
	}

	public static class URLReducer extends Reducer<Text, IntWritable, Text, IntWritable>
	{
		private IntWritable result = new IntWritable();

		public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException
		{
			int sum = 0;
			for (IntWritable val : values)
			{
				sum += val.get();
			}
			result.set(sum);
			context.write(key, result);
		}
	}

	public static void main(String[] args) throws Exception
	{
		String input = "/user/shen/urllist/*";
		String output = "/user/shen/urlavglen";

		Configuration conf = new Configuration();
		FileSystem fs = FileSystem.get(conf);
		fs.delete(new Path(output), true);

		Job job = new Job(conf, "URLAvgLength");
		job.setJarByClass(URLAvgLength.class);
		job.setMapperClass(URLMapper.class);
		job.setCombinerClass(URLReducer.class);
		job.setReducerClass(URLReducer.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(IntWritable.class);
		FileInputFormat.addInputPath(job, new Path(input));
		FileOutputFormat.setOutputPath(job, new Path(output));

		boolean status = job.waitForCompletion(true);
		Counters cs = job.getCounters();
		Counter c = cs.findCounter(URLCounter.COUNT);
		long count = c.getValue();

		InputStream in = fs.open(new Path("hdfs://shen:9000" + output + "/part-r-00000"));
		BufferedReader br = new BufferedReader(new InputStreamReader(in));
		String line = br.readLine();
		Integer value = Integer.parseInt(line.split("\t")[1]);
		System.out.println("Avg:" + value / count);

		System.exit(status ? 0 : 1);
	}
}

上述範例中,「Counter」和「Status」都已經從原先的「Reporter」換成了「Context」來處理,同樣地可以從JobTracker Admin(default:50030)來看執行的狀態。

相關資源

Upgrading To The New Map Reduce API

What’s New in Hadoop Core 0.20

2009-10-29 18:16:45 | Comments (2)

當兩隻大象結合的時候...

當兩隻大象結合的時候... 說實在的,這個標題我想很難讓人聯想到這意含為何.. 加上Goolge搜尋引擎對於網頁中的「title元素」所佔的權重又比較高... 嗯~ 所以本文似乎不容易被搜尋得到... 天曉得這是一篇探討Hadoop結合PostgreSQL的文章.. 不過我還是想這麼做...

以往要將資料庫中的資料抓出來當作MapReduce的輸入/輸出都必須先自行處理這當中的轉換工作,而本文要探討的是直接採用資料庫當作MapReduce的輸入/輸出資料,因為這在Hadoop 0.19版(目前為0.19.1)就納入支援了「MapReduce for MySQL(Hadoop-2536)」,底下是一個簡單的測試範例,下述是筆者自行建立的「wordcount」資料表:

CREATE TABLE wordcount
(
  id serial NOT NULL,
  word character varying(20) NOT NULL,
  count integer NOT NULL DEFAULT 1,
  CONSTRAINT wc_id PRIMARY KEY (id)
)
WITH (OIDS=FALSE);
ALTER TABLE wordcount OWNER TO postgres;

預設的資料內容如下:

基本上就是先透過DBConfiguration去設定資料庫相關的組態工作,然後交由DBInputFormatDBOutputFormat來處理相對應資料表的輸入和輸出,並且撰寫一個實作DBWritable介面的Class,用來作為資料庫讀/寫工作的橋梁,在這個範例中為「WordRecord」Class,詳細請參考附檔。

P.S. 請拷貝一份「JDBC」driver放置在「HADOOP_HOME/lib」底下,另外您執行的jar檔也需要一同打包這個driver。

import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.lib.db.DBConfiguration;
import org.apache.hadoop.mapred.lib.db.DBInputFormat;
import org.apache.hadoop.mapred.lib.db.DBOutputFormat;

import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class WordCount extends Configured implements Tool
{

    public int run(String[] arg0) throws Exception
    {
        JobConf job = new JobConf(getConf(), WordCount.class);

        job.setJobName("DBWordCount");

        job.setMapperClass(WordCountMapper.class);
        job.setReducerClass(WordCountReducer.class);

        DBConfiguration.configureDB(job, "org.postgresql.Driver", "jdbc:postgresql://localhost/WordCount", "帳號", "密碼");

        String[] fields = { "word", "count" };

        DBInputFormat.setInput(job, WordRecord.class, "wordcount", null, "id", fields);
        DBOutputFormat.setOutput(job, "wordcount(word,count)", fields);

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);

        job.setOutputKeyClass(WordRecord.class);
        job.setOutputValueClass(NullWritable.class);

        JobClient.runJob(job);

        return 0;
    }

    static class WordCountMapper extends MapReduceBase implements
            Mapper<LongWritable, WordRecord, Text, IntWritable>
    {

        public void map(LongWritable key, WordRecord value,
                OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException
        {
            output.collect(new Text(value.word), new IntWritable(value.count));
        }
    }

    static class WordCountReducer extends MapReduceBase implements
            Reducer<Text, IntWritable, WordRecord, NullWritable>
    {

        public void reduce(Text key, Iterator<IntWritable> values,
                OutputCollector<WordRecord, NullWritable> output,
                Reporter reporter) throws IOException
        {
            int sum = 0;
            while (values.hasNext())
            {
                sum += values.next().get();
            }
            output.collect(new WordRecord(key.toString(), sum), NullWritable.get());
        }
    }

    public static void main(String args[]) throws Exception
    {
        int ret = ToolRunner.run(new WordCount(), args);
        System.exit(ret);
    }
}

結果:(直接寫回wordcount資料表)

詳細的內部實作可以參考DBInputFormatDBOutputFormat,會發現DBInputFormat中的「getSelectQuery()」方法裡面用了select... order by、limit、offset去串起來這樣的SQL語法(所以目前尚不支援某些資料庫,如:Oracle),相反的DBOutputFormat當然就是用insert into tablename values(fields name),而在此範例中雖然有一個serial number當作Primary Key(id),不過筆者所撰寫的「WordRecord」並沒有去操作這個ID,所以在「setOutput」的方法中筆者明確地告知資料表名稱為「wordcount(word,count)」,如此在輸出到資料表時才不會出錯。

原始檔

參考資源

Database Access with Hadoop

DBInputFormat (Hadoop 0.19.1 API)

2009-03-15 00:41:51 | Add Comment

當大象遇上PDFBox...

當大象遇上PDFBox... 這個標題看起來蠻有趣的,相反的當「Hadoop + PDFBox」就太正式了.. XD

兩個月前筆者曾po「PDFBox - 擷取PDF檔案中的純文字」,現在一樣請多台幾器一起來做這件事~ 如果沒機會體驗的話~ 看看「Self-service, Prorated Super Computing Fun!」這篇描述NYT在兩年前用Hadoop將1100萬份文章的TIFF影像檔轉成PDF檔案,重點在於只花了一天的時間就搞定了... = =" 而本文要做的就是分散式的將這些PDF檔案擷取出純文字~ 當然會比一台機器快多了~ (不過話說我也是在一台機器上測試...)

P.S. third party library 請記得放在「lib」資料夾一同打包

import java.io.ByteArrayInputStream;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.lib.NullOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.pdfbox.pdmodel.PDDocument;
import org.pdfbox.util.PDFTextStripper;

public class PDF2TXT extends Configured implements Tool
{

    public static class Map extends MapReduceBase implements
            Mapper<NullWritable, BytesWritable, Text, Text>
    {

        private JobConf conf;

        @Override
        public void configure(JobConf conf)
        {
            this.conf = conf;
        }

        public void map(NullWritable key, BytesWritable value,
                OutputCollector<Text, Text> output, Reporter reporter)
                throws IOException
        {
            String filename = conf.get("map.input.file");
            String output_dir = conf.get("output.dir");
            filename = getFileName(filename);

            FileSystem fs = FileSystem.get(conf);
            FSDataOutputStream dos = fs.create(new Path(output_dir + filename + ".txt"));
            PDDocument document = PDDocument.load(new ByteArrayInputStream(value.getBytes()));
            PDFTextStripper stripper = new PDFTextStripper();
            stripper.setStartPage(1);
            stripper.setEndPage(document.getNumberOfPages());
            String s = stripper.getText(document);
            dos.write(s.getBytes("UTF-8"));
            dos.close();
        }

        public String getFileName(String s)
        {
            return s.substring(s.lastIndexOf("/"), s.lastIndexOf("."));
        }
    }

    public int run(String[] args) throws Exception
    {
        JobConf conf = new JobConf(getConf(), PDF2TXT.class);
        conf.set("output.dir", args[1]);

        conf.setJobName("PDF2TXT");
        conf.setMapperClass(Map.class);

        conf.setInputFormat(WholeFileInputFormat.class);
        conf.setOutputFormat(NullOutputFormat.class);

        conf.set("mapred.child.java.opts", "-Xmx256m");
        conf.setNumReduceTasks(0);

        WholeFileInputFormat.setInputPaths(conf, new Path(args[0]));
        JobClient.runJob(conf);
        return 0;
    }

    public static void main(String[] args)
    {
        try
        {
            int res = ToolRunner.run(new Configuration(), new PDF2TXT(), args);
            System.exit(res);
        } catch (Exception e)
        {
            e.printStackTrace();
        }
    }
}

原始檔

2009-03-14 00:26:23 | Add Comment

Next Posts~:::~Previous Posts
Copyright (C) Ching-Shen Chen. All rights reserved.

::: 搜尋 :::

::: 分類 :::

::: Ads :::

::: 最新文章 :::

::: 最新回應 :::

::: 訂閱 :::

Atom feed
Atom Comment