由於Cassandra在0.6版開始提供和Hadoop整合,可以將Cassandra所儲存的資料當做Hadoop MapReduce的輸入來源,所以筆者前幾天在試著玩玩Cassandra該如何要和Hadoop整合時,用著「官方所提供的WordCount」範例,跑出來的結果居然完全錯誤!! trace了大半天才發現原來是Cassandra的Bug,還好這在最新釋出的0.6.4版已經被修正了(ColumnFamilyRecordReader returns duplicate rows),不過目前Cassandra還是沒有提供將Hadoop資料輸出到Cassandrad的介面實作(雖然可以在Reduce自行處理),這要等到0.7版才會釋出(A Hadoop Output Format That Targets Cassandra),下述就是Cassandra+Hadoop的WordCount程式:
測試資料
Key Value ----------------------------------------- Doc1 new home sales top forecasts Doc2 home sales rise in july Doc3 increase in home sales in july Doc4 july new home sales rise
IRWordCountSetup
package cassandra;
import org.apache.cassandra.thrift.Cassandra;
import org.apache.cassandra.thrift.ColumnPath;
import org.apache.cassandra.thrift.ConsistencyLevel;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;
public class IRWordCountSetup
{
public static final String UTF8 = "UTF8";
public static void main(String[] args)throws Exception
{
TTransport tr = new TSocket("localhost", 9160);
TProtocol proto = new TBinaryProtocol(tr);
Cassandra.Client client = new Cassandra.Client(proto);
tr.open();
String keyspace = "Keyspace1";
String columnFamily = "Standard1";
ColumnPath colPathName = new ColumnPath(columnFamily);
colPathName.setColumn("Doc".getBytes(UTF8));
long timestamp = System.currentTimeMillis();
client.insert(keyspace, "Doc1", colPathName, "new home sales top forecasts".getBytes(UTF8), timestamp, ConsistencyLevel.ONE);
client.insert(keyspace, "Doc2", colPathName, "home sales rise in july".getBytes(UTF8), timestamp, ConsistencyLevel.ONE);
client.insert(keyspace, "Doc3", colPathName, "increase in home sales in july".getBytes(UTF8), timestamp, ConsistencyLevel.ONE);
client.insert(keyspace, "Doc4", colPathName, "july new home sales rise".getBytes(UTF8), timestamp, ConsistencyLevel.ONE);
}
}
IRWordCount
package cassandra;
import java.io.IOException;
import java.util.Arrays;
import java.util.SortedMap;
import java.util.StringTokenizer;
import org.apache.cassandra.db.IColumn;
import org.apache.cassandra.hadoop.ColumnFamilyInputFormat;
import org.apache.cassandra.hadoop.ConfigHelper;
import org.apache.cassandra.thrift.SlicePredicate;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class IRWordCount
{
static final String KEYSPACE = "Keyspace1";
static final String COLUMN_FAMILY = "Standard1";
private static final String CONF_COLUMN_NAME = "Doc";
private static final String OUTPUT_PATH_PREFIX = "/tmp/doc_word_count";
public static class TokenizerMapper extends Mapper<String, SortedMap<byte[], IColumn>, Text, IntWritable>
{
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
private String columnName;
protected void setup(Context context) throws IOException, InterruptedException
{
this.columnName = context.getConfiguration().get(CONF_COLUMN_NAME);
}
public void map(String key, SortedMap<byte[], IColumn> columns, Context context) throws IOException, InterruptedException
{
IColumn column = columns.get(columnName.getBytes());
if(column == null)
return;
String value = new String(column.value());
System.out.println("read " + key + ":" + value + " from " + context.getInputSplit());
StringTokenizer itr = new StringTokenizer(value);
while (itr.hasMoreTokens())
{
word.set(itr.nextToken());
context.write(word, one);
}
}
}
public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable>
{
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException
{
int sum = 0;
for (IntWritable val : values)
{
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
public static void main(String[] args) throws Exception
{
Path output = new Path(OUTPUT_PATH_PREFIX);
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(conf);
if(fs.exists(output))
fs.delete(output, true);
String columnName = "Doc";
conf.set(CONF_COLUMN_NAME, columnName);
Job job = new Job(conf, "wordcount");
job.setJarByClass(IRWordCount.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setInputFormatClass(ColumnFamilyInputFormat.class);
FileOutputFormat.setOutputPath(job, output);
ConfigHelper.setColumnFamily(job.getConfiguration(), KEYSPACE, COLUMN_FAMILY);
SlicePredicate predicate = new SlicePredicate().setColumn_names(Arrays.asList(columnName.getBytes()));
ConfigHelper.setSlicePredicate(job.getConfiguration(), predicate);
boolean status = job.waitForCompletion(true);
System.exit(status ? 0 : 1);
}
}
輸出的結果為:
forecasts 1 home 4 in 3 increase 1 july 3 new 2 rise 2 sales 4 top 1
