blog.Ring.idv.tw

HBase

HBase/Hadoop RPC

基本上HBase的RPC設計是採用Hadoop的RPC並做一些更動而寫成的(HBase/RoadMaps),而如果要用一句話來解釋HBase/Hadoop的RPC設計可以這麼說:它是透過Dynamic Proxy Pattern + Reflection + NIO(Multiplexed, non-blocking I/O)所構成的,中間溝通的物件會透過序列化(serialization)的方式來傳遞,所以都需要實作Hadoop的Writable介面,下述是筆者利用HBase/Hadoop的RPC設計來自訂一個Hello, Java範例:

RPCInterface

自行定義一個「say()」方法供RPC呼叫。

package hbase.rpc;

import org.apache.hadoop.hbase.ipc.HBaseRPCProtocolVersion;

public interface RPCInterface extends HBaseRPCProtocolVersion
{
	public String say();
}

Message

實作RPCInterface介面的Message類別,純粹回傳一個「Hello, Java」字串。

package hbase.rpc;

import java.io.IOException;

import org.apache.hadoop.hbase.ipc.HBaseRPCProtocolVersion;

public class Message implements RPCInterface
{
	public String say()
	{
		return "Hello, Java";
	}
	
	@Override
	public long getProtocolVersion(String protocol, long clientVersion) throws IOException
	{
		return HBaseRPCProtocolVersion.versionID;
	}
}

TestRPCServer

該程式會透過「HBaseRPC.addToMap()」來註冊自行定義的方法(Method Registry),而內部就是利用Reflection來取得該類別所擁有的方法(Method),它會給予每個方法一個特定的ID,而該ID是一個整數值。

由於HBase/Hadoop的Server是採用Multiplexed, non-blocking I/O方式而設計的,所以它可以透過一個Thread來完成處理,但是由於處理Client端所呼叫的方法是Blocking I/O,所以它的設計會將Client所傳遞過來的物件先放置在Queue,並在啟動Server時就先產生一堆Handler(Thread),該Handler會透過Polling的方式來取得該物件並執行對應的方法,下述範例預設為10個Handler(HMaster/HRegionServer預設都為25個,根據"hbase.regionserver.handler.count"設定)。

package hbase.rpc;

import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HServerAddress;
import org.apache.hadoop.hbase.ipc.HBaseRPC;
import org.apache.hadoop.hbase.ipc.HBaseServer;

public class TestRPCServer
{
	private HBaseServer server;
	private final HServerAddress address;
	static{HBaseRPC.addToMap(RPCInterface.class, (byte)37);}
	
	public TestRPCServer()
	{
		this.address = new HServerAddress("localhost:56789");
	}

	public void start()
	{
		try
		{
			Message msg = new Message();
			this.server = HBaseRPC.getServer(msg, address.getBindAddress(), address.getPort(), 10, true, new HBaseConfiguration());
			this.server.start();
			while (true)
			{
				Thread.sleep(3000);
			}
		} catch (Exception e)
		{
			e.printStackTrace();
		}

	}
	public static void main(String[] args)
	{
		new TestRPCServer().start();
	}
}

TestRPCClient

這裡的「HBaseRPC.getProxy()」就是採用Dynamic Proxy Pattern + Reflection來設計,有興趣的朋友可以去研究它的Source Code。

package hbase.rpc;

import java.io.IOException;
import java.net.InetSocketAddress;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.ipc.HBaseRPC;
import org.apache.hadoop.hbase.ipc.HBaseRPCProtocolVersion;


public class TestRPCClient
{
	protected RPCInterface server;
	static{HBaseRPC.addToMap(RPCInterface.class, (byte)37);}
	@SuppressWarnings("unchecked")
	public TestRPCClient()
	{
		try
		{
			server = (RPCInterface) HBaseRPC.getProxy(RPCInterface.class, HBaseRPCProtocolVersion.versionID, new InetSocketAddress("localhost", 56789), new HBaseConfiguration());
		} catch (Exception e)
		{
			e.printStackTrace();
		}
	}

	public String call() throws IOException
	{
		return server.say();
	}

	public static void main(String[] args) throws IOException
	{
		TestRPCClient client = new TestRPCClient();
		System.out.println(client.call());
	}
}

玩玩看吧!

2010-05-09 01:46:58 | Comments (1)

HBase - TableInputFormat

這篇主要記錄如何將HBase(0.20.3)當作Hadoop MapReduce程式的輸入來源,下述的程式碼很單純的從一個Table取出資料並直接輸出至HDFS上:

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

public class TestTableInput
{
	public static class Map extends TableMapper<Text, Text>
	{
		@Override
		public void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException
		{
			String v = Bytes.toString(value.value());
			context.write(new Text(Bytes.toString(value.getRow())), new Text(v));
		}
	}

	public static class Reduce extends Reducer<Text, Text, Text, Text>
	{
		@Override
		public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException
		{
			context.write(key, new Text(values.iterator().next()));
		}
	}

	public static void main(String args[]) throws Exception
	{
		if(args.length != 2)
		{
			System.out.println("Usage: hadoop jar TestTableInput.jar <table> <output>");
			System.exit(-1);
		}
		
		String tablename = args[0];
		String output = args[1];

		Configuration conf = new Configuration();
		FileSystem fs = FileSystem.get(conf);
		fs.delete(new Path(output), true);

		Job job = new Job(conf, "TestTableInput");
		Scan scan = new Scan();
		TableMapReduceUtil.initTableMapperJob(tablename, scan, Map.class, Text.class, Text.class, job);	
		job.setJarByClass(TestTableInput.class);
		job.setReducerClass(Reduce.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(Text.class);
		job.setOutputFormatClass(TextOutputFormat.class);
		TextOutputFormat.setOutputPath(job, new Path(output));

		System.exit(job.waitForCompletion(true) ? 0 : 1);
	}
}

這裡值得注意的有二個地方,一個是Map必須繼承HBase所提供的TableMapper(其實TableMapper也是繼承於Mapper),它用來指定KEYIN和VALUEIN兩個類別,它們分別為:ImmutableBytesWritableResult,而這兩個類別分別對應的Table資料如下:

ImmutableBytesWritable = Row Key

Result = Row Key+Column+Timestamp+Value

另一個值得注意的是在main方式中用到的TableMapReduceUtil.initTableMapperJob()方法,它封裝了一些設定,如下圖所示:

從圖中我們可以知道該方法會幫我們設定InputFormatClass為TableInputFormat,還有一些相關設定,例如:TableInputFormat.INPUT_TABLE用來設定輸入的Table,TableInputFormat.SCAN用來設定Scan(由於Scan是一個物件,所以必須透過convertScanToString()方法來轉碼成Base64編碼)

2010-03-09 21:12:28 | Add Comment

淺談HBase HTable API

HBase中,如果我們要對一個Table進行操作的話,需要透過「HTable」物件來先初始化一些工作,而本文的重點就在於它初始化了哪些工作?為何第一次初始化就要耗費約「500ms」的時間呢?(以筆者電腦單機測試為例)

下述是一個簡單的程式:

HTable table = new HTable("UserTable");

HTable.java (lines:80~83)

public HTable(final String tableName)
  throws IOException {
    this(new HBaseConfiguration(), Bytes.toBytes(tableName));
  }

從「HTable」的原始碼來看,上述呼叫的Constructor會產生一個「HBaseConfiguration」Object,它是用來載入HBase的組態檔,如:「hbase-default.xml」和「hbase-site.xml」。

另外值得一提的是,產生「HBaseConfiguration」Object大約需要「200ms」的時間(以筆者電腦單機測試為例)。

HBaseConfiguration.java (lines:48-51)

  private void addHbaseResources() {
    addResource("hbase-default.xml");
    addResource("hbase-site.xml");
  }

接著再從呼叫另一個Constructor來看,它除了設定一些參數之外,最主要呼叫了兩個方法,它們分別為:「HConnectionManager.getConnection(conf)」和「connection.locateRegion(tableName, HConstants.EMPTY_START_ROW)

HTable.java (lines:115~132)

 public HTable(HBaseConfiguration conf, final byte [] tableName)
  throws IOException {
    this.tableName = tableName;
    if (conf == null) {
      this.scannerTimeout = 0;
      this.connection = null;
      return;
    }

    this.connection = HConnectionManager.getConnection(conf);    
    this.scannerTimeout =
      conf.getInt("hbase.regionserver.lease.period", 60 * 1000);
    this.configuration = conf;
    this.connection.locateRegion(tableName, HConstants.EMPTY_START_ROW);
    this.writeBufferSize = conf.getLong("hbase.client.write.buffer", 2097152);
    this.autoFlush = true;
    this.currentWriteBufferSize = 0;
    this.scannerCaching = conf.getInt("hbase.client.scanner.caching", 1);
  }

先從「HConnectionManager.getConnection(conf)」來看,它主要產生一個「TableServers」物件,從變數名稱的隱喻來看它所指的是建立一個連線,並將這個connection先cache起來。

HConnectionManager.java (lines:96-106)

public static HConnection getConnection(HBaseConfiguration conf) {
    TableServers connection;
    synchronized (HBASE_INSTANCES) {
      connection = HBASE_INSTANCES.get(conf);
      if (connection == null) {
        connection = new TableServers(conf);
        HBASE_INSTANCES.put(conf, connection);
      }
    }
    return connection;
  }

再從「TableServers」Constructor來看,它除了設定基本組態之外,還透過「Class.forName()」來載入一個Interface資訊,此Interface為「org.apache.hadoop.hbase.ipc.HRegionInterface」,而該介面就是HBase Server和Client之間的RPC溝通介面。

所以到目前為止兩者其實尚未有實質化的連線。

HConnectionManager.java (lines:256-280)

public TableServers(HBaseConfiguration conf) {
      this.conf = conf;

      String serverClassName =
        conf.get(REGION_SERVER_CLASS, DEFAULT_REGION_SERVER_CLASS);

      this.closed = false;
      
      try {
        this.serverInterfaceClass =
          (Class<? extends HRegionInterface>) Class.forName(serverClassName);
        
      } catch (ClassNotFoundException e) {
        throw new UnsupportedOperationException(
            "Unable to find region server interface " + serverClassName, e);
      }

      this.pause = conf.getLong("hbase.client.pause", 2 * 1000);
      this.numRetries = conf.getInt("hbase.client.retries.number", 10);
      this.maxRPCAttempts = conf.getInt("hbase.client.rpc.maxattempts", 1);
      this.rpcTimeout = conf.getLong("hbase.regionserver.lease.period", 60000);
      
      this.master = null;
      this.masterChecked = false;
    }

接下來再來探討「connection.locateRegion(tableName, HConstants.EMPTY_START_ROW)」所為何事?

這裡的「HConstants.EMPTY_START_ROW」常數值為「new byte [0];」,代表指定該Table第一個Row的Region位置

HConnectionManager.java (lines:556-560)

 public HRegionLocation locateRegion(final byte [] tableName,
        final byte [] row)
    throws IOException{
      return locateRegion(tableName, row, true);
    }

接著就是要取得該Table相關的Region Information,從下述程式可以得知,它會呼叫「locateRegionInMeta」方法來取得「UserTable」和「.META.」這二個Table的資訊,至於ROOT則會呼叫「locateRootRegion()」來取得,根據測試得知「locateRootRegion()」大約會耗費「250ms」,這是因為內部需要透過ZooKeeper Server來取得ROOT的相關資訊。

不過其實下述程式中的「locateRegionInMeta()」方法又會呼叫「locateRegion()」,所以它的整個執行流程順序會先從「-ROOT-」取得資訊,接著是「.META.」最後才是「UserTable」。

HConnectionManager.java (lines:568~)

private HRegionLocation locateRegion(final byte [] tableName,
      final byte [] row, boolean useCache)
    throws IOException{
      if (tableName == null || tableName.length == 0) {
        throw new IllegalArgumentException(
            "table name cannot be null or zero length");
      }
            
      if (Bytes.equals(tableName, ROOT_TABLE_NAME)) {
        synchronized (rootRegionLock) {
          if (!useCache || rootRegionLocation == null) {
            this.rootRegionLocation = locateRootRegion();
          }
          return this.rootRegionLocation;
        }        
      } else if (Bytes.equals(tableName, META_TABLE_NAME)) {
        synchronized (metaRegionLock) {
          return locateRegionInMeta(ROOT_TABLE_NAME, tableName, row, useCache);
        }
      } else {
        synchronized(userRegionLock){
          return locateRegionInMeta(META_TABLE_NAME, tableName, row, useCache);
        }
      }
    }

2010-01-08 19:09:09 | Comments (2)

WordCount - HBase 0.20.x

本文是一個簡單的WordCount程式,經由MapReduce的處理之後直接輸出到HBase,實作的範例如下:

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;

public class WordCountHBase
{
    public static class Map extends Mapper<LongWritable,Text,Text, IntWritable>
    {
        private IntWritable i = new IntWritable(1);
        @Override
        public void map(LongWritable key,Text value,Context context) throws IOException, InterruptedException
        {
            String s[] = value.toString().trim().split(" ");
            for( String m : s)
            {
                context.write(new Text(m), i);
            }
        }
    }
    public static class Reduce extends TableReducer<Text, IntWritable, NullWritable>
    {
        @Override
        public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException
        {
            int sum = 0;
            for(IntWritable i : values)
            {
                sum += i.get();
            }
           
            Put put = new Put(Bytes.toBytes(key.toString()));
            put.add(Bytes.toBytes("content"), Bytes.toBytes("count"), Bytes.toBytes(String.valueOf(sum)));
            context.write(NullWritable.get(), put);
        }
    }
    public static void createHBaseTable(String tablename)throws IOException
    {
        HTableDescriptor htd = new HTableDescriptor(tablename);
        HColumnDescriptor col = new HColumnDescriptor("content:");
        htd.addFamily(col);
       
        HBaseConfiguration config = new HBaseConfiguration();
        HBaseAdmin admin = new HBaseAdmin(config);
        if(admin.tableExists(tablename))
        {
            admin.disableTable(tablename);
            admin.deleteTable(tablename);
        }
       
        System.out.println("create new table: " + tablename);
        admin.createTable(htd);
    }
   
    public static void main(String args[]) throws Exception
    {
        String tablename = "wordcount";
       
        Configuration conf = new Configuration();
        conf.set(TableOutputFormat.OUTPUT_TABLE, tablename);
        createHBaseTable(tablename);

        String input = args[0];
        Job job = new Job(conf, "WordCount table with " + input);
       
        job.setJarByClass(WordCountHBase.class);
        job.setNumReduceTasks(3);
        job.setMapperClass(Map.class);
        job.setReducerClass(Reduce.class);
       
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
       
        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TableOutputFormat.class);

        FileInputFormat.addInputPath(job, new Path(input));
       
        System.exit(job.waitForCompletion(true)?0:1);
    }
}

從上述程式可以知道Reduce是直接繼承於TableReducer<KEYIN,VALUEIN,KEYOUT>,不過就如同API的說明,KEYOUT在TableOutputFormat當中是被忽略的,而且VALUEOUT也只能是PutDelete,可以從下述的原始碼得知:

TableOutputFormat.java

public void write(KEY key, Writable value) throws IOException
{
	if (value instanceof Put)
		this.table.put(new Put((Put) value));
	else if (value instanceof Delete)
		this.table.delete(new Delete((Delete) value));
	else
		throw new IOException("Pass a Delete or a Put");
}

至於該輸出至哪一個Table,則必須設置「TableOutputFormat.OUTPUT_TABLE」的組態設定,也可以自行設置「hbase.mapred.outputtable」。

TableOutputFormat.java

public static final String OUTPUT_TABLE = "hbase.mapred.outputtable";

2009-12-08 21:55:33 | Comments (2)

Copyright (C) Ching-Shen Chen. All rights reserved.

::: 搜尋 :::

::: 分類 :::

::: Ads :::

::: 最新文章 :::

::: 最新回應 :::

::: 訂閱 :::

Atom feed
Atom Comment