学习 Spark 2(七):去重计数

去重计数(count distinct)通常用于计算集合中不重复元素的个数。例如:统计交易的商品数、网站的 UV 等。

HyperLogLog

一般情况下,去重计数的做法是维护元素集合 S,对于一个新的元素 e,如果 S 中包含元素 e 则加入集合 S,否则不加入,集合 S 的元素数量就是计数值。

然而,👆 的做法存在两个问题:

  1. 数据基数越大,维护集合需要的存储就越大(耗内存)
  2. 集合越大,判断是否加入集合的成本就越大(耗 CPU)

在 Spark 2.0 之后,Apache Spark SQL 提供了基于 HyperLogLog 算法的 approx_count_distinct 方法。

以统计页面的 PV 和 UV 为例,如果 UV 的值准确性要求不那么高,那么优化后的代码:

// accessLogDF 页面的访问日志
accessLogDF.groupBy(accessLogDF("page_id")).agg(  
    count("user_id") as "pv",
    approx_count_distinct("user_id") as "uv"
)

参考:https://databricks.com/blog/2016/05/19/approximate-algorithms-in-apache-spark-hyperloglog-and-quantiles.html

上卷精确去重计数

OLAP 分析对数据进行上卷(Roll-up)操作,然而,这将导致粒度降低,无法精确去重计数。

参考 Apache Kylin 的实现(原文:https://blog.bcmeng.com/post/kylin-distinct-count-global-dict.html),基于 Spark 实现了上卷精确去重计数:

第一步:构建字典

如果字段类型非 INT 型,需要根据去重计数列构建字典:

def createDict(s: Set[String]): Map[String, Int] = {  
  val bimap = HashBiMap.create[Int, String](elements.length)

  var i = 0
  for (ele <- s) {
    bimap.put(i, ele)
    i += 1
  }

  bimap.inverse().toMap
}

这里使用了 Google Guava 提供的 BiMap 集合,保证了键和值是一对一的关系。

第二步:广播字典

val dBoradcast = spark.sparkContext.broadcast(d)  

对字典广播之后,在接下来的上卷过程中就使用字典

第三步:上卷过程,使用 BitMap 压缩:

对字段明细压缩存储:

val bitmap = new RoaringBitmap() // BitMap

val d = dBroadcast.value // 字典

// 遍历
for (v <- vs) {  
  bitmap.add(d(v.user_id))
}

val data = BitmapUtils.bitmapSerialize(bitmap)  

第四步:BitMap 合并去重计数:

var data: Array[Byte]  = null

// 遍历
for (v <- vs) {  
  if (data == null) {
    data = v.user_id
} else {
    data = BitmapUtils.bitmapMerge(data, v.user_id)
}

BitmapUtils.bitmapDeserialize(data).getCardinality  

工具类:

object BitmapUtils {

  def bitmapSerialize(bitmap: RoaringBitmap): Array[Byte] = {
    val bos = new ByteArrayOutputStream
    val dos = new DataOutputStream(bos)
    bitmap.serialize(dos)
    dos.close()
    bos.toByteArray
  }

  def bitmapDeserialize(data: Array[Byte]): RoaringBitmap = {
    val bis = new ByteArrayInputStream(data)
    val dis = new DataInputStream(bis)
    val bitmap = new RoaringBitmap()
    bitmap.deserialize(dis)
    bis.close()
    bitmap
  }

  def bitmapMerge(data1: Array[Byte], data2: Array[Byte]): Array[Byte] = {
    val x1 = BitmapUtils.bitmapDeserialize(data1)
    val x2 = BitmapUtils.bitmapDeserialize(data2)

    val r = RoaringBitmap.or(x1, x2)
    r.runOptimize()

    bitmapSerialize(r)
  }

}

参考:RoaringBitmap