hadoop中map/reduce数据求最大最小值实战

  • hadoop中map/reduce数据求最大最小值实战已关闭评论
  • 106 views
  • A+
所属分类:hadoop

数据准备
a.txt

102
10
39
109
200
11
3
90
28

b.txt

5
2
30
838
10005

 

结果预测

Max 10005
Min 2

map代码

package com.test.hadoop.eighteen;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class MaxMinMap extends Mapper<LongWritable, Text, Text, LongWritable> {

   private final Text keyText = new Text("K");
   private LongWritable val = new LongWritable(0);
   @Override
   protected void map(LongWritable key, Text value,Context context)
         throws IOException, InterruptedException {
      val.set(Long.parseLong(value.toString()));
      context.write(keyText, val);

   }

}

reduce代码

package com.test.hadoop.eighteen;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class MaxMinReduce extends Reducer<Text, LongWritable, Text, Text> {

    private long max=Long.MIN_VALUE;
    private long min=Long.MAX_VALUE;
    @Override
    protected void reduce(Text key, Iterable<LongWritable> values,Context context)
            throws IOException, InterruptedException {
        for(LongWritable val:values){
            long tmp = val.get();
            if(tmp>max){
                max=tmp;
            }
            if(tmp<min){
                min=tmp;
            }
        }

    }
    @Override
    protected void cleanup(Context context)
            throws IOException, InterruptedException {
        context.write(new Text("Max"),new Text(max+"") );
        context.write(new Text("Min"), new Text(min+""));
    }

}

main代码

package com.test.hadoop.eighteen;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.SshFenceByTcpPort.Args;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class JobMain extends Configuration implements Tool {

    @Override
    public Configuration getConf() {
        // TODO Auto-generated method stub
        return new Configuration();
    }

    @Override
    public void setConf(Configuration arg0) {
        // TODO Auto-generated method stub

    }

    @Override
    public int run(String[] arg0) throws Exception {
        Configuration configuration = new Configuration();
        Job job = new Job(configuration,"maxmin-job");
        job.setJarByClass(JobMain.class);

        job.setMapperClass(MaxMinMap.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(LongWritable.class);

        job.setReducerClass(MaxMinReduce.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);

        FileInputFormat.addInputPath(job, new Path(arg0[0]));
        Path output = new Path(arg0[1]);
        FileSystem fs = FileSystem.get(configuration);
        if(fs.exists(output)){
            fs.delete(output,true);
        }

        FileOutputFormat.setOutputPath(job, output);

        return job.waitForCompletion(true)?0:1;
    }

    /**
     * @param args
     */
    public static void main(String[] args)throws Exception {
        Configuration conf = new Configuration();

        int res=ToolRunner.run(conf,new JobMain(), args);
        System.exit(res);
    }

}

 

  • 安卓客户端下载
  • 微信扫一扫
  • weinxin
  • 微信公众号
  • 微信公众号扫一扫
  • weinxin
avatar