栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

spark sql并行读取实践

spark sql并行读取实践

spark sql 并行查询 第一种使用指定分区列的方式

http://spark.apache.org/docs/latest/sql-programming-guide.html#jdbc-to-other-databases
partitionColumn must be a numeric, date, or timestamp column from the table in question.

partitionColumn, lowerBound, upperBound These options must all be specified if any of them is specified. In addition, numPartitions must be specified. They describe how to partition the table when reading in parallel from multiple workers. partitionColumn must be a numeric column from the table in question. Notice that lowerBound and upperBound are just used to decide the partition stride, not for filtering the rows in table. So all rows in the table will be partitioned and returned. This option applies only to reading.

numPartitions The maximum number of partitions that can be used for parallelism in table reading and writing. This also determines the maximum number of concurrent JDBC connections. If the number of partitions to write exceeds this limit, we decrease it to this limit by calling coalesce(numPartitions) before writing.

fetchsize The JDBC fetch size, which determines how many rows to fetch per round trip. This can help performance on JDBC drivers which default to low fetch size (eg. Oracle with 10 rows). This option applies only to reading.

batchsize The JDBC batch size, which determines how many rows to insert per round trip. This can help performance on JDBC drivers. This option applies only to writing. It defaults to 1000.

Dataset pgJdbcDF2 = spark.read()
    .format("jdbc")
    .option("url", "jdbc:postgresql://ip:15400/postgres?gssEncMode=disable")
    .option("dbtable", "gaussdb.hotelevent")
    .option("user", "gaussdb")
    .option("password", "******")
    .option("driver", "org.postgresql.Driver")
    .option("numPartitions", 100)
    .option("partitionColumn", "rzsj" )
    .option("lowerBound", 1000)
    .option("upperBound", 2000)
    .option("fetchSize", 5000)
    .load();

dbtable:表名,可以是真实存在的关系表,也可以是通过查询语句 AS 出来的表。其实只要是在 SQL 语句里,FROM 后面能跟的语句用在 dbtable 属性都合法,其原理就是拼接 SQL 语句,dbtable 会填在 FROM 后面。
numPartitions:读、写的最大分区数,也决定了开启数据库连接的数目。使用 numPartitions 有一点点限制, 如果指定了 numPartitions 大于1的值,但是没有指定分区规则,仍只有一个 task 去执行查询。
partitionColumn, lowerBound, upperBound:指定读数据时的分区规则。要使用这三个参数,必须定义 numPartitions,而且这三个参数不能单独出现,要用就必须全部指定。而且 lowerBound, upperBound 不是过滤条件,只是用于决定分区跨度。在分区的时候,会根据s=(upperBound - lowerBound)/numPartitions,每个分区s条数据,最后超过upperBound 的分区的数据,单独作为一个分区 ,然后并行去执行查询。

第二种方式是使用自定义查询的方式

构造多个查询语句,并行多个查询,最后reduce成一个RDD,这种方式和spark自己提供的指定分区列主要区别在于这种并行是多个RDD,每个RDD一个分区,指定分区列的方式是一个RDD,有多个分区。使用这种方式时需要注意最后只有一个RDD,分区也只有一个,所以计算上就没有并行,spark的并行是根据分区数来决定,所以添加了重分区功能,但是这会导致shuffle,会有一定性能消耗,最后的结果并行写逻辑也要注意limit的使用,假如在计算的最后添加了limit截取写的数据量会导致RDD多个分区转换为一个分区[1]从而写的任务没有并行,所以在limit后面进行了重分区,这也会导致shuffle,消耗一定的性能。对于应用中调试节点计算,调研了下spark的缓存使用,spark的缓存不能跨应用使用,除非缓存在外部存储。调试节点,多个节点同时调试执行时可以在一个应用中执行,每个节点分别保存一部分调试的数据,一个应用多个job,job之间是串行执行,因为要保存同时调试的节点数据,所以一个节点就会生成最少一个job。

参考文献

  1. https://github.com/apache/spark/pull/7334
 Optional> datasetOptional = partitionConfig.getSplitCondition().stream().map(v -> {
                            DataframeReader dataframeReader = jdbcConfigData.getTableIdMapDataframeReader().get(modelNode.getSrcTableId());
                            String dbtable = "";
                            if (DatabaseTypeEnum.ORACLE.getName().equalsIgnoreCase(partitionConfig.getDbType())) {
                                String[] data = v.split("and");
                                String first = data[0];
                                String second;
                                if (data.length == 1) {
                                    second = data[0];
                                } else {
                                    second = data[1];
                                }
                                dbtable = String.format(partitionConfig.getDbTableSql(), second, first);
                            } else {
                                dbtable = String.format(partitionConfig.getDbTableSql(), v);
                            }
                            return dataframeReader.option("dbtable", dbtable).load();
                        }).reduce(Dataset::union);
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/349915.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号