拜歐先把要替換的檔案上傳到
HDFS中。
上傳完成後,使用下列指令檢視其內容:
hdfs dfs -cat /user/javakid/replace/replace.txt
輸入的檔案為
replace.txt
,其內容如下:
foo1234foo567foo890foo123foo
foo1234foo567foo890foo123foo
foo1234foo567foo890foo123foo
foo1234foo567foo890foo123foo
foo1234foo567foo890foo123foo
foo1234foo567foo890foo123foo
foo1234foo567foo890foo123foo
foo1234foo567foo890foo123foo
foo1234foo567foo890foo123foo
foo1234foo567foo890foo123foo
再來開始寫程式,拜歐是用
Intellij IDEA來撰寫程式,在專案設定中,將編譯的輸出目錄(compile output path)設定為
/home/javakid/study/hadoop/classpath
。
第一個要寫的是
mapper:
package idv.jk.study.hadoop.myself;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
/**
* Created by javakid on 2015/6/3.
*/
public class StringReplaceMapper
extends Mapper<LongWritable, Text, NullWritable, Text>
{
private static final String TARGET = "foo";
private static final String REPLACEMENT = "bar";
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException
{
String line = value.toString();
if(line.indexOf(TARGET) >= 0)
{
context.write(NullWritable.get(),
new Text(line.replaceAll(TARGET, REPLACEMENT)));
}
}
}
在
mapper的輸出,拜歐要的只是替換後的每一行字串,這些值的key是什麼,並不重要,所以在上列程式中的第27行,用
NullWritable來做為輸出的key。
再來要寫的是
reducer:
package idv.jk.study.hadoop.myself;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
/**
* Created by javakid on 2015/6/3.
*/
public class StringReplaceReducer
extends Reducer<NullWritable, Text, NullWritable, Text>
{
@Override
protected void reduce(NullWritable key, Iterable<Text> values, Context context)
throws IOException, InterruptedException
{
for(Text text : values)
{
context.write(NullWritable.get(), text);
}
}
}
在
reducer中,拜歐要的也只是將已經在
mapper替換好的字串輸出,在第21行一樣使用
NullWritable來做為輸出的key。
最後是主程式:
package idv.jk.study.hadoop.myself;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
/**
* Created by javakid on 2015/6/3.
*/
public class StringReplace
{
public static void main(String[] argv)
throws IOException, ClassNotFoundException, InterruptedException
{
if(argv.length != 2)
{
System.err.println("Usage: StringReplace <input path> <output path>");
System.exit(-1);
}
Job job = new Job();
job.setJarByClass(StringReplace.class);
job.setJobName("String replacement");
FileInputFormat.addInputPath(job, new Path(argv[0]));
FileOutputFormat.setOutputPath(job, new Path(argv[1]));
job.setMapperClass(StringReplaceMapper.class);
job.setReducerClass(StringReplaceReducer.class);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(Text.class);
System.out.println(job.waitForCompletion(true) ? 0 : 1);
}
}
上列
setOutputKeyClass
和
setOutputValueClass
這兩個方法控制
reduce方法中輸出的型別,而且與
reducer中產出的輸出中設定,兩者必須是相同的。
先將
HADOOP_CLASSPATH
設定好後,用
hadoop
指令來執行主程式:
export HADOOP_CLASSPATH=/home/javakid/study/hadoop/classpath
hadoop idv.jk.study.hadoop.myself.StringReplace \
/user/javakid/replace/replace.txt /user/javakid/replace/output
執行完成後,使用下列指令來確認結果:
hdfs dfs -cat /user/javakid/replace/output/part-r-00000
預期結果如下:
bar1234bar567bar890bar123bar
bar1234bar567bar890bar123bar
bar1234bar567bar890bar123bar
bar1234bar567bar890bar123bar
bar1234bar567bar890bar123bar
bar1234bar567bar890bar123bar
bar1234bar567bar890bar123bar
bar1234bar567bar890bar123bar
bar1234bar567bar890bar123bar
bar1234bar567bar890bar123bar
你可以在這裡找到
原始碼。