- A+
所属分类:未分类
数据准备
a.txt
102
10
39
109
200
11
3
90
28
b.txt
5
2
30
838
10005
结果预测
Max 10005
Min 2
map代码
package com.test.hadoop.eighteen; import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class MaxMinMap extends Mapper<LongWritable, Text, Text, LongWritable> { private final Text keyText = new Text("K"); private LongWritable val = new LongWritable(0); @Override protected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException { val.set(Long.parseLong(value.toString())); context.write(keyText, val); } }
reduce代码
package com.test.hadoop.eighteen; import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class MaxMinReduce extends Reducer<Text, LongWritable, Text, Text> { private long max=Long.MIN_VALUE; private long min=Long.MAX_VALUE; @Override protected void reduce(Text key, Iterable<LongWritable> values,Context context) throws IOException, InterruptedException { for(LongWritable val:values){ long tmp = val.get(); if(tmp>max){ max=tmp; } if(tmp<min){ min=tmp; } } } @Override protected void cleanup(Context context) throws IOException, InterruptedException { context.write(new Text("Max"),new Text(max+"") ); context.write(new Text("Min"), new Text(min+"")); } }
main代码
package com.test.hadoop.eighteen; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.SshFenceByTcpPort.Args; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class JobMain extends Configuration implements Tool { @Override public Configuration getConf() { // TODO Auto-generated method stub return new Configuration(); } @Override public void setConf(Configuration arg0) { // TODO Auto-generated method stub } @Override public int run(String[] arg0) throws Exception { Configuration configuration = new Configuration(); Job job = new Job(configuration,"maxmin-job"); job.setJarByClass(JobMain.class); job.setMapperClass(MaxMinMap.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(LongWritable.class); job.setReducerClass(MaxMinReduce.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); FileInputFormat.addInputPath(job, new Path(arg0[0])); Path output = new Path(arg0[1]); FileSystem fs = FileSystem.get(configuration); if(fs.exists(output)){ fs.delete(output,true); } FileOutputFormat.setOutputPath(job, output); return job.waitForCompletion(true)?0:1; } /** * @param args */ public static void main(String[] args)throws Exception { Configuration conf = new Configuration(); int res=ToolRunner.run(conf,new JobMain(), args); System.exit(res); } }
- 安卓客户端下载
- 微信扫一扫
- 微信公众号
- 微信公众号扫一扫