学习 Spark Streaming(二):集成 Drools 实现 CEP

CEP

CEP(Complex Event Processing 复杂事件处理)是一种事件流处理方式,通过结合多个数据源来推断事件或者模式,从而发现更复杂的情况。

CEP 的一种实现为:流处理 + 规则引擎

添加依赖

Spark 依赖:

<dependency>  
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming_2.11</artifactId>
    <version>2.3.0</version>
</dependency>  
<dependency>  
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
    <version>2.3.0</version>
</dependency>  

添加 Drools 依赖:

<dependency>  
    <groupId>org.drools</groupId>
    <artifactId>drools-core</artifactId>
    <version>7.7.0.Final</version>
</dependency>  
<dependency>  
    <groupId>org.drools</groupId>
    <artifactId>drools-compiler</artifactId>
    <version>7.7.0.Final</version>
</dependency>  
<dependency>  
    <groupId>org.kie</groupId>
    <artifactId>kie-api</artifactId>
    <version>7.7.0.Final</version>
</dependency>  
<dependency>  
    <groupId>org.kie</groupId>
    <artifactId>kie-ci</artifactId>
    <version>7.7.0.Final</version>
</dependency>  

Spark Streaming + Drools

获取 DRL 创建广播变量:

@volatile
private var drlBroadcast: Broadcast[Seq[String]] = null

def getDrlBroadcast(sc: SparkContext): Broadcast[Seq[String]] = {  
  if (drlBroadcast == null) {
    synchronized {
      if (drlBroadcast == null) {
        drlBroadcast = sc.broadcast(getDrls())
      }
    }
  }
  drlBroadcast
}

创建 KieSession:

def newKieSession(drls: Seq[String]): KieSession = {  
  val ks = KieServices.Factory.get

  val helper = new KieHelper
  for (drl <- drls) helper.addContent(drl, ResourceType.DRL)
  val base = helper.build(ks.newKieBaseConfiguration)

  base.newKieSession
}

处理 Kafka 事件流:

kafkaStreams.foreachRDD(rdd => {

  val drls = getDrlBroadcast(rdd.sparkContext)

  rdd.foreachPartition(p => {
    val session = newKieSession(drls.value)

    for (i <- p) {
      session.insert(i)
    }

    session.fireAllRules
    session.destroy()
  })

})

参考