由於Cassandra在0.6版開始提供和Hadoop整合,可以將Cassandra所儲存的資料當做Hadoop MapReduce的輸入來源,所以筆者前幾天在試著玩玩Cassandra該如何要和Hadoop整合時,用著「官方所提供的WordCount」範例,跑出來的結果居然完全錯誤!! trace了大半天才發現原來是Cassandra的Bug,還好這在最新釋出的0.6.4版已經被修正了(ColumnFamilyRecordReader returns duplicate rows),不過目前Cassandra還是沒有提供將Hadoop資料輸出到Cassandrad的介面實作(雖然可以在Reduce自行處理),這要等到0.7版才會釋出(A Hadoop Output Format That Targets Cassandra),下述就是Cassandra+Hadoop的WordCount程式:
測試資料
Key Value ----------------------------------------- Doc1 new home sales top forecasts Doc2 home sales rise in july Doc3 increase in home sales in july Doc4 july new home sales rise
IRWordCountSetup
package cassandra; import org.apache.cassandra.thrift.Cassandra; import org.apache.cassandra.thrift.ColumnPath; import org.apache.cassandra.thrift.ConsistencyLevel; import org.apache.thrift.protocol.TBinaryProtocol; import org.apache.thrift.protocol.TProtocol; import org.apache.thrift.transport.TSocket; import org.apache.thrift.transport.TTransport; public class IRWordCountSetup { public static final String UTF8 = "UTF8"; public static void main(String[] args)throws Exception { TTransport tr = new TSocket("localhost", 9160); TProtocol proto = new TBinaryProtocol(tr); Cassandra.Client client = new Cassandra.Client(proto); tr.open(); String keyspace = "Keyspace1"; String columnFamily = "Standard1"; ColumnPath colPathName = new ColumnPath(columnFamily); colPathName.setColumn("Doc".getBytes(UTF8)); long timestamp = System.currentTimeMillis(); client.insert(keyspace, "Doc1", colPathName, "new home sales top forecasts".getBytes(UTF8), timestamp, ConsistencyLevel.ONE); client.insert(keyspace, "Doc2", colPathName, "home sales rise in july".getBytes(UTF8), timestamp, ConsistencyLevel.ONE); client.insert(keyspace, "Doc3", colPathName, "increase in home sales in july".getBytes(UTF8), timestamp, ConsistencyLevel.ONE); client.insert(keyspace, "Doc4", colPathName, "july new home sales rise".getBytes(UTF8), timestamp, ConsistencyLevel.ONE); } }
IRWordCount
package cassandra; import java.io.IOException; import java.util.Arrays; import java.util.SortedMap; import java.util.StringTokenizer; import org.apache.cassandra.db.IColumn; import org.apache.cassandra.hadoop.ColumnFamilyInputFormat; import org.apache.cassandra.hadoop.ConfigHelper; import org.apache.cassandra.thrift.SlicePredicate; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class IRWordCount { static final String KEYSPACE = "Keyspace1"; static final String COLUMN_FAMILY = "Standard1"; private static final String CONF_COLUMN_NAME = "Doc"; private static final String OUTPUT_PATH_PREFIX = "/tmp/doc_word_count"; public static class TokenizerMapper extends Mapper<String, SortedMap<byte[], IColumn>, Text, IntWritable> { private final static IntWritable one = new IntWritable(1); private Text word = new Text(); private String columnName; protected void setup(Context context) throws IOException, InterruptedException { this.columnName = context.getConfiguration().get(CONF_COLUMN_NAME); } public void map(String key, SortedMap<byte[], IColumn> columns, Context context) throws IOException, InterruptedException { IColumn column = columns.get(columnName.getBytes()); if(column == null) return; String value = new String(column.value()); System.out.println("read " + key + ":" + value + " from " + context.getInputSplit()); StringTokenizer itr = new StringTokenizer(value); while (itr.hasMoreTokens()) { word.set(itr.nextToken()); context.write(word, one); } } } public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> { private IntWritable result = new IntWritable(); public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } result.set(sum); context.write(key, result); } } public static void main(String[] args) throws Exception { Path output = new Path(OUTPUT_PATH_PREFIX); Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(conf); if(fs.exists(output)) fs.delete(output, true); String columnName = "Doc"; conf.set(CONF_COLUMN_NAME, columnName); Job job = new Job(conf, "wordcount"); job.setJarByClass(IRWordCount.class); job.setMapperClass(TokenizerMapper.class); job.setCombinerClass(IntSumReducer.class); job.setReducerClass(IntSumReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); job.setInputFormatClass(ColumnFamilyInputFormat.class); FileOutputFormat.setOutputPath(job, output); ConfigHelper.setColumnFamily(job.getConfiguration(), KEYSPACE, COLUMN_FAMILY); SlicePredicate predicate = new SlicePredicate().setColumn_names(Arrays.asList(columnName.getBytes())); ConfigHelper.setSlicePredicate(job.getConfiguration(), predicate); boolean status = job.waitForCompletion(true); System.exit(status ? 0 : 1); } }
輸出的結果為:
forecasts 1 home 4 in 3 increase 1 july 3 new 2 rise 2 sales 4 top 1