blog.Ring.idv.tw

Hadoop

整合Cassandra和Hadoop - WordCount

由於Cassandra在0.6版開始提供和Hadoop整合,可以將Cassandra所儲存的資料當做Hadoop MapReduce的輸入來源,所以筆者前幾天在試著玩玩Cassandra該如何要和Hadoop整合時,用著「官方所提供的WordCount」範例,跑出來的結果居然完全錯誤!! trace了大半天才發現原來是Cassandra的Bug,還好這在最新釋出的0.6.4版已經被修正了(ColumnFamilyRecordReader returns duplicate rows),不過目前Cassandra還是沒有提供將Hadoop資料輸出到Cassandrad的介面實作(雖然可以在Reduce自行處理),這要等到0.7版才會釋出(A Hadoop Output Format That Targets Cassandra),下述就是Cassandra+Hadoop的WordCount程式:

測試資料

Key     Value
-----------------------------------------
Doc1    new home sales top forecasts 
Doc2    home sales rise in july 
Doc3    increase in home sales in july 
Doc4    july new home sales rise 

IRWordCountSetup

package cassandra;

import org.apache.cassandra.thrift.Cassandra;
import org.apache.cassandra.thrift.ColumnPath;
import org.apache.cassandra.thrift.ConsistencyLevel;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;

public class IRWordCountSetup
{
	public static final String UTF8 = "UTF8";

	public static void main(String[] args)throws Exception
	{
		TTransport tr = new TSocket("localhost", 9160);
		TProtocol proto = new TBinaryProtocol(tr);
		Cassandra.Client client = new Cassandra.Client(proto);
		tr.open();

		String keyspace = "Keyspace1";
		String columnFamily = "Standard1";

		ColumnPath colPathName = new ColumnPath(columnFamily);
		colPathName.setColumn("Doc".getBytes(UTF8));
		long timestamp = System.currentTimeMillis();
		
		client.insert(keyspace, "Doc1", colPathName, "new home sales top forecasts".getBytes(UTF8), timestamp, ConsistencyLevel.ONE);
		client.insert(keyspace, "Doc2", colPathName, "home sales rise in july".getBytes(UTF8), timestamp, ConsistencyLevel.ONE);
		client.insert(keyspace, "Doc3", colPathName, "increase in home sales in july".getBytes(UTF8), timestamp, ConsistencyLevel.ONE);
		client.insert(keyspace, "Doc4", colPathName, "july new home sales rise".getBytes(UTF8), timestamp, ConsistencyLevel.ONE);
	}
}

IRWordCount

package cassandra;

import java.io.IOException;
import java.util.Arrays;
import java.util.SortedMap;
import java.util.StringTokenizer;

import org.apache.cassandra.db.IColumn;
import org.apache.cassandra.hadoop.ColumnFamilyInputFormat;
import org.apache.cassandra.hadoop.ConfigHelper;
import org.apache.cassandra.thrift.SlicePredicate;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class IRWordCount
{
	static final String KEYSPACE = "Keyspace1";
	static final String COLUMN_FAMILY = "Standard1";
	private static final String CONF_COLUMN_NAME = "Doc";
	private static final String OUTPUT_PATH_PREFIX = "/tmp/doc_word_count";

	public static class TokenizerMapper extends Mapper<String, SortedMap<byte[], IColumn>, Text, IntWritable>
	{
		private final static IntWritable one = new IntWritable(1);
		private Text word = new Text();
		private String columnName;
		
		protected void setup(Context context) throws IOException, InterruptedException
		{
			this.columnName = context.getConfiguration().get(CONF_COLUMN_NAME);
		}
		public void map(String key, SortedMap<byte[], IColumn> columns, Context context) throws IOException, InterruptedException
		{
			IColumn column = columns.get(columnName.getBytes());
			if(column == null)
				return;
			
			String value = new String(column.value());			
			System.out.println("read " + key + ":" + value + " from " + context.getInputSplit());

			StringTokenizer itr = new StringTokenizer(value);
			while (itr.hasMoreTokens())
			{
				word.set(itr.nextToken());
				context.write(word, one);
			}
		}
	}

	public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable>
	{
		private IntWritable result = new IntWritable();

		public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException
		{
			int sum = 0;
			for (IntWritable val : values)
			{
				sum += val.get();
			}
			result.set(sum);
			context.write(key, result);
		}
	}
	
	public static void main(String[] args) throws Exception
	{
		Path output = new Path(OUTPUT_PATH_PREFIX);
		Configuration conf = new Configuration();
        
		FileSystem fs = FileSystem.get(conf);
		if(fs.exists(output))
			fs.delete(output, true);
		
		String columnName = "Doc";
		conf.set(CONF_COLUMN_NAME, columnName);
		Job job = new Job(conf, "wordcount");
		job.setJarByClass(IRWordCount.class);
		job.setMapperClass(TokenizerMapper.class);
		job.setCombinerClass(IntSumReducer.class);
		job.setReducerClass(IntSumReducer.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(IntWritable.class);

		job.setInputFormatClass(ColumnFamilyInputFormat.class);
		FileOutputFormat.setOutputPath(job, output);

		ConfigHelper.setColumnFamily(job.getConfiguration(), KEYSPACE, COLUMN_FAMILY);
		SlicePredicate predicate = new SlicePredicate().setColumn_names(Arrays.asList(columnName.getBytes()));
		ConfigHelper.setSlicePredicate(job.getConfiguration(), predicate);

		boolean status = job.waitForCompletion(true);
		System.exit(status ? 0 : 1);
	}
}

輸出的結果為:

forecasts	1
home	4
in	3
increase	1
july	3
new	2
rise	2
sales	4
top	1

2010-08-09 15:27:32 | Add Comment

HBase/Hadoop RPC

基本上HBase的RPC設計是採用Hadoop的RPC並做一些更動而寫成的(HBase/RoadMaps),而如果要用一句話來解釋HBase/Hadoop的RPC設計可以這麼說:它是透過Dynamic Proxy Pattern + Reflection + NIO(Multiplexed, non-blocking I/O)所構成的,中間溝通的物件會透過序列化(serialization)的方式來傳遞,所以都需要實作Hadoop的Writable介面,下述是筆者利用HBase/Hadoop的RPC設計來自訂一個Hello, Java範例:

RPCInterface

自行定義一個「say()」方法供RPC呼叫。

package hbase.rpc;

import org.apache.hadoop.hbase.ipc.HBaseRPCProtocolVersion;

public interface RPCInterface extends HBaseRPCProtocolVersion
{
	public String say();
}

Message

實作RPCInterface介面的Message類別,純粹回傳一個「Hello, Java」字串。

package hbase.rpc;

import java.io.IOException;

import org.apache.hadoop.hbase.ipc.HBaseRPCProtocolVersion;

public class Message implements RPCInterface
{
	public String say()
	{
		return "Hello, Java";
	}
	
	@Override
	public long getProtocolVersion(String protocol, long clientVersion) throws IOException
	{
		return HBaseRPCProtocolVersion.versionID;
	}
}

TestRPCServer

該程式會透過「HBaseRPC.addToMap()」來註冊自行定義的方法(Method Registry),而內部就是利用Reflection來取得該類別所擁有的方法(Method),它會給予每個方法一個特定的ID,而該ID是一個整數值。

由於HBase/Hadoop的Server是採用Multiplexed, non-blocking I/O方式而設計的,所以它可以透過一個Thread來完成處理,但是由於處理Client端所呼叫的方法是Blocking I/O,所以它的設計會將Client所傳遞過來的物件先放置在Queue,並在啟動Server時就先產生一堆Handler(Thread),該Handler會透過Polling的方式來取得該物件並執行對應的方法,下述範例預設為10個Handler(HMaster/HRegionServer預設都為25個,根據"hbase.regionserver.handler.count"設定)。

package hbase.rpc;

import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HServerAddress;
import org.apache.hadoop.hbase.ipc.HBaseRPC;
import org.apache.hadoop.hbase.ipc.HBaseServer;

public class TestRPCServer
{
	private HBaseServer server;
	private final HServerAddress address;
	static{HBaseRPC.addToMap(RPCInterface.class, (byte)37);}
	
	public TestRPCServer()
	{
		this.address = new HServerAddress("localhost:56789");
	}

	public void start()
	{
		try
		{
			Message msg = new Message();
			this.server = HBaseRPC.getServer(msg, address.getBindAddress(), address.getPort(), 10, true, new HBaseConfiguration());
			this.server.start();
			while (true)
			{
				Thread.sleep(3000);
			}
		} catch (Exception e)
		{
			e.printStackTrace();
		}

	}
	public static void main(String[] args)
	{
		new TestRPCServer().start();
	}
}

TestRPCClient

這裡的「HBaseRPC.getProxy()」就是採用Dynamic Proxy Pattern + Reflection來設計,有興趣的朋友可以去研究它的Source Code。

package hbase.rpc;

import java.io.IOException;
import java.net.InetSocketAddress;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.ipc.HBaseRPC;
import org.apache.hadoop.hbase.ipc.HBaseRPCProtocolVersion;


public class TestRPCClient
{
	protected RPCInterface server;
	static{HBaseRPC.addToMap(RPCInterface.class, (byte)37);}
	@SuppressWarnings("unchecked")
	public TestRPCClient()
	{
		try
		{
			server = (RPCInterface) HBaseRPC.getProxy(RPCInterface.class, HBaseRPCProtocolVersion.versionID, new InetSocketAddress("localhost", 56789), new HBaseConfiguration());
		} catch (Exception e)
		{
			e.printStackTrace();
		}
	}

	public String call() throws IOException
	{
		return server.say();
	}

	public static void main(String[] args) throws IOException
	{
		TestRPCClient client = new TestRPCClient();
		System.out.println(client.call());
	}
}

玩玩看吧!

2010-05-09 01:46:58 | Comments (1)

Hadoop - 探討RunJar

通常要執行一個Hadoop Job時,會透過下述的指令來達成:

${HADOOP_HOME}/bin/hadoop jar your.jar mainClass args

當送出上述指令之後,透過「jps」指令可以觀察到有一個「org.apache.hadoop.util.RunJar」的程式正在執行:

19141 RunJar

而該「org.apache.hadoop.util.RunJar」的程式就是透過「${HADOOP_HOME}/bin/hadoop」shell來執行對應的Command「jar」,並啟動「org.apache.hadoop.util.RunJar」來進行Hadoop Job的第一步。

hadoop shell (lines:228-229)

elif [ "$COMMAND" = "jar" ] ; then
  CLASS=org.apache.hadoop.util.RunJar

main method開始來看,它會從你所執行的「your.jar」來試著取得manifest的Main-Class屬性用來當作mainClassName,如果沒有指定的話就從參數取得。

RunJar.java (lines:94-107)

Manifest manifest = jarFile.getManifest();
    if (manifest != null) {
      mainClassName = manifest.getMainAttributes().getValue("Main-Class");
    }
    jarFile.close();

    if (mainClassName == null) {
      if (args.length < 2) {
        System.err.println(usage);
        System.exit(-1);
      }
      mainClassName = args[firstArg++];
    }
    mainClassName = mainClassName.replaceAll("/", ".");

接著該程式會將「your.jar」解壓縮在一個暫存的目錄裡面,該目錄的位置會取決於「hadoop.tmp.dir」的設定,從「${HAOOP_HOME}/src/core/core-default.xml」可以得知該設定的預設值為「/tmp/hadoop-${user.name}」,所以從下述的原始碼可得知,它一開始會試著建立「/tmp/hadoop-${user.name}」目錄(通常只有第一次執行時),然後再透過「File.createTempFile()」方法來建立一個「hadoop-unjar*」的暫存目錄,所以「your.jar」解壓縮後的class檔都會放在此目錄裡面,當執行結束之後也會一併刪除該目錄。

RunJar.java (lines:109-132)

File tmpDir = new File(new Configuration().get("hadoop.tmp.dir"));
    tmpDir.mkdirs();
    if (!tmpDir.isDirectory()) { 
      System.err.println("Mkdirs failed to create " + tmpDir);
      System.exit(-1);
    }
    final File workDir = File.createTempFile("hadoop-unjar", "", tmpDir);
    workDir.delete();
    workDir.mkdirs();
    if (!workDir.isDirectory()) {
      System.err.println("Mkdirs failed to create " + workDir);
      System.exit(-1);
    }
    
    Runtime.getRuntime().addShutdownHook(new Thread() {
        public void run() {
          try {
            FileUtil.fullyDelete(workDir);
          } catch (IOException e) {
          }
        }
      });
     
    unJar(file, workDir);

最後才會透過Reflection機制來達成動態載入「your.jar」的mainClass。

RunJar.java (lines:134-159)

ArrayList<URL> classPath = new ArrayList<URL>();
    classPath.add(new File(workDir+"/").toURL());
    classPath.add(file.toURL());
    classPath.add(new File(workDir, "classes/").toURL());
    File[] libs = new File(workDir, "lib").listFiles();
    if (libs != null) {
      for (int i = 0; i < libs.length; i++) {
        classPath.add(libs[i].toURL());
      }
    }
    
    ClassLoader loader =
      new URLClassLoader(classPath.toArray(new URL[0]));

    Thread.currentThread().setContextClassLoader(loader);
    Class<?> mainClass = Class.forName(mainClassName, true, loader);
    Method main = mainClass.getMethod("main", new Class[] {
      Array.newInstance(String.class, 0).getClass()
    });
    String[] newArgs = Arrays.asList(args)
      .subList(firstArg, args.length).toArray(new String[0]);
    try {
      main.invoke(null, new Object[] { newArgs });
    } catch (InvocationTargetException e) {
      throw e.getTargetException();
    }

2009-12-16 00:09:37 | Add Comment

WordCount - HBase 0.20.x

本文是一個簡單的WordCount程式,經由MapReduce的處理之後直接輸出到HBase,實作的範例如下:

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;

public class WordCountHBase
{
    public static class Map extends Mapper<LongWritable,Text,Text, IntWritable>
    {
        private IntWritable i = new IntWritable(1);
        @Override
        public void map(LongWritable key,Text value,Context context) throws IOException, InterruptedException
        {
            String s[] = value.toString().trim().split(" ");
            for( String m : s)
            {
                context.write(new Text(m), i);
            }
        }
    }
    public static class Reduce extends TableReducer<Text, IntWritable, NullWritable>
    {
        @Override
        public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException
        {
            int sum = 0;
            for(IntWritable i : values)
            {
                sum += i.get();
            }
           
            Put put = new Put(Bytes.toBytes(key.toString()));
            put.add(Bytes.toBytes("content"), Bytes.toBytes("count"), Bytes.toBytes(String.valueOf(sum)));
            context.write(NullWritable.get(), put);
        }
    }
    public static void createHBaseTable(String tablename)throws IOException
    {
        HTableDescriptor htd = new HTableDescriptor(tablename);
        HColumnDescriptor col = new HColumnDescriptor("content:");
        htd.addFamily(col);
       
        HBaseConfiguration config = new HBaseConfiguration();
        HBaseAdmin admin = new HBaseAdmin(config);
        if(admin.tableExists(tablename))
        {
            admin.disableTable(tablename);
            admin.deleteTable(tablename);
        }
       
        System.out.println("create new table: " + tablename);
        admin.createTable(htd);
    }
   
    public static void main(String args[]) throws Exception
    {
        String tablename = "wordcount";
       
        Configuration conf = new Configuration();
        conf.set(TableOutputFormat.OUTPUT_TABLE, tablename);
        createHBaseTable(tablename);

        String input = args[0];
        Job job = new Job(conf, "WordCount table with " + input);
       
        job.setJarByClass(WordCountHBase.class);
        job.setNumReduceTasks(3);
        job.setMapperClass(Map.class);
        job.setReducerClass(Reduce.class);
       
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
       
        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TableOutputFormat.class);

        FileInputFormat.addInputPath(job, new Path(input));
       
        System.exit(job.waitForCompletion(true)?0:1);
    }
}

從上述程式可以知道Reduce是直接繼承於TableReducer<KEYIN,VALUEIN,KEYOUT>,不過就如同API的說明,KEYOUT在TableOutputFormat當中是被忽略的,而且VALUEOUT也只能是PutDelete,可以從下述的原始碼得知:

TableOutputFormat.java

public void write(KEY key, Writable value) throws IOException
{
	if (value instanceof Put)
		this.table.put(new Put((Put) value));
	else if (value instanceof Delete)
		this.table.delete(new Delete((Delete) value));
	else
		throw new IOException("Pass a Delete or a Put");
}

至於該輸出至哪一個Table,則必須設置「TableOutputFormat.OUTPUT_TABLE」的組態設定,也可以自行設置「hbase.mapred.outputtable」。

TableOutputFormat.java

public static final String OUTPUT_TABLE = "hbase.mapred.outputtable";

2009-12-08 21:55:33 | Comments (2)

淺談Hadoop FileSystem API

Hadoop中,我們若是想直接存取HDFS之中的資料或進行一些檔案的操作,可以透過它所提供的FileSystem API來達成,下述程式是一個簡單範例:

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;

public class GetFileSystem
{
	public static void main(String[] args) throws IOException
	{
		String uri = "hdfs://shen:9000/user/";
		Configuration conf = new Configuration();
		FileSystem fs = FileSystem.get(URI.create(uri), conf);
	}
}

上述程式在執行的過程會執行三次的shell command,分別為:

whoami
bash -c groups
whoami

而內部的運作方式,是當上述程式執行「FileSystem.get()」方法之後,它會利用FileSystem類別本身的一個static物件「FileSystem.Cache」(如下述程式),去執行所屬的「FileSystem.Cache.get()」方法,並透過Map去找尋是否有Cache住的FileSystem instance,所以在找尋的過程中會產生一個「FileSystem.Cache.Key」物件,而它的Constructor會呼叫「UserGroupInformation.login()」靜態方法,並再交由「UnixUserGroupInformation.login()」去執行登入的動作,而這個登入的動作就會透過「org.apache.hadoop.util.Shell」去執行上述「whoami」和「bash -c groups」兩個指令。

FileSystem.java (lines:1382~1397)

static class Cache {
    private final Map<Key, FileSystem> map = new HashMap<Key, FileSystem>();

    synchronized FileSystem get(URI uri, Configuration conf) throws IOException{
      Key key = new Key(uri, conf);
      FileSystem fs = map.get(key);
      if (fs == null) {
        fs = createFileSystem(uri, conf);
        if (map.isEmpty() && !clientFinalizer.isAlive()) {
          Runtime.getRuntime().addShutdownHook(clientFinalizer);
        }
        fs.key = key;
        map.put(key, fs);
      }
      return fs;
    }

那為何又會有第三個「whoami」指令?

這是因為如果從Cache中找不到對應的FileSystem的話,它會執行「private static createFileSystem()」方法去產生一個對應的FileSystem instance,並執行一些初始化的動作(如下述程式),而如何產生對應的FileSystem會取決於URI scheme來決定,由於上述的範例是要存取HDFS,所以scheme為hdfs,並經由「$HADOOP_HOME/src/core/core-default.xml」的組態檔可得知,HDFS對應的FileSystem class是「org.apache.hadoop.hdfs.DistributedFileSystem」(它繼承於FileSystem),所以重點就在於此類別中的「initialize()」所為何事?

FileSystem.java (lines:1369~1379)

private static FileSystem createFileSystem(URI uri, Configuration conf
      ) throws IOException {
	  LOG.warn(uri.getScheme());
    Class<?> clazz = conf.getClass("fs." + uri.getScheme() + ".impl", null);
    if (clazz == null) {
      throw new IOException("No FileSystem for scheme: " + uri.getScheme());
    }
    FileSystem fs = (FileSystem)ReflectionUtils.newInstance(clazz, conf);
    fs.initialize(uri, conf);
    return fs;
  }

在「org.apache.hadoop.hdfs.DistributedFileSystem」中執行「initialize()」方法會產生一個「org.apache.hadoop.hdfs.DFSClient」物件,它準備用來和HDFS進行連線的工作,而它的Constructor又會呼叫「UnixUserGroupInformation.login()」去執行登入的動作,所以才又有第二次的「whoami」指令,那為何第二次沒有執行「bash -c groups」指令?這是因為「UnixUserGroupInformation」本身也會Cache,所以執行第二次的「whoami」指令主要就是要從Cache中再取出「UnixUserGroupInformation」並傳回給「org.apache.hadoop.hdfs.DFSClient」,之所以如此才會依序執行「whoami」、「bash -c groups」和「whoami」三個指令,所以其實HDFS純粹透過Shell來取得使用者的身份和群組資訊。

UnixUserGroupInformation.java (lines:238~277)

public static UnixUserGroupInformation login() throws LoginException {
    try {
      String userName;

      // if an exception occurs, then uses the
      // default user
      try {
        userName =  getUnixUserName();
      } catch (Exception e) {
        userName = DEFAULT_USERNAME;
      }

      // check if this user already has a UGI object in the ugi map
      UnixUserGroupInformation ugi = user2UGIMap.get(userName);
      if (ugi != null) {
        return ugi;
      }

      /* get groups list from UNIX. 
       * It's assumed that the first group is the default group.
       */
      String[]  groupNames;

      // if an exception occurs, then uses the
      // default group
      try {
        groupNames = getUnixGroups();
      } catch (Exception e) {
        groupNames = new String[1];
        groupNames[0] = DEFAULT_GROUP;
      }

      // construct a Unix UGI
      ugi = new UnixUserGroupInformation(userName, groupNames);
      user2UGIMap.put(ugi.getUserName(), ugi);
      return ugi;
    } catch (Exception e) {
      throw new LoginException("Login failed: "+e.getMessage());
    }
  }

2010.01.04 updated

HADOOP-4998在討論是否實作一個native OS runtime for Hadoop,如此就不用依賴上述Shell command來取得OS的相關資源.

Bash -c string 說明

來源:bash(1) - Linux man page

If the -c option is present, then commands are read from string. 
If there are arguments after the string, they are assigned to the positional parameters, starting with $0. 

2009-11-26 23:55:07 | Add Comment

Next Posts~:::~Previous Posts
Copyright (C) Ching-Shen Chen. All rights reserved.

::: 搜尋 :::

::: 分類 :::

::: Ads :::

::: 最新文章 :::

::: 最新回應 :::

::: 訂閱 :::

Atom feed
Atom Comment

::: 人氣指數 :::

今日人氣:212

累積人氣:2952575


::: 線上人數 :::

counter