zoukankan      html  css  js  c++  java
  • spark三种连接join

    本文主要介绍spark join相关操作。

    讲述spark连接相关的三个方法join,left-outer-join,right-outer-join,在这之前,我们用hiveSQL先跑出了结果以方便进行对比。

    我们以实例来进行说明。我的实现步骤记录如下。

    1、数据准备

    2、HSQL描述

    3、Spark描述

    1、数据准备

    我们准备两张Hive表,分别是orders(订单表)和drivers(司机表),通过driver_id字段进行关联。数据如下:

    orders

    orders表有两个字段,订单id:order_id和司机id:driver_id。司机id将作为连接键。

    通过select可以看到三条数据。

    hive (gulfstream_test)> select * from orders;
    OK
    orders.order_id orders.driver_id
    1000    5000
    1001    5001
    1002    5002
    Time taken: 0.387 seconds, Fetched: 3 row(s)

    drivers

    drivers表由两个字段,司机id:driver_id和车辆id:car_id。司机id将作为连接键。

    通过select可以看到两条数据。

    hive (gulfstream_test)> select * from drivers;
    OK
    drivers.driver_id       drivers.car_id
    5000    100
    5003    103
    Time taken: 0.036 seconds, Fetched: 2 row(s)

    2、HSQL描述

    JOIN

    自然连接,输出连接键匹配的记录。

    可以看到,通过driver_id匹配的数据只有一条。

    hive (gulfstream_test)> select * from orders t1 join drivers t2 on (t1.driver_id = t2.driver_id) ;
    OK
    t1.order_id     t1.driver_id    t2.driver_id    t2.car_id
    1000    5000    5000    100
    Time taken: 36.079 seconds, Fetched: 1 row(s)

    LEFT OUTER JOIN

    左外链接,输出连接键匹配的记录,左侧的表无论匹配与否都输出。

    可以看到,通过driver_id匹配的数据只有一条,不过所有orders表中的记录都被输出了,drivers中未能匹配的字段被置为空。

    hive (gulfstream_test)> select * from orders t1 left outer join drivers t2 on (t1.driver_id = t2.driver_id) ;
    OK
    t1.order_id     t1.driver_id    t2.driver_id    t2.car_id
    1000    5000    5000    100
    1001    5001    NULL    NULL
    1002    5002    NULL    NULL
    Time taken: 36.063 seconds, Fetched: 3 row(s)

    RIGHT OUTER JOIN

    右外连接,输出连接键匹配的记录,右侧的表无论匹配与否都输出。

    可以看到,通过driver_id匹配的数据只有一条,不过所有drivers表中的记录都被输出了,orders中未能匹配的字段被置为空。

    hive (gulfstream_test)> select * from orders t1 right outer join drivers t2 on (t1.driver_id = t2.driver_id) ;
    OK
    t1.order_id     t1.driver_id    t2.driver_id    t2.car_id
    1000    5000    5000    100
    NULL    NULL    5003    103
    Time taken: 30.089 seconds, Fetched: 2 row(s)

    3、Spark描述

    spark实现join的方式也是通过RDD的算子,spark同样提供了三个算子join,leftOuterJoin,rightOuterJoin。

    在下面给出的例子中,我们通过spark-hive读取了Hive中orders表和drivers表中的数据,这时候数据的表现形式是DataFrame,如果要使用Join操作:

    1)首先需要先将DataFrame转化成了JavaRDD。

    2)不过,JavaRDD其实是没有join算子的,下面还需要通过mapToPair算子将JavaRDD转换成JavaPairRDD,这样就可以使用Join了。 

    下面例子中给出了三种join操作的实现方式,在join之后,通过collect()函数把数据拉到Driver端本地,并通过标准输出打印。

    需要指出的是

    1)join算子(join,leftOuterJoin,rightOuterJoin)只能通过PairRDD使用;

    2)join算子操作的Tuple2<Object1, Object2>类型中,Object1是连接键,我只试过Integer和String,Object2比较灵活,甚至可以是整个Row。

    这里我们使用driver_id作为连接键。 所以在输出Tuple2的时候,我们将driver_id放在了前面。

    Join.java

    /*
    *   spark-submit --queue=root.zhiliangbu_prod_datamonitor spark-join-1.0-SNAPSHOT-jar-with-dependencies.jar
    * */
    public class Join implements Serializable {
    
        private transient JavaSparkContext javaSparkContext;
        private transient HiveContext hiveContext;
    
        /*
        *   初始化Load
        *   创建sparkContext, sqlContext, hiveContext
        * */
        public Join() {
            initSparckContext();
            initHiveContext();
        }
    
        /*
        *   创建sparkContext
        * */
        private void initSparckContext() {
            String warehouseLocation = System.getProperty("user.dir");
            SparkConf sparkConf = new SparkConf()
                    .setAppName("spark-join")
                    .set("spark.sql.warehouse.dir", warehouseLocation)
                    .setMaster("yarn-client");
            javaSparkContext = new JavaSparkContext(sparkConf);
        }
    
        /*
        *   创建hiveContext
        *   用于读取Hive中的数据
        * */
        private void initHiveContext() {
            hiveContext = new HiveContext(javaSparkContext);
        }
    
    
        public void join() {
            /*
            *   生成rdd1
            * */
            String query1 = "select * from gulfstream_test.orders";
            DataFrame rows1 = hiveContext.sql(query1).select("order_id", "driver_id");
            JavaPairRDD<String, String> rdd1 = rows1.toJavaRDD().mapToPair(new PairFunction<Row, String, String>() {
                @Override
                public Tuple2<String, String> call(Row row) throws Exception {
                    String orderId = (String)row.get(0);
                    String driverId = (String)row.get(1);
                    return new Tuple2<String, String>(driverId, orderId);
                }
            });
            /*
            *   生成rdd2
            * */
            String query2 = "select * from gulfstream_test.drivers";
            DataFrame rows2 = hiveContext.sql(query2).select("driver_id", "car_id");
            JavaPairRDD<String, String> rdd2 = rows2.toJavaRDD().mapToPair(new PairFunction<Row, String, String>() {
                @Override
                public Tuple2<String, String> call(Row row) throws Exception {
                    String driverId = (String)row.get(0);
                    String carId = (String)row.get(1);
                    return new Tuple2<String, String>(driverId, carId);
                }
            });
            /*
            *   join
            * */
            System.out.println(" ****************** join *******************");
            JavaPairRDD<String, Tuple2<String, String>> joinRdd = rdd1.join(rdd2);
            Iterator<Tuple2<String, Tuple2<String, String>>> it1 = joinRdd.collect().iterator();
            while (it1.hasNext()) {
                Tuple2<String, Tuple2<String, String>> item = it1.next();
                System.out.println("driver_id:" + item._1 + ", order_id:" + item._2._1 + ", car_id:" + item._2._2 );
            }
    
            /*
            *   leftOuterJoin
            * */
            System.out.println(" ****************** leftOuterJoin *******************");
            JavaPairRDD<String, Tuple2<String, Optional<String>>> leftOuterJoinRdd = rdd1.leftOuterJoin(rdd2);
            Iterator<Tuple2<String, Tuple2<String, Optional<String>>>> it2 = leftOuterJoinRdd.collect().iterator();
            while (it2.hasNext()) {
                Tuple2<String, Tuple2<String, Optional<String>>> item = it2.next();
                System.out.println("driver_id:" + item._1 + ", order_id:" + item._2._1 + ", car_id:" + item._2._2 );
            }
    
            /*
            *   rightOuterJoin
            * */
            System.out.println(" ****************** rightOuterJoin *******************");
            JavaPairRDD<String, Tuple2<Optional<String>, String>> rightOuterJoinRdd = rdd1.rightOuterJoin(rdd2);
            Iterator<Tuple2<String, Tuple2<Optional<String>, String>>> it3 = rightOuterJoinRdd.collect().iterator();
            while (it3.hasNext()) {
                Tuple2<String, Tuple2<Optional<String>, String>> item = it3.next();
                System.out.println("driver_id:" + item._1 + ", order_id:" + item._2._1 + ", car_id:" + item._2._2 );
            }
        }
    
        public static void main(String[] args) {
            Join sj = new Join();
            sj.join();
        }
    
    }

    执行结果

    其中Optional.absent()表示的就是null,可以看到和HSQL是一致的。

    复制代码
    Application ID is application_1508228032068_2746260, trackingURL: http://10.93.21.21:4040
     ****************** join *******************
    driver_id:5000, order_id:1000, car_id:100                                       
     ****************** leftOuterJoin *******************
    driver_id:5001, order_id:1001, car_id:Optional.absent()
    driver_id:5002, order_id:1002, car_id:Optional.absent()
    driver_id:5000, order_id:1000, car_id:Optional.of(100)
     ****************** rightOuterJoin *******************
    driver_id:5003, order_id:Optional.absent(), car_id:103
    driver_id:5000, order_id:Optional.of(1000), car_id:100
    复制代码

    由于数据量不大,我没有从执行效率上进行考量。

    根据经验,一般在数据量较大的情况下,HSQL的执行效率会高一些,如果数据量较小,Spark会快。 

      

     
     
  • 相关阅读:
    virtualenvwrapper GitBash下的配置使用
    MySQL主从复制
    Nginx
    LVS
    MySQL
    Docker Swarm
    flask数据库操作
    flask 数据库迁移
    pytest.mark.parametrize 详解
    redis作为变量池在接口自动化中的应用
  • 原文地址:https://www.cnblogs.com/kangoroo/p/7778962.html
Copyright © 2011-2022 走看看