.2010/05/24 已新增MapReduce New API版本
大約兩年前我曾用ActionScript寫了「Sobel - 邊緣偵測 for AS2」,那時純粹只是抱持著好玩的心態~ 而現在用同樣的例子改成Hadoop版本來試試~ 當然最主要就是要藉重它分散式運算的能力~ 只是這樣的應用僅需要透過「Map」階段將處理後的影像直接寫入HDFS就行了~ 不需要再經過shuffle和reduce階段來浪費頻寬等資源~ 另外值得一提的是~ 這個例子要處理的是整張影像檔~ 所以要避免在進行「Map」階段之前處於被分割的命運~ 這裡採用的作法是覆寫「isSplitable()」method並將整份檔案當作一筆Record來處理,有興趣的朋友請見附檔:
import java.awt.image.BufferedImage; import java.io.ByteArrayInputStream; import java.io.IOException; import javax.imageio.ImageIO; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.MapReduceBase; import org.apache.hadoop.mapred.Mapper; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.lib.NullOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import com.sun.image.codec.jpeg.JPEGCodec; import com.sun.image.codec.jpeg.JPEGImageEncoder; public class SobelProcessing extends Configured implements Tool { public static class Map extends MapReduceBase implements Mapper<NullWritable, BytesWritable, Text, Text> { private JobConf conf; @Override public void configure(JobConf conf) { this.conf = conf; } public void map(NullWritable key, BytesWritable value, OutputCollector<Text, Text> output, Reporter reporter) throws IOException { String filename = conf.get("map.input.file"); String output_dir = conf.get("output.dir"); filename = getFileName(filename); FileSystem fs = FileSystem.get(conf); FSDataOutputStream dos = fs.create(new Path(output_dir + filename + ".jpg")); BufferedImage src = ImageIO.read(new ByteArrayInputStream(value.getBytes())); float sobscale = Float.valueOf(conf.get("sobscale")); int offsetval = Integer.valueOf(conf.get("offsetval")); int iw = src.getWidth(); int ih = src.getHeight(); BufferedImage dest = new BufferedImage(iw, ih, src.getType()); int[][] gray = new int[iw][ih]; for (int x = 0; x < iw; x++) { for (int y = 0; y < ih; y++) { int rgb = src.getRGB(x, y); int r = 0xFF & (rgb >> 16); int g = 0xFF & (rgb >> 8); int b = 0xFF & rgb; gray[x][y] = (int) (0.299 * r + 0.587 * g + 0.114 * b); } } for (int x = 1; x < iw - 1; x++) { for (int y = 1; y < ih - 1; y++) { int a = gray[x - 1][y - 1]; int b = gray[x][y - 1]; int c = gray[x + 1][y - 1]; int d = gray[x - 1][y]; int e = gray[x + 1][y]; int f = gray[x - 1][y + 1]; int g = gray[x][y + 1]; int h = gray[x + 1][y + 1]; int hor = (a + d + f) - (c + e + h); if (hor < 0) hor = -hor; int vert = (a + b + c) - (f + g + h); if (vert < 0) vert = -vert; int gc = (int) (sobscale * (hor + vert)); gc = (gc + offsetval); if (gc > 255) gc = 255; int sobel = 0xff000000 | gc << 16 | gc << 8 | gc; dest.setRGB(x, y, sobel); } } JPEGImageEncoder encoder = JPEGCodec.createJPEGEncoder(dos); encoder.encode(dest); dos.close(); } public String getFileName(String s) { return s.substring(s.lastIndexOf("/"), s.lastIndexOf(".")); } } public int run(String[] args) throws Exception { JobConf conf = new JobConf(getConf(), SobelProcessing.class); conf.set("sobscale", "1.0"); conf.set("offsetval", "0"); conf.set("output.dir", args[1]); conf.setJobName("SobelProcessing"); conf.setMapperClass(Map.class); conf.setInputFormat(WholeFileInputFormat.class); conf.setOutputFormat(NullOutputFormat.class); conf.set("mapred.child.java.opts", "-Xmx256m"); conf.setNumReduceTasks(0); WholeFileInputFormat.setInputPaths(conf, new Path(args[0])); JobClient.runJob(conf); return 0; } public static void main(String[] args) { try { int res = ToolRunner.run(new Configuration(), new SobelProcessing(), args); System.exit(res); } catch (Exception e) { e.printStackTrace(); } } }
結果:
.原始碼