提交 f81d9f02 编写于 作者: A Aljoscha Krettek

[FLINK-1710] [table] Switch compile backend to Janino

This greatly reduces compile time while still supporting the same
feature set.
上级 92e0c774
......@@ -300,6 +300,7 @@ BSD-style licenses:
[3-clause BSD license]
- Kryo (https://github.com/EsotericSoftware/kryo) - Copyright (c) 2008, Nathan Sweet
- D3 (http://d3js.org/) - Copyright (c) 2010-2014, Michael Bostock
- Janino (http://docs.codehaus.org/display/JANINO/Home) - Copyright (c) 2001-2010, Arno Unkrig
[BSD-like License]
- Scala Library (http://www.scala-lang.org/) - Copyright (c) 2002-2014 EPFL, Copyright (c) 2011-2014 Typesafe, Inc.
......
......@@ -190,6 +190,14 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-----------------------------------------------------------------------
Janino
-----------------------------------------------------------------------
Janino - An embedded Java[TM] compiler
Copyright (c) 2001-2010, Arno Unkrig
All rights reserved.
-----------------------------------------------------------------------
The Netty Project
......
......@@ -80,6 +80,12 @@ under the License.
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.codehaus.janino</groupId>
<artifactId>janino</artifactId>
<version>2.7.5</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-tests</artifactId>
......
......@@ -47,6 +47,7 @@ public class JavaTableExample {
return "WC " + word + " " + count;
}
}
public static void main(String[] args) throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.createCollectionsEnvironment();
TableEnvironment tableEnv = new TableEnvironment();
......
......@@ -216,7 +216,7 @@ case class Table(private[flink] val operation: PlanNode) {
}
/**
* Joins to [[Table]]s. Similar to an SQL join. The fields of the two joined
* Joins two [[Table]]s. Similar to an SQL join. The fields of the two joined
* operations must not overlap, use [[as]] to rename fields if necessary. You can use
* where and select clauses after a join to further specify the behaviour of the join.
*
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.api.table.codegen
import org.apache.flink.api.table.expressions.Expression
import org.apache.flink.api.common.typeutils.CompositeType
import org.slf4j.LoggerFactory
/**
* Code generator for binary predicates, i.e. a Join or CoGroup Predicate.
*/
class GenerateBinaryPredicate[L, R](
leftType: CompositeType[L],
rightType: CompositeType[R],
predicate: Expression,
cl: ClassLoader)
extends ExpressionCodeGenerator[(L, R) => Boolean](
Seq(("input0", leftType), ("input1", rightType)),
cl = cl) {
val LOG = LoggerFactory.getLogger(this.getClass)
import scala.reflect.runtime.{universe => ru}
import scala.reflect.runtime.universe._
override protected def generateInternal(): ((L, R) => Boolean) = {
val pred = generateExpression(predicate)
val in0 = newTermName("input0")
val in1 = newTermName("input1")
val leftTpe = typeTermForTypeInfo(leftType)
val rightTpe = typeTermForTypeInfo(rightType)
val code = if (nullCheck) {
q"""
($in0: $leftTpe, $in1: $rightTpe) => {
..${pred.code}
if (${pred.nullTerm}) {
false
} else {
${pred.resultTerm}
}
}
"""
} else {
q"""
($in0: $leftTpe, $in1: $rightTpe) => {
..${pred.code}
${pred.resultTerm}
}
"""
}
LOG.debug(s"""Generated binary predicate "$predicate":\n$code""")
toolBox.eval(code).asInstanceOf[(L, R) => Boolean]
}
}
......@@ -17,51 +17,66 @@
*/
package org.apache.flink.api.table.codegen
import org.apache.flink.api.table.expressions.Expression
import org.apache.flink.api.common.typeutils.CompositeType
import java.io.StringReader
import org.slf4j.LoggerFactory
import org.apache.flink.api.common.functions.FilterFunction
import org.apache.flink.api.common.typeutils.CompositeType
import org.apache.flink.api.table.codegen.Indenter._
import org.apache.flink.api.table.expressions.Expression
/**
* Code generator for a unary predicate, i.e. a Filter.
*/
class GenerateUnaryPredicate[T](
class GenerateFilter[T](
inputType: CompositeType[T],
predicate: Expression,
cl: ClassLoader) extends ExpressionCodeGenerator[T => Boolean](
Seq(("input0", inputType)),
cl: ClassLoader) extends ExpressionCodeGenerator[FilterFunction[T]](
Seq(("in0", inputType)),
cl = cl) {
val LOG = LoggerFactory.getLogger(this.getClass)
import scala.reflect.runtime.{universe => ru}
import scala.reflect.runtime.universe._
override protected def generateInternal(): (T => Boolean) = {
override protected def generateInternal(): FilterFunction[T] = {
val pred = generateExpression(predicate)
val tpe = typeTermForTypeInfo(inputType)
val generatedName = freshName("GeneratedFilter")
// Janino does not support generics, so we need to cast by hand
val code = if (nullCheck) {
q"""
(input0: $tpe) => {
..${pred.code}
if (${pred.nullTerm}) {
false
} else {
${pred.resultTerm}
j"""
public class $generatedName
implements org.apache.flink.api.common.functions.FilterFunction<$tpe> {
public boolean filter(Object _in0) {
$tpe in0 = ($tpe) _in0;
${pred.code}
if (${pred.nullTerm}) {
return false;
} else {
return ${pred.resultTerm};
}
}
}
"""
} else {
q"""
(input0: $tpe) => {
..${pred.code}
${pred.resultTerm}
j"""
public class $generatedName
implements org.apache.flink.api.common.functions.FilterFunction<$tpe> {
public boolean filter(Object _in0) {
$tpe in0 = ($tpe) _in0;
${pred.code}
return ${pred.resultTerm};
}
}
"""
}
LOG.debug(s"""Generated unary predicate "$predicate":\n$code""")
toolBox.eval(code).asInstanceOf[(T) => Boolean]
compiler.cook(new StringReader(code))
val clazz = compiler.getClassLoader().loadClass(generatedName)
clazz.newInstance().asInstanceOf[FilterFunction[T]]
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.api.table.codegen
import java.io.StringReader
import org.slf4j.LoggerFactory
import org.apache.flink.api.common.functions.FlatJoinFunction
import org.apache.flink.api.common.typeutils.CompositeType
import org.apache.flink.api.table.codegen.Indenter._
import org.apache.flink.api.table.expressions.{Expression, NopExpression}
/**
* Code generator for assembling the result of a binary operation.
*/
class GenerateJoin[L, R, O](
leftTypeInfo: CompositeType[L],
rightTypeInfo: CompositeType[R],
resultTypeInfo: CompositeType[O],
predicate: Expression,
outputFields: Seq[Expression],
cl: ClassLoader)
extends GenerateResultAssembler[FlatJoinFunction[L, R, O]](
Seq(("in0", leftTypeInfo), ("in1", rightTypeInfo)),
cl = cl) {
val LOG = LoggerFactory.getLogger(this.getClass)
override protected def generateInternal(): FlatJoinFunction[L, R, O] = {
val leftTpe = typeTermForTypeInfo(leftTypeInfo)
val rightTpe = typeTermForTypeInfo(rightTypeInfo)
val resultTpe = typeTermForTypeInfo(resultTypeInfo)
val resultCode = createResult(resultTypeInfo, outputFields, o => s"coll.collect($o);")
val generatedName = freshName("GeneratedJoin")
val code = predicate match {
case n: NopExpression =>
// Janino does not support generics, that's why we need
// manual casting here
if (nullCheck) {
j"""
public class $generatedName
implements org.apache.flink.api.common.functions.FlatFlatJoinFunction {
${reuseCode(resultTypeInfo)}
public void join(Object _in0, Object _in1, org.apache.flink.util.Collector coll) {
$leftTpe in0 = ($leftTpe) _in0;
$rightTpe in1 = ($rightTpe) _in1;
$resultCode
}
}
"""
} else {
j"""
public class $generatedName
implements org.apache.flink.api.common.functions.FlatJoinFunction {
${reuseCode(resultTypeInfo)}
public void join(Object _in0, Object _in1, org.apache.flink.util.Collector coll) {
$leftTpe in0 = ($leftTpe) _in0;
$rightTpe in1 = ($rightTpe) _in1;
$resultCode
}
}
"""
}
case _ =>
val pred = generateExpression(predicate)
// Janino does not support generics, that's why we need
// manual casting here
if (nullCheck) {
j"""
public class $generatedName
implements org.apache.flink.api.common.functions.FlatFlatJoinFunction {
${reuseCode(resultTypeInfo)}
public void join(Object _in0, Object _in1, org.apache.flink.util.Collector coll) {
$leftTpe in0 = ($leftTpe) _in0;
$rightTpe in1 = ($rightTpe) _in1;
${pred.code}
if (${pred.nullTerm} && ${pred.resultTerm}) {
$resultCode
}
}
}
"""
} else {
j"""
public class $generatedName
implements org.apache.flink.api.common.functions.FlatJoinFunction {
${reuseCode(resultTypeInfo)}
public void join(Object _in0, Object _in1, org.apache.flink.util.Collector coll) {
$leftTpe in0 = ($leftTpe) _in0;
$rightTpe in1 = ($rightTpe) _in1;
${pred.code}
if (${pred.resultTerm}) {
$resultCode
}
}
}
"""
}
}
LOG.debug(s"""Generated join:\n$code""")
compiler.cook(new StringReader(code))
val clazz = compiler.getClassLoader().loadClass(generatedName)
clazz.newInstance().asInstanceOf[FlatJoinFunction[L, R, O]]
}
}
......@@ -17,11 +17,11 @@
*/
package org.apache.flink.api.table.codegen
import org.apache.flink.api.table.expressions.Expression
import org.apache.flink.api.common.typeutils.CompositeType
import org.apache.flink.api.table.typeinfo.RowTypeInfo
import org.apache.flink.api.java.typeutils.{TupleTypeInfo, PojoTypeInfo}
import org.apache.flink.api.java.typeutils.{PojoTypeInfo, TupleTypeInfo}
import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
import org.apache.flink.api.table.expressions.Expression
import org.apache.flink.api.table.typeinfo.RowTypeInfo
/**
* Base class for unary and binary result assembler code generators.
......@@ -30,12 +30,24 @@ abstract class GenerateResultAssembler[R](
inputs: Seq[(String, CompositeType[_])],
cl: ClassLoader)
extends ExpressionCodeGenerator[R](inputs, cl = cl) {
import scala.reflect.runtime.{universe => ru}
import scala.reflect.runtime.universe._
def reuseCode[A](resultTypeInfo: CompositeType[A]) = {
val resultTpe = typeTermForTypeInfo(resultTypeInfo)
resultTypeInfo match {
case pj: PojoTypeInfo[_] => s"$resultTpe out = new ${pj.getTypeClass.getCanonicalName}();"
case row: RowTypeInfo =>
s"org.apache.flink.api.table.Row out =" +
s" new org.apache.flink.api.table.Row(${row.getArity});"
case _ => ""
}
}
def createResult[T](
resultTypeInfo: CompositeType[T],
outputFields: Seq[Expression]): Tree = {
outputFields: Seq[Expression],
result: String => String): String = {
val resultType = typeTermForTypeInfo(resultTypeInfo)
......@@ -43,54 +55,57 @@ abstract class GenerateResultAssembler[R](
val block = resultTypeInfo match {
case ri: RowTypeInfo =>
val resultSetters: Seq[Tree] = fieldsCode.zipWithIndex map {
val resultSetters: String = fieldsCode.zipWithIndex map {
case (fieldCode, i) =>
q"""
out.setField($i, { ..${fieldCode.code}; ${fieldCode.resultTerm} })
"""
}
s"""
|${fieldCode.code}
|out.setField($i, ${fieldCode.resultTerm});
""".stripMargin
} mkString("\n")
q"""
..$resultSetters
out
"""
s"""
|$resultSetters
|${result("out")}
""".stripMargin
case pj: PojoTypeInfo[_] =>
val resultSetters: Seq[Tree] = fieldsCode.zip(outputFields) map {
val resultSetters: String = fieldsCode.zip(outputFields) map {
case (fieldCode, expr) =>
val fieldName = newTermName(expr.name)
q"""
out.$fieldName = { ..${fieldCode.code}; ${fieldCode.resultTerm} }
"""
}
val fieldName = expr.name
s"""
|${fieldCode.code}
|out.$fieldName = ${fieldCode.resultTerm};
""".stripMargin
} mkString("\n")
q"""
..$resultSetters
out
"""
s"""
|$resultSetters
|${result("out")}
""".stripMargin
case tup: TupleTypeInfo[_] =>
val resultSetters: Seq[Tree] = fieldsCode.zip(outputFields) map {
val resultSetters: String = fieldsCode.zip(outputFields) map {
case (fieldCode, expr) =>
val fieldName = newTermName(expr.name)
q"""
out.$fieldName = { ..${fieldCode.code}; ${fieldCode.resultTerm} }
"""
}
val fieldName = expr.name
s"""
|${fieldCode.code}
|out.$fieldName = ${fieldCode.resultTerm};
""".stripMargin
} mkString("\n")
q"""
..$resultSetters
out
"""
s"""
|$resultSetters
|${result("out")}
""".stripMargin
case cc: CaseClassTypeInfo[_] =>
val resultFields: Seq[Tree] = fieldsCode map {
fieldCode =>
q"{ ..${fieldCode.code}; ${fieldCode.resultTerm}}"
}
q"""
new $resultType(..$resultFields)
"""
val fields: String = fieldsCode.map(_.code).mkString("\n")
val ctorParams: String = fieldsCode.map(_.resultTerm).mkString(",")
s"""
|$fields
|return new $resultType($ctorParams);
""".stripMargin
}
block
......
......@@ -17,41 +17,58 @@
*/
package org.apache.flink.api.table.codegen
import org.apache.flink.api.table.expressions.Expression
import org.apache.flink.api.common.typeutils.CompositeType
import java.io.StringReader
import org.slf4j.LoggerFactory
import org.apache.flink.api.common.functions.MapFunction
import org.apache.flink.api.common.typeutils.CompositeType
import org.apache.flink.api.table.codegen.Indenter._
import org.apache.flink.api.table.expressions.Expression
/**
* Code generator for assembling the result of a unary operation.
* Code generator for assembling the result of a select operation.
*/
class GenerateUnaryResultAssembler[I, O](
class GenerateSelect[I, O](
inputTypeInfo: CompositeType[I],
resultTypeInfo: CompositeType[O],
outputFields: Seq[Expression],
cl: ClassLoader)
extends GenerateResultAssembler[(I, O) => O](
Seq(("input0", inputTypeInfo)),
extends GenerateResultAssembler[MapFunction[I, O]](
Seq(("in0", inputTypeInfo)),
cl = cl) {
val LOG = LoggerFactory.getLogger(this.getClass)
import scala.reflect.runtime.universe._
override protected def generateInternal(): MapFunction[I, O] = {
val inputTpe = typeTermForTypeInfo(inputTypeInfo)
val resultTpe = typeTermForTypeInfo(resultTypeInfo)
val resultCode = createResult(resultTypeInfo, outputFields, o => s"return $o;")
override protected def generateInternal(): ((I, O) => O) = {
val generatedName = freshName("GeneratedSelect")
val inputType = typeTermForTypeInfo(inputTypeInfo)
val resultType = typeTermForTypeInfo(resultTypeInfo)
// Janino does not support generics, that's why we need
// manual casting here
val code =
j"""
public class $generatedName
implements org.apache.flink.api.common.functions.MapFunction<$inputTpe, $resultTpe> {
val resultCode = createResult(resultTypeInfo, outputFields)
${reuseCode(resultTypeInfo)}
val code: Tree =
q"""
(input0: $inputType, out: $resultType) => {
..$resultCode
@Override
public Object map(Object _in0) {
$inputTpe in0 = ($inputTpe) _in0;
$resultCode
}
}
"""
LOG.debug(s"Generated unary result-assembler:\n${show(code)}")
toolBox.eval(code).asInstanceOf[(I, O) => O]
LOG.debug(s"""Generated select:\n$code""")
compiler.cook(new StringReader(code))
val clazz = compiler.getClassLoader().loadClass(generatedName)
clazz.newInstance().asInstanceOf[MapFunction[I, O]]
}
}
......@@ -17,44 +17,38 @@
*/
package org.apache.flink.api.table.codegen
import org.apache.flink.api.table.expressions.Expression
import org.apache.flink.api.common.typeutils.CompositeType
import org.slf4j.LoggerFactory
/**
* Code generator for assembling the result of a binary operation.
*/
class GenerateBinaryResultAssembler[L, R, O](
leftTypeInfo: CompositeType[L],
rightTypeInfo: CompositeType[R],
resultTypeInfo: CompositeType[O],
outputFields: Seq[Expression],
cl: ClassLoader)
extends GenerateResultAssembler[(L, R, O) => O](
Seq(("input0", leftTypeInfo), ("input1", rightTypeInfo)),
cl = cl) {
val LOG = LoggerFactory.getLogger(this.getClass)
import scala.reflect.runtime.universe._
override protected def generateInternal(): ((L, R, O) => O) = {
val leftType = typeTermForTypeInfo(leftTypeInfo)
val rightType = typeTermForTypeInfo(rightTypeInfo)
val resultType = typeTermForTypeInfo(resultTypeInfo)
val resultCode = createResult(resultTypeInfo, outputFields)
val code: Tree =
q"""
(input0: $leftType, input1: $rightType, out: $resultType) => {
..$resultCode
}
"""
class IndentStringContext(sc: StringContext) {
def j(args: Any*):String = {
val sb = new StringBuilder()
for ((s, a) <- sc.parts zip args) {
sb append s
val ind = getindent(s)
if (ind.size > 0) {
sb append a.toString().replaceAll("\n", "\n" + ind)
} else {
sb append a.toString()
}
}
if (sc.parts.size > args.size) {
sb append sc.parts.last
}
sb.toString()
}
LOG.debug(s"Generated binary result-assembler:\n$code")
toolBox.eval(code).asInstanceOf[(L, R, O) => O]
// get white indent after the last new line, if any
def getindent(str: String): String = {
val lastnl = str.lastIndexOf("\n")
if (lastnl == -1) ""
else {
val ind = str.substring(lastnl + 1)
if (ind.trim.isEmpty) ind // ind is all whitespace. Use this
else ""
}
}
}
object Indenter {
implicit def toISC(sc: StringContext) = new IndentStringContext(sc)
}
......@@ -17,31 +17,31 @@
*/
package org.apache.flink.api.table.runtime
import org.apache.flink.api.table.codegen.GenerateUnaryPredicate
import org.apache.flink.api.table.expressions.{NopExpression, Expression}
import org.apache.flink.api.common.functions.RichFilterFunction
import org.apache.flink.api.common.functions.{FilterFunction, RichFilterFunction}
import org.apache.flink.api.common.typeutils.CompositeType
import org.apache.flink.api.table.codegen.GenerateFilter
import org.apache.flink.api.table.expressions.Expression
import org.apache.flink.configuration.Configuration
/**
* Proxy function that takes an expression predicate. This is compiled
* upon runtime and calls to [[filter()]] are forwarded to the compiled code.
*/
class ExpressionFilterFunction[T](
predicate: Expression,
inputType: CompositeType[T]) extends RichFilterFunction[T] {
var compiledPredicate: (T) => Boolean = null
var compiledFilter: FilterFunction[T] = null
override def open(config: Configuration): Unit = {
if (compiledPredicate == null) {
compiledPredicate = predicate match {
case n: NopExpression => null
case _ =>
val codegen = new GenerateUnaryPredicate[T](
inputType,
predicate,
getRuntimeContext.getUserCodeClassLoader)
codegen.generate()
}
if (compiledFilter == null) {
val codegen = new GenerateFilter[T](
inputType,
predicate,
getRuntimeContext.getUserCodeClassLoader)
compiledFilter = codegen.generate()
}
}
override def filter(in: T) = compiledPredicate(in)
override def filter(in: T) = compiledFilter.filter(in)
}
......@@ -17,14 +17,17 @@
*/
package org.apache.flink.api.table.runtime
import org.apache.flink.api.table.expressions.{NopExpression, Expression}
import org.apache.flink.api.common.functions.RichFlatJoinFunction
import org.apache.flink.api.common.functions.{FlatJoinFunction, RichFlatJoinFunction}
import org.apache.flink.api.common.typeutils.CompositeType
import org.apache.flink.api.table.codegen.{GenerateBinaryResultAssembler,
GenerateBinaryPredicate}
import org.apache.flink.api.table.codegen.GenerateJoin
import org.apache.flink.api.table.expressions.Expression
import org.apache.flink.configuration.Configuration
import org.apache.flink.util.Collector
/**
* Proxy function that takes an expression predicate and output fields. These are compiled
* upon runtime and calls to [[join()]] are forwarded to the compiled code.
*/
class ExpressionJoinFunction[L, R, O](
predicate: Expression,
leftType: CompositeType[L],
......@@ -32,45 +35,20 @@ class ExpressionJoinFunction[L, R, O](
resultType: CompositeType[O],
outputFields: Seq[Expression]) extends RichFlatJoinFunction[L, R, O] {
var compiledPredicate: (L, R) => Boolean = null
var resultAssembler: (L, R, O) => O = null
var result: O = null.asInstanceOf[O]
var compiledJoin: FlatJoinFunction[L, R, O] = null
override def open(config: Configuration): Unit = {
result = resultType.createSerializer(getRuntimeContext.getExecutionConfig).createInstance()
if (compiledPredicate == null) {
compiledPredicate = predicate match {
case n: NopExpression => null
case _ =>
val codegen = new GenerateBinaryPredicate[L, R](
leftType,
rightType,
predicate,
getRuntimeContext.getUserCodeClassLoader)
codegen.generate()
}
}
if (resultAssembler == null) {
val resultCodegen = new GenerateBinaryResultAssembler[L, R, O](
leftType,
rightType,
resultType,
outputFields,
getRuntimeContext.getUserCodeClassLoader)
resultAssembler = resultCodegen.generate()
}
val codegen = new GenerateJoin[L, R, O](
leftType,
rightType,
resultType,
predicate,
outputFields,
getRuntimeContext.getUserCodeClassLoader)
compiledJoin = codegen.generate()
}
def join(left: L, right: R, out: Collector[O]) = {
if (compiledPredicate == null) {
result = resultAssembler(left, right, result)
out.collect(result)
} else {
if (compiledPredicate(left, right)) {
result = resultAssembler(left, right, result)
out.collect(result) }
}
compiledJoin.join(left, right, out)
}
}
......@@ -18,34 +18,36 @@
package org.apache.flink.api.table.runtime
import org.apache.flink.api.table.expressions.Expression
import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.api.common.functions.{MapFunction, RichMapFunction}
import org.apache.flink.api.common.typeutils.CompositeType
import org.apache.flink.api.table.codegen.GenerateUnaryResultAssembler
import org.apache.flink.api.table.codegen.GenerateSelect
import org.apache.flink.configuration.Configuration
/**
* Proxy function that takes expressions. These are compiled
* upon runtime and calls to [[map()]] are forwarded to the compiled code.
*/
class ExpressionSelectFunction[I, O](
inputType: CompositeType[I],
resultType: CompositeType[O],
outputFields: Seq[Expression]) extends RichMapFunction[I, O] {
var resultAssembler: (I, O) => O = null
var result: O = null.asInstanceOf[O]
var compiledSelect: MapFunction[I, O] = null
override def open(config: Configuration): Unit = {
result = resultType.createSerializer(getRuntimeContext.getExecutionConfig).createInstance()
if (resultAssembler == null) {
val resultCodegen = new GenerateUnaryResultAssembler[I, O](
if (compiledSelect == null) {
val resultCodegen = new GenerateSelect[I, O](
inputType,
resultType,
outputFields,
getRuntimeContext.getUserCodeClassLoader)
resultAssembler = resultCodegen.generate()
compiledSelect = resultCodegen.generate()
}
}
def map(in: I): O = {
resultAssembler(in, result)
compiledSelect.map(in)
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册