学习 Spark 2(八):集成 Drools

在日常开发过程中,偶尔会遇到业务逻辑变更频繁且变更较小的情况,每次业务逻辑变更都需要经历一次上线的过程,效率非常低下。

解决问题的一个思路,是将这部分经常变化的业务逻辑抽象出来,存储在应用外部(文件系统或者数据库),在程序运行的时候动态加载。

Drools 是基于 Java 的规则引擎,可以使用规则实现复杂的逻辑处理,也可以动态加载规则实现逻辑的动态加载。

下面,以根据商品日均销售额判断商品级别为例,演示:

依赖

编辑 pom.xml 文件,添加 Drools 依赖:

<dependency>  
    <groupId>org.drools</groupId>
    <artifactId>drools-core</artifactId>
    <version>${drools.version}</version>
</dependency>  
<dependency>  
    <groupId>org.drools</groupId>
    <artifactId>drools-compiler</artifactId>
    <version>${drools.version}</version>
</dependency>  
<dependency>  
    <groupId>org.kie</groupId>
    <artifactId>kie-api</artifactId>
    <version>${drools.version}</version>
</dependency>  

规则定义

文字描述:

  1. 日均销售额大于 ¥1000 为 A 级商品;
  2. 日均销售额大于 ¥200 小于等于 ¥1000 为 B 级商品;
  3. 日均销售额小于等于 ¥200 为 C 级商品

POJO 代码:

case class ProductTrading {

    public ProductTrading(BigDecimal amount) {
        this.amount = amount;
    }

    private BigDecimal amount; // 日均销售金额

    private String level; // 级别

    /*
     * 此处省略 getter 和 setter 方法
     */
}

DRL 规则:

package com.dyingbleed.rule;  
dialect  "mvel"

import com.dyingbleed.bean.ProductTrading  
import java.math.BigDecimal

/*
 * A 级品
 */
rule "A"  
    when
        $value: ProductTrading(amount > 800)
    then
        $value.setLevel("A");
        update($value);
end

/*
 * B 级品
 */
rule "B"  
    when
        $value: ProductTrading(amount > 200 && amount <= 800)
    then
        $value.setLevel("B");
        update($value);
end

/*
 * C 级品
 */
rule "C"  
    when
        $value: ProductTrading(amount <= 200)
    then
        $value.setLevel("C");
        update($value);
end  

动态加载规则

在 Driver 从外部(文件系统或字符串)读取 DRL 创建广播变量:

val drlBroadcast = spark.sparkContext.broadcast(drl)  

加载并执行规则:

ds.mapPartitions(partition => {  
  val kieHelper = new KieHelper
  kieHelper.addContent(drlBroadcast.value, ResourceType.DRL) // 加载规则
  val kieBase = kieHelper.build()
  val kieSession = kieBase.newKieSession

  for (i <- partition) {
    kieSession.insert(new ProductTrading(i.amount)) // 插入值
    kieSession.fireAllRules() // 出发规则执行

    // 此处省略若干代码
  }

  kieSession.destroy()

  // 此处省略若干代码
})