上傳完成後,使用下列指令檢視其內容:
hdfs dfs -cat /user/javakid/replace/replace.txt輸入的檔案為
replace.txt
,其內容如下:
foo1234foo567foo890foo123foo foo1234foo567foo890foo123foo foo1234foo567foo890foo123foo foo1234foo567foo890foo123foo foo1234foo567foo890foo123foo foo1234foo567foo890foo123foo foo1234foo567foo890foo123foo foo1234foo567foo890foo123foo foo1234foo567foo890foo123foo foo1234foo567foo890foo123foo再來開始寫程式,拜歐是用Intellij IDEA來撰寫程式,在專案設定中,將編譯的輸出目錄(compile output path)設定為
/home/javakid/study/hadoop/classpath
。第一個要寫的是 mapper:
package idv.jk.study.hadoop.myself; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; /** * Created by javakid on 2015/6/3. */ public class StringReplaceMapper extends Mapper<LongWritable, Text, NullWritable, Text> { private static final String TARGET = "foo"; private static final String REPLACEMENT = "bar"; @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); if(line.indexOf(TARGET) >= 0) { context.write(NullWritable.get(), new Text(line.replaceAll(TARGET, REPLACEMENT))); } } }在mapper的輸出,拜歐要的只是替換後的每一行字串,這些值的key是什麼,並不重要,所以在上列程式中的第27行,用NullWritable來做為輸出的key。
再來要寫的是 reducer:
package idv.jk.study.hadoop.myself; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; /** * Created by javakid on 2015/6/3. */ public class StringReplaceReducer extends Reducer<NullWritable, Text, NullWritable, Text> { @Override protected void reduce(NullWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException { for(Text text : values) { context.write(NullWritable.get(), text); } } }在reducer中,拜歐要的也只是將已經在mapper替換好的字串輸出,在第21行一樣使用NullWritable來做為輸出的key。
最後是主程式:
package idv.jk.study.hadoop.myself; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; /** * Created by javakid on 2015/6/3. */ public class StringReplace { public static void main(String[] argv) throws IOException, ClassNotFoundException, InterruptedException { if(argv.length != 2) { System.err.println("Usage: StringReplace <input path> <output path>"); System.exit(-1); } Job job = new Job(); job.setJarByClass(StringReplace.class); job.setJobName("String replacement"); FileInputFormat.addInputPath(job, new Path(argv[0])); FileOutputFormat.setOutputPath(job, new Path(argv[1])); job.setMapperClass(StringReplaceMapper.class); job.setReducerClass(StringReplaceReducer.class); job.setOutputKeyClass(NullWritable.class); job.setOutputValueClass(Text.class); System.out.println(job.waitForCompletion(true) ? 0 : 1); } }上列
setOutputKeyClass
和setOutputValueClass
這兩個方法控制reduce方法中輸出的型別,而且與reducer中產出的輸出中設定,兩者必須是相同的。先將
HADOOP_CLASSPATH
設定好後,用hadoop
指令來執行主程式:
export HADOOP_CLASSPATH=/home/javakid/study/hadoop/classpath hadoop idv.jk.study.hadoop.myself.StringReplace \ /user/javakid/replace/replace.txt /user/javakid/replace/output執行完成後,使用下列指令來確認結果:
hdfs dfs -cat /user/javakid/replace/output/part-r-00000預期結果如下:
bar1234bar567bar890bar123bar bar1234bar567bar890bar123bar bar1234bar567bar890bar123bar bar1234bar567bar890bar123bar bar1234bar567bar890bar123bar bar1234bar567bar890bar123bar bar1234bar567bar890bar123bar bar1234bar567bar890bar123bar bar1234bar567bar890bar123bar bar1234bar567bar890bar123bar你可以在這裡找到原始碼。
沒有留言:
張貼留言