.2010/05/24 已新增MapReduce New API版本
大約兩年前我曾用ActionScript寫了「Sobel - 邊緣偵測 for AS2」,那時純粹只是抱持著好玩的心態~ 而現在用同樣的例子改成Hadoop版本來試試~ 當然最主要就是要藉重它分散式運算的能力~ 只是這樣的應用僅需要透過「Map」階段將處理後的影像直接寫入HDFS就行了~ 不需要再經過shuffle和reduce階段來浪費頻寬等資源~ 另外值得一提的是~ 這個例子要處理的是整張影像檔~ 所以要避免在進行「Map」階段之前處於被分割的命運~ 這裡採用的作法是覆寫「isSplitable()」method並將整份檔案當作一筆Record來處理,有興趣的朋友請見附檔:
import java.awt.image.BufferedImage;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import javax.imageio.ImageIO;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.lib.NullOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import com.sun.image.codec.jpeg.JPEGCodec;
import com.sun.image.codec.jpeg.JPEGImageEncoder;
public class SobelProcessing extends Configured implements Tool
{
public static class Map extends MapReduceBase implements
Mapper<NullWritable, BytesWritable, Text, Text>
{
private JobConf conf;
@Override
public void configure(JobConf conf)
{
this.conf = conf;
}
public void map(NullWritable key, BytesWritable value,
OutputCollector<Text, Text> output, Reporter reporter)
throws IOException
{
String filename = conf.get("map.input.file");
String output_dir = conf.get("output.dir");
filename = getFileName(filename);
FileSystem fs = FileSystem.get(conf);
FSDataOutputStream dos = fs.create(new Path(output_dir + filename + ".jpg"));
BufferedImage src = ImageIO.read(new ByteArrayInputStream(value.getBytes()));
float sobscale = Float.valueOf(conf.get("sobscale"));
int offsetval = Integer.valueOf(conf.get("offsetval"));
int iw = src.getWidth();
int ih = src.getHeight();
BufferedImage dest = new BufferedImage(iw, ih, src.getType());
int[][] gray = new int[iw][ih];
for (int x = 0; x < iw; x++)
{
for (int y = 0; y < ih; y++)
{
int rgb = src.getRGB(x, y);
int r = 0xFF & (rgb >> 16);
int g = 0xFF & (rgb >> 8);
int b = 0xFF & rgb;
gray[x][y] = (int) (0.299 * r + 0.587 * g + 0.114 * b);
}
}
for (int x = 1; x < iw - 1; x++)
{
for (int y = 1; y < ih - 1; y++)
{
int a = gray[x - 1][y - 1];
int b = gray[x][y - 1];
int c = gray[x + 1][y - 1];
int d = gray[x - 1][y];
int e = gray[x + 1][y];
int f = gray[x - 1][y + 1];
int g = gray[x][y + 1];
int h = gray[x + 1][y + 1];
int hor = (a + d + f) - (c + e + h);
if (hor < 0)
hor = -hor;
int vert = (a + b + c) - (f + g + h);
if (vert < 0)
vert = -vert;
int gc = (int) (sobscale * (hor + vert));
gc = (gc + offsetval);
if (gc > 255)
gc = 255;
int sobel = 0xff000000 | gc << 16 | gc << 8 | gc;
dest.setRGB(x, y, sobel);
}
}
JPEGImageEncoder encoder = JPEGCodec.createJPEGEncoder(dos);
encoder.encode(dest);
dos.close();
}
public String getFileName(String s)
{
return s.substring(s.lastIndexOf("/"), s.lastIndexOf("."));
}
}
public int run(String[] args) throws Exception
{
JobConf conf = new JobConf(getConf(), SobelProcessing.class);
conf.set("sobscale", "1.0");
conf.set("offsetval", "0");
conf.set("output.dir", args[1]);
conf.setJobName("SobelProcessing");
conf.setMapperClass(Map.class);
conf.setInputFormat(WholeFileInputFormat.class);
conf.setOutputFormat(NullOutputFormat.class);
conf.set("mapred.child.java.opts", "-Xmx256m");
conf.setNumReduceTasks(0);
WholeFileInputFormat.setInputPaths(conf, new Path(args[0]));
JobClient.runJob(conf);
return 0;
}
public static void main(String[] args)
{
try
{
int res = ToolRunner.run(new Configuration(), new SobelProcessing(), args);
System.exit(res);
} catch (Exception e)
{
e.printStackTrace();
}
}
}
結果:
.原始碼
