SQL JOIN与Spark RDD JOIN
- 测试数据
create table tableA(f1 string,f2 int);
insert into tableA values ('A_01', 21);
insert into tableA values ('A_02', 22);
insert into tableA values ('A_03', 23);
insert into tableA values ('A_04', 24);
insert into tableA values ('A_05', 25);
insert into tableA values ('A_06', 26);
insert into tableA values ('B_05', 35);
insert into tableA values ('B_06', 36);
create table tableB(f1 string,f2 int);
insert into tableB values ('B_01', 31);
insert into tableB values ('B_02', 32);
insert into tableB values ('B_03', 33);
insert into tableB values ('B_04', 34);
insert into tableB values ('B_05', 35);
insert into tableB values ('B_06', 36);
insert into tableB values ('A_05', 25);
insert into tableB values ('A_06', 26);
1.LEFT JOIN(A表为主)
SQL实现:
SELECT * FROM tableA A LEFT JOIN tableB B ON A.f1=B.f1;
spark实现:
val tableARDD = spark.sql("SELECT * FROM tableA").rdd
.map(x => (x.getAs[String]("f1"), x.getAs[Int]("f2")))
val tableBRDD = spark.sql("SELECT * FROM tableB").rdd
.map(x => (x.getAs[String]("f1"), x.getAs[Int]("f2")))
val rdd = tableARDD.leftOuterJoin(tableBRDD)
rdd.collect().foreach(println)
执行结果:
+----+---+----+----+
| f1| f2| f1| f2|
+----+---+----+----+
|A_01| 21|null|null|
|A_02| 22|null|null|
|A_03| 23|null|null|
|A_04| 24|null|null|
|A_05| 25|A_05| 25|
|A_06| 26|A_06| 26|
|B_05| 35|B_05| 35|
|B_06| 36|B_06| 36|
+----+---+----+----+
2.LEFT JOIN(取A表数据排除和B表的交集数据)
SQL实现:
SELECT * FROM tableA A LEFT JOIN tableB B ON A.f1=B.f1 WHERE B.f1 IS NULL;
spark实现:
val tableARDD = spark.sql("SELECT * FROM tableA").rdd
.map(x => (x.getAs[String]("f1"), x.getAs[Int]("f2")))
val tableBRDD = spark.sql("SELECT * FROM tableB").rdd
.map(x => (x.getAs[String]("f1"), x.getAs[Int]("f2")))
val rdd = tableARDD.leftOuterJoin(tableBRDD).filter(x=>x._2._2.isEmpty)
rdd.collect().foreach(println)
执行结果:
+----+---+----+----+
| f1| f2| f1| f2|
+----+---+----+----+
|A_01| 21|null|null|
|A_02| 22|null|null|
|A_03| 23|null|null|
|A_04| 24|null|null|
+----+---+----+----+
3.RIGHT JOIN(B表为主)
SQL实现:
SELECT * FROM tableA A RIGHT JOIN tableB B ON A.f1=B.f1;
spark实现:
val tableARDD = spark.sql("SELECT * FROM tableA").rdd
.map(x => (x.getAs[String]("f1"), x.getAs[Int]("f2")))
val tableBRDD = spark.sql("SELECT * FROM tableB").rdd
.map(x => (x.getAs[String]("f1"), x.getAs[Int]("f2")))
val rdd = tableARDD.rightOuterJoin(tableBRDD)
rdd.collect().foreach(println)
执行结果:
+----+----+----+---+
| f1| f2| f1| f2|
+----+----+----+---+
|null|null|B_01| 31|
|null|null|B_02| 32|
|null|null|B_03| 33|
|null|null|B_04| 34|
|B_05| 35|B_05| 35|
|B_06| 36|B_06| 36|
|A_05| 25|A_05| 25|
|A_06| 26|A_06| 26|
+----+----+----+---+
4.RIGHT JOIN(取B表数据排除和A表的交集数据)
SQL实现:
SELECT * FROM tableA A RIGHT JOIN tableB B ON A.f1=B.f1 WHERE A.f1 IS NULL;
spark实现:
val tableARDD = spark.sql("SELECT * FROM tableA").rdd
.map(x => (x.getAs[String]("f1"), x.getAs[Int]("f2")))
val tableBRDD = spark.sql("SELECT * FROM tableB").rdd
.map(x => (x.getAs[String]("f1"), x.getAs[Int]("f2")))
val rdd = tableARDD.rightOuterJoin(tableBRDD).filter(x=>x._2._1.isEmpty)
rdd.collect().foreach(println)
执行结果:
+----+----+----+---+
| f1| f2| f1| f2|
+----+----+----+---+
|null|null|B_01| 31|
|null|null|B_02| 32|
|null|null|B_03| 33|
|null|null|B_04| 34|
+----+----+----+---+
5.INNER JOIN
SQL实现:
SELECT * FROM tableA A INNER JOIN tableB B ON A.f1=B.f1;
spark实现:
val tableARDD = spark.sql("SELECT * FROM tableA").rdd
.map(x => (x.getAs[String]("f1"), x.getAs[Int]("f2")))
val tableBRDD = spark.sql("SELECT * FROM tableB").rdd
.map(x => (x.getAs[String]("f1"), x.getAs[Int]("f2")))
val rdd = tableARDD.join(tableBRDD)
rdd.collect().foreach(println)
执行结果:
+----+---+----+---+
| f1| f2| f1| f2|
+----+---+----+---+
|A_05| 25|A_05| 25|
|A_06| 26|A_06| 26|
|B_05| 35|B_05| 35|
|B_06| 36|B_06| 36|
+----+---+----+---+
6.FULL OUTER JOIN(AB表合并)
SQL实现:
SELECT * FROM tableA A FULL OUTER JOIN tableB B ON A.f1=B.f1;
spark实现:
val tableARDD = spark.sql("SELECT * FROM tableA").rdd
.map(x => (x.getAs[String]("f1"), x.getAs[Int]("f2")))
val tableBRDD = spark.sql("SELECT * FROM tableB").rdd
.map(x => (x.getAs[String]("f1"), x.getAs[Int]("f2")))
val rdd = tableARDD.fullOuterJoin(tableBRDD)
rdd.collect().foreach(println)
执行结果:
+----+----+----+----+
| f1| f2| f1| f2|
+----+----+----+----+
|A_06| 26|A_06| 26|
|A_01| 21|null|null|
|null|null|B_03| 33|
|null|null|B_01| 31|
|B_06| 36|B_06| 36|
|A_04| 24|null|null|
|A_05| 25|A_05| 25|
|null|null|B_02| 32|
|A_03| 23|null|null|
|null|null|B_04| 34|
|B_05| 35|B_05| 35|
|A_02| 22|null|null|
+----+----+----+----+
7.FULL OUTER JOIN(AB表合并,去掉交集部分)
SQL实现:
SELECT * FROM tableA A FULL OUTER JOIN tableB B ON A.f1=B.f1 WHERE A.f1 IS NULL OR B.f1 IS NULL
spark实现:
val tableARDD = spark.sql("SELECT * FROM tableA").rdd
.map(x => (x.getAs[String]("f1"), x.getAs[Int]("f2")))
val tableBRDD = spark.sql("SELECT * FROM tableB").rdd
.map(x => (x.getAs[String]("f1"), x.getAs[Int]("f2")))
val rdd = tableARDD.fullOuterJoin(tableBRDD).filter(x => x._2._1.isEmpty || x._2._2.isEmpty)
rdd.collect().foreach(println)
运行结果:
+----+----+----+----+
| f1| f2| f1| f2|
+----+----+----+----+
|A_01| 21|null|null|
|null|null|B_03| 33|
|null|null|B_01| 31|
|A_04| 24|null|null|
|null|null|B_02| 32|
|A_03| 23|null|null|
|null|null|B_04| 34|
|A_02| 22|null|null|
+----+----+----+----+
版权属于:版权归 bbmax.cc 所有,转载请注明出处
本文链接:https://www.bbmax.cc/index.php/archives/34/
转载时须注明出处及本声明