学习 Avro(二):Spark Streaming 使用 Avro

思路

生产者使用 Avro 对数据进行序列化,发送数据到 Kafka 的指定 Topic 队列

消费者订阅 Kafka 指定 Topic 订阅,使用 Avro 对数据进行反序列化

生产者和消费者使用相同的 Avro Schema,保存在外部存储中,例如:Redis 等

序列化与反序列化

Bijection 是 Twitter 开源的用于在不同类型间转换的库,其中 bijection-avro 模块提供了 Avro 序列化和反序列化工具函数

编辑 pom.xml 文件,添加依赖:

<dependency>  
    <groupId>com.twitter</groupId>
    <artifactId>bijection-avro_2.11</artifactId>
    <version>0.9.6</version>
</dependency>  

👇是基于 bijection-avro 的 AvroSerde 工具类:

object AvroSerde {

  def serialize(record: GenericRecord, schema: Schema): Array[Byte] = {
    val codecs = GenericAvroCodecs.toBinary[GenericRecord](schema)
    codecs.apply(record)
  }

  def deserialize(data: Array[Byte], schema: Schema): GenericRecord = {
    val codecs = GenericAvroCodecs.toBinary[GenericRecord](schema)
    codecs.invert(data).get
  }

}

注意:Bijection 使用的二进制数据不包含 Avro Schema

消费者

ds.foreachRDD(rdd => {  
  val schemaBroadcast = getSchemaBroadcast(rdd.sparkContext) // ①

  rdd.foreachPartition(partition => {
    for (i <- partition) {
      val data = i._2
      val schema = Schema.Parser().parse(schemaBroadcast.value) // ②
      val record = AvroSerde.deserialize(data, schema) // ③
      // TODO
    }
  })
})

① 获取 AVSC 广播变量

② 解析为 Avro Schema 对象

③ 反序列化为 GenericRecord 对象