SQL JOIN与Spark RDD JOIN

  • 测试数据
create table tableA(f1 string,f2 int);
insert into tableA values ('A_01', 21);
insert into tableA values ('A_02', 22);
insert into tableA values ('A_03', 23);
insert into tableA values ('A_04', 24);
insert into tableA values ('A_05', 25);
insert into tableA values ('A_06', 26);
insert into tableA values ('B_05', 35);
insert into tableA values ('B_06', 36);

create table tableB(f1 string,f2 int);
insert into tableB values ('B_01', 31);
insert into tableB values ('B_02', 32);
insert into tableB values ('B_03', 33);
insert into tableB values ('B_04', 34);
insert into tableB values ('B_05', 35);
insert into tableB values ('B_06', 36);
insert into tableB values ('A_05', 25);
insert into tableB values ('A_06', 26);

1.LEFT JOIN(A表为主)
image

SQL实现:

SELECT * FROM tableA A LEFT JOIN tableB B ON A.f1=B.f1;

spark实现:

val tableARDD = spark.sql("SELECT * FROM tableA").rdd
.map(x => (x.getAs[String]("f1"), x.getAs[Int]("f2")))
val tableBRDD = spark.sql("SELECT * FROM tableB").rdd
.map(x => (x.getAs[String]("f1"), x.getAs[Int]("f2")))

val rdd = tableARDD.leftOuterJoin(tableBRDD)
rdd.collect().foreach(println)

执行结果:

+----+---+----+----+
|  f1| f2|  f1|  f2|
+----+---+----+----+
|A_01| 21|null|null|
|A_02| 22|null|null|
|A_03| 23|null|null|
|A_04| 24|null|null|
|A_05| 25|A_05|  25|
|A_06| 26|A_06|  26|
|B_05| 35|B_05|  35|
|B_06| 36|B_06|  36|
+----+---+----+----+

2.LEFT JOIN(取A表数据排除和B表的交集数据)
image

SQL实现:

SELECT * FROM tableA A LEFT JOIN tableB B ON A.f1=B.f1 WHERE B.f1 IS NULL;

spark实现:

val tableARDD = spark.sql("SELECT * FROM tableA").rdd
      .map(x => (x.getAs[String]("f1"), x.getAs[Int]("f2")))
val tableBRDD = spark.sql("SELECT * FROM tableB").rdd
.map(x => (x.getAs[String]("f1"), x.getAs[Int]("f2")))

val rdd = tableARDD.leftOuterJoin(tableBRDD).filter(x=>x._2._2.isEmpty)
rdd.collect().foreach(println)

执行结果:

+----+---+----+----+
|  f1| f2|  f1|  f2|
+----+---+----+----+
|A_01| 21|null|null|
|A_02| 22|null|null|
|A_03| 23|null|null|
|A_04| 24|null|null|
+----+---+----+----+ 

3.RIGHT JOIN(B表为主)
image

SQL实现:

SELECT * FROM tableA A RIGHT JOIN tableB B ON A.f1=B.f1;

spark实现:

val tableARDD = spark.sql("SELECT * FROM tableA").rdd
      .map(x => (x.getAs[String]("f1"), x.getAs[Int]("f2")))
val tableBRDD = spark.sql("SELECT * FROM tableB").rdd
.map(x => (x.getAs[String]("f1"), x.getAs[Int]("f2")))

val rdd = tableARDD.rightOuterJoin(tableBRDD)
rdd.collect().foreach(println)

执行结果:

+----+----+----+---+
|  f1|  f2|  f1| f2|
+----+----+----+---+
|null|null|B_01| 31|
|null|null|B_02| 32|
|null|null|B_03| 33|
|null|null|B_04| 34|
|B_05|  35|B_05| 35|
|B_06|  36|B_06| 36|
|A_05|  25|A_05| 25|
|A_06|  26|A_06| 26|
+----+----+----+---+

4.RIGHT JOIN(取B表数据排除和A表的交集数据)
image

SQL实现:

SELECT * FROM tableA A RIGHT JOIN tableB B ON A.f1=B.f1 WHERE A.f1 IS NULL;

spark实现:

val tableARDD = spark.sql("SELECT * FROM tableA").rdd
.map(x => (x.getAs[String]("f1"), x.getAs[Int]("f2")))
val tableBRDD = spark.sql("SELECT * FROM tableB").rdd
.map(x => (x.getAs[String]("f1"), x.getAs[Int]("f2")))

val rdd = tableARDD.rightOuterJoin(tableBRDD).filter(x=>x._2._1.isEmpty)
rdd.collect().foreach(println)

执行结果:

+----+----+----+---+
|  f1|  f2|  f1| f2|
+----+----+----+---+
|null|null|B_01| 31|
|null|null|B_02| 32|
|null|null|B_03| 33|
|null|null|B_04| 34|
+----+----+----+---+

5.INNER JOIN
image

SQL实现:

SELECT * FROM tableA A INNER JOIN tableB B ON A.f1=B.f1;

spark实现:

val tableARDD = spark.sql("SELECT * FROM tableA").rdd
      .map(x => (x.getAs[String]("f1"), x.getAs[Int]("f2")))
val tableBRDD = spark.sql("SELECT * FROM tableB").rdd
.map(x => (x.getAs[String]("f1"), x.getAs[Int]("f2")))

val rdd = tableARDD.join(tableBRDD)
rdd.collect().foreach(println)

执行结果:

+----+---+----+---+
|  f1| f2|  f1| f2|
+----+---+----+---+
|A_05| 25|A_05| 25|
|A_06| 26|A_06| 26|
|B_05| 35|B_05| 35|
|B_06| 36|B_06| 36|
+----+---+----+---+

6.FULL OUTER JOIN(AB表合并)
image

SQL实现:

SELECT * FROM tableA A FULL OUTER JOIN tableB B ON A.f1=B.f1;

spark实现:

val tableARDD = spark.sql("SELECT * FROM tableA").rdd
      .map(x => (x.getAs[String]("f1"), x.getAs[Int]("f2")))
val tableBRDD = spark.sql("SELECT * FROM tableB").rdd
.map(x => (x.getAs[String]("f1"), x.getAs[Int]("f2")))

val rdd = tableARDD.fullOuterJoin(tableBRDD)
rdd.collect().foreach(println)

执行结果:

+----+----+----+----+
|  f1|  f2|  f1|  f2|
+----+----+----+----+
|A_06|  26|A_06|  26|
|A_01|  21|null|null|
|null|null|B_03|  33|
|null|null|B_01|  31|
|B_06|  36|B_06|  36|
|A_04|  24|null|null|
|A_05|  25|A_05|  25|
|null|null|B_02|  32|
|A_03|  23|null|null|
|null|null|B_04|  34|
|B_05|  35|B_05|  35|
|A_02|  22|null|null|
+----+----+----+----+

7.FULL OUTER JOIN(AB表合并,去掉交集部分)
image

SQL实现:

SELECT * FROM tableA A FULL OUTER JOIN tableB B ON A.f1=B.f1 WHERE A.f1 IS NULL OR B.f1 IS NULL

spark实现:

val tableARDD = spark.sql("SELECT * FROM tableA").rdd
      .map(x => (x.getAs[String]("f1"), x.getAs[Int]("f2")))
val tableBRDD = spark.sql("SELECT * FROM tableB").rdd
.map(x => (x.getAs[String]("f1"), x.getAs[Int]("f2")))

val rdd = tableARDD.fullOuterJoin(tableBRDD).filter(x => x._2._1.isEmpty || x._2._2.isEmpty)
rdd.collect().foreach(println)

运行结果:

+----+----+----+----+
|  f1|  f2|  f1|  f2|
+----+----+----+----+
|A_01|  21|null|null|
|null|null|B_03|  33|
|null|null|B_01|  31|
|A_04|  24|null|null|
|null|null|B_02|  32|
|A_03|  23|null|null|
|null|null|B_04|  34|
|A_02|  22|null|null|
+----+----+----+----+
如果觉得我的文章对你有用,请随意赞赏