这有一个reduce join的问题,输入文件是一个表,表的内容如下(分割符是Tab键),分别是child和parent的名字
child parent
Jone Lucy
Jone Jack
Tom Lucy
Tom Jack
Lucy Mary
Lucy Ben
Jack Alice
Jack Jesse
Terry Alice
Terry Jesse
Philip Terry
要求输出grandchild grandparent表。
我的想法是将这个表写成两份,一个是左表,加标志位“2”表示。另一个是右表,加标志位“1”表示。
在map阶段,左表用parent的名字作为key,右表用child的名字作为key。在map输出后的shuffle阶段,
会将相同key的value的值连接在一个List集合中,而两表join发生在reduce中,join的原则是:对于同一个key,
如果同时存在于左右表中,即可合并。
例如:对于左表“2 Jone Lucy”和右表“1 Lucy Mary”,
因为key=Lucy,同时存在于两个表中,
即Lucy是Jone的parent,还是Mary的child,自然Jone是Mary是grandchild,所以这两个表符合join的原则。
把这个grandchild和grandparent名字存储起来,最后做卡迪尔积
程序如下:
Mapper函数:
public class STJoinMapper extends Mapper{
//对child parent字段不处理
private LongWritable begin=new LongWritable(0);
@Override
protected void map(LongWritable key, Text value,Context context)
throws IOException, InterruptedException {
if(key!=begin){
String line=value.toString();
StringTokenizer strtoken=new StringTokenizer(line, "\t");
String childName=new String();
String parentName=new String();
if(strtoken.hasMoreTokens()){
childName=strtoken.nextToken();
parentName=strtoken.nextToken();
//分表
context.write(new Text(childName), new Text("1"+" "+childName+" "+parentName));
context.write(new Text(parentName),new Text("2"+" "+childName+" "+parentName));
}
}
}
}
Reducer函数:
public class STJoinReducer extends Reducer{
private boolean flag=false;
@Override
protected void reduce(Text key, Iterable valueList,Context context) throws IOException,
InterruptedException {
//输出表头:grandchild grandparent
if(flag==false){
context.write(new Text("grandchild"), new Text("grandparent"));
flag=true;
}
ArrayList grandchildList=new ArrayList();
ArrayList grandparentList=new ArrayList();
for(Text name:valueList){
String namestr=name.toString();
String[] value=namestr.split(" ");
if(value[0].equals("1")){
grandchildList.add(value[1]);
continue;
}
if(value[0].equals("2")){
grandparentList.add(value[2]);
continue;
}
}
//如果只有左表或者只有右表,则不join,否则就求卡迪尔积,输出grandchild grandparent表
if((!grandchildList.isEmpty())&&(!grandparentList.isEmpty())){
for(String child:grandchildList){
for(String parent:grandparentList){
context.write(new Text(child), new Text(parent));
}
}
}
}
}