hadoop中map/reduce数据求最大的K个值并排序

  • hadoop中map/reduce数据求最大的K个值并排序已关闭评论
  • 53 views
  • A+
所属分类:hadoop

需求分析

#orderid,userid,payment,productid
a.txt
1,9819,100,121
2,8918,2000,111
3,2813,1234,22
4,9100,10,1101
5,3210,490,111
6,1298,28,1211
7,1010,281,90
8,1818,9000,20
b.txt
100,3333,10,100
101,9321,1000,293
102,3881,701,20
103,6791,910,30
104,8888,11,39

预测结果:(求 Top N=5 的结果)
1 9000
2 2000
3 1234
4 1000
5 910

map代码

package com.test.hadoop.seventeen;

import java.io.IOException;
import java.util.Arrays;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
/**
 * orderid,userid,payment,productid
 * @author Administrator
 *
 */
public class TopNMap extends Mapper<LongWritable, Text, IntWritable, IntWritable> {

    int len;
    int[] top;
    @Override
    protected void cleanup(Context context)
            throws IOException, InterruptedException {
        for(int i=1;i<len+1;i++){
            context.write(new IntWritable(top[i]), new IntWritable(top[i]));
        }
    }

    @Override
    protected void map(LongWritable key, Text value,Context context)
            throws IOException, InterruptedException {
        String[] arr = value.toString().split(",",-1);
        if(arr.length==4){
            int x = Integer.valueOf(arr[2]);
            add(x);
        }
    }

    private void add(int val){
        top[0] = val;
        Arrays.sort(top);//排序,从小到大顺序,top初始化值都是0
    }

    @Override
    protected void setup(Context context)
            throws IOException, InterruptedException {
        len = context.getConfiguration().getInt("N", 10);
        top = new int[len+1];
    }


}

reduce代码

package com.test.hadoop.seventeen;

import java.io.IOException;
import java.util.Arrays;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class TopNReduce extends Reducer<IntWritable, IntWritable, Text, Text> {

    int len;
    int[] top;

    @Override
    protected void cleanup(Context context)
            throws IOException, InterruptedException {
        for(int i=len;i>0;i--){
            context.write(new Text(String.valueOf(len-i+1)), new Text(String.valueOf(top[i])));
        }
    }

    @Override
    protected void reduce(IntWritable key, Iterable<IntWritable> values,Context context)
            throws IOException, InterruptedException {
        for(IntWritable val:values){
            add(val.get());
        }
    }

    private void add(int val){
        top[0] = val;
        Arrays.sort(top);//排序,从小到大顺序,top初始化值都是0
    }

    @Override
    protected void setup(Context context)
            throws IOException, InterruptedException {
        len = context.getConfiguration().getInt("N", 10);
        top = new int[len+1];
    }

}

main代码

package com.test.hadoop.seventeen;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.SshFenceByTcpPort.Args;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class JobMain extends Configuration implements Tool {

    @Override
    public Configuration getConf() {
        // TODO Auto-generated method stub
        return new Configuration();
    }

    @Override
    public void setConf(Configuration arg0) {
        // TODO Auto-generated method stub

    }

    @Override
    public int run(String[] arg0) throws Exception {
        Configuration configuration = new Configuration();
        configuration.setInt("N", 5);
        Job job = new Job(configuration,"topn-job");
        job.setJarByClass(JobMain.class);

        job.setMapperClass(TopNMap.class);
        job.setMapOutputKeyClass(IntWritable.class);
        job.setMapOutputValueClass(IntWritable.class);

        job.setReducerClass(TopNReduce.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);

        FileInputFormat.addInputPath(job, new Path(arg0[0]));
        Path output = new Path(arg0[1]);
        FileSystem fs = FileSystem.get(configuration);
        if(fs.exists(output)){
            fs.delete(output,true);
        }

        FileOutputFormat.setOutputPath(job, output);

        return job.waitForCompletion(true)?0:1;
    }

    /**
     * @param args
     */
    public static void main(String[] args)throws Exception {
        Configuration conf = new Configuration();

        int res=ToolRunner.run(conf,new JobMain(), args);
        System.exit(res);
    }

}

 

 

 

  • 安卓客户端下载
  • 微信扫一扫
  • weinxin
  • 微信公众号
  • 微信公众号扫一扫
  • weinxin
avatar