基于Catalyst实现DSL

本文主要基于Spark 1.2.0

从SparkSQL的角度来说,Catalyst是SparkSQL的优化引擎。准确来说,Catalyst其实是一个抽象语法树(Abstract Syntax Tree)的优化引擎。Catalyst最核心的功能其实就两个:

  • AST的表示。为了完成一个操作,定义操作对象的接口是很自然的
  • AST的变换。优化就是是基于变换完成的

非常典型的数据结构,而且是immutable的。

Catalyst作为一个相对通用的优化引擎,它可以很方便地应用于其它的DSL(Domain-specific language)。

本文的主要目的也是在说明应该如何利用Catalyst实现一个External-DSL。本文不会过多的关注SparkSQL的细节。

为什么选择Catalyst

  • Spark平台的集成。比如你可以先用SQL做处理,然后把特定的任务交给DSL处理,处理之后再丢给SparkSQL或者其它的Spark任务。
  • 享受SparkSQL的大量基础设施。从头实现一个相对复杂的DSL需要相当的工作量,而现在,大部分的基础性的工作Catalyst或者SparkSQL都已经帮你完成了(尤其是Expression方面的工作,这部分的工作基本是可以完全复用的)
  • Scala Combinator Parser。Catalyst基于Scala实现,很容易接Scala内置的Combinator Parser,这样前置的词法和语法分析就不用费心了。

简而言之,我们只要很少的工作,就能实现一个功能强大的DSL了。

DSL的组成部分

  • 词法和语法分析。词法分析主要是提取关键字等token,语法分析生成抽象语法树(AST)
  • 语义分析。语义分析主要对AST做各种变换和检查。
  • AST的执行。在目标系统上执行最终的AST

基于Catalyst的实现

词法和语法分析

这个工作其实和Catalyst并不直接相关,只不过这个过程的结果(AST)需要被Catalyst所使用。

因为Spark和Catalyst都是由Scala实现的,我们很自然的选择Scala内建的Parser Combinators。

Parser Combinators是Scala实现的一个Internal-DSL,正因为这种内嵌的形式,使得它可以直接调用Scala各种强大的API,异常方便。

//E-BNF samples
p1 ~ p2 // sequencing: must match p1 followed by p2
p1 | p2 // alternation: must match either p1 or p2, with preference given to p1
p1.?    // optionality: may match p1 or not
p1.*    // repetition: matches any number of repetitions of p1

这里不打算展开来讲,具体可以参见Parser Combinators的文档,个人建议先看下scala.util.parsing.combinator.RegexParsers的例子。之后可以看下SparkSQL的org.apache.spark.sql.catalyst.SqlParser,我们只要依葫芦画瓢,我们就能完成语法分析,生成AST了。

AST的结构

看起来生成AST的工作好像很简单,写写产生式,再翻译生成AST的节点就可以了。然而,这是建立在你已经知道需要生成什么样的AST的基础之上的。而且,你必须足够了解Catalyst,这样才能使用Catalyst完成你的工作(而不是自己重新造一套轮子)。

在大致了解Catalyst的一些概念之后,可以看下org.apache.spark.sql.catalyst.analysis的实现。这样就基本能了解为了让一个AST能够跑起来需要做哪些工作了。

TreeNode

org.apache.spark.sql.catalyst.trees.TreeNode类是最基本的AST节点, Catalyst的所有AST节点都是它的子类。需要注意的是,TreeNode虽然叫Node,但实际上它表示了一颗树,当你持有一个节点的时候,你同时也持有了以该节点为根的子树。所以,很自然的,AST的变换被定义在了Node上的。

作为Node,拥有children的信息。作为Tree,拥有若干变换操作(transform)。

  • children: 子节点
  • transformUp: 根据规则自底向上的对整棵树做变换
  • transformDown: 同上,不过是自顶向下的变换
AST的变换
/**
 * Returns a copy of this node where `rule` has been recursively applied to it and all of its
 * children (pre-order). When `rule` does not apply to a given node it is left unchanged.
 * @param rule the function used to transform this nodes children
 */
def transformDown(rule: PartialFunction[BaseType, BaseType]): BaseType = {
  val afterRule = rule.applyOrElse(this, identity[BaseType])
  // Check if unchanged and then possibly return old copy to avoid gc churn.
  if (this fastEquals afterRule) {
    transformChildrenDown(rule)
  } else {
    afterRule.transformChildrenDown(rule)
  }
}

/**
 * Returns a copy of this node where `rule` has been recursively applied first to all of its
 * children and then itself (post-order). When `rule` does not apply to a given node, it is left
 * unchanged.
 * @param rule the function use to transform this nodes children
 */
def transformUp(rule: PartialFunction[BaseType, BaseType]): BaseType = {
  val afterRuleOnChildren = transformChildrenUp(rule);
  if (this fastEquals afterRuleOnChildren) {
    rule.applyOrElse(this, identity[BaseType])
  } else {
    rule.applyOrElse(afterRuleOnChildren, identity[BaseType])
  }
}
  • rule: TreeNode => TreeNode类型的PartialFunction
  • PartialFunction: 对某些输入未定义行为的函数,比如sqrt = (x: Double) => Double只对非负的Double有定义
  • BaseType: 子类的类型通过泛型的类型参数表达,BaseType是TreeNode的子类

必须强调一点:TreeNode是不可变的,所以,为了改变一颗AST,你必须应用规则生成一颗新的AST,而不能考虑去修改原来的树。

虽然变换的规则参数命名是rule,但实际上它本身是节点的置换函数,rule实际上指的时这个置换函数内部的匹配规则,PartialFunction充当了规则的载体。这些规则表示在什么条件下应用对应的变换,PartialFunction只会在这些条件下定义。默认行为是identity,也就是不变。

plan transform {
  //满足如下条件的节点才会被置换
  case Project(projectList, child) if containsAggregates(projectList) =>
    Aggregate(Nil, projectList, child)
}

考虑到持有一个节点就相当于持有了整棵子树,所以这个transform能做的事情其实是非常多的,而transform内部可以继续调用别的transform。这是一套相当优秀的api:极简的形式,极强的表达力

主要的几类TreeNode
  • Expression: 最基本的表达式,作用于数据集的一行。比如a + b / 3这类
  • AggregateExpression: 聚合操作,作用于多行,返回一个值。比如SUM(a+b)
  • AggregateFunction: 实际执行aggregate操作的节点,因为aggregate是有状态的,每次都需要生成新的节点
  • NamedExpression: 这一支主要用于变量的生成和引用等,变量绑定到变量名就是由这些类的实例完成的
  • QueryPlan: 用于表示数据集级别的操作,比如JOINSELECT等操作。Expression通常是作为它们的属性存在的
  • LogicalPlan: Catalyst部分的主要QueryPlan子类,基本上应该是对Spark没有依赖的
  • PhysicalPlan: SparkSQL部分的QueryPlan,需要负责具体执行的操作,和Spark紧耦合

这些TreeNode的分支,并不是表面上看起来那么简单的。这其中有着很复杂的关系,并且Catalyst目前并没有足够的文档。

这里吐个槽:AggregateExpression这支相当复杂,因为AggregateExpression实际上没有实现Expression接口,它根本没有Expression的行为(实际上根本就不应该作为Expression的子类)。而AggregateFunction同时实现了ExpressionAggregateExpression的接口。更麻烦的是,因为AggregateFunction是有状态的,所以每次都得生成新的节点来执行。当aggregate之间产生依赖的时候,我们不得不在运行期间动态地做transform,因为包含AggregateExpressionExpression节点是不能执行的。

abstract class AggregateExpression extends Expression {
  self: Product =>

  /**
   * Creates a new instance that can be used to compute this aggregate expression for a group
   * of input rows/
   */
  def newInstance(): AggregateFunction

  /**
   * [[AggregateExpression.eval]] should never be invoked because [[AggregateExpression]]'s are
   * replaced with a physical aggregate operator at runtime.
   */
  override def eval(input: Row = null): EvaluatedType =
    throw new TreeNodeException(this, s"No function to evaluate expression. type: ${this.nodeName}")
}

上下文传递

所谓的上下文,在这里主要也就是变量(Reference)和表(Relation,SQL里面的table)。

  • Relation: 这个在Catalyst里面基本上可以认为是全局的,Analyzer包含了Catalog,用于查找Relation。
  • Reference: Catalyst的所有变量都是通过children传递的,子节点会把自己输出的变量暴露给父节点。
abstract class QueryPlan[PlanType <: TreeNode[PlanType]] extends TreeNode[PlanType] {
  self: PlanType with Product =>

  def output: Seq[Attribute]

  /**
   * Returns the set of attributes that are output by this node.
   */
  def outputSet: AttributeSet = AttributeSet(output)

  /**
   * All Attributes that appear in expressions from this operator.  Note that this set does not
   * include attributes that are implicitly referenced by being passed through to the output tuple.
   */
  def references: AttributeSet = AttributeSet(expressions.flatMap(_.references))

//...
}

AST的执行

这个过程实际上已经在Catalyst之外了,可以参照SparkSQL的PhysicalPlan部分。当然,如果平台不是Spark的话,就另当别论了:)

这个部分并不容易,上面提到的AggregateExpression的各种问题就是在这层暴露的。很多时候,在执行的过程中,你依然需要对一些表达式做transform。

总结

以上都是我在看Catalyst的时候关注的点,因为我觉得实现一个DSL,我需要知道这些东西。而其它的细节,基本都可以在实现的过程中慢慢再看。其实是写不下去了:)(笑)