Spark SQL 通过 JDBC 连接 SQL 数据库

连接配置

读数据:

val df = spark.read  
  .format("jdbc")
  .option("url", url)
  .option("username", username)
  .option("password", password)
  .option("driver", "com.mysql.jdbc.Driver")
  .option("dbtable", table)
  .load()

写数据:

df.write  
  .format("jdbc")
  .option("url", url)
  .option("username", username)
  .option("password", password)
  .option("driver", "com.mysql.jdbc.Driver")
  .option("dbtable", table)
  .save()

谓词下推

Spark 支持将查询下推到数据库只返回查询结果,而不是返回整个表,减少了加载的数据量

将 dbtable 属性替换为子查询即可,🌰:

val table = "user"

val tablePushDown = "(select * from user where sex='male') male_user"  

并发

属性 说明
numPartitions 并发读写数据库的最大分区数,同时也决定了最大 JDBC 连接并发数
partitionColumn 用于分区的列名,注意:分区列字段类型必须为 Numeric 类型,例如:INT、BIGINT、DOUBLE、DECIMAL、NUMERIC 等
lowerBound 和 upperBound lowerBound 和 upperBound 用于决定分区的步长,例如:从 1-100 分 4 个分区: 1-25、26-50、51-75、76-100

举个🌰:

val df = spark.read  
  .format("jdbc")
  .option("url", url)
  .option("username", username)
  .option("password", password)
  .option("driver", "com.mysql.jdbc.Driver")
  .option("dbtable", table)
  .option("partitionColumn", "id")
  .option("lowerBound", 1)
  .option("upperBound", 100)
  .option("numPartitions", 4)
  .load()

踩坑

MySQL TINYINT 类型字段失真

MySQL 字段类型为 TINYINT(1) 类型,Spark SQL 会将其映射为 BooleanType 类型。如果字段值非0非1,则导致数据失真的问题。

解决办法:

在 URL 末尾追加参数 tinyInt1isBit=false,这样 TINYINT(1) 就会映射为 IntegerType

MySQL Type Name Return value of GetColumnTyneName Return value of GetColumnClassName
TINYINT TINYINT java.lang.Boolean if the configuration property tinyInt1isBit is set to true (the default) and the storage size is 1, or java.lang.Integer if not.

参考:https://dev.mysql.com/doc/connector-j/5.1/en/connector-j-reference-type-conversions.html

参考:http://spark.apache.org/docs/latest/sql-programming-guide.html#jdbc-to-other-databases

参考:https://docs.databricks.com/spark/latest/data-sources/sql-databases.html