学习 Spark Streaming(一):自定义数据源

在 Spark 中,RDD(Resilient Distributed Dataset 弹性分布式数据集)为数据的抽象,DStreams(Discretized Stream 离散流)则为连续的 RDD 的抽象:

Spark DStream

Input DStreams 为从流数据源接收的输入数据流。每个 Input DStreams(除了文件流)都与一个 Receiver 对象相关联,Receiver 从数据源接收数据并存储在 Spark 内存中做进一步处理。

Spark Streaming 提供了两类内置流数据源:

  • 基础数据源:文件系统和 Socket 连接
  • 高级数据源:Kafka、Flume 和 Kinesis 等

自定义 Receiver

👇以从 MySQL 数据库获取更新数据为例:

class MySQLReceiver(url: String, username: String, password: String, sql: String) extends Receiver[Row](StorageLevel.MEMORY_AND_DISK) with Logging { // ①

  override def onStart(): Unit = { // ②
    new Thread("MySQL Receiver") {

      override def run(): Unit = { receive() }

    }.start()
  }

  override def onStop(): Unit = {
    // do nothing
  }

  private def receive(): Unit = {
    Class.forName("com.mysql.jdbc.Driver")
    val conn = DriverManager.getConnection(url, username, password)
    val stat = conn.createStatement()
    val rs = stat.executeQuery(sql)

    val rows = new ListBuffer[Row]
    while (rs.next()) {
      val list = new mutable.ListBuffer[Any]

      for (i <- 1 to rs.getMetaData.getColumnCount) {
        list += rs.getObject(int2Integer(i))
      }

      rows += Row.fromSeq(list)
    }

    store(rows.toIterator) // ③
  }

}

① 继承 org.apache.spark.streaming.receiver.Receiver 抽象类,指定接收的流数据存储级别,实现 onStart 和 onStop 方法

② 在 onStart 方法内启动一个线程,接收数据

③ 存储接收的数据

使用自定义 Receiver:

ssc.receiverStream(new MySQLReceiver("jdbc:mysql:@MYSQL_HOST:MYSQL_PORT", "USER", "SECRET", sql))  

参考