学习 Apache Cassandra(四):Spark SQL 读写 Cassandra

Spark 版本:2.2

Cassandra 版本:3.0.15

依赖

添加 Datastax 公司提供的 spark-cassandra-connector 依赖

编辑 pom.xml 文件:

<dependency>  
    <groupId>com.datastax.spark</groupId>
    <artifactId>spark-cassandra-connector_2.11</artifactId>
    <version>2.0.6</version>
</dependency>

配置

spark.setCassandraConf("Cassandra Cluster", CassandraConnectorConf.ConnectionHostParam.option("cassandra-1") ++ CassandraConnectorConf.ConnectionPortParam.option(9042))  

配置 Cassandra 集群,集群名与 conf/cassandra.yaml 中配置的 CLUSTER_NAME 参数保持一致,端口默认为 9042

读写

Cassandra 参数:

值类型 默认值 说明
table String N/A 连接的 Cassandra Table,必填
keyspace String N/A 连接的 Cassandra Keyspace,必填
cluster String "default" 连接的 Cassandra Cluster
pushdown Boolean true 当谓词下推可用时是否启用?
confirm.truncate Boolean false 当使用 SaveMode.overwrite 模式时,是否确认 truncate table?

其它配置项,参考:Configuration Reference

读数据:

spark  
  .read
  .format("org.apache.spark.sql.cassandra")
  .options(Map(
    "table" -> "trans",
    "keyspace" -> "dyingbleed"
  )).load()

写数据:

df.write  
  .format("org.apache.spark.sql.cassandra")
  .options(Map(
    "table" -> "trans",
    "keyspace" -> "dyingbleed"
  )).save()

优化

写优化

如果不要求强一致性,可以修改一致性级别,从而提高 Cassandra 的写入性能。

参数 spark.cassandra.output.consistency.level 用于配置写入一致性级别,默认值为 LOCAL_QUORUM。

强一致性可选择 ALL,弱一致性可选择 ANY 或 ONE,举个🌰:

df.write  
  .format("org.apache.spark.sql.cassandra")
  .options(Map(
    "table" -> "trans",
    "keyspace" -> "dyingbleed",
    "spark.cassandra.output.consistency.level" -> "ONE"
  )).save()

参考