- A+
所属分类:未分类
1、原始数据
员工数据
empno ename job mgr hiredate sal comm deptno loc 7499 allen salesman 7698 1981-02-20 1600 300 30 7782 clark managers 7639 1981-06-09 2450 10 7654 martin salesman 7698 1981-03-20 1250 1400 30 boston 7900 james clerk 7698 1981-01-09 950 30 7788 scott analyst 7566 1981-09-01 3000 100 20
部门数据
deptno dname loc 30 sales chicago 20 research dallas 10 accounting newyork
2、思路
2、实现的功能类似于 select e.empno,e.ename,d.dname,d.deptno from emp e join dept d on e.deptno=d.deptno; key: deptno 第一种思路: Text:empno_ename_0/deptno_dname_1; 第二种思路: Consume bean: empno/ename/deptno/dname/flag
3、处理join的思路:
将Join key 当作map的输出key, 也就是reduce的输入key , 这样只要join的key相同,shuffle过后,就会进入到同一个reduce 的key - value list 中去。 需要为join的2张表设计一个通用的一个bean. 并且bean中加一个flag的标志属性,这样可以根据flag来区分是哪张表的数据。 reduce 阶段根据flag来判断是员工数据还是部门数据就很容易了 。而join的真正处理是在reduce阶段。
4、实现中间bean
存储数据的bean (由于数据要在网络上传输必须序列化,hadoop处理的时候需要分组和排序,所以要实现WritableComparable接口): package com.wy.hadoop.join.one; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.WritableComparable; public class Emplyee implements WritableComparable { private String empNo =""; private String empName =""; private String deptNo=""; private String deptName=""; private int flag =0; public Emplyee(){}; public Emplyee(String empNo,String empName,String deptNo,String deptName,int flag){ this.empNo = empNo; this.empName = empName; this.deptNo = deptNo; this.deptName = deptName; this.flag = flag; } public Emplyee(Emplyee e){ this.empNo = e.empNo; this.empName = e.empName; this.deptNo = e.deptNo; this.deptName = e.deptName; this.flag = e.flag; } public String getEmpNo() { return empNo; } public void setEmpNo(String empNo) { this.empNo = empNo; } public String getEmpName() { return empName; } public void setEmpName(String empName) { this.empName = empName; } public String getDeptNo() { return deptNo; } public void setDeptNo(String deptNo) { this.deptNo = deptNo; } public String getDeptName() { return deptName; } public void setDeptName(String deptName) { this.deptName = deptName; } public int getFlag() { return flag; } public void setFlag(int flag) { this.flag = flag; } @Override public void readFields(DataInput input) throws IOException { // TODO Auto-generated method stub this.empNo = input.readUTF(); this.empName = input.readUTF(); this.deptNo = input.readUTF(); this.deptName = input.readUTF(); this.flag = input.readInt(); } @Override public void write(DataOutput output) throws IOException { // TODO Auto-generated method stub output.writeUTF(this.empNo); output.writeUTF(this.empName); output.writeUTF(this.deptNo); output.writeUTF(this.deptName); output.writeInt(this.flag); } @Override public int compareTo(Object o) { // TODO Auto-generated method stub return 0; } @Override public String toString() { String string = this.empNo+","+this.empName+","+this.deptName; return string; } }
5、实现map代码编写
package com.hadoop.join.one; import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class EmpMapper extends Mapper<LongWritable, Text, LongWritable, Emplyee> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String val = value.toString(); String[] arr = val.split("\t"); if(arr.length<=3){//dept Emplyee e = new Emplyee(); e.setDeptNo(arr[0]); e.setDeptName(arr[1]); e.setFlag(1); context.write(new LongWritable(Long.valueOf(e.getDeptNo())), e); }else{//emp Emplyee e = new Emplyee(); e.setEmpNo(arr[0]); e.setEmpName(arr[1]); e.setDeptNo(arr[7]); e.setFlag(0); context.write(new LongWritable(Long.valueOf(e.getDeptNo())), e); } } }
6、实现reduce代码编写
package com.hadoop.join.one; import java.io.IOException; import java.util.ArrayList; import java.util.List; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import com.google.protobuf.ByteString.Output; public class EmpReducer extends Reducer<LongWritable, Emplyee, LongWritable, Text> { @Override protected void reduce(LongWritable key, Iterable<Emplyee> iter, Context context) throws IOException, InterruptedException { Emplyee dept = null; List<Emplyee> list = new ArrayList<Emplyee>(); for(Emplyee tmp : iter){ if(tmp.getFlag()==0){//emp Emplyee emplyee = new Emplyee(tmp); list.add(emplyee); }else { dept = new Emplyee(tmp); } } if(dept!=null){ for(Emplyee xtmp:list){ xtmp.setDeptName(dept.getDeptName()); context.write(new LongWritable(0), new Text(xtmp.toString())); } } } }
7、实现job调用主函数编写
package com.hadoop.join.one; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class EmplyeeJob extends Configuration implements Tool, Runnable { private String inputPath = null; private String outputPath = null; public EmplyeeJob(String inputPath,String outputPath){ this.inputPath = inputPath; this.outputPath = outputPath; } public EmplyeeJob(){} @Override public Configuration getConf() { // TODO Auto-generated method stub return null; } @Override public void setConf(Configuration arg0) { // TODO Auto-generated method stub } @Override public void run() { try{ String[] args = {this.inputPath,this.outputPath}; start(args); }catch (Exception e) { e.printStackTrace(); } } private void start(String[] args)throws Exception{ ToolRunner.run(new EmplyeeJob(), args); } @Override public int run(String[] args) throws Exception { Job job = new Job(getConf(),"emplyeejob"); job.setJarByClass(EmplyeeJob.class); job.setMapOutputKeyClass(LongWritable.class); job.setMapOutputValueClass(Emplyee.class); job.setOutputKeyClass(LongWritable.class); job.setOutputValueClass(Text.class); job.setMapperClass(EmpMapper.class); job.setReducerClass(EmpReducer.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); boolean success = job.waitForCompletion(true); return success?0:1; } }
package com.hadoop.join.one; public class JobMain { /** * @param args */ public static void main(String[] args) { if(args.length==2){ new Thread(new EmplyeeJob(args[0],args[1])).start(); } } }
8、打成jar,只需要打包对应的源代码即可,上传到/opt/mapred/job/2 目录下面 join1.jar
9、/opt/mapred/job/2 下创建文件source.txt 并把需要分析的文本数据copy到该文件中
10、执行 hadoop fs -put source.txt /user/root/data/2/source.txt 将文件存放在hdfs中
11、hadoop jar join1.jar com.wy.hadoop.join.one.JobMain /user/root/data/2/source.txt /user/root/output/2
12、查看结果
好,这样就完成了执行操作,然后看看我们得到的结果如何.从hadoop中取出结果文件.hadoop fs -get /user/root/output/2/* ./
# cat part-r-*
- 安卓客户端下载
- 微信扫一扫
- 微信公众号
- 微信公众号扫一扫