提交 a0b78146 编写于 作者: T twalthr

[FLINK-6237] [table] Performance improvements and code clean up

上级 6230cc55
......@@ -1386,6 +1386,50 @@ PI()
</td>
</tr>
<tr>
<td>
{% highlight text %}
RAND()
{% endhighlight %}
</td>
<td>
<p>Returns a pseudorandom double value between 0.0 (inclusive) and 1.0 (exclusive).</p>
</td>
</tr>
<tr>
<td>
{% highlight text %}
RAND(seed integer)
{% endhighlight %}
</td>
<td>
<p>Returns a pseudorandom double value between 0.0 (inclusive) and 1.0 (exclusive) with a initial seed. Two RAND functions will return identical sequences of numbers if they have same initial seed.</p>
</td>
</tr>
<tr>
<td>
{% highlight text %}
RAND_INTEGER(bound integer)
{% endhighlight %}
</td>
<td>
<p>Returns a pseudorandom integer value between 0.0 (inclusive) and the specified value (exclusive).</p>
</td>
</tr>
<tr>
<td>
{% highlight text %}
RAND_INTEGER(seed integer, bound integer)
{% endhighlight %}
</td>
<td>
<p>Returns a pseudorandom integer value between 0.0 (inclusive) and the specified value (exclusive) with a initial seed. Two RAND_INTEGER functions will return identical sequences of numbers if they have same initial seed and same bound.</p>
</td>
</tr>
</tbody>
</table>
......
......@@ -1411,12 +1411,8 @@ The `OverWindow` defines a range of rows over which aggregates are computed. `Ov
// Bounded Event-time over window (assuming an event-time attribute "rowtime")
.window(Over partitionBy 'a orderBy 'rowtime preceding 1.minutes as 'w)
<<<<<<< HEAD:docs/dev/table/tableApi.md
// Bounded Processing-time over window (assuming a processing-time attribute "proctime")
.window(Over partitionBy 'a orderBy 'proctime preceding 1.minutes as 'w)
=======
A, ABS, ABSOLUTE, ACTION, ADA, ADD, ADMIN, AFTER, ALL, ALLOCATE, ALLOW, ALTER, ALWAYS, AND, ANY, ARE, ARRAY, AS, ASC, ASENSITIVE, ASSERTION, ASSIGNMENT, ASYMMETRIC, AT, ATOMIC, ATTRIBUTE, ATTRIBUTES, AUTHORIZATION, AVG, BEFORE, BEGIN, BERNOULLI, BETWEEN, BIGINT, BINARY, BIT, BLOB, BOOLEAN, BOTH, BREADTH, BY, C, CALL, CALLED, CARDINALITY, CASCADE, CASCADED, CASE, CAST, CATALOG, CATALOG_NAME, CEIL, CEILING, CENTURY, CHAIN, CHAR, CHARACTER, CHARACTERISTICTS, CHARACTERS, CHARACTER_LENGTH, CHARACTER_SET_CATALOG, CHARACTER_SET_NAME, CHARACTER_SET_SCHEMA, CHAR_LENGTH, CHECK, CLASS_ORIGIN, CLOB, CLOSE, COALESCE, COBOL, COLLATE, COLLATION, COLLATION_CATALOG, COLLATION_NAME, COLLATION_SCHEMA, COLLECT, COLUMN, COLUMN_NAME, COMMAND_FUNCTION, COMMAND_FUNCTION_CODE, COMMIT, COMMITTED, CONDITION, CONDITION_NUMBER, CONNECT, CONNECTION, CONNECTION_NAME, CONSTRAINT, CONSTRAINTS, CONSTRAINT_CATALOG, CONSTRAINT_NAME, CONSTRAINT_SCHEMA, CONSTRUCTOR, CONTAINS, CONTINUE, CONVERT, CORR, CORRESPONDING, COUNT, COVAR_POP, COVAR_SAMP, CREATE, CROSS, CUBE, CUME_DIST, CURRENT, CURRENT_CATALOG, CURRENT_DATE, CURRENT_DEFAULT_TRANSFORM_GROUP, CURRENT_PATH, CURRENT_ROLE, CURRENT_SCHEMA, CURRENT_TIME, CURRENT_TIMESTAMP, CURRENT_TRANSFORM_GROUP_FOR_TYPE, CURRENT_USER, CURSOR, CURSOR_NAME, CYCLE, DATA, DATABASE, DATE, DATETIME_INTERVAL_CODE, DATETIME_INTERVAL_PRECISION, DAY, DEALLOCATE, DEC, DECADE, DECIMAL, DECLARE, DEFAULT, DEFAULTS, DEFERRABLE, DEFERRED, DEFINED, DEFINER, DEGREE, DELETE, DENSE_RANK, DEPTH, DEREF, DERIVED, DESC, DESCRIBE, DESCRIPTION, DESCRIPTOR, DETERMINISTIC, DIAGNOSTICS, DISALLOW, DISCONNECT, DISPATCH, DISTINCT, DOMAIN, DOUBLE, DOW, DOY, DROP, DYNAMIC, DYNAMIC_FUNCTION, DYNAMIC_FUNCTION_CODE, EACH, ELEMENT, ELSE, END, END-EXEC, EPOCH, EQUALS, ESCAPE, EVERY, EXCEPT, EXCEPTION, EXCLUDE, EXCLUDING, EXEC, EXECUTE, EXISTS, EXP, EXPLAIN, EXTEND, EXTERNAL, EXTRACT, FALSE, FETCH, FILTER, FINAL, FIRST, FIRST_VALUE, FLOAT, FLOOR, FOLLOWING, FOR, FOREIGN, FORTRAN, FOUND, FRAC_SECOND, FREE, FROM, FULL, FUNCTION, FUSION, G, GENERAL, GENERATED, GET, GLOBAL, GO, GOTO, GRANT, GRANTED, GROUP, GROUPING, HAVING, HIERARCHY, HOLD, HOUR, IDENTITY, IMMEDIATE, IMPLEMENTATION, IMPORT, IN, INCLUDING, INCREMENT, INDICATOR, INITIALLY, INNER, INOUT, INPUT, INSENSITIVE, INSERT, INSTANCE, INSTANTIABLE, INT, INTEGER, INTERSECT, INTERSECTION, INTERVAL, INTO, INVOKER, IS, ISOLATION, JAVA, JOIN, K, KEY, KEY_MEMBER, KEY_TYPE, LABEL, LANGUAGE, LARGE, LAST, LAST_VALUE, LATERAL, LEADING, LEFT, LENGTH, LEVEL, LIBRARY, LIKE, LIMIT, LN, LOCAL, LOCALTIME, LOCALTIMESTAMP, LOCATOR, LOWER, M, MAP, MATCH, MATCHED, MAX, MAXVALUE, MEMBER, MERGE, MESSAGE_LENGTH, MESSAGE_OCTET_LENGTH, MESSAGE_TEXT, METHOD, MICROSECOND, MILLENNIUM, MIN, MINUTE, MINVALUE, MOD, MODIFIES, MODULE, MONTH, MORE, MULTISET, MUMPS, NAME, NAMES, NATIONAL, NATURAL, NCHAR, NCLOB, NESTING, NEW, NEXT, NO, NONE, NORMALIZE, NORMALIZED, NOT, NULL, NULLABLE, NULLIF, NULLS, NUMBER, NUMERIC, OBJECT, OCTETS, OCTET_LENGTH, OF, OFFSET, OLD, ON, ONLY, OPEN, OPTION, OPTIONS, OR, ORDER, ORDERING, ORDINALITY, OTHERS, OUT, OUTER, OUTPUT, OVER, OVERLAPS, OVERLAY, OVERRIDING, PAD, PARAMETER, PARAMETER_MODE, PARAMETER_NAME, PARAMETER_ORDINAL_POSITION, PARAMETER_SPECIFIC_CATALOG, PARAMETER_SPECIFIC_NAME, PARAMETER_SPECIFIC_SCHEMA, PARTIAL, PARTITION, PASCAL, PASSTHROUGH, PATH, PERCENTILE_CONT, PERCENTILE_DISC, PERCENT_RANK, PLACING, PLAN, PLI, POSITION, POWER, PRECEDING, PRECISION, PREPARE, PRESERVE, PRIMARY, PRIOR, PRIVILEGES, PROCEDURE, PUBLIC, QUARTER, RANGE, RANK, READ, READS, REAL, RECURSIVE, REF, REFERENCES, REFERENCING, REGR_AVGX, REGR_AVGY, REGR_COUNT, REGR_INTERCEPT, REGR_R2, REGR_SLOPE, REGR_SXX, REGR_SXY, REGR_SYY, RELATIVE, RELEASE, REPEATABLE, RESET, RESTART, RESTRICT, RESULT, RETURN, RETURNED_CARDINALITY, RETURNED_LENGTH, RETURNED_OCTET_LENGTH, RETURNED_SQLSTATE, RETURNS, REVOKE, RIGHT, ROLE, ROLLBACK, ROLLUP, ROUTINE, ROUTINE_CATALOG, ROUTINE_NAME, ROUTINE_SCHEMA, ROW, ROWS, ROW_COUNT, ROW_NUMBER, SAVEPOINT, SCALE, SCHEMA, SCHEMA_NAME, SCOPE, SCOPE_CATALOGS, SCOPE_NAME, SCOPE_SCHEMA, SCROLL, SEARCH, SECOND, SECTION, SECURITY, SELECT, SELF, SENSITIVE, SEQUENCE, SERIALIZABLE, SERVER, SERVER_NAME, SESSION, SESSION_USER, SET, SETS, SIMILAR, SIMPLE, SIZE, SMALLINT, SOME, SOURCE, SPACE, SPECIFIC, SPECIFICTYPE, SPECIFIC_NAME, SQL, SQLEXCEPTION, SQLSTATE, SQLWARNING, SQL_TSI_DAY, SQL_TSI_FRAC_SECOND, SQL_TSI_HOUR, SQL_TSI_MICROSECOND, SQL_TSI_MINUTE, SQL_TSI_MONTH, SQL_TSI_QUARTER, SQL_TSI_SECOND, SQL_TSI_WEEK, SQL_TSI_YEAR, SQRT, START, STATE, STATEMENT, STATIC, STDDEV_POP, STDDEV_SAMP, STREAM, STRUCTURE, STYLE, SUBCLASS_ORIGIN, SUBMULTISET, SUBSTITUTE, SUBSTRING, SUM, SYMMETRIC, SYSTEM, SYSTEM_USER, TABLE, TABLESAMPLE, TABLE_NAME, TEMPORARY, THEN, TIES, TIME, TIMESTAMP, TIMESTAMPADD, TIMESTAMPDIFF, TIMEZONE_HOUR, TIMEZONE_MINUTE, TINYINT, TO, TOP_LEVEL_COUNT, TRAILING, TRANSACTION, TRANSACTIONS_ACTIVE, TRANSACTIONS_COMMITTED, TRANSACTIONS_ROLLED_BACK, TRANSFORM, TRANSFORMS, TRANSLATE, TRANSLATION, TREAT, TRIGGER, TRIGGER_CATALOG, TRIGGER_NAME, TRIGGER_SCHEMA, TRIM, TRUE, TYPE, UESCAPE, UNBOUNDED, UNCOMMITTED, UNDER, UNION, UNIQUE, UNKNOWN, UNNAMED, UNNEST, UPDATE, UPPER, UPSERT, USAGE, USER, USER_DEFINED_TYPE_CATALOG, USER_DEFINED_TYPE_CODE, USER_DEFINED_TYPE_NAME, USER_DEFINED_TYPE_SCHEMA, USING, VALUE, VALUES, VARBINARY, VARCHAR, VARYING, VAR_POP, VAR_SAMP, VERSION, VIEW, WEEK, WHEN, WHENEVER, WHERE, WIDTH_BUCKET, WINDOW, WITH, WITHIN, WITHOUT, WORK, WRAPPER, WRITE, XML, YEAR, ZONE, RAND, RAND_INTEGER
>>>>>>> support RAND and RAND_INTEGER on TableAPI, update the documentation:docs/dev/table_api.md
// Bounded Event-time Row-count over window (assuming an event-time attribute "rowtime")
.window(Over partitionBy 'a orderBy 'rowtime preceding 10.rows as 'w)
......@@ -2071,6 +2067,50 @@ pi()
<p>Returns a value that is closer than any other value to pi.</p>
</td>
</tr>
<tr>
<td>
{% highlight java %}
rand()
{% endhighlight %}
</td>
<td>
<p>Returns a pseudorandom double value between 0.0 (inclusive) and 1.0 (exclusive).</p>
</td>
</tr>
<tr>
<td>
{% highlight java %}
rand(seed integer)
{% endhighlight %}
</td>
<td>
<p>Returns a pseudorandom double value between 0.0 (inclusive) and 1.0 (exclusive) with a initial seed. Two rand functions will return identical sequences of numbers if they have same initial seed.</p>
</td>
</tr>
<tr>
<td>
{% highlight java %}
randInteger(bound integer)
{% endhighlight %}
</td>
<td>
<p>Returns a pseudorandom integer value between 0.0 (inclusive) and the specified value (exclusive).</p>
</td>
</tr>
<tr>
<td>
{% highlight java %}
randInteger(seed integer, bound integer)
{% endhighlight %}
</td>
<td>
<p>Returns a pseudorandom integer value between 0.0 (inclusive) and the specified value (exclusive) with a initial seed. Two randInteger functions will return identical sequences of numbers if they have same initial seed and same bound.</p>
</td>
</tr>
</tbody>
</table>
......@@ -3280,6 +3320,50 @@ pi()
</td>
</tr>
<tr>
<td>
{% highlight scala %}
rand()
{% endhighlight %}
</td>
<td>
<p>Returns a pseudorandom double value between 0.0 (inclusive) and 1.0 (exclusive).</p>
</td>
</tr>
<tr>
<td>
{% highlight scala %}
rand(seed integer)
{% endhighlight %}
</td>
<td>
<p>Returns a pseudorandom double value between 0.0 (inclusive) and 1.0 (exclusive) with a initial seed. Two rand functions will return identical sequences of numbers if they have same initial seed.</p>
</td>
</tr>
<tr>
<td>
{% highlight scala %}
randInteger(bound integer)
{% endhighlight %}
</td>
<td>
<p>Returns a pseudorandom integer value between 0.0 (inclusive) and the specified value (exclusive).</p>
</td>
</tr>
<tr>
<td>
{% highlight scala %}
randInteger(seed integer, bound integer)
{% endhighlight %}
</td>
<td>
<p>Returns a pseudorandom integer value between 0.0 (inclusive) and the specified value (exclusive) with a initial seed. Two randInteger functions will return identical sequences of numbers if they have same initial seed and same bound.</p>
</td>
</tr>
</tbody>
</table>
......@@ -3938,246 +4022,6 @@ ARRAY.element()
</tbody>
</table>
<<<<<<< HEAD:docs/dev/table/tableApi.md
=======
<table class="table table-bordered">
<thead>
<tr>
<th class="text-left" style="width: 40%">Random functions</th>
<th class="text-center">Description</th>
</tr>
</thead>
<tbody>
<tr>
<td>
{% highlight text %}
RAND()
{% endhighlight %}
</td>
<td>
<p>Returns a pseudorandom double value between 0.0 (inclusive) and 1.0 (exclusive).</p>
</td>
</tr>
<tr>
<td>
{% highlight text %}
RAND(seed integer)
{% endhighlight %}
</td>
<td>
<p>Returns a pseudorandom double value between 0.0 (inclusive) and 1.0 (exclusive) with a initial seed. Two RAND functions will get identical sequences of numbers if they have same initial seed.</p>
</td>
</tr>
<tr>
<td>
{% highlight text %}
RAND_INTEGER(bound integer)
{% endhighlight %}
</td>
<td>
<p>Returns a pseudorandom int value between 0.0 (inclusive) and the specified value (exclusive).</p>
</td>
</tr>
<tr>
<td>
{% highlight text %}
RAND_INTEGER(seed integer, bound integer)
{% endhighlight %}
</td>
<td>
<p>Returns a pseudorandom int value between 0.0 (inclusive) and the specified value (exclusive) with a initial seed. Two RAND_INTEGER functions will get identical sequences of numbers if they have same initial seed and same bound.</p>
</td>
</tr>
</tbody>
</table>
</div>
</div>
{% top %}
User-defined Functions
----------------
### User-defined Scalar Functions
If a required scalar function is not contained in the built-in functions, it is possible to define custom, user-defined scalar functions for both the Table API and SQL. A user-defined scalar functions maps zero, one, or multiple scalar values to a new scalar value.
In order to define a scalar function one has to extend the base class `ScalarFunction` in `org.apache.flink.table.functions` and implement (one or more) evaluation methods. The behavior of a scalar function is determined by the evaluation method. An evaluation method must be declared publicly and named `eval`. The parameter types and return type of the evaluation method also determine the parameter and return types of the scalar function. Evaluation methods can also be overloaded by implementing multiple methods named `eval`.
The following example shows how to define your own hash code function, register it in the TableEnvironment, and call it in a query. Note that you can configure your scalar function via a constructor before it is registered:
<div class="codetabs" markdown="1">
<div data-lang="java" markdown="1">
{% highlight java %}
public class HashCode extends ScalarFunction {
private int factor = 12;
public HashCode(int factor) {
this.factor = factor;
}
public int eval(String s) {
return s.hashCode() * factor;
}
}
BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
// register the function
tableEnv.registerFunction("hashCode", new HashCode(10))
// use the function in Java Table API
myTable.select("string, string.hashCode(), hashCode(string)");
// use the function in SQL API
tableEnv.sql("SELECT string, HASHCODE(string) FROM MyTable");
{% endhighlight %}
</div>
<div data-lang="scala" markdown="1">
{% highlight scala %}
// must be defined in static/object context
class HashCode(factor: Int) extends ScalarFunction {
def eval(s: String): Int = {
s.hashCode() * factor
}
}
val tableEnv = TableEnvironment.getTableEnvironment(env)
// use the function in Scala Table API
val hashCode = new HashCode(10)
myTable.select('string, hashCode('string))
// register and use the function in SQL
tableEnv.registerFunction("hashCode", new HashCode(10))
tableEnv.sql("SELECT string, HASHCODE(string) FROM MyTable");
{% endhighlight %}
</div>
</div>
By default the result type of an evaluation method is determined by Flink's type extraction facilities. This is sufficient for basic types or simple POJOs but might be wrong for more complex, custom, or composite types. In these cases `TypeInformation` of the result type can be manually defined by overriding `ScalarFunction#getResultType()`.
Internally, the Table API and SQL code generation works with primitive values as much as possible. If a user-defined scalar function should not introduce much overhead through object creation/casting during runtime, it is recommended to declare parameters and result types as primitive types instead of their boxed classes. `Types.DATE` and `Types.TIME` can also be represented as `int`. `Types.TIMESTAMP` can be represented as `long`.
The following example shows an advanced example which takes the internal timestamp representation and also returns the internal timestamp representation as a long value. By overriding `ScalarFunction#getResultType()` we define that the returned long value should be interpreted as a `Types.TIMESTAMP` by the code generation.
<div class="codetabs" markdown="1">
<div data-lang="java" markdown="1">
{% highlight java %}
public static class TimestampModifier extends ScalarFunction {
public long eval(long t) {
return t % 1000;
}
public TypeInformation<?> getResultType(signature: Class<?>[]) {
return Types.TIMESTAMP;
}
}
{% endhighlight %}
</div>
<div data-lang="scala" markdown="1">
{% highlight scala %}
object TimestampModifier extends ScalarFunction {
def eval(t: Long): Long = {
t % 1000
}
override def getResultType(signature: Array[Class[_]]): TypeInformation[_] = {
Types.TIMESTAMP
}
}
{% endhighlight %}
</div>
</div>
### User-defined Table Functions
Similar to a user-defined scalar function, a user-defined table function takes zero, one, or multiple scalar values as input parameters. However in contrast to a scalar function, it can return an arbitrary number of rows as output instead of a single value. The returned rows may consist of one or more columns.
In order to define a table function one has to extend the base class `TableFunction` in `org.apache.flink.table.functions` and implement (one or more) evaluation methods. The behavior of a table function is determined by its evaluation methods. An evaluation method must be declared `public` and named `eval`. The `TableFunction` can be overloaded by implementing multiple methods named `eval`. The parameter types of the evaluation methods determine all valid parameters of the table function. The type of the returned table is determined by the generic type of `TableFunction`. Evaluation methods emit output rows using the protected `collect(T)` method.
In the Table API, a table function is used with `.join(Expression)` or `.leftOuterJoin(Expression)` for Scala users and `.join(String)` or `.leftOuterJoin(String)` for Java users. The `join` operator (cross) joins each row from the outer table (table on the left of the operator) with all rows produced by the table-valued function (which is on the right side of the operator). The `leftOuterJoin` operator joins each row from the outer table (table on the left of the operator) with all rows produced by the table-valued function (which is on the right side of the operator) and preserves outer rows for which the table function returns an empty table. In SQL use `LATERAL TABLE(<TableFunction>)` with CROSS JOIN and LEFT JOIN with an ON TRUE join condition (see examples below).
The following example shows how to define table-valued function, register it in the TableEnvironment, and call it in a query. Note that you can configure your table function via a constructor before it is registered:
<div class="codetabs" markdown="1">
<div data-lang="java" markdown="1">
{% highlight java %}
// The generic type "Tuple2<String, Integer>" determines the schema of the returned table as (String, Integer).
public class Split extends TableFunction<Tuple2<String, Integer>> {
private String separator = " ";
public Split(String separator) {
this.separator = separator;
}
public void eval(String str) {
for (String s : str.split(separator)) {
// use collect(...) to emit a row
collect(new Tuple2<String, Integer>(s, s.length()));
}
}
}
BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
Table myTable = ... // table schema: [a: String]
// Register the function.
tableEnv.registerFunction("split", new Split("#"));
// Use the table function in the Java Table API. "as" specifies the field names of the table.
myTable.join("split(a) as (word, length)").select("a, word, length");
myTable.leftOuterJoin("split(a) as (word, length)").select("a, word, length");
// Use the table function in SQL with LATERAL and TABLE keywords.
// CROSS JOIN a table function (equivalent to "join" in Table API).
tableEnv.sql("SELECT a, word, length FROM MyTable, LATERAL TABLE(split(a)) as T(word, length)");
// LEFT JOIN a table function (equivalent to "leftOuterJoin" in Table API).
tableEnv.sql("SELECT a, word, length FROM MyTable LEFT JOIN LATERAL TABLE(split(a)) as T(word, length) ON TRUE");
{% endhighlight %}
</div>
<div data-lang="scala" markdown="1">
{% highlight scala %}
// The generic type "(String, Int)" determines the schema of the returned table as (String, Integer).
class Split(separator: String) extends TableFunction[(String, Int)] {
def eval(str: String): Unit = {
// use collect(...) to emit a row.
str.split(separator).foreach(x -> collect((x, x.length))
}
}
val tableEnv = TableEnvironment.getTableEnvironment(env)
val myTable = ... // table schema: [a: String]
// Use the table function in the Scala Table API (Note: No registration required in Scala Table API).
val split = new Split("#")
// "as" specifies the field names of the generated table.
myTable.join(split('a) as ('word, 'length)).select('a, 'word, 'length);
myTable.leftOuterJoin(split('a) as ('word, 'length)).select('a, 'word, 'length);
// Register the table function to use it in SQL queries.
tableEnv.registerFunction("split", new Split("#"))
// Use the table function in SQL with LATERAL and TABLE keywords.
// CROSS JOIN a table function (equivalent to "join" in Table API)
tableEnv.sql("SELECT a, word, length FROM MyTable, LATERAL TABLE(split(a)) as T(word, length)");
// LEFT JOIN a table function (equivalent to "leftOuterJoin" in Table API)
tableEnv.sql("SELECT a, word, length FROM MyTable LEFT JOIN TABLE(split(a)) as T(word, length) ON TRUE");
{% endhighlight %}
**IMPORTANT:** Do not implement TableFunction as a Scala object. Scala object is a singleton and will cause concurrency issues.
</div>
</div>
Please note that POJO types do not have a deterministic field order. Therefore, you cannot rename the fields of POJO returned by a table function using `AS`.
>>>>>>> support RAND and RAND_INTEGER on TableAPI, update the documentation:docs/dev/table_api.md
<table class="table table-bordered">
......
......@@ -22,18 +22,11 @@ import java.sql.{Date, Time, Timestamp}
import org.apache.calcite.avatica.util.DateTimeUtils._
import org.apache.flink.api.common.typeinfo.{SqlTimeTypeInfo, TypeInformation}
<<<<<<< HEAD
import org.apache.flink.table.api.{TableException, CurrentRow, CurrentRange, UnboundedRow, UnboundedRange}
import org.apache.flink.table.expressions.ExpressionUtils.{convertArray, toMilliInterval, toMonthInterval, toRowInterval}
import org.apache.flink.table.expressions.TimeIntervalUnit.TimeIntervalUnit
import org.apache.flink.table.expressions._
import org.apache.flink.table.functions.AggregateFunction
=======
import org.apache.flink.table.api.ValidationException
import org.apache.flink.table.expressions.ExpressionUtils.{convertArray, toMilliInterval, toMonthInterval, toRowInterval}
import org.apache.flink.table.expressions.TimeIntervalUnit.TimeIntervalUnit
import org.apache.flink.table.expressions._
>>>>>>> support RAND and RAND_INTEGER on TableAPI, update the documentation
import scala.language.implicitConversions
......@@ -917,7 +910,6 @@ object array {
}
}
<<<<<<< HEAD
/**
* Returns a value that is closer than any other value to pi.
*/
......@@ -928,44 +920,52 @@ object pi {
*/
def apply(): Expression = {
Pi()
=======
object rand {
def apply(expr: Expression*): Expression = {
expr.size match {
case 0 => new Rand()
case 1 => Rand(expr.head)
case _ => throw new ValidationException("Invalid arguments for rand([seed]).")
}
}
}
object rand_integer {
/**
* Returns a pseudorandom double value between 0.0 (inclusive) and 1.0 (exclusive).
*/
object rand {
def apply(expr: Expression*): Expression = {
expr.size match {
case 1 => new RandInteger(expr.head)
case 2 => RandInteger(expr.head, expr(1))
case _ =>
throw new ValidationException("Invalid arguments for rand_integer([seed, ] bound).")
}
<<<<<<< HEAD
>>>>>>> support RAND and RAND_INTEGER on TableAPI, update the documentation
=======
/**
* Returns a pseudorandom double value between 0.0 (inclusive) and 1.0 (exclusive).
*/
def apply(): Expression = {
new Rand()
}
/**
* Returns a pseudorandom double value between 0.0 (inclusive) and 1.0 (exclusive) with a
* initial seed. Two rand() functions will return identical sequences of numbers if they
* have same initial seed.
*/
def apply(seed: Expression): Expression = {
Rand(seed)
}
}
/**
* Returns a value that is closer than any other value to pi.
* Returns a pseudorandom integer value between 0.0 (inclusive) and the specified
* value (exclusive).
*/
object pi {
object randInteger {
/**
* Returns a value that is closer than any other value to pi.
* Returns a pseudorandom integer value between 0.0 (inclusive) and the specified
* value (exclusive).
*/
def apply(): Expression = {
Pi()
>>>>>>> fix merging error
def apply(bound: Expression): Expression = {
new RandInteger(bound)
}
/**
* Returns a pseudorandom integer value between 0.0 (inclusive) and the specified value
* (exclusive) with a initial seed. Two randInteger() functions will return identical sequences
* of numbers if they have same initial seed and same bound.
*/
def apply(seed: Expression, bound: Expression): Expression = {
RandInteger(seed, bound)
}
}
......
......@@ -43,15 +43,11 @@ import org.apache.flink.table.codegen.Indenter.toISC
import org.apache.flink.table.codegen.calls.FunctionGenerator
import org.apache.flink.table.codegen.calls.ScalarOperators._
import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils
<<<<<<< HEAD
import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils.{getUserDefinedMethod, signatureToString}
import org.apache.flink.table.functions.{AggregateFunction, FunctionContext, TimeMaterializationSqlFunction, UserDefinedFunction}
=======
import org.apache.flink.table.functions.{FunctionContext, UserDefinedFunction}
>>>>>>> [FLINK-6237] [table] support RAND and RAND_INTEGER on SQL
import org.apache.flink.table.runtime.TableFunctionCollector
import org.apache.flink.table.typeutils.TypeCheckUtils._
import org.apache.flink.types.Row
import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils.{getUserDefinedMethod, signatureToString}
import scala.collection.JavaConversions._
import scala.collection.mutable
......@@ -1730,7 +1726,7 @@ class CodeGenerator(
|$resultTypeTerm $resultTerm = $defaultValue;
|boolean $nullTerm = true;
|""".stripMargin
GeneratedExpression(resultTerm, nullTerm, wrappedCode, resultType)
GeneratedExpression(resultTerm, nullTerm, wrappedCode, resultType, literal = true)
} else {
throw new CodeGenException("Null literals are not allowed if nullCheck is disabled.")
}
......@@ -1755,7 +1751,7 @@ class CodeGenerator(
|""".stripMargin
}
GeneratedExpression(resultTerm, nullTerm, resultCode, literalType)
GeneratedExpression(resultTerm, nullTerm, resultCode, literalType, literal = true)
}
private[flink] def generateSymbol(enum: Enum[_]): GeneratedExpression = {
......@@ -1965,34 +1961,37 @@ class CodeGenerator(
/**
* Adds a reusable [[java.util.Random]] to the member area of the generated [[Function]].
*
* The seed parameter must be a literal/constant expression.
*
* @return member variable term
*/
def addReusableRandom(seedExpr: GeneratedExpression): String = {
def addReusableRandom(seedExpr: Option[GeneratedExpression]): String = {
val fieldTerm = newName("random")
val field =
s"""
|final java.util.Random $fieldTerm;
|transient java.util.Random $fieldTerm;
|""".stripMargin
reusableMemberStatements.add(field)
val fieldInit = if (seedExpr != null && nullCheck) {
s"""
|${seedExpr.code}
|if(!${seedExpr.nullTerm}) {
| $fieldTerm = new java.util.Random(${seedExpr.resultTerm});
val fieldInit = seedExpr match {
case Some(s) if nullCheck =>
s"""
|${s.code}
|if(!${s.nullTerm}) {
| $fieldTerm = new java.util.Random(${s.resultTerm});
|}
|else {
| $fieldTerm = new java.util.Random();
|}
|""".stripMargin
} else if (seedExpr != null) {
s"""
|${seedExpr.code}
|$fieldTerm = new java.util.Random(${seedExpr.resultTerm});
case Some(s) =>
s"""
|${s.code}
|$fieldTerm = new java.util.Random(${s.resultTerm});
|""".stripMargin
} else {
s"""
case _ =>
s"""
|$fieldTerm = new java.util.Random();
|""".stripMargin
}
......
......@@ -375,6 +375,26 @@ object FunctionGenerator {
Seq(),
new ConstantCallGen(DOUBLE_TYPE_INFO, Math.PI.toString))
addSqlFunction(
RAND,
Seq(),
new RandCallGen(isRandInteger = false, hasSeed = false))
addSqlFunction(
RAND,
Seq(INT_TYPE_INFO),
new RandCallGen(isRandInteger = false, hasSeed = true))
addSqlFunction(
RAND_INTEGER,
Seq(INT_TYPE_INFO),
new RandCallGen(isRandInteger = true, hasSeed = false))
addSqlFunction(
RAND_INTEGER,
Seq(INT_TYPE_INFO, INT_TYPE_INFO),
new RandCallGen(isRandInteger = true, hasSeed = true))
// ----------------------------------------------------------------------------------------------
// Temporal functions
// ----------------------------------------------------------------------------------------------
......@@ -458,26 +478,6 @@ object FunctionGenerator {
Seq(),
new CurrentTimePointCallGen(SqlTimeTypeInfo.TIMESTAMP, local = true))
addSqlFunction(
RAND,
Seq(),
new RandCallGen(BuiltInMethod.RAND))
addSqlFunction(
RAND,
Seq(INT_TYPE_INFO),
new RandCallGen(BuiltInMethod.RAND_SEED))
addSqlFunction(
RAND_INTEGER,
Seq(INT_TYPE_INFO),
new RandCallGen(BuiltInMethod.RAND_INTEGER))
addSqlFunction(
RAND_INTEGER,
Seq(INT_TYPE_INFO, INT_TYPE_INFO),
new RandCallGen(BuiltInMethod.RAND_INTEGER_SEED))
// ----------------------------------------------------------------------------------------------
/**
......
......@@ -18,82 +18,40 @@
package org.apache.flink.table.codegen.calls
import org.apache.calcite.util.BuiltInMethod
import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.table.codegen.CodeGenUtils._
import org.apache.flink.table.codegen.calls.CallGenerator.generateCallIfArgsNotNull
import org.apache.flink.table.codegen.{CodeGenerator, GeneratedExpression}
/**
* Generates a rand/rand_integer function call.
* Supports: RAND([seed]) and RAND_INTEGER([seed, ] bound)
* Generates a random function call.
* Supports: RAND([seed]) and RAND_INTEGER([seed, ] bound)
*/
class RandCallGen(method: BuiltInMethod) extends CallGenerator {
class RandCallGen(isRandInteger: Boolean, hasSeed: Boolean) extends CallGenerator {
override def generate(
codeGenerator: CodeGenerator,
operands: Seq[GeneratedExpression]): GeneratedExpression = {
codeGenerator: CodeGenerator,
operands: Seq[GeneratedExpression])
: GeneratedExpression = {
method match {
case BuiltInMethod.RAND =>
assert(operands.isEmpty)
val randField = codeGenerator.addReusableRandom(null)
generateCallIfArgsNotNull(codeGenerator.nullCheck, DOUBLE_TYPE_INFO, operands) {
_ => s"""$randField.nextDouble()""".stripMargin
}
case BuiltInMethod.RAND_SEED =>
assert(operands.size == 1)
val (randField, newOperands) = if (operands.head.code.isEmpty) {
val (randField, initRandomExpr) = genInitRandomExpression(operands.head)
(randField, Seq(initRandomExpr))
} else {
val randField = codeGenerator.addReusableRandom(operands.head)
(randField, Seq.empty)
}
generateCallIfArgsNotNull(codeGenerator.nullCheck, DOUBLE_TYPE_INFO, newOperands) {
_ => s"""$randField.nextDouble()""".stripMargin
}
case BuiltInMethod.RAND_INTEGER =>
assert(operands.size == 1)
val randField = codeGenerator.addReusableRandom(null)
generateCallIfArgsNotNull(codeGenerator.nullCheck, INT_TYPE_INFO, operands) {
(terms) => s"""$randField.nextInt(${terms.head})""".stripMargin
}
case BuiltInMethod.RAND_INTEGER_SEED =>
assert(operands.size == 2)
val (randField, newOperands) = if (operands.head.code.isEmpty) {
val (randField, initRandomExpr) = genInitRandomExpression(operands.head)
(randField, Seq(initRandomExpr, operands(1)))
} else {
val randField = codeGenerator.addReusableRandom(operands.head)
(randField, Seq(operands(1)))
}
generateCallIfArgsNotNull(codeGenerator.nullCheck, INT_TYPE_INFO, newOperands) {
(terms) => s"""$randField.nextInt(${terms.last})""".stripMargin
}
val randField = if (hasSeed) {
if (operands.head.literal) {
codeGenerator.addReusableRandom(Some(operands.head))
} else {
s"(new java.util.Random(${operands.head.resultTerm}))"
}
} else {
codeGenerator.addReusableRandom(None)
}
}
private def genInitRandomExpression(
seedExpr: GeneratedExpression): (String, GeneratedExpression) = {
val randField = newName("random")
val initRandomCode =
s"""
|java.util.Random $randField;
|if(!${seedExpr.nullTerm}) {
| $randField = new java.util.Random(${seedExpr.resultTerm});
|}
|else {
| $randField = new java.util.Random();
|}
|""".stripMargin
val initRandomExpr = GeneratedExpression(
randField,
GeneratedExpression.NEVER_NULL,
initRandomCode,
TypeInformation.of(classOf[java.util.Random]))
(randField, initRandomExpr)
if (isRandInteger) {
generateCallIfArgsNotNull(codeGenerator.nullCheck, INT_TYPE_INFO, operands) { terms =>
s"$randField.nextInt(${terms.last})"
}
} else {
generateCallIfArgsNotNull(codeGenerator.nullCheck, DOUBLE_TYPE_INFO, operands) { _ =>
s"$randField.nextDouble()"
}
}
}
}
......@@ -934,8 +934,6 @@ object ScalarOperators {
}
}
<<<<<<< HEAD
<<<<<<< HEAD
def generateMapGet(
codeGenerator: CodeGenerator,
map: GeneratedExpression,
......@@ -964,68 +962,8 @@ object ScalarOperators {
|""".stripMargin
}
GeneratedExpression(resultTerm, nullTerm, accessCode, resultType)
=======
def generateRand(
randField: String,
seedExpr: GeneratedExpression,
resultType: TypeInformation[_])
: GeneratedExpression = {
val resultTerm = newName("result")
val resultTypeTerm = primitiveTypeTermForTypeInfo(resultType)
val randCode = if (seedExpr != null) {
s"""
|if ($randField == null) {
| ${seedExpr.code}
| $randField = new java.util.Random(${seedExpr.resultTerm});
|}
|$resultTypeTerm $resultTerm = $randField.nextDouble();
""".stripMargin
} else {
s"""
|if ($randField == null) {
| $randField = new java.util.Random();
|}
|$resultTypeTerm $resultTerm = $randField.nextDouble();
""".stripMargin
}
GeneratedExpression(resultTerm, GeneratedExpression.NEVER_NULL, randCode, resultType)
}
def generateRandInteger(
randField: String,
seedExpr: GeneratedExpression,
boundExpr: GeneratedExpression,
resultType: TypeInformation[_])
: GeneratedExpression = {
assert(boundExpr != null)
val resultTerm = newName("result")
val resultTypeTerm = primitiveTypeTermForTypeInfo(resultType)
val randCode = if (seedExpr != null) {
s"""
|if ($randField == null) {
| ${seedExpr.code}
| $randField = new java.util.Random(${seedExpr.resultTerm});
|}
|${boundExpr.code}
|$resultTypeTerm $resultTerm = $randField.nextInt(${boundExpr.resultTerm});
""".stripMargin
} else {
s"""
|if ($randField == null) {
| $randField = new java.util.Random();
|}
|${boundExpr.code}
|$resultTypeTerm $resultTerm = $randField.nextInt(${boundExpr.resultTerm});
""".stripMargin
}
GeneratedExpression(resultTerm, GeneratedExpression.NEVER_NULL, randCode, resultType)
>>>>>>> [FLINK-6237] [table] support RAND and RAND_INTEGER on SQL
}
=======
>>>>>>> add the rand functions to FunctionGenerator class, and init random field in constructor
// ----------------------------------------------------------------------------------------------
private def generateUnaryOperatorIfNotNull(
......@@ -1114,7 +1052,7 @@ object ScalarOperators {
case "*" => "multiply"
case "/" => "divide"
case "%" => "remainder"
case _ => throw new CodeGenException(s"Unsupported decimal arithmetic operator: '${operator}'")
case _ => throw new CodeGenException(s"Unsupported decimal arithmetic operator: '$operator'")
}
private def numericCasting(
......@@ -1129,7 +1067,7 @@ object ScalarOperators {
case LONG_TYPE_INFO => "longValueExact"
case FLOAT_TYPE_INFO => "floatValue"
case DOUBLE_TYPE_INFO => "doubleValue"
case _ => throw new CodeGenException(s"Unsupported decimal casting type: '${targetType}'")
case _ => throw new CodeGenException(s"Unsupported decimal casting type: '$targetType'")
}
val resultTypeTerm = primitiveTypeTermForTypeInfo(resultType)
......
......@@ -30,12 +30,16 @@ import org.apache.flink.api.common.typeinfo.TypeInformation
* @param nullTerm boolean term that indicates if expression is null
* @param code code necessary to produce resultTerm and nullTerm
* @param resultType type of the resultTerm
* @param literal flag to indicate a constant expression do not reference input and can thus
* be used in the member area (e.g. as constructor parameter of a reusable
* instance)
*/
case class GeneratedExpression(
resultTerm: String,
nullTerm: String,
code: String,
resultType: TypeInformation[_])
resultTerm: String,
nullTerm: String,
code: String,
resultType: TypeInformation[_],
literal: Boolean = false)
object GeneratedExpression {
val ALWAYS_NULL = "true"
......
......@@ -20,12 +20,13 @@ package org.apache.flink.table.expressions
import org.apache.calcite.rex.RexNode
import org.apache.calcite.sql.fun.SqlStdOperatorTable
import org.apache.calcite.tools.RelBuilder
import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
import org.apache.flink.table.typeutils.TypeCheckUtils
import org.apache.flink.table.validate._
import scala.collection.JavaConversions._
case class Abs(child: Expression) extends UnaryExpression {
override private[flink] def resultType: TypeInformation[_] = child.resultType
......@@ -285,3 +286,61 @@ case class Pi() extends LeafExpression {
relBuilder.call(SqlStdOperatorTable.PI)
}
}
case class Rand(seed: Expression) extends Expression with InputTypeSpec {
def this() = this(null)
override private[flink] def children: Seq[Expression] = if (seed != null) {
seed :: Nil
} else {
Nil
}
override private[flink] def resultType: TypeInformation[_] = BasicTypeInfo.DOUBLE_TYPE_INFO
override private[flink] def expectedTypes: Seq[TypeInformation[_]] = if (seed != null) {
INT_TYPE_INFO :: Nil
} else {
Nil
}
override def toString: String = if (seed != null) {
s"rand($seed)"
} else {
s"rand()"
}
override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
relBuilder.call(SqlStdOperatorTable.RAND, children.map(_.toRexNode))
}
}
case class RandInteger(seed: Expression, bound: Expression) extends Expression with InputTypeSpec {
def this(bound: Expression) = this(null, bound)
override private[flink] def children: Seq[Expression] = if (seed != null) {
seed :: bound :: Nil
} else {
bound :: Nil
}
override private[flink] def resultType: TypeInformation[_] = BasicTypeInfo.INT_TYPE_INFO
override private[flink] def expectedTypes: Seq[TypeInformation[_]] = if (seed != null) {
INT_TYPE_INFO :: INT_TYPE_INFO :: Nil
} else {
INT_TYPE_INFO :: Nil
}
override def toString: String = if (seed != null) {
s"randInteger($seed, $bound)"
} else {
s"randInteger($bound)"
}
override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
relBuilder.call(SqlStdOperatorTable.RAND_INTEGER, children.map(_.toRexNode))
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.table.expressions
import org.apache.calcite.rex.RexNode
import org.apache.calcite.sql.fun.SqlStdOperatorTable
import org.apache.calcite.tools.RelBuilder
import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
import scala.collection.JavaConversions._
case class Rand(seed: Expression) extends Expression with InputTypeSpec {
def this() = this(null)
override private[flink] def children: Seq[Expression] = if (seed != null) {
seed :: Nil
} else {
Nil
}
override private[flink] def resultType: TypeInformation[_] = BasicTypeInfo.DOUBLE_TYPE_INFO
override private[flink] def expectedTypes: Seq[TypeInformation[_]] = if (seed != null) {
INT_TYPE_INFO :: Nil
} else {
Nil
}
override def toString: String = if (seed != null) {
s"rand($seed)"
} else {
s"rand()"
}
override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
relBuilder.call(SqlStdOperatorTable.RAND, children.map(_.toRexNode))
}
}
case class RandInteger(seed: Expression, bound: Expression) extends Expression with InputTypeSpec {
def this(bound: Expression) = this(null, bound)
override private[flink] def children: Seq[Expression] = if (seed != null) {
seed :: bound :: Nil
} else {
bound :: Nil
}
override private[flink] def resultType: TypeInformation[_] = BasicTypeInfo.INT_TYPE_INFO
override private[flink] def expectedTypes: Seq[TypeInformation[_]] = if (seed != null) {
INT_TYPE_INFO :: INT_TYPE_INFO :: Nil
} else {
INT_TYPE_INFO :: Nil
}
override def toString: String = if (seed != null) {
s"rand($seed, $bound)"
} else {
s"rand($bound)"
}
override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
relBuilder.call(SqlStdOperatorTable.RAND_INTEGER, children.map(_.toRexNode))
}
}
......@@ -222,6 +222,8 @@ object FunctionCatalog {
"sign" -> classOf[Sign],
"round" -> classOf[Round],
"pi" -> classOf[Pi],
"rand" -> classOf[Rand],
"randInteger" -> classOf[RandInteger],
// temporal functions
"extract" -> classOf[Extract],
......@@ -244,18 +246,9 @@ object FunctionCatalog {
"start" -> classOf[WindowStart],
"end" -> classOf[WindowEnd],
<<<<<<< HEAD
// ordering
"asc" -> classOf[Asc],
"desc" -> classOf[Desc]
=======
// extensions to support streaming query
"rowtime" -> classOf[RowTime],
"proctime" -> classOf[ProcTime],
"rand" -> classOf[Rand],
"rand_integer" -> classOf[RandInteger]
>>>>>>> support RAND and RAND_INTEGER on TableAPI, update the documentation
)
/**
......@@ -384,7 +377,6 @@ class BasicOperatorTable extends ReflectiveSqlOperatorTable {
SqlStdOperatorTable.QUARTER,
SqlStdOperatorTable.SCALAR_QUERY,
SqlStdOperatorTable.EXISTS,
<<<<<<< HEAD
SqlStdOperatorTable.SIN,
SqlStdOperatorTable.COS,
SqlStdOperatorTable.TAN,
......@@ -397,10 +389,8 @@ class BasicOperatorTable extends ReflectiveSqlOperatorTable {
SqlStdOperatorTable.SIGN,
SqlStdOperatorTable.ROUND,
SqlStdOperatorTable.PI,
=======
SqlStdOperatorTable.RAND,
SqlStdOperatorTable.RAND_INTEGER,
>>>>>>> [FLINK-6237] [table] support RAND and RAND_INTEGER on SQL
// EXTENSIONS
SqlStdOperatorTable.TUMBLE,
SqlStdOperatorTable.TUMBLE_START,
......
......@@ -18,39 +18,22 @@
package org.apache.flink.table.api.java.batch.sql;
import com.google.common.collect.Lists;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
<<<<<<< HEAD
<<<<<<< HEAD
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple5;
import org.apache.flink.api.java.typeutils.MapTypeInfo;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
=======
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple5;
>>>>>>> add the rand functions to FunctionGenerator class, and init random field in constructor
=======
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple5;
import org.apache.flink.api.java.typeutils.MapTypeInfo;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
>>>>>>> support non-constant field arguments for rand and rand_integer
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.java.BatchTableEnvironment;
import org.apache.flink.table.api.scala.batch.utils.TableProgramsCollectionTestBase;
import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
import org.apache.flink.types.Row;
<<<<<<< HEAD
=======
>>>>>>> add the rand functions to FunctionGenerator class, and init random field in constructor
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
......@@ -58,14 +41,7 @@ import org.junit.runners.Parameterized;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
<<<<<<< HEAD
import java.util.Map;
=======
import java.util.Random;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
>>>>>>> add the rand functions to FunctionGenerator class, and init random field in constructor
/**
* Integration tests for batch SQL.
......@@ -175,106 +151,6 @@ public class SqlITCase extends TableProgramsCollectionTestBase {
compareResultAsText(results, expected);
}
@Test
<<<<<<< HEAD
public void testMap() throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
List<Tuple2<Integer, Map<String, String>>> rows = new ArrayList<>();
rows.add(new Tuple2<>(1, Collections.singletonMap("foo", "bar")));
rows.add(new Tuple2<>(2, Collections.singletonMap("foo", "spam")));
TypeInformation<Tuple2<Integer, Map<String, String>>> ty = new TupleTypeInfo<>(
BasicTypeInfo.INT_TYPE_INFO,
new MapTypeInfo<>(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO));
DataSet<Tuple2<Integer, Map<String, String>>> ds1 = env.fromCollection(rows, ty);
tableEnv.registerDataSet("t1", ds1, "a, b");
String sqlQuery = "SELECT b['foo'] FROM t1";
=======
public void testRandAndRandInteger() throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
String sqlQuery = "SELECT i, RAND() AS r1, RAND(1) AS r2, " +
"RAND_INTEGER(10) AS r3, RAND_INTEGER(3, 10) AS r4 " +
"FROM (VALUES 1, 2, 3, 4, 5) AS t(i)";
>>>>>>> add the rand functions to FunctionGenerator class, and init random field in constructor
Table result = tableEnv.sql(sqlQuery);
DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
List<Row> results = resultSet.collect();
<<<<<<< HEAD
String expected = "bar\n" + "spam\n";
compareResultAsText(results, expected);
=======
Random expectedRandom1 = new Random(1);
Random expectedRandom2 = new Random(3);
assertEquals(5, results.size());
for (int i = 0; i < 5; ++i) {
Row row = results.get(i);
assertEquals(i + 1, row.getField(0));
double r1 = (double) row.getField(1);
double r2 = (double) row.getField(2);
int r3 = (int) row.getField(3);
int r4 = (int) row.getField(4);
assertTrue(0 <= r1 && r1 < 1);
assertEquals("" + expectedRandom1.nextDouble(), "" + r2);
assertTrue(0 <= r3 && r3 < 10);
assertEquals("" + expectedRandom2.nextInt(10), "" + r4);
}
<<<<<<< HEAD
>>>>>>> add the rand functions to FunctionGenerator class, and init random field in constructor
=======
}
@Test
public void testRandAndRandIntegerWithField() throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.getSmall3TupleDataSet(env);
tableEnv.registerDataSet("t", ds, "a, b, c");
String sqlQuery = "SELECT a, b, RAND(a) AS r1, " +
"RAND_INTEGER(a) AS r2, RAND_INTEGER(a, 10) AS r3, RAND_INTEGER(4, a) AS r4, " +
"RAND_INTEGER(a, CAST(b AS INT)) AS r5 FROM t ORDER BY a";
Table result = tableEnv.sql(sqlQuery);
DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
List<Row> results = resultSet.collect();
List<Integer> aValues = Lists.newArrayList(1, 2, 3);
List<Long> bValues = Lists.newArrayList(1L, 2L, 2L);
Random expectedRandom4 = new Random(4);
assertEquals(3, results.size());
for (int i = 0; i < 3; ++i) {
Row row = results.get(i);
int a = aValues.get(i);
Long b = bValues.get(i);
assertEquals(a, row.getField(0));
assertEquals(b, row.getField(1));
double expectedR1 = new Random(a).nextDouble();
double r1 = (double) row.getField(2);
int r2 = (int) row.getField(3);
int expectedR3 = new Random(a).nextInt(10);
int r3 = (int) row.getField(4);
int expectedR4 = expectedRandom4.nextInt(a);
int r4 = (int) row.getField(5);
int expectedR5 = new Random(a).nextInt(b.intValue());
int r5 = (int) row.getField(6);
assertEquals("" + expectedR1, "" + r1);
assertTrue(0 <= r2 && r2 < a);
assertEquals(expectedR3, r3);
assertEquals(expectedR4, r4);
assertEquals(expectedR5, r5);
}
}
@Test
public void testMap() throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
......@@ -298,6 +174,5 @@ public class SqlITCase extends TableProgramsCollectionTestBase {
List<Row> results = resultSet.collect();
String expected = "bar\n" + "spam\n";
compareResultAsText(results, expected);
>>>>>>> support non-constant field arguments for rand and rand_integer
}
}
......@@ -18,16 +18,11 @@
package org.apache.flink.table.api.java.stream.sql;
<<<<<<< HEAD
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple5;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
=======
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple5;
>>>>>>> add the rand functions to FunctionGenerator class, and init random field in constructor
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
......@@ -37,18 +32,11 @@ import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.api.java.stream.utils.StreamTestData;
import org.apache.flink.table.api.scala.stream.utils.StreamITCase;
import org.apache.flink.types.Row;
<<<<<<< HEAD
=======
>>>>>>> add the rand functions to FunctionGenerator class, and init random field in constructor
import org.junit.Test;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
/**
* Integration tests for streaming SQL.
......@@ -176,38 +164,4 @@ public class SqlITCase extends StreamingMultipleProgramsTestBase {
StreamITCase.compareWithList(expected);
}
@Test
public void testRandAndRandInteger() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
StreamITCase.clear();
String sqlQuery = "SELECT i, RAND() AS r1, RAND(1) AS r2, " +
"RAND_INTEGER(10) AS r3, RAND_INTEGER(3, 10) AS r4 " +
"FROM (VALUES 1, 2, 3, 4, 5) AS t(i)";
Table result = tableEnv.sql(sqlQuery);
DataStream<Row> resultSet = tableEnv.toDataStream(result, Row.class);
resultSet.addSink(new StreamITCase.StringSink());
env.execute();
Random expectedRandom1 = new Random(1);
Random expectedRandom2 = new Random(3);
assertEquals(5, StreamITCase.testResults().size());
for (int i = 0; i < 5; ++i) {
String row = StreamITCase.testResults().get(i).get();
String[] values = row.split(",");
assertEquals(Integer.valueOf(i + 1), Integer.valueOf(values[0]));
double r1 = Double.valueOf(values[1]);
double r2 = Double.valueOf(values[2]);
int r3 = Integer.valueOf(values[3]);
int r4 = Integer.valueOf(values[4]);
assertTrue(0 <= r1 && r1 < 1);
assertEquals("" + expectedRandom1.nextDouble(), "" + r2);
assertTrue(0 <= r3 && r3 < 10);
assertEquals("" + expectedRandom2.nextInt(10), "" + r4);
}
}
}
......@@ -1116,6 +1116,37 @@ class ScalarFunctionsTest extends ExpressionTestBase {
math.Pi.toString)
}
@Test
def testRandAndRandInteger(): Unit = {
val random1 = new java.util.Random(1)
testAllApis(
rand(1),
"rand(1)",
"RAND(1)",
random1.nextDouble().toString)
val random2 = new java.util.Random(3)
testAllApis(
rand('f7),
"rand(f7)",
"RAND(f7)",
random2.nextDouble().toString)
val random3 = new java.util.Random(1)
testAllApis(
randInteger(1, 10),
"randInteger(1, 10)",
"RAND_INTEGER(1, 10)",
random3.nextInt(10).toString)
val random4 = new java.util.Random(3)
testAllApis(
randInteger('f7, 'f4.cast(Types.INT)),
"randInteger(f7, f4.cast(INT))",
"RAND_INTEGER(f7, CAST(f4 AS INT))",
random4.nextInt(44).toString)
}
// ----------------------------------------------------------------------------------------------
// Temporal functions
// ----------------------------------------------------------------------------------------------
......@@ -1520,100 +1551,6 @@ class ScalarFunctionsTest extends ExpressionTestBase {
"true")
}
@Test(expected = classOf[ValidationException])
def testInvalidTableApiRand1(): Unit = {
// Must fail. Parameter of rand must be an Integer not a Double.
testTableApi(rand(2.0.toExpr), "FAIL", "FAIL")
}
@Test(expected = classOf[ValidationException])
def testInvalidTableApiRand2(): Unit = {
// Must fail. The parameter number of rand must be less than 2.
testTableApi(rand(1, 10), "FAIL", "FAIL")
}
@Test(expected = classOf[ValidationException])
def testInvalidTableApiStringExpressionRand1(): Unit = {
// Must fail. Parameter of rand must be an Integer not a Double.
testTableApi("FAIL", "rand(2.0)", "FAIL")
}
@Test(expected = classOf[ValidationException])
def testInvalidTableApiStringExpressionRand2(): Unit = {
// Must fail. The parameter number of rand must be less than 2.
testTableApi("FAIL", "rand(1, 10)", "FAIL")
}
@Test(expected = classOf[ValidationException])
def testInvalidSqlRand(): Unit = {
// Must fail. The parameter number of rand must be less than 2.
testSqlApi("RAND(1, 10)", "FAIL")
}
@Test
def testRand(): Unit = {
val random1 = new java.util.Random(1)
testAllApis(
rand(1),
"rand(1)",
"RAND(1)",
"" + random1.nextDouble())
val random2 = new java.util.Random(3)
testAllApis(
rand('f7),
"rand(f7)",
"RAND(f7)",
"" + random2.nextDouble())
}
@Test(expected = classOf[ValidationException])
def testInvalidTableApiRandInteger1(): Unit = {
// Must fail. Parameter of rand_integer must be an Integer not a Double.
testTableApi(rand_integer(2.0.toExpr), "FAIL", "FAIL")
}
@Test(expected = classOf[ValidationException])
def testInvalidTableApiRandInteger2(): Unit = {
// Must fail. The parameter number of rand_integer must be less than 3 and greater than 0.
testTableApi(rand_integer(), "FAIL", "FAIL")
}
@Test(expected = classOf[ValidationException])
def testInvalidTableApiStringExpressionRandInteger1(): Unit = {
// Must fail. Parameter of rand_integer must be an Integer not a Double.
testTableApi("FAIL", "rand_integer(2.0)", "FAIL")
}
@Test(expected = classOf[ValidationException])
def testInvalidTableApiStringExpressionRandInteger2(): Unit = {
// Must fail. The parameter number of rand_integer must be less than 3 and greater than 0.
testTableApi("FAIL", "rand_integer()", "FAIL")
}
@Test(expected = classOf[ValidationException])
def testInvalidSqlRandInteger(): Unit = {
// Must fail. The parameter number of rand_integer must be less than 3 and greater than 0.
testSqlApi("RAND_INTEGER()", "FAIL")
}
@Test
def testRandInteger(): Unit = {
val random1 = new java.util.Random(1)
testAllApis(
rand_integer(1, 10),
"rand_integer(1, 10)",
"RAND_INTEGER(1, 10)",
"" + random1.nextInt(10))
val random2 = new java.util.Random(3)
testAllApis(
rand_integer('f7, 'f4.cast(Types.INT)),
"rand_integer(f7, f4.cast(INT))",
"RAND_INTEGER(f7, CAST(f4 AS INT))",
"" + random2.nextInt(44))
}
// ----------------------------------------------------------------------------------------------
def testData = {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册