基于Catalyst实现DSL
本文主要基于Spark 1.2.0
从SparkSQL的角度来说,Catalyst是SparkSQL的优化引擎。准确来说,Catalyst其实是一个抽象语法树(Abstract Syntax Tree)的优化引擎。Catalyst最核心的功能其实就两个:
- AST的表示。为了完成一个操作,定义操作对象的接口是很自然的
- AST的变换。优化就是是基于变换完成的
非常典型的数据结构,而且是immutable的。
Catalyst作为一个相对通用的优化引擎,它可以很方便地应用于其它的DSL(Domain-specific language)。
本文的主要目的也是在说明应该如何利用Catalyst实现一个External-DSL。本文不会过多的关注SparkSQL的细节。
为什么选择Catalyst
- Spark平台的集成。比如你可以先用SQL做处理,然后把特定的任务交给DSL处理,处理之后再丢给SparkSQL或者其它的Spark任务。
- 享受SparkSQL的大量基础设施。从头实现一个相对复杂的DSL需要相当的工作量,而现在,大部分的基础性的工作Catalyst或者SparkSQL都已经帮你完成了(尤其是Expression方面的工作,这部分的工作基本是可以完全复用的)
- Scala Combinator Parser。Catalyst基于Scala实现,很容易接Scala内置的Combinator Parser,这样前置的词法和语法分析就不用费心了。
简而言之,我们只要很少的工作,就能实现一个功能强大的DSL了。
DSL的组成部分
- 词法和语法分析。词法分析主要是提取关键字等token,语法分析生成抽象语法树(AST)
- 语义分析。语义分析主要对AST做各种变换和检查。
- AST的执行。在目标系统上执行最终的AST
基于Catalyst的实现
词法和语法分析
这个工作其实和Catalyst并不直接相关,只不过这个过程的结果(AST)需要被Catalyst所使用。
因为Spark和Catalyst都是由Scala实现的,我们很自然的选择Scala内建的Parser Combinators。
Parser Combinators是Scala实现的一个Internal-DSL,正因为这种内嵌的形式,使得它可以直接调用Scala各种强大的API,异常方便。
//E-BNF samples
p1 ~ p2 // sequencing: must match p1 followed by p2
p1 | p2 // alternation: must match either p1 or p2, with preference given to p1
p1.? // optionality: may match p1 or not
p1.* // repetition: matches any number of repetitions of p1这里不打算展开来讲,具体可以参见Parser Combinators的文档,个人建议先看下scala.util.parsing.combinator.RegexParsers的例子。之后可以看下SparkSQL的org.apache.spark.sql.catalyst.SqlParser,我们只要依葫芦画瓢,我们就能完成语法分析,生成AST了。
AST的结构
看起来生成AST的工作好像很简单,写写产生式,再翻译生成AST的节点就可以了。然而,这是建立在你已经知道需要生成什么样的AST的基础之上的。而且,你必须足够了解Catalyst,这样才能使用Catalyst完成你的工作(而不是自己重新造一套轮子)。
在大致了解Catalyst的一些概念之后,可以看下org.apache.spark.sql.catalyst.analysis的实现。这样就基本能了解为了让一个AST能够跑起来需要做哪些工作了。
TreeNode
org.apache.spark.sql.catalyst.trees.TreeNode类是最基本的AST节点, Catalyst的所有AST节点都是它的子类。需要注意的是,TreeNode虽然叫Node,但实际上它表示了一颗树,当你持有一个节点的时候,你同时也持有了以该节点为根的子树。所以,很自然的,AST的变换被定义在了Node上的。
作为Node,拥有children的信息。作为Tree,拥有若干变换操作(transform)。
children: 子节点transformUp: 根据规则自底向上的对整棵树做变换transformDown: 同上,不过是自顶向下的变换
AST的变换
/**
* Returns a copy of this node where `rule` has been recursively applied to it and all of its
* children (pre-order). When `rule` does not apply to a given node it is left unchanged.
* @param rule the function used to transform this nodes children
*/
def transformDown(rule: PartialFunction[BaseType, BaseType]): BaseType = {
val afterRule = rule.applyOrElse(this, identity[BaseType])
// Check if unchanged and then possibly return old copy to avoid gc churn.
if (this fastEquals afterRule) {
transformChildrenDown(rule)
} else {
afterRule.transformChildrenDown(rule)
}
}
/**
* Returns a copy of this node where `rule` has been recursively applied first to all of its
* children and then itself (post-order). When `rule` does not apply to a given node, it is left
* unchanged.
* @param rule the function use to transform this nodes children
*/
def transformUp(rule: PartialFunction[BaseType, BaseType]): BaseType = {
val afterRuleOnChildren = transformChildrenUp(rule);
if (this fastEquals afterRuleOnChildren) {
rule.applyOrElse(this, identity[BaseType])
} else {
rule.applyOrElse(afterRuleOnChildren, identity[BaseType])
}
}- rule:
TreeNode => TreeNode类型的PartialFunction - PartialFunction: 对某些输入未定义行为的函数,比如
sqrt = (x: Double) => Double只对非负的Double有定义 - BaseType: 子类的类型通过泛型的类型参数表达,BaseType是TreeNode的子类
必须强调一点:TreeNode是不可变的,所以,为了改变一颗AST,你必须应用规则生成一颗新的AST,而不能考虑去修改原来的树。
虽然变换的规则参数命名是rule,但实际上它本身是节点的置换函数,rule实际上指的时这个置换函数内部的匹配规则,PartialFunction充当了规则的载体。这些规则表示在什么条件下应用对应的变换,PartialFunction只会在这些条件下定义。默认行为是identity,也就是不变。
plan transform {
//满足如下条件的节点才会被置换
case Project(projectList, child) if containsAggregates(projectList) =>
Aggregate(Nil, projectList, child)
}考虑到持有一个节点就相当于持有了整棵子树,所以这个transform能做的事情其实是非常多的,而transform内部可以继续调用别的transform。这是一套相当优秀的api:极简的形式,极强的表达力。
主要的几类TreeNode
Expression: 最基本的表达式,作用于数据集的一行。比如a + b / 3这类AggregateExpression: 聚合操作,作用于多行,返回一个值。比如SUM(a+b)AggregateFunction: 实际执行aggregate操作的节点,因为aggregate是有状态的,每次都需要生成新的节点NamedExpression: 这一支主要用于变量的生成和引用等,变量绑定到变量名就是由这些类的实例完成的QueryPlan: 用于表示数据集级别的操作,比如JOIN,SELECT等操作。Expression通常是作为它们的属性存在的LogicalPlan: Catalyst部分的主要QueryPlan子类,基本上应该是对Spark没有依赖的PhysicalPlan: SparkSQL部分的QueryPlan,需要负责具体执行的操作,和Spark紧耦合
这些TreeNode的分支,并不是表面上看起来那么简单的。这其中有着很复杂的关系,并且Catalyst目前并没有足够的文档。
这里吐个槽:AggregateExpression这支相当复杂,因为AggregateExpression实际上没有实现Expression接口,它根本没有Expression的行为(实际上根本就不应该作为Expression的子类)。而AggregateFunction同时实现了Expression和AggregateExpression的接口。更麻烦的是,因为AggregateFunction是有状态的,所以每次都得生成新的节点来执行。当aggregate之间产生依赖的时候,我们不得不在运行期间动态地做transform,因为包含AggregateExpression的Expression节点是不能执行的。
abstract class AggregateExpression extends Expression {
self: Product =>
/**
* Creates a new instance that can be used to compute this aggregate expression for a group
* of input rows/
*/
def newInstance(): AggregateFunction
/**
* [[AggregateExpression.eval]] should never be invoked because [[AggregateExpression]]'s are
* replaced with a physical aggregate operator at runtime.
*/
override def eval(input: Row = null): EvaluatedType =
throw new TreeNodeException(this, s"No function to evaluate expression. type: ${this.nodeName}")
}上下文传递
所谓的上下文,在这里主要也就是变量(Reference)和表(Relation,SQL里面的table)。
- Relation: 这个在Catalyst里面基本上可以认为是全局的,Analyzer包含了Catalog,用于查找Relation。
- Reference: Catalyst的所有变量都是通过children传递的,子节点会把自己输出的变量暴露给父节点。
abstract class QueryPlan[PlanType <: TreeNode[PlanType]] extends TreeNode[PlanType] {
self: PlanType with Product =>
def output: Seq[Attribute]
/**
* Returns the set of attributes that are output by this node.
*/
def outputSet: AttributeSet = AttributeSet(output)
/**
* All Attributes that appear in expressions from this operator. Note that this set does not
* include attributes that are implicitly referenced by being passed through to the output tuple.
*/
def references: AttributeSet = AttributeSet(expressions.flatMap(_.references))
//...
}AST的执行
这个过程实际上已经在Catalyst之外了,可以参照SparkSQL的PhysicalPlan部分。当然,如果平台不是Spark的话,就另当别论了:)
这个部分并不容易,上面提到的AggregateExpression的各种问题就是在这层暴露的。很多时候,在执行的过程中,你依然需要对一些表达式做transform。
总结
以上都是我在看Catalyst的时候关注的点,因为我觉得实现一个DSL,我需要知道这些东西。而其它的细节,基本都可以在实现的过程中慢慢再看。其实是写不下去了:)(笑)