案例:根据员工表信息,输出一个老板直接管理的所有员工。格式:老板姓名 员工姓名列表
自关联原理:把一张表当做两张表使用,就回到了多表关联,然后在每张表的值前面加上区分标记。
1.程序源码
//SelfJoinMapper.java
package demo.selfJoin;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class SelfJoinMapper extends Mapper<LongWritable, Text, LongWritable, Text> {
@Override
protected void map(LongWritable key1, Text value1, Context context)
throws IOException, InterruptedException {
//数据:7698,BLAKE,MANAGER,7839,1981/5/1,2850,,30
String data = value1.toString();
//分词
String[] words = data.split(",");
//输出老板表:老板自己的员工号 老板姓名
context.write(new LongWritable(Long.parseLong(words[0])), new Text("BNAME_"+words[1]));
//输出员工表:员工的老板号 员工姓名
try{
context.write(new LongWritable(Long.parseLong(words[3])), new Text("ENAME_"+words[1]));
}catch(Exception ex){
//如果产生例外:表示大老板
context.write(new LongWritable(-1), new Text("ENAME_"+words[1]));
}
}
}
//SelfJoinReducer.java
package demo.selfJoin;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class SelfJoinReducer extends Reducer<LongWritable, Text, Text, Text> {
protected void reduce(LongWritable key3, Iterable<Text> value3, Context context)
throws IOException, InterruptedException {
//value3中包含老板姓名(前缀:BNAME_)和员工姓名(前缀:ENAME_)
String bname = "";
String enameList = "";
//分离出老板姓名和其手下的所有员工
for(Text v:value3) {
String name = v.toString();
String pre = name.substring(0, 6);
if(pre.equals("BNAME_")) {
bname = name.substring(6);
}else if(pre.equals("ENAME_")) {
enameList += name.substring(6)+";";
}else {
continue;
}
}
//只输出有效的数据
if(bname.length()>0 && enameList.length()>0) {
context.write(new Text(bname), new Text(enameList));
}
}
}
//SelfJoinMain.java
package demo.selfJoin;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class SelfJoinMain {
public static void main(String[] args) throws Exception {
//创建Job
Job job = Job.getInstance(new Configuration());
//指定任务入口
job.setJarByClass(SelfJoinMain.class);
//指定任务的Mapper和输出类型
job.setMapperClass(SelfJoinMapper.class);
job.setMapOutputKeyClass(LongWritable.class);
job.setMapOutputValueClass(Text.class);
//指定任务的Reducer和输出类型
job.setReducerClass(SelfJoinReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
//指定输入和输出目录:HDFS路径
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
//执行任务
job.waitForCompletion(true);
}
}
2.打包执行
员工表:emp.csv(员工号,姓名,职位,老板号,入职日期,工资,奖金,部门号)
# hdfs dfs -cat /input/multi-table/emp.csv
7369,SMITH,CLERK,7902,1980/12/17,800,,20
7499,ALLEN,SALESMAN,7698,1981/2/20,1600,300,30
7521,WARD,SALESMAN,7698,1981/2/22,1250,500,30
7566,JONES,MANAGER,7839,1981/4/2,2975,,20
7654,MARTIN,SALESMAN,7698,1981/9/28,1250,1400,30
7698,BLAKE,MANAGER,7839,1981/5/1,2850,,30
7782,CLARK,MANAGER,7839,1981/6/9,2450,,10
7788,SCOTT,ANALYST,7566,1987/4/19,3000,,20
7839,KING,PRESIDENT,,1981/11/17,5000,,10
7844,TURNER,SALESMAN,7698,1981/9/8,1500,0,30
7876,ADAMS,CLERK,7788,1987/5/23,1100,,20
7900,JAMES,CLERK,7698,1981/12/3,950,,30
7902,FORD,ANALYST,7566,1981/12/3,3000,,20
7934,MILLER,CLERK,7782,1982/1/23,1300,,10
将程序打包成SelfJoin.jar,上传到服务器上执行:
# hadoop jar SelfJoin.jar /input/emp.csv /output/selfjoin
……
18/11/18 11:21:39 INFO mapreduce.Job: map 0% reduce 0%
18/11/18 11:21:45 INFO mapreduce.Job: map 100% reduce 0%
18/11/18 11:21:49 INFO mapreduce.Job: map 100% reduce 100%
18/11/18 11:21:50 INFO mapreduce.Job: Job job_1542506318955_0008 completed successfully
……
查看结果:
# hdfs dfs -ls /output/selfjoin
Found 2 items
-rw-r--r-- 1 root supergroup 0 2018-11-18 11:21 /output/selfjoin/_SUCCESS
-rw-r--r-- 1 root supergroup 119 2018-11-18 11:21 /output/selfjoin/part-r-00000# hdfs dfs -cat /output/selfjoin/part-r-00000
JONES FORD;SCOTT;
BLAKE JAMES;TURNER;ALLEN;MARTIN;WARD;
CLARK MILLER;
SCOTT ADAMS;
KING BLAKE;JONES;CLARK;
FORD SMITH;