Hadoop从入门到精通36:MapReduce实战——自关联查询

案例:根据员工表信息,输出一个老板直接管理的所有员工。格式:老板姓名 员工姓名列表

自关联原理:把一张表当做两张表使用,就回到了多表关联,然后在每张表的值前面加上区分标记。

1.程序源码

//SelfJoinMapper.java
package demo.selfJoin;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class SelfJoinMapper extends Mapper<LongWritable, Text, LongWritable, Text> {
  @Override
  protected void map(LongWritable key1, Text value1, Context context)
    throws IOException, InterruptedException {
    //数据:7698,BLAKE,MANAGER,7839,1981/5/1,2850,,30
    String data = value1.toString();
    //分词
    String[] words = data.split(",");
    //输出老板表:老板自己的员工号 老板姓名
    context.write(new LongWritable(Long.parseLong(words[0])), new Text("BNAME_"+words[1]));
    //输出员工表:员工的老板号 员工姓名
    try{
      context.write(new LongWritable(Long.parseLong(words[3])), new Text("ENAME_"+words[1]));
    }catch(Exception ex){
    //如果产生例外:表示大老板
       context.write(new LongWritable(-1), new Text("ENAME_"+words[1]));
    }
  }
}
//SelfJoinReducer.java
package demo.selfJoin;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class SelfJoinReducer extends Reducer<LongWritable, Text, Text, Text> {
  
  protected void reduce(LongWritable key3, Iterable<Text> value3, Context context)
    throws IOException, InterruptedException {
    //value3中包含老板姓名(前缀:BNAME_)和员工姓名(前缀:ENAME_)
    String bname = "";
    String enameList = "";
    //分离出老板姓名和其手下的所有员工
    for(Text v:value3) {
      String name = v.toString();
      String pre = name.substring(0, 6);
      if(pre.equals("BNAME_")) {
        bname = name.substring(6);
      }else if(pre.equals("ENAME_")) {
        enameList += name.substring(6)+";";
      }else {
        continue;
      }
    }
    //只输出有效的数据
    if(bname.length()>0 && enameList.length()>0) {
      context.write(new Text(bname), new Text(enameList));
    }
  }
}
//SelfJoinMain.java
package demo.selfJoin;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class SelfJoinMain {
  public static void main(String[] args) throws Exception {
    //创建Job
    Job job = Job.getInstance(new Configuration());
    //指定任务入口
    job.setJarByClass(SelfJoinMain.class);
    //指定任务的Mapper和输出类型
    job.setMapperClass(SelfJoinMapper.class);
    job.setMapOutputKeyClass(LongWritable.class);
    job.setMapOutputValueClass(Text.class);
    //指定任务的Reducer和输出类型
    job.setReducerClass(SelfJoinReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(Text.class);
    //指定输入和输出目录:HDFS路径
    FileInputFormat.setInputPaths(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
    //执行任务
    job.waitForCompletion(true);
  }
}

2.打包执行

员工表:emp.csv(员工号,姓名,职位,老板号,入职日期,工资,奖金,部门号)

# hdfs dfs -cat /input/multi-table/emp.csv
7369,SMITH,CLERK,7902,1980/12/17,800,,20
7499,ALLEN,SALESMAN,7698,1981/2/20,1600,300,30
7521,WARD,SALESMAN,7698,1981/2/22,1250,500,30
7566,JONES,MANAGER,7839,1981/4/2,2975,,20
7654,MARTIN,SALESMAN,7698,1981/9/28,1250,1400,30
7698,BLAKE,MANAGER,7839,1981/5/1,2850,,30
7782,CLARK,MANAGER,7839,1981/6/9,2450,,10
7788,SCOTT,ANALYST,7566,1987/4/19,3000,,20
7839,KING,PRESIDENT,,1981/11/17,5000,,10
7844,TURNER,SALESMAN,7698,1981/9/8,1500,0,30
7876,ADAMS,CLERK,7788,1987/5/23,1100,,20
7900,JAMES,CLERK,7698,1981/12/3,950,,30
7902,FORD,ANALYST,7566,1981/12/3,3000,,20
7934,MILLER,CLERK,7782,1982/1/23,1300,,10

将程序打包成SelfJoin.jar,上传到服务器上执行:

# hadoop jar SelfJoin.jar /input/emp.csv /output/selfjoin
……
18/11/18 11:21:39 INFO mapreduce.Job: map 0% reduce 0%
18/11/18 11:21:45 INFO mapreduce.Job: map 100% reduce 0%
18/11/18 11:21:49 INFO mapreduce.Job: map 100% reduce 100%
18/11/18 11:21:50 INFO mapreduce.Job: Job job_1542506318955_0008 completed successfully
……

查看结果:

# hdfs dfs -ls /output/selfjoin
Found 2 items
-rw-r--r-- 1 root supergroup 0 2018-11-18 11:21 /output/selfjoin/_SUCCESS
-rw-r--r-- 1 root supergroup 119 2018-11-18 11:21 /output/selfjoin/part-r-00000

# hdfs dfs -cat /output/selfjoin/part-r-00000
JONES FORD;SCOTT;
BLAKE JAMES;TURNER;ALLEN;MARTIN;WARD;
CLARK MILLER;
SCOTT ADAMS;
KING BLAKE;JONES;CLARK;
FORD SMITH;

QR Code
微信扫一扫,欢迎咨询~

联系我们
武汉格发信息技术有限公司
湖北省武汉市经开区科技园西路6号103孵化器
电话:155-2731-8020 座机:027-59821821
邮件:tanzw@gofarlic.com
Copyright © 2023 Gofarsoft Co.,Ltd. 保留所有权利
遇到许可问题?该如何解决!?
评估许可证实际采购量? 
不清楚软件许可证使用数据? 
收到软件厂商律师函!?  
想要少购买点许可证,节省费用? 
收到软件厂商侵权通告!?  
有正版license,但许可证不够用,需要新购? 
联系方式 155-2731-8020
预留信息,一起解决您的问题
* 姓名:
* 手机:

* 公司名称:

姓名不为空

手机不正确

公司不为空