通过hadoop的map/reduce获取员工所在部门信息,输出格式要求:员工编号,员工姓名,部门名称,部门编号

  • 通过hadoop的map/reduce获取员工所在部门信息,输出格式要求:员工编号,员工姓名,部门名称,部门编号已关闭评论
  • 60 views
  • A+
所属分类:hadoop

1、原始数据

员工数据

empno ename job mgr hiredate sal comm deptno loc
7499 allen salesman 7698 1981-02-20 1600 300 30
7782 clark managers 7639 1981-06-09 2450 10
7654 martin salesman 7698 1981-03-20 1250 1400 30 boston
7900 james clerk 7698 1981-01-09 950 30
7788 scott analyst 7566 1981-09-01 3000 100 20

 

部门数据

deptno dname loc
30 sales chicago
20 research dallas
10 accounting newyork

2、思路

2、实现的功能类似于
select e.empno,e.ename,d.dname,d.deptno from emp e join dept d on e.deptno=d.deptno;

key: deptno
第一种思路:
Text:empno_ename_0/deptno_dname_1;

第二种思路:
Consume bean: empno/ename/deptno/dname/flag

3、处理join的思路:

 将Join key 当作map的输出key, 也就是reduce的输入key , 这样只要join的key相同,shuffle过后,就会进入到同一个reduce 的key - value list 中去。
 需要为join的2张表设计一个通用的一个bean. 并且bean中加一个flag的标志属性,这样可以根据flag来区分是哪张表的数据。
 reduce 阶段根据flag来判断是员工数据还是部门数据就很容易了 。而join的真正处理是在reduce阶段。

4、实现中间bean

 存储数据的bean (由于数据要在网络上传输必须序列化,hadoop处理的时候需要分组和排序,所以要实现WritableComparable接口):
 package com.wy.hadoop.join.one;

import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;

import org.apache.hadoop.io.WritableComparable;

public class Emplyee implements WritableComparable {

private String empNo ="";
 private String empName ="";
 private String deptNo="";
 private String deptName="";
 private int flag =0;

public Emplyee(){};
 public Emplyee(String empNo,String empName,String deptNo,String deptName,int flag){
 this.empNo = empNo;
 this.empName = empName;
 this.deptNo = deptNo;
 this.deptName = deptName;
 this.flag = flag;
 }
 public Emplyee(Emplyee e){
 this.empNo = e.empNo;
 this.empName = e.empName;
 this.deptNo = e.deptNo;
 this.deptName = e.deptName;
 this.flag = e.flag;
 }
 public String getEmpNo() {
 return empNo;
 }

public void setEmpNo(String empNo) {
 this.empNo = empNo;
 }

public String getEmpName() {
 return empName;
 }

public void setEmpName(String empName) {
 this.empName = empName;
 }

public String getDeptNo() {
 return deptNo;
 }

public void setDeptNo(String deptNo) {
 this.deptNo = deptNo;
 }

public String getDeptName() {
 return deptName;
 }

public void setDeptName(String deptName) {
 this.deptName = deptName;
 }

public int getFlag() {
 return flag;
 }

public void setFlag(int flag) {
 this.flag = flag;
 }

@Override
 public void readFields(DataInput input) throws IOException {
 // TODO Auto-generated method stub
 this.empNo = input.readUTF();
 this.empName = input.readUTF();
 this.deptNo = input.readUTF();
 this.deptName = input.readUTF();
 this.flag = input.readInt();
 }

@Override
 public void write(DataOutput output) throws IOException {
 // TODO Auto-generated method stub
 output.writeUTF(this.empNo);
 output.writeUTF(this.empName);
 output.writeUTF(this.deptNo);
 output.writeUTF(this.deptName);
 output.writeInt(this.flag);

}

@Override
 public int compareTo(Object o) {
 // TODO Auto-generated method stub
 return 0;
 }
 @Override
 public String toString() {
 String string = this.empNo+","+this.empName+","+this.deptName;
 return string;
 }

}

5、实现map代码编写

 package com.hadoop.join.one;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class EmpMapper extends Mapper<LongWritable, Text, LongWritable, Emplyee> {

@Override
 protected void map(LongWritable key, Text value,
 Context context)
 throws IOException, InterruptedException {
 String val = value.toString();
 String[] arr = val.split("\t");

if(arr.length<=3){//dept
 Emplyee e = new Emplyee();
 e.setDeptNo(arr[0]);
 e.setDeptName(arr[1]);
 e.setFlag(1);

context.write(new LongWritable(Long.valueOf(e.getDeptNo())), e);

}else{//emp
 Emplyee e = new Emplyee();
 e.setEmpNo(arr[0]);
 e.setEmpName(arr[1]);
 e.setDeptNo(arr[7]);
 e.setFlag(0);

context.write(new LongWritable(Long.valueOf(e.getDeptNo())), e);
 }
}

}

6、实现reduce代码编写

 package com.hadoop.join.one;

import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;

import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.Reducer;

import com.google.protobuf.ByteString.Output;

public class EmpReducer extends
 Reducer<LongWritable, Emplyee, LongWritable, Text> {

@Override
 protected void reduce(LongWritable key, Iterable<Emplyee> iter,
 Context context)
 throws IOException, InterruptedException {

Emplyee dept = null;
 List<Emplyee> list = new ArrayList<Emplyee>();

for(Emplyee tmp : iter){
 if(tmp.getFlag()==0){//emp
 Emplyee emplyee = new Emplyee(tmp);
 list.add(emplyee);
 }else {
 dept = new Emplyee(tmp);
 }
 }

if(dept!=null){
 for(Emplyee xtmp:list){
 xtmp.setDeptName(dept.getDeptName());
 context.write(new LongWritable(0), new Text(xtmp.toString()));
 }
 }

}

}

 

7、实现job调用主函数编写

 package com.hadoop.join.one;

import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;

public class EmplyeeJob extends Configuration implements Tool, Runnable {

private String inputPath = null;
 private String outputPath = null;

public EmplyeeJob(String inputPath,String outputPath){
 this.inputPath = inputPath;
 this.outputPath = outputPath;
 }
 public EmplyeeJob(){}

@Override
 public Configuration getConf() {
 // TODO Auto-generated method stub
 return null;
 }

@Override
 public void setConf(Configuration arg0) {
 // TODO Auto-generated method stub

}

@Override
 public void run() {
 try{
 String[] args = {this.inputPath,this.outputPath};

start(args);

}catch (Exception e) {
 e.printStackTrace();
 }

}

private void start(String[] args)throws Exception{

ToolRunner.run(new EmplyeeJob(), args);
 }

@Override
 public int run(String[] args) throws Exception {

Job job = new Job(getConf(),"emplyeejob");
 job.setJarByClass(EmplyeeJob.class);

job.setMapOutputKeyClass(LongWritable.class);
 job.setMapOutputValueClass(Emplyee.class);

job.setOutputKeyClass(LongWritable.class);
 job.setOutputValueClass(Text.class);

job.setMapperClass(EmpMapper.class);
 job.setReducerClass(EmpReducer.class);

FileInputFormat.addInputPath(job, new Path(args[0]));
 FileOutputFormat.setOutputPath(job, new Path(args[1]));

boolean success = job.waitForCompletion(true);

return success?0:1;
 }

}

 

package com.hadoop.join.one;

public class JobMain {

/**
 * @param args
 */
 public static void main(String[] args) {
 if(args.length==2){
 new Thread(new EmplyeeJob(args[0],args[1])).start();
 }

}

}

 

8、打成jar,只需要打包对应的源代码即可,上传到/opt/mapred/job/2 目录下面 join1.jar

9、/opt/mapred/job/2 下创建文件source.txt 并把需要分析的文本数据copy到该文件中

10、执行 hadoop fs -put source.txt /user/root/data/2/source.txt 将文件存放在hdfs中

11、hadoop jar join1.jar com.wy.hadoop.join.one.JobMain /user/root/data/2/source.txt /user/root/output/2

12、查看结果
好,这样就完成了执行操作,然后看看我们得到的结果如何.从hadoop中取出结果文件.hadoop fs -get /user/root/output/2/* ./

# cat part-r-*

  • 安卓客户端下载
  • 微信扫一扫
  • weinxin
  • 微信公众号
  • 微信公众号扫一扫
  • weinxin