上傳完成後,使用下列指令檢視其內容:
hdfs dfs -cat /user/javakid/replace/replace.txt輸入的檔案為
replace.txt,其內容如下:
foo1234foo567foo890foo123foo foo1234foo567foo890foo123foo foo1234foo567foo890foo123foo foo1234foo567foo890foo123foo foo1234foo567foo890foo123foo foo1234foo567foo890foo123foo foo1234foo567foo890foo123foo foo1234foo567foo890foo123foo foo1234foo567foo890foo123foo foo1234foo567foo890foo123foo再來開始寫程式,拜歐是用Intellij IDEA來撰寫程式,在專案設定中,將編譯的輸出目錄(compile output path)設定為
/home/javakid/study/hadoop/classpath。第一個要寫的是 mapper:
package idv.jk.study.hadoop.myself;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
/**
* Created by javakid on 2015/6/3.
*/
public class StringReplaceMapper
extends Mapper<LongWritable, Text, NullWritable, Text>
{
private static final String TARGET = "foo";
private static final String REPLACEMENT = "bar";
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException
{
String line = value.toString();
if(line.indexOf(TARGET) >= 0)
{
context.write(NullWritable.get(),
new Text(line.replaceAll(TARGET, REPLACEMENT)));
}
}
}
在mapper的輸出,拜歐要的只是替換後的每一行字串,這些值的key是什麼,並不重要,所以在上列程式中的第27行,用NullWritable來做為輸出的key。
再來要寫的是 reducer:
package idv.jk.study.hadoop.myself;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
/**
* Created by javakid on 2015/6/3.
*/
public class StringReplaceReducer
extends Reducer<NullWritable, Text, NullWritable, Text>
{
@Override
protected void reduce(NullWritable key, Iterable<Text> values, Context context)
throws IOException, InterruptedException
{
for(Text text : values)
{
context.write(NullWritable.get(), text);
}
}
}
在reducer中,拜歐要的也只是將已經在mapper替換好的字串輸出,在第21行一樣使用NullWritable來做為輸出的key。
最後是主程式:
package idv.jk.study.hadoop.myself;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
/**
* Created by javakid on 2015/6/3.
*/
public class StringReplace
{
public static void main(String[] argv)
throws IOException, ClassNotFoundException, InterruptedException
{
if(argv.length != 2)
{
System.err.println("Usage: StringReplace <input path> <output path>");
System.exit(-1);
}
Job job = new Job();
job.setJarByClass(StringReplace.class);
job.setJobName("String replacement");
FileInputFormat.addInputPath(job, new Path(argv[0]));
FileOutputFormat.setOutputPath(job, new Path(argv[1]));
job.setMapperClass(StringReplaceMapper.class);
job.setReducerClass(StringReplaceReducer.class);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(Text.class);
System.out.println(job.waitForCompletion(true) ? 0 : 1);
}
}
上列setOutputKeyClass和setOutputValueClass這兩個方法控制reduce方法中輸出的型別,而且與reducer中產出的輸出中設定,兩者必須是相同的。先將
HADOOP_CLASSPATH設定好後,用hadoop指令來執行主程式:
export HADOOP_CLASSPATH=/home/javakid/study/hadoop/classpath hadoop idv.jk.study.hadoop.myself.StringReplace \ /user/javakid/replace/replace.txt /user/javakid/replace/output執行完成後,使用下列指令來確認結果:
hdfs dfs -cat /user/javakid/replace/output/part-r-00000預期結果如下:
bar1234bar567bar890bar123bar bar1234bar567bar890bar123bar bar1234bar567bar890bar123bar bar1234bar567bar890bar123bar bar1234bar567bar890bar123bar bar1234bar567bar890bar123bar bar1234bar567bar890bar123bar bar1234bar567bar890bar123bar bar1234bar567bar890bar123bar bar1234bar567bar890bar123bar你可以在這裡找到原始碼。
沒有留言:
張貼留言