【译】深入理解 Spark SQL 的 Catalyst 优化器

原文:Deep Dive into Spark SQL’s Catalyst Optimizer

Spark SQL 是 Spark 最新且技术最复杂的组件之一。它同时支持 SQL 查询和新的 DataFrame API。Spark SQL 的核心是 Catalyst 优化器,它以一种全新的方式利用高级语言的特性(例如:Scala 的模式匹配和 Quasiquotes ①)构建一个可扩展的查询优化器。

最近我们在 SIGMOD 2015 发表了一篇论文(合作者:Davies Liu,Joseph K. Bradley,Xiangrui Meng,Tomer Kaftan,Michael J. Franklin 和 Ali Ghodsi)。在本篇博客,我们将重新发表论文中的部分内容,为广大读者解释 Catalyst 优化器的内部原理。

为了实现 Spark SQL,我们基于 Scala 函数式编程结构,设计了一个新的可扩展的优化器 Catalyst。其可扩展设计有两个目的:

  • 首先,我们希望能够非常容易地为 Spark SQL 添加新的优化技术和特性,尤其是为了应对我们遇到的大数据中的各种问题(例如:半结构化数据和高级分析);
  • 其次,我们希望外部的开发者可以扩展优化器。例如,为数据源添加特定的规则从而使过滤或聚合操作下推到外部的存储系统,或者支持新的数据类型。Catalyst 同时支持基于规则和基于成本的优化(CBO)。

Catalyst 核心是树和操作树的规则的一个通用库。在框架的顶层,我们构建了专门用于关系型查询处理的库(例如,表达式,逻辑查询计划),以及处理查询执行的不同阶段的几组规则:分析,逻辑优化,物理计划和将部分查询编译为 Java 字节码的代码生成。对于后者,我们使用了另一个 Scala 特性 Quasiquotes,它使得在运行时从组合表达式生成代码机器变得非常简单。最后,Catalyst 提供了若干公共的扩展点,包括扩展数据源和用户自定义类型。

Catalyst 主要的数据类型是由节点对象构成的树。每个节点由一个节点类型和零到多个子节点组成。节点类型在 Scala 中被定义为 TreeNode 类的子类。这些对象是不可变的,可以使用函数式的转换对其进行操作,我们将在下一小节继续讨论。

举个简单的例子,假设我们有以下三个节点类型,可以用更简化的表达式表示为:

  • Literal(value: Int):代表常量
  • Attribute(name: String):代表输入一行数据的一个属性,例如:“x”
  • Add(left: TreeNode, right: TreeNode):对两个表达式加和

这些类构建成树;例如,表达式 x+(1+2),可以在 Scala 代码中表示为:

Catalyst Tree

规则

规则用于对树进行操作,其实际上是一个将一棵树转换为另外一棵树的方法。虽然规则可以在其输入树上运行任意的代码(假定该树只是一个 Scala 对象),但最常见的方式是使用一组模式匹配函数,找到并替换特定结构的子树。

模式匹配是许多函数式编程语言的特性,允许从代数数据类型的嵌套结构中进行值提取。在 Catalyst,树提供的转换方法可以递归地应用模式匹配函数到树的所有节点。例如,我们可以实现一个常量之间叠加操作的规则:

tree.transform {

  case Add(Literal(c1), Literal(c2)) => Literal(c1+c2)

}

应用这条规则到树 x+(1+2) 就会产生一棵新树 x+3。这里 case 关键字是 Scala 标准模式匹配的语法,可被用于匹配对象的类型以及命名值提取(这里是 c1 和 c2)。

被传递给转换操作的模式匹配表达式是一个偏函数 ②,其只需要匹配所有可能的输入树的子集即可。Catalyst 将测试规则会应用到树的哪个部分,并自动跳过并下降到还没有匹配的子树。这样的能力意味着规则只需要对优化适用的树进行推理。因此,即使添加新的操作类型到系统中,也不需要修改规则。

规则(通常是 Scala 的模式匹配)可以在相同的转换调用中匹配多个模式,这使得一次实现多个转换操作非常的简单:

tree.transform {

  case Add(Literal(c1), Literal(c2)) => Literal(c1+c2)

  case Add(left, Literal(0)) => left

  case Add(Literal(0), right) => right

}

实践中,规则可能需要执行多次才能完全转换一棵树。Catalyst 将规则分成批次,执行各个批次直到达到一个固定的点,即应用规则之后树不再更新为止。执行规则达到一个固定的点,意味着每条规则可以非常简单且是自包含的,但是,最终仍会在树上产生比较大的全局效果。在上面的例子中,重复地应用规则将不断折叠一棵大树,如 (x+0)+(3+3)。另一个例子,第一个批次也许分析一个表达式并将类型赋给所有属性,而第二个批次可能使用这些类型进行不断折叠。每个批次之后,开发者还可以在生成的新树上运行健全性检查(例如,查看所有的属性都指定了类型),通常这也同样通过递归匹配来编写。

最后,规则条件及其实现可以包含具体的 Scala 代码。这使得 Catalyst 比优化器 DSL 更加强大,同时保持了规则的简洁性。

根据我们的经验,对不可变树执行函数式转换操作使得整个优化器非常易于推理和调试。同时也使得优化器的转换操作可以并行化,尽管我们还没有把它利用起来。

在 Spark SQL 中使用 Catalyst

我们在四个阶段使用了 Catalyst 通用树转换操作框架,如下所示:

  1. 分析逻辑计划解析引用
  2. 逻辑计划优化
  3. 物理计划
  4. 代码生成,编译部分查询为 Java 字节码

Catalyst Optimizer

分析

Spark SQL 以一个需要计算的关系开始,其要么来自 SQL 解析器返回的抽象语法树(AST),要么来自使用 API 构造的 DataFrame 对象。在两种情况下,关系可能包含未解析的属性引用或关系:例如,在 SQL 查询 SELECT col FROM sales,col 的类型,甚至是否是一个合法的列名,在我们查询表 sales 之前都是未知的。如果我们不知道其类型或者没有匹配到输入表(或别名),那么这个属性就未被解析。Spark SQL 使用 Catalyst 规则和一个 Catalyst 对象去追踪所有数据源的表来解析这些属性。从未绑定的属性和数据类型构建一个“未解析的逻辑计划”,然后应用规则执行下面的步骤:

  • 通过名字从 Catalog 中查找关系。
  • 映射命名属性,如 col,到输入的给定操作符子项。
  • 检查哪些属性引用了相同的值给它们一个相同的 ID(之后允许针对 col = col 这样的表达式进行优化)。
  • 通过表达式传递和强制类型:举个例子,我们无法知道 1 + col 的返回类型,直到解析 col 并可能将其子表达式转换为兼容类型。

总共,分析器相关的规则大概 1000 行代码

逻辑优化

逻辑优化阶段对逻辑优化应用了标准的基于规则的优化方式。(执行基于成本的优化,通过使用规则生成多个计划并计算他们的成本。)包括:常量折叠(Constant Folding)、谓词下推(Predicate Pushdown)、投影裁剪(Projection Pruning)、空传递(Null Propagation)、布尔表达式简化(Boolean Expression Simplification)和其它规则。总的来说,我们发现为各种情形添加新的规则都极为简单。例如,当我们添加固定精度的 DECIMAL 类型到 Spark SQL 时,以低精度的方式优化对 DECIMAL 的如 SUM 和 AVG 的聚合操作;只要 12 行代码编写一条规则在 SUM 和 AVG 表达式中找到这样的 DECIMAL,然后将他们转换为 64 位的 LONG 类型进行聚合操作,最后将结果转换回来。下面是一个仅优化了 SUM 表达式的简化版本:

object DecimalAggregates extends Rule[LogicalPlan] {

  /** Maximum number of decimal digits in a Long */

  val MAX_LONG_DIGITS = 18

  def apply(plan: LogicalPlan): LogicalPlan = {

    plan transformAllExpressions {

      case Sum(e @ DecimalType.Expression(prec, scale))

          if prec + 10 <= MAX_LONG_DIGITS =>

        MakeDecimal(Sum(UnscaledValue(e)), prec + 10, scale) }

}

另外一个例子,一条 12 行的规则通过简单的正则表达式将 LIKE 表达式优化为 String.startsWith 或 String.contains 调用。在规则中使用任意 Scala 代码的自由,使得这些优化超越了模式匹配子树结构,更易于表达。

总共,逻辑优化规则大概 800 行代码

物理计划

物理计划阶段,Spark SQL 将一个逻辑计划使用匹配的 Spark 执行引擎的物理操作符生成一个或更多的物理计划。然后选择一个计划应用成本模型。此时,基于成本的优化器只用于选择连接算法:对于已知的很小的关系,Spark SQL 使用 broadcast join,使用 Spark 里可用的点对点的广播工具。框架支持更广泛的使用基于成本的优化,这是因为成本可以通过对整棵树使用规则来递归估计。所以,未来我们打算实现更丰富的基于成本的优化。

物理计划同样执行基于规则的物理优化,如在一个 Spark 的 map 操作执行流水线投影(Piplining Projection)或过滤。除此之外,还可以从逻辑计划将操作推到支持谓词或投影下推的数据源。我们将在之后的章节描述这些数据源的 API。

总共,物理计划规则大概 500 行代码

代码生成

查询优化的最后阶段涉及生成运行在各台机器上的 Java 字节码。由于 Spark SQL 通常是运行在内存数据集上,其处理受限于 CPU,因此我们希望支持代码生成来加快执行速度。然而,构建代码生成引擎非常的复杂,尤其是编译器。Catalyst 依赖于 Scala 语言特定的属性 Quasiquotes 使得代码生成更加简单。Quasiquotes 允许在 Scala 语言中使用编程的方式构建抽象语法树(ASTs),然后可以在运行时提供给 Scala 编译器生成字节码。我们使用 Calalyst 将 SQL 表达式的树转换为 Scala 代码的 AST 评估表达式,然后编译并运行生成的代码。

举一个简单的例子,回忆 4.2 节介绍的属性和字面量树节点 Add,这使得我们能够写出表达式 (x+y)+1。如果没有代码生成,这样的表达书不得不解析每一行数据,一直走到 Add 树,属性和字面量节点。

大量的分支和虚函数调用将减慢执行速度。通过代码生成,我们可以向下面,写一个函数将特定的表达式树转换为 Scala AST:

def compile(node: Node): AST = node match {

  case Literal(value) => q"$value"

  case Attribute(name) => q"row.get($name)"

  case Add(left, right) => q"${compile(left)} + ${compile(right)}"

}

以 q 开头的字符串就是 Quasiquotes,虽然长得像字符串,但是 Scala 编译器会在编译时解析它们,并表示代码中的 ASTs。Quasiquotes 支持变量或其它 ASTs 片段拼接,使用 $ 进行表示。举个例子,Literal(1) 变成了 Scala AST 中的 1,而 Attribute("x") 变成了 row.get("x")。最后,像是 Add(Literal(1), Attribute("x")) 的树变成了 Scala 表达式 AST 1+row.get("x")

Quasiquotes 会在编译时进行类型检查以确保只有合适的 ASTs 或者字面量能够被替换,这比字符串连接更有用,而且是直接生成 Scala AST 树而不是在运行时运行 Scala 解析器。此外,由于每个节点代码的生成规则不需要知晓其子节点是如何构建的,因此它们是高度可组合的。最后,如果 Catalyst 缺少表达式级别的优化,Scala 编译器会对代码进行进一步的优化。下图展示了 Quasiquotes 生成的代码性能近似于手动优化的程序性能。

Code Generation

我们发现了 Quasiquotes 可以直接用于代码生成,而且我们观察到即使是 Spark SQL 新的提交者也可以快速增加新的表达式类型规则。Quasiquotes 也与我们运行在原生 Java 对象的目标相契合:当需要访问对象中的字段时,我们通过代码生成直接访问需要的字段,而不必拷贝对象到一个 Spark SQL 的 Row 中然后使用 Row 的访问方法。最后,将代码生成评估与没有生成代码的表达式解析评估结合起来也非常便捷,因为我们编译的 Scala 代码可直接在表达式解析器中调用。

总共, Catalyst 代码生成大概 700 行代码

本篇博客覆盖了 Spark SQL 的 Catalyst 优化器的内部实现。全新的简单的设计使得 Spark 社区可以快速建立原型,实现和扩展引擎。可以通过论文中剩余的部分。如果你参加今年的 SIGMOD,请来参加我们的分享吧!

更多关于 Spark SQL 的信息:

参考