- A+
所属分类:未分类
需求分析
#orderid,userid,payment,productid
a.txt
1,9819,100,121
2,8918,2000,111
3,2813,1234,22
4,9100,10,1101
5,3210,490,111
6,1298,28,1211
7,1010,281,90
8,1818,9000,20
b.txt
100,3333,10,100
101,9321,1000,293
102,3881,701,20
103,6791,910,30
104,8888,11,39
预测结果:(求 Top N=5 的结果)
1 9000
2 2000
3 1234
4 1000
5 910
map代码
package com.test.hadoop.seventeen; import java.io.IOException; import java.util.Arrays; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; /** * orderid,userid,payment,productid * @author Administrator * */ public class TopNMap extends Mapper<LongWritable, Text, IntWritable, IntWritable> { int len; int[] top; @Override protected void cleanup(Context context) throws IOException, InterruptedException { for(int i=1;i<len+1;i++){ context.write(new IntWritable(top[i]), new IntWritable(top[i])); } } @Override protected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException { String[] arr = value.toString().split(",",-1); if(arr.length==4){ int x = Integer.valueOf(arr[2]); add(x); } } private void add(int val){ top[0] = val; Arrays.sort(top);//排序,从小到大顺序,top初始化值都是0 } @Override protected void setup(Context context) throws IOException, InterruptedException { len = context.getConfiguration().getInt("N", 10); top = new int[len+1]; } }
reduce代码
package com.test.hadoop.seventeen; import java.io.IOException; import java.util.Arrays; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class TopNReduce extends Reducer<IntWritable, IntWritable, Text, Text> { int len; int[] top; @Override protected void cleanup(Context context) throws IOException, InterruptedException { for(int i=len;i>0;i--){ context.write(new Text(String.valueOf(len-i+1)), new Text(String.valueOf(top[i]))); } } @Override protected void reduce(IntWritable key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException { for(IntWritable val:values){ add(val.get()); } } private void add(int val){ top[0] = val; Arrays.sort(top);//排序,从小到大顺序,top初始化值都是0 } @Override protected void setup(Context context) throws IOException, InterruptedException { len = context.getConfiguration().getInt("N", 10); top = new int[len+1]; } }
main代码
package com.test.hadoop.seventeen; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.SshFenceByTcpPort.Args; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class JobMain extends Configuration implements Tool { @Override public Configuration getConf() { // TODO Auto-generated method stub return new Configuration(); } @Override public void setConf(Configuration arg0) { // TODO Auto-generated method stub } @Override public int run(String[] arg0) throws Exception { Configuration configuration = new Configuration(); configuration.setInt("N", 5); Job job = new Job(configuration,"topn-job"); job.setJarByClass(JobMain.class); job.setMapperClass(TopNMap.class); job.setMapOutputKeyClass(IntWritable.class); job.setMapOutputValueClass(IntWritable.class); job.setReducerClass(TopNReduce.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); FileInputFormat.addInputPath(job, new Path(arg0[0])); Path output = new Path(arg0[1]); FileSystem fs = FileSystem.get(configuration); if(fs.exists(output)){ fs.delete(output,true); } FileOutputFormat.setOutputPath(job, output); return job.waitForCompletion(true)?0:1; } /** * @param args */ public static void main(String[] args)throws Exception { Configuration conf = new Configuration(); int res=ToolRunner.run(conf,new JobMain(), args); System.exit(res); } }
- 安卓客户端下载
- 微信扫一扫
- 微信公众号
- 微信公众号扫一扫