Spark 使用 BloomFilter 过滤

Spark 2.1 之后,基于 Guava 的 BloomFilter 实现了自己的布隆过滤器(BloomFilter

在吴军的《数学之美》中第二十一章中,详细介绍了布隆过滤器的原理及应用

相比较其它数据结构,布隆过滤器在时间和空间都要占优势,但是,存在一定的错误率

Demo

下面以计算连续两天同时登录用户数为例:

从访问日志 access_log 查询所有的用户 ID 创建布隆过滤器

bloomFilter 第二个参数 expectedNumItems 为期望的元素数量

bloomFilter 第三个参数 ffp 为期望的错误概率,数值取 0.0 到 1.0 之间

val yesterdayDF = spark.sql(  
  """
    |select
    |    al.user_id
    |from access_log al
    |where al.his = date_format(date_sub(current_date, 1), 'yyyy-MM-dd')
    |and al.user_id is not null
  """.stripMargin)
val bf = yesterdayDF.stat.bloomFilter("user_id", 200000, 0.01) // 布隆过滤器  

创建布隆过滤器广播变量

val bfBc = spark.sparkContext.broadcast(bf) // 广播变量  

使用布隆过滤器过滤前天访问用户 ID

spark.sql(  
  """
    |select
    |    al.user_id
    |from access_log al
    |where al.his = date_format(date_sub(current_date, 2), 'yyyy-MM-dd')
    |and up.user_id is not null
  """.stripMargin).filter((row) => {
    bfBc.value.mightContainString(row.getString(0))
}).distinct().count()