hadoop中map/reduce数据去重实战

  • hadoop中map/reduce数据去重实战已关闭评论
  • 30 views
  • A+
所属分类:未分类

说明

数据去重的最终目标是让原始数据中出现次数超过一次的数据在输出文件中只出现一次。我们自然而然会想到将同一个数据的所有记录都交给一台reduce机器, 无论这个数据出现多少次,只要在最终结果中输出一次就可以了。具体就是reduce的输入应该以数据作为key, 而对value-list则没有要求。当reduce接收到一个<key,value-list>时就直接将key复制到输出的key中,并将value设置成空值。

原始数据

1:file1

2018-9-1 a

2018-9-2 b

2018-9-3 c

2018-9-4 d

2018-9-5 a

2018-9-6 b

2018-9-7 c

2018-9-3 c

2:file2

2018-9-1 b

2018-9-2 a

2018-9-3 b

2018-9-4 d

2018-9-5 a

2018-9-6 c

2018-9-7 d

2018-9-3 c

预计输出结果

2018-9-1 a

2018-9-1 b

2018-9-2 a

2018-9-2 b

2018-9-3 b

2018-9-3 c

2018-9-4 d

2018-9-5 a

2018-9-6 b

2018-9-6 c

2018-9-7 c

2018-9-7 d

实例代码

 import java.io.IOException;
 import java.io.IOException;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.hadoop.mapreduce.Reducer;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.apache.hadoop.util.GenericOptionsParser;
  
 public class Dedup {
  
     //map将输入中的value复制到输出数据的key上,并直接输出
     public static class Map extends Mapper<Object,Text,Text,Text>{
 private static Text line=new Text();//每行数据
        
 //实现map函数
 public void map(Object key,Text value,Context context)
 throws IOException,InterruptedException{
     line=value;
     context.write(line, new Text(""));
 }
        
     }
    
     //reduce将输入中的key复制到输出数据的key上,并直接输出
     public static class Reduce extends Reducer<Text,Text,Text,Text>{
 //实现reduce函数
 public void reduce(Text key,Iterable<Text> values,Context context)
 throws IOException,InterruptedException{
     context.write(key, new Text(""));
 }
        
     }
    
     public static void main(String[] args) throws Exception{
 Configuration conf = new Configuration();
 //这句话很关键
 conf.set("mapred.job.tracker", "192.168.1.2:9001");
        
 String[] ioArgs=new String[]{"dedup_in","dedup_out"};
      String[] otherArgs = new GenericOptionsParser(conf, ioArgs).getRemainingArgs();
      if (otherArgs.length != 2) {
      System.err.println("Usage: Data Deduplication <in> <out>");
      System.exit(2);
      }
      
      Job job = new Job(conf, "Data Deduplication");
      job.setJarByClass(Dedup.class);
      
      //设置Map、Combine和Reduce处理类
      job.setMapperClass(Map.class);
      job.setCombinerClass(Reduce.class);
      job.setReducerClass(Reduce.class);
      
      //设置输出类型
      job.setOutputKeyClass(Text.class);
      job.setOutputValueClass(Text.class);
      
      //设置输入和输出目录
      FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
      FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
      System.exit(job.waitForCompletion(true) ? 0 : 1);
      }
 }

 

  • 安卓客户端下载
  • 微信扫一扫
  • weinxin
  • 微信公众号
  • 微信公众号扫一扫
  • weinxin
avatar