blog.Ring.idv.tw

Open Source

整合Cassandra和Hadoop - WordCount

由於Cassandra在0.6版開始提供和Hadoop整合,可以將Cassandra所儲存的資料當做Hadoop MapReduce的輸入來源,所以筆者前幾天在試著玩玩Cassandra該如何要和Hadoop整合時,用著「官方所提供的WordCount」範例,跑出來的結果居然完全錯誤!! trace了大半天才發現原來是Cassandra的Bug,還好這在最新釋出的0.6.4版已經被修正了(ColumnFamilyRecordReader returns duplicate rows),不過目前Cassandra還是沒有提供將Hadoop資料輸出到Cassandrad的介面實作(雖然可以在Reduce自行處理),這要等到0.7版才會釋出(A Hadoop Output Format That Targets Cassandra),下述就是Cassandra+Hadoop的WordCount程式:

測試資料

Key     Value
-----------------------------------------
Doc1    new home sales top forecasts 
Doc2    home sales rise in july 
Doc3    increase in home sales in july 
Doc4    july new home sales rise 

IRWordCountSetup

package cassandra;

import org.apache.cassandra.thrift.Cassandra;
import org.apache.cassandra.thrift.ColumnPath;
import org.apache.cassandra.thrift.ConsistencyLevel;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;

public class IRWordCountSetup
{
	public static final String UTF8 = "UTF8";

	public static void main(String[] args)throws Exception
	{
		TTransport tr = new TSocket("localhost", 9160);
		TProtocol proto = new TBinaryProtocol(tr);
		Cassandra.Client client = new Cassandra.Client(proto);
		tr.open();

		String keyspace = "Keyspace1";
		String columnFamily = "Standard1";

		ColumnPath colPathName = new ColumnPath(columnFamily);
		colPathName.setColumn("Doc".getBytes(UTF8));
		long timestamp = System.currentTimeMillis();
		
		client.insert(keyspace, "Doc1", colPathName, "new home sales top forecasts".getBytes(UTF8), timestamp, ConsistencyLevel.ONE);
		client.insert(keyspace, "Doc2", colPathName, "home sales rise in july".getBytes(UTF8), timestamp, ConsistencyLevel.ONE);
		client.insert(keyspace, "Doc3", colPathName, "increase in home sales in july".getBytes(UTF8), timestamp, ConsistencyLevel.ONE);
		client.insert(keyspace, "Doc4", colPathName, "july new home sales rise".getBytes(UTF8), timestamp, ConsistencyLevel.ONE);
	}
}

IRWordCount

package cassandra;

import java.io.IOException;
import java.util.Arrays;
import java.util.SortedMap;
import java.util.StringTokenizer;

import org.apache.cassandra.db.IColumn;
import org.apache.cassandra.hadoop.ColumnFamilyInputFormat;
import org.apache.cassandra.hadoop.ConfigHelper;
import org.apache.cassandra.thrift.SlicePredicate;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class IRWordCount
{
	static final String KEYSPACE = "Keyspace1";
	static final String COLUMN_FAMILY = "Standard1";
	private static final String CONF_COLUMN_NAME = "Doc";
	private static final String OUTPUT_PATH_PREFIX = "/tmp/doc_word_count";

	public static class TokenizerMapper extends Mapper<String, SortedMap<byte[], IColumn>, Text, IntWritable>
	{
		private final static IntWritable one = new IntWritable(1);
		private Text word = new Text();
		private String columnName;
		
		protected void setup(Context context) throws IOException, InterruptedException
		{
			this.columnName = context.getConfiguration().get(CONF_COLUMN_NAME);
		}
		public void map(String key, SortedMap<byte[], IColumn> columns, Context context) throws IOException, InterruptedException
		{
			IColumn column = columns.get(columnName.getBytes());
			if(column == null)
				return;
			
			String value = new String(column.value());			
			System.out.println("read " + key + ":" + value + " from " + context.getInputSplit());

			StringTokenizer itr = new StringTokenizer(value);
			while (itr.hasMoreTokens())
			{
				word.set(itr.nextToken());
				context.write(word, one);
			}
		}
	}

	public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable>
	{
		private IntWritable result = new IntWritable();

		public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException
		{
			int sum = 0;
			for (IntWritable val : values)
			{
				sum += val.get();
			}
			result.set(sum);
			context.write(key, result);
		}
	}
	
	public static void main(String[] args) throws Exception
	{
		Path output = new Path(OUTPUT_PATH_PREFIX);
		Configuration conf = new Configuration();
        
		FileSystem fs = FileSystem.get(conf);
		if(fs.exists(output))
			fs.delete(output, true);
		
		String columnName = "Doc";
		conf.set(CONF_COLUMN_NAME, columnName);
		Job job = new Job(conf, "wordcount");
		job.setJarByClass(IRWordCount.class);
		job.setMapperClass(TokenizerMapper.class);
		job.setCombinerClass(IntSumReducer.class);
		job.setReducerClass(IntSumReducer.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(IntWritable.class);

		job.setInputFormatClass(ColumnFamilyInputFormat.class);
		FileOutputFormat.setOutputPath(job, output);

		ConfigHelper.setColumnFamily(job.getConfiguration(), KEYSPACE, COLUMN_FAMILY);
		SlicePredicate predicate = new SlicePredicate().setColumn_names(Arrays.asList(columnName.getBytes()));
		ConfigHelper.setSlicePredicate(job.getConfiguration(), predicate);

		boolean status = job.waitForCompletion(true);
		System.exit(status ? 0 : 1);
	}
}

輸出的結果為:

forecasts	1
home	4
in	3
increase	1
july	3
new	2
rise	2
sales	4
top	1

2010-08-09 15:27:32 | Add Comment

GTK - GtkMozEmbed

.2010.03.11 Flash更新

本文主要實作一個透過GtkMozEmbed內嵌Browser的GTK應用程式。

先安裝Gecko (layout engine)的開發函式庫:

sudo apt-get install libxul-dev

範例程式

#include <gtk-2.0/gtk/gtk.h>
#include <gtkmozembed.h>
#include <mozilla-config.h>
#include <stdio.h>

int main(int argc, char * argv[])
{
        gtk_init(&argc, &argv);
        GtkWidget *window = gtk_window_new(GTK_WINDOW_TOPLEVEL);
        gtk_window_set_default_size(GTK_WINDOW(window), 640, 480);
        g_signal_connect(GTK_OBJECT(window), "destroy",G_CALLBACK(gtk_main_quit), NULL);
        GtkWidget *html = gtk_moz_embed_new();
        gtk_container_add(GTK_CONTAINER(window), html);
        gtk_moz_embed_load_url(GTK_MOZ_EMBED(html), "http://www.youtube.com/watch?v=TGbwL8kSpEk");

        gtk_widget_show_all(window);
        gtk_main();
        return 0;
}

編譯並執行它

gcc test.cpp -o test `pkg-config --cflags --libs gtk+-2.0` `pkg-config --cflags --libs xulrunner-gtkmozembed`
./test

問題來了!看不到Flash咧~ 這樣就看不到Girls' Generation的MV了.. Orz

2010.03.11 更新

安裝FlashPlayer (ubuntu 9.10 32bit)

sudo apt-get install flashplugin-installer

很簡單的搞定它了! :p

2010-03-11 00:15:52 | Comments (4)

Augmented Reality in Flash - FLARToolKit

Augmented Reality這個詞主要是在1990年的時候由波音公司的一位「Thomas Caudell」工程師所提出來的(根據wiki的說法),Augmented Reality可以讓我們將虛擬的產物和真實世界結合在一起,可以想像如1996年的一部「Space Jam」電影,該電影將虛擬世界的卡通人物和Michael Jordan結合的作法,只不過那是電影後製的處理手法,而Augmented Reality則是更能實際和生活中結合。

要建構這方面的應用最初可以透過Dr. Hirokazu Kato所開發的ARToolKit library,它是一個採用C/C++的函式庫,不過有許多熱心的人都已經將它移植到其它語言了,所以如果你是用Flash/ActionScript開發的話,那麼可以使用Saqoosha所移植的「FLARToolKit」,只是它也遵循著ARToolKit的腳步採用GNU General Public License,所以如果想商業化的話則必須取得它的Commercial License。

相關資源

手機科技可以讓你看到鬼了!

ARToolKit 實戰/教學(一) 建置篇

FlarToolkit/Flash Augmented Reality - Getting Started

Scorpio: FLARToolkit簡單教學

【教學】PV3D 結合 flartoolkit

2009-10-23 19:25:15 | Add Comment

[Chrome擴展套件] Ymedia Downloader 天空部落影音下載器 v0.1

兩年多前筆者曾寫了手動下載天空部落影音」和「音樂」兩篇的po文,不過這都是需要「手動下載」~

然而隨著Google Chrome的發展,根據「Extensions at Google I/O」的說明(2009年5月),我們已經可以自行開發Chrome Extensions(副檔名為crx)。

開發Chrome Extensions必須先將你的Chrome改成Dev版本,可以參考「Early Access Release Channels 」,另外除了參考官方的教學之外,也有「www.chromeextensions.org」提供許多Chrome Extensions的集中地~

由於Chrome Extension的crx檔也是像Adobe AIR的air檔採用zip壓縮,所以可以很方便的解開,如果想學習它人如何開發的話... 不失為一種方式,但還是要尊重它人的智財權:)。

使用Ymedia Downloader 天空部落影音下載器 v0.1

請點擊「ymedia.crx」安裝,完成之後會在瀏覽器下方的工具列上看到下圖:

接下來只要當你瀏覽天空部落的任一個影音網站時,點擊上圖的按鈕就會立即下載您目前所處於瀏覽的影音!!

如果有任何問題的話,歡迎給予建議或回饋,謝謝。

2009-09-22 19:59:48 | Add Comment

Universal Text Box - 通用輸入框

Laconica - The Open Microblogging Tool.是一套免費且開放原始碼(PHP)的微型部落格平臺,可以用在一個社群或群組的短訊(140 character)溝通平臺,類似於當下熱門的TwitterPlurk

如果您是Twitter的愛用者,對於「@」符號應該不會太陌生,下述是從Laconica的原始碼抓出它的Regular Expression來看:

import re
msg = "@account This is a test."
p = r'(?:^|\s)@([a-z0-9]{1,64})'
m = re.findall(p,msg)  
for i in m:
    print i

從這個簡單的小程式來看,重點在於「(?:^|\s)@([a-z0-9]{1,64})」這一行Regular Expression,它處理的方式是在「@」之後的「帳號名稱」只能是小寫的字母和數字所組成,且範圍限制在1到64個字元之間,而在「@」之前的處理方式則是必須是訊息的起始位置或是空白字元,不過因為它是用「(?:^|\s)」括弧涵蓋起來,所以加上「?:」來排除這個Group,如此就是一個類似處理Twitter訊息輸入欄的Regular Expression

下述這個範例就是一個簡單的利用「#」符號來擴展成標記Tag的方法:

import re
msg = "Today is a sunny day. #weather #life"
p = r'(?:^|\s)#([A-Za-z0-9_\-\.]{1,64})'
m = re.findall(p,msg)  
for i in m:
    print i

參考資源

8.2. re — Regular expression operations

How @replies work on Twitter (and how they might)

2009-05-12 16:14:57 | Add Comment

Previous Posts
Copyright (C) Ching-Shen Chen. All rights reserved.

::: 搜尋 :::

::: 分類 :::

::: Ads :::

::: 最新文章 :::

::: 最新回應 :::

::: 訂閱 :::

Atom feed
Atom Comment