提交 6979d060 编写于 作者: T twalthr

[FLINK-3580] [table] Reintroduce Date/Time and implement scalar functions for it

This closes #2150.
上级 e64237d8
......@@ -752,7 +752,7 @@ suffixed = cast | as | aggregation | nullCheck | evaluate | functionCall ;
cast = composite , ".cast(" , dataType , ")" ;
dataType = "BYTE" | "SHORT" | "INT" | "LONG" | "FLOAT" | "DOUBLE" | "BOOL" | "BOOLEAN" | "STRING" | "DATE" | "DECIMAL";
dataType = "BYTE" | "SHORT" | "INT" | "LONG" | "FLOAT" | "DOUBLE" | "BOOL" | "BOOLEAN" | "STRING" | "DECIMAL" | "DATE" | "TIME" | "TIMESTAMP";
as = composite , ".as(" , fieldReference , ")" ;
......@@ -775,6 +775,8 @@ column names and function names follow Java identifier syntax. Expressions speci
If working with exact numeric values or large decimals is required, the Table API also supports Java's BigDecimal type. In the Scala Table API decimals can be defined by `BigDecimal("123456")` and in Java by appending a "p" for precise e.g. `123456p`.
In order to work with temporal values the Table API supports Java SQL's Date, Time, and Timestamp types. In the Scala Table API literals can be defined by using `java.sql.Date.valueOf("2016-06-27")`, `java.sql.Time.valueOf("10:10:42")`, or `java.sql.Timestamp.valueOf("2016-06-27 10:10:42.123")`. The Java and Scala Table API also support calling `"2016-06-27".toDate()`, `"10:10:42".toTime()`, and `"2016-06-27 10:10:42.123".toTimestamp()` for converting Strings into temporal types. *Note:* Since Java's temporal SQL types are time zone dependent, please make sure that the Flink Client and all TaskManagers use the same time zone.
{% top %}
......@@ -826,7 +828,9 @@ The current version supports selection (filter), projection, inner equi-joins, g
Among others, the following SQL features are not supported, yet:
- Time data types (`DATE`, `TIME`, `TIMESTAMP`, `INTERVAL`) and `DECIMAL` types
- Time interval data type (`INTERVAL`)
- Timestamps are limited to milliseconds precision
- Advanced types such as generic types, composite types (e.g. POJOs), and arrays within rows
- Distinct aggregates (e.g., `COUNT(DISTINCT name)`)
- Non-equi joins and Cartesian products
- Result selection by order position (`ORDER BY OFFSET FETCH`)
......
......@@ -17,9 +17,11 @@
*/
package org.apache.flink.api.scala.table
import java.sql.{Timestamp, Time, Date}
import scala.language.implicitConversions
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.common.typeinfo.{SqlTimeTypeInfo, TypeInformation}
import org.apache.flink.api.table.expressions._
/**
......@@ -201,6 +203,21 @@ trait ImplicitExpressionOperations {
* e.g. "A+" matches all Strings that consist of at least one A
*/
def similar(pattern: Expression) = Similar(expr, pattern)
/**
* Parses a date String in the form "yy-mm-dd" to a SQL Date.
*/
def toDate = Cast(expr, SqlTimeTypeInfo.DATE)
/**
* Parses a time String in the form "hh:mm:ss" to a SQL Time.
*/
def toTime = Cast(expr, SqlTimeTypeInfo.TIME)
/**
* Parses a timestamp String in the form "yy-mm-dd hh:mm:ss.fff" to a SQL Timestamp.
*/
def toTimestamp = Cast(expr, SqlTimeTypeInfo.TIMESTAMP)
}
/**
......@@ -258,6 +275,19 @@ trait ImplicitExpressionConversions {
def expr = Literal(scalaDecimal.bigDecimal)
}
implicit class LiteralSqlDateExpression(sqlDate: Date) extends ImplicitExpressionOperations {
def expr = Literal(sqlDate)
}
implicit class LiteralSqlTimeExpression(sqlTime: Time) extends ImplicitExpressionOperations {
def expr = Literal(sqlTime)
}
implicit class LiteralSqlTimestampExpression(sqlTimestamp: Timestamp)
extends ImplicitExpressionOperations {
def expr = Literal(sqlTimestamp)
}
implicit def symbol2FieldExpression(sym: Symbol): Expression = UnresolvedFieldReference(sym.name)
implicit def byte2Literal(b: Byte): Expression = Literal(b)
implicit def short2Literal(s: Short): Expression = Literal(s)
......@@ -270,4 +300,7 @@ trait ImplicitExpressionConversions {
implicit def javaDec2Literal(javaDec: java.math.BigDecimal): Expression = Literal(javaDec)
implicit def scalaDec2Literal(scalaDec: scala.math.BigDecimal): Expression =
Literal(scalaDec.bigDecimal)
implicit def sqlDate2Literal(sqlDate: Date): Expression = Literal(sqlDate)
implicit def sqlTime2Literal(sqlTime: Time): Expression = Literal(sqlTime)
implicit def sqlTimestamp2Literal(sqlTimestamp: Timestamp): Expression = Literal(sqlTimestamp)
}
......@@ -18,12 +18,13 @@
package org.apache.flink.api.table.codegen
import java.lang.reflect.Field
import java.lang.reflect.{Field, Method}
import java.util.concurrent.atomic.AtomicInteger
import org.apache.calcite.util.BuiltInMethod
import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo._
import org.apache.flink.api.common.typeinfo.{FractionalTypeInfo, TypeInformation}
import org.apache.flink.api.common.typeinfo.{FractionalTypeInfo, SqlTimeTypeInfo, TypeInformation}
import org.apache.flink.api.common.typeutils.CompositeType
import org.apache.flink.api.java.typeutils.{PojoTypeInfo, TupleTypeInfo, TypeExtractor}
import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
......@@ -64,6 +65,11 @@ object CodeGenUtils {
case BOOLEAN_PRIMITIVE_ARRAY_TYPE_INFO => "boolean[]"
case CHAR_PRIMITIVE_ARRAY_TYPE_INFO => "char[]"
// internal primitive representation of Date/Time/Timestamp
case SqlTimeTypeInfo.DATE => "int"
case SqlTimeTypeInfo.TIME => "int"
case SqlTimeTypeInfo.TIMESTAMP => "long"
case _ =>
tpe.getTypeClass.getCanonicalName
}
......@@ -94,6 +100,8 @@ object CodeGenUtils {
case BOOLEAN_TYPE_INFO => "false"
case STRING_TYPE_INFO => "\"\""
case CHAR_TYPE_INFO => "'\\0'"
case SqlTimeTypeInfo.DATE | SqlTimeTypeInfo.TIME | SqlTimeTypeInfo.TIMESTAMP => "-1"
case _ => "null"
}
......@@ -102,6 +110,29 @@ object CodeGenUtils {
case _ => "long"
}
def qualifyMethod(method: Method): String =
method.getDeclaringClass.getCanonicalName + "." + method.getName
def internalToTemporalCode(resultType: TypeInformation[_], resultTerm: String) =
resultType match {
case SqlTimeTypeInfo.DATE =>
s"${qualifyMethod(BuiltInMethod.INTERNAL_TO_DATE.method)}($resultTerm)"
case SqlTimeTypeInfo.TIME =>
s"${qualifyMethod(BuiltInMethod.INTERNAL_TO_TIME.method)}($resultTerm)"
case SqlTimeTypeInfo.TIMESTAMP =>
s"${qualifyMethod(BuiltInMethod.INTERNAL_TO_TIMESTAMP.method)}($resultTerm)"
}
def temporalToInternalCode(resultType: TypeInformation[_], resultTerm: String) =
resultType match {
case SqlTimeTypeInfo.DATE =>
s"${qualifyMethod(BuiltInMethod.DATE_TO_INT.method)}($resultTerm)"
case SqlTimeTypeInfo.TIME =>
s"${qualifyMethod(BuiltInMethod.TIME_TO_INT.method)}($resultTerm)"
case SqlTimeTypeInfo.TIMESTAMP =>
s"${qualifyMethod(BuiltInMethod.TIMESTAMP_TO_LONG.method)}($resultTerm)"
}
// ----------------------------------------------------------------------------------------------
def requireNumeric(genExpr: GeneratedExpression) =
......
......@@ -25,7 +25,7 @@ import org.apache.calcite.sql.`type`.SqlTypeName._
import org.apache.calcite.sql.fun.SqlStdOperatorTable._
import org.apache.calcite.sql.{SqlLiteral, SqlOperator}
import org.apache.flink.api.common.functions.{FlatJoinFunction, FlatMapFunction, Function, MapFunction}
import org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation}
import org.apache.flink.api.common.typeinfo.{AtomicType, SqlTimeTypeInfo, TypeInformation}
import org.apache.flink.api.common.typeutils.CompositeType
import org.apache.flink.api.java.typeutils.{PojoTypeInfo, TupleTypeInfo}
import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
......@@ -35,7 +35,7 @@ import org.apache.flink.api.table.codegen.Indenter.toISC
import org.apache.flink.api.table.codegen.calls.ScalarFunctions
import org.apache.flink.api.table.codegen.calls.ScalarOperators._
import org.apache.flink.api.table.typeutils.RowTypeInfo
import org.apache.flink.api.table.typeutils.TypeCheckUtils.{isNumeric, isString}
import org.apache.flink.api.table.typeutils.TypeCheckUtils.{isNumeric, isString, isTemporal}
import org.apache.flink.api.table.typeutils.TypeConverter.sqlTypeToTypeInfo
import scala.collection.JavaConversions._
......@@ -335,12 +335,13 @@ class CodeGenerator(
}
val returnTypeTerm = boxedTypeTermForTypeInfo(returnType)
val boxedFieldExprs = fieldExprs.map(generateOutputBoxing)
// generate result expression
returnType match {
case ri: RowTypeInfo =>
addReusableOutRecord(ri)
val resultSetters: String = fieldExprs.zipWithIndex map {
val resultSetters: String = boxedFieldExprs.zipWithIndex map {
case (fieldExpr, i) =>
if (nullCheck) {
s"""
......@@ -365,7 +366,7 @@ class CodeGenerator(
case pt: PojoTypeInfo[_] =>
addReusableOutRecord(pt)
val resultSetters: String = fieldExprs.zip(resultFieldNames) map {
val resultSetters: String = boxedFieldExprs.zip(resultFieldNames) map {
case (fieldExpr, fieldName) =>
val accessor = getFieldAccessor(pt.getTypeClass, fieldName)
......@@ -436,7 +437,7 @@ class CodeGenerator(
case tup: TupleTypeInfo[_] =>
addReusableOutRecord(tup)
val resultSetters: String = fieldExprs.zipWithIndex map {
val resultSetters: String = boxedFieldExprs.zipWithIndex map {
case (fieldExpr, i) =>
val fieldName = "f" + i
if (nullCheck) {
......@@ -461,12 +462,12 @@ class CodeGenerator(
GeneratedExpression(outRecordTerm, "false", resultSetters, returnType)
case cc: CaseClassTypeInfo[_] =>
val fieldCodes: String = fieldExprs.map(_.code).mkString("\n")
val constructorParams: String = fieldExprs.map(_.resultTerm).mkString(", ")
val fieldCodes: String = boxedFieldExprs.map(_.code).mkString("\n")
val constructorParams: String = boxedFieldExprs.map(_.resultTerm).mkString(", ")
val resultTerm = newName(outRecordTerm)
val nullCheckCode = if (nullCheck) {
fieldExprs map { (fieldExpr) =>
boxedFieldExprs map { (fieldExpr) =>
s"""
|if (${fieldExpr.nullTerm}) {
| throw new NullPointerException("Null result cannot be stored in a Case Class.");
......@@ -487,7 +488,7 @@ class CodeGenerator(
GeneratedExpression(resultTerm, "false", resultCode, returnType)
case a: AtomicType[_] =>
val fieldExpr = fieldExprs.head
val fieldExpr = boxedFieldExprs.head
val nullCheckCode = if (nullCheck) {
s"""
|if (${fieldExpr.nullTerm}) {
......@@ -611,7 +612,16 @@ class CodeGenerator(
}
generateNonNullLiteral(resultType, symbolOrdinal.toString)
case _ => ??? // TODO more types
case DATE =>
generateNonNullLiteral(resultType, value.toString)
case TIME =>
generateNonNullLiteral(resultType, value.toString)
case TIMESTAMP =>
generateNonNullLiteral(resultType, value.toString + "L")
case t@_ =>
throw new CodeGenException(s"Type not supported: $t")
}
}
......@@ -795,11 +805,11 @@ class CodeGenerator(
// if input has been used before, we can reuse the code that
// has already been generated
val inputExpr = reusableInputUnboxingExprs.get((inputTerm, index)) match {
// input access and boxing has already been generated
// input access and unboxing has already been generated
case Some(expr) =>
expr
// generate input access and boxing if necessary
// generate input access and unboxing if necessary
case None =>
val expr = if (nullableInput) {
generateNullableInputFieldAccess(inputType, inputTerm, index)
......@@ -936,9 +946,16 @@ class CodeGenerator(
val resultTypeTerm = primitiveTypeTermForTypeInfo(literalType)
val defaultValue = primitiveDefaultValue(literalType)
// explicit unboxing
val unboxedLiteralCode = if (isTemporal(literalType)) {
temporalToInternalCode(literalType, literalCode)
} else {
literalCode
}
val wrappedCode = if (nullCheck && !isReference(literalType)) {
s"""
|$tmpTypeTerm $tmpTerm = $literalCode;
|$tmpTypeTerm $tmpTerm = $unboxedLiteralCode;
|boolean $nullTerm = $tmpTerm == null;
|$resultTypeTerm $resultTerm;
|if ($nullTerm) {
......@@ -950,12 +967,12 @@ class CodeGenerator(
|""".stripMargin
} else if (nullCheck) {
s"""
|$resultTypeTerm $resultTerm = $literalCode;
|$resultTypeTerm $resultTerm = $unboxedLiteralCode;
|boolean $nullTerm = $literalCode == null;
|""".stripMargin
} else {
s"""
|$resultTypeTerm $resultTerm = $literalCode;
|$resultTypeTerm $resultTerm = $unboxedLiteralCode;
|""".stripMargin
}
......@@ -1001,6 +1018,39 @@ class CodeGenerator(
}
}
private def generateOutputBoxing(expr: GeneratedExpression): GeneratedExpression = {
expr.resultType match {
// convert internal date/time/timestamp to java.sql.* objects
case SqlTimeTypeInfo.DATE | SqlTimeTypeInfo.TIME | SqlTimeTypeInfo.TIMESTAMP =>
val resultTerm = newName("result")
val resultTypeTerm = boxedTypeTermForTypeInfo(expr.resultType)
val convMethod = internalToTemporalCode(expr.resultType, expr.resultTerm)
val resultCode = if (nullCheck) {
s"""
|${expr.code}
|$resultTypeTerm $resultTerm;
|if (${expr.nullTerm}) {
| $resultTerm = null;
|}
|else {
| $resultTerm = $convMethod;
|}
|""".stripMargin
} else {
s"""
|${expr.code}
|$resultTypeTerm $resultTerm = $convMethod;
|""".stripMargin
}
GeneratedExpression(resultTerm, expr.nullTerm, resultCode, expr.resultType)
// other types are autoboxed or need no boxing
case _ => expr
}
}
// ----------------------------------------------------------------------------------------------
// Reusable code snippets
// ----------------------------------------------------------------------------------------------
......
......@@ -17,11 +17,13 @@
*/
package org.apache.flink.api.table.codegen.calls
import org.apache.calcite.avatica.util.DateTimeUtils
import org.apache.calcite.util.BuiltInMethod
import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
import org.apache.flink.api.common.typeinfo.{NumericTypeInfo, TypeInformation}
import org.apache.flink.api.common.typeinfo.{NumericTypeInfo, SqlTimeTypeInfo, TypeInformation}
import org.apache.flink.api.table.codegen.CodeGenUtils._
import org.apache.flink.api.table.codegen.{CodeGenException, GeneratedExpression}
import org.apache.flink.api.table.typeutils.TypeCheckUtils.{isBoolean, isComparable, isDecimal, isNumeric}
import org.apache.flink.api.table.typeutils.TypeCheckUtils._
object ScalarOperators {
......@@ -85,6 +87,10 @@ object ScalarOperators {
if (isNumeric(left.resultType) && isNumeric(right.resultType)) {
generateComparison("==", nullCheck, left, right)
}
// temporal types
else if (isTemporal(left.resultType) && left.resultType == right.resultType) {
generateComparison("==", nullCheck, left, right)
}
// comparable types of same type
else if (isComparable(left.resultType) && left.resultType == right.resultType) {
generateComparison("==", nullCheck, left, right)
......@@ -115,11 +121,15 @@ object ScalarOperators {
if (isNumeric(left.resultType) && isNumeric(right.resultType)) {
generateComparison("!=", nullCheck, left, right)
}
// temporal types
else if (isTemporal(left.resultType) && left.resultType == right.resultType) {
generateComparison("!=", nullCheck, left, right)
}
// comparable types
else if (isComparable(left.resultType) && left.resultType == right.resultType) {
generateComparison("!=", nullCheck, left, right)
}
// non comparable types
// non-comparable types
else {
generateOperatorIfNotNull(nullCheck, BOOLEAN_TYPE_INFO, left, right) {
if (isReference(left)) {
......@@ -164,6 +174,10 @@ object ScalarOperators {
else if (isNumeric(left.resultType) && isNumeric(right.resultType)) {
(leftTerm, rightTerm) => s"$leftTerm $operator $rightTerm"
}
// both sides are temporal of same type
else if (isTemporal(left.resultType) && left.resultType == right.resultType) {
(leftTerm, rightTerm) => s"$leftTerm $operator $rightTerm"
}
// both sides are boolean
else if (isBoolean(left.resultType) && left.resultType == right.resultType) {
operator match {
......@@ -379,7 +393,13 @@ object ScalarOperators {
case (fromTp, toTp) if fromTp == toTp =>
operand
// * -> String
// Date/Time/Timestamp -> String
case (dtt: SqlTimeTypeInfo[_], STRING_TYPE_INFO) =>
generateUnaryOperatorIfNotNull(nullCheck, targetType, operand) {
(operandTerm) => s"""${internalToTemporalCode(dtt, operandTerm)}.toString()"""
}
// * (not Date/Time/Timestamp) -> String
case (_, STRING_TYPE_INFO) =>
generateUnaryOperatorIfNotNull(nullCheck, targetType, operand) {
(operandTerm) => s""" "" + $operandTerm"""
......@@ -404,6 +424,25 @@ object ScalarOperators {
(operandTerm) => s"new $wrapperClass($operandTerm)"
}
// String -> Date
case (STRING_TYPE_INFO, SqlTimeTypeInfo.DATE) =>
generateUnaryOperatorIfNotNull(nullCheck, targetType, operand) {
(operandTerm) => s"${qualifyMethod(BuiltInMethod.STRING_TO_DATE.method)}($operandTerm)"
}
// String -> Time
case (STRING_TYPE_INFO, SqlTimeTypeInfo.TIME) =>
generateUnaryOperatorIfNotNull(nullCheck, targetType, operand) {
(operandTerm) => s"${qualifyMethod(BuiltInMethod.STRING_TO_TIME.method)}($operandTerm)"
}
// String -> Timestamp
case (STRING_TYPE_INFO, SqlTimeTypeInfo.TIMESTAMP) =>
generateUnaryOperatorIfNotNull(nullCheck, targetType, operand) {
(operandTerm) => s"${qualifyMethod(BuiltInMethod.STRING_TO_TIMESTAMP.method)}" +
s"($operandTerm)"
}
// Boolean -> NUMERIC TYPE
case (BOOLEAN_TYPE_INFO, nti: NumericTypeInfo[_]) =>
val targetTypeTerm = primitiveTypeTermForTypeInfo(nti)
......@@ -438,6 +477,57 @@ object ScalarOperators {
(operandTerm) => s"${operandCasting(operandTerm)}"
}
// Date -> Timestamp
case (SqlTimeTypeInfo.DATE, SqlTimeTypeInfo.TIMESTAMP) =>
generateUnaryOperatorIfNotNull(nullCheck, targetType, operand) {
(operandTerm) =>
s"$operandTerm * ${classOf[DateTimeUtils].getCanonicalName}.MILLIS_PER_DAY"
}
// Timestamp -> Date
case (SqlTimeTypeInfo.TIMESTAMP, SqlTimeTypeInfo.DATE) =>
val targetTypeTerm = primitiveTypeTermForTypeInfo(targetType)
generateUnaryOperatorIfNotNull(nullCheck, targetType, operand) {
(operandTerm) =>
s"($targetTypeTerm) ($operandTerm / " +
s"${classOf[DateTimeUtils].getCanonicalName}.MILLIS_PER_DAY)"
}
// Time -> Timestamp
case (SqlTimeTypeInfo.TIME, SqlTimeTypeInfo.TIMESTAMP) =>
generateUnaryOperatorIfNotNull(nullCheck, targetType, operand) {
(operandTerm) => s"$operandTerm"
}
// Timestamp -> Time
case (SqlTimeTypeInfo.TIMESTAMP, SqlTimeTypeInfo.TIME) =>
val targetTypeTerm = primitiveTypeTermForTypeInfo(targetType)
generateUnaryOperatorIfNotNull(nullCheck, targetType, operand) {
(operandTerm) =>
s"($targetTypeTerm) ($operandTerm % " +
s"${classOf[DateTimeUtils].getCanonicalName}.MILLIS_PER_DAY)"
}
// Date -> Integer, Time -> Integer
case (SqlTimeTypeInfo.DATE, INT_TYPE_INFO) | (SqlTimeTypeInfo.TIME, INT_TYPE_INFO) =>
internalExprCasting(operand, INT_TYPE_INFO)
// Timestamp -> Long
case (SqlTimeTypeInfo.TIMESTAMP, LONG_TYPE_INFO) =>
internalExprCasting(operand, LONG_TYPE_INFO)
// Integer -> Date
case (INT_TYPE_INFO, SqlTimeTypeInfo.DATE) =>
internalExprCasting(operand, SqlTimeTypeInfo.DATE)
// Integer -> Time
case (INT_TYPE_INFO, SqlTimeTypeInfo.TIME) =>
internalExprCasting(operand, SqlTimeTypeInfo.TIME)
// Long -> Timestamp
case (LONG_TYPE_INFO, SqlTimeTypeInfo.TIMESTAMP) =>
internalExprCasting(operand, SqlTimeTypeInfo.TIMESTAMP)
case (from, to) =>
throw new CodeGenException(s"Unsupported cast from '$from' to '$to'.")
}
......@@ -576,6 +666,13 @@ object ScalarOperators {
GeneratedExpression(resultTerm, nullTerm, resultCode, resultType)
}
private def internalExprCasting(
expr: GeneratedExpression,
typeInfo: TypeInformation[_])
: GeneratedExpression = {
GeneratedExpression(expr.resultTerm, expr.nullTerm, expr.code, typeInfo)
}
private def arithOpToDecMethod(operator: String): String = operator match {
case "+" => "add"
case "-" => "subtract"
......
......@@ -17,7 +17,7 @@
*/
package org.apache.flink.api.table.expressions
import org.apache.flink.api.common.typeinfo.{TypeInformation, BasicTypeInfo}
import org.apache.flink.api.common.typeinfo.{SqlTimeTypeInfo, TypeInformation, BasicTypeInfo}
import org.apache.flink.api.table.ExpressionParserException
import scala.util.parsing.combinator.{JavaTokenParsers, PackratParsers}
......@@ -54,6 +54,9 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers {
lazy val EVAL: Keyword = Keyword("eval")
lazy val ASC: Keyword = Keyword("asc")
lazy val DESC: Keyword = Keyword("desc")
lazy val TO_DATE: Keyword = Keyword("toDate")
lazy val TO_TIME: Keyword = Keyword("toTime")
lazy val TO_TIMESTAMP: Keyword = Keyword("toTimestamp")
def functionIdent: ExpressionParser.Parser[String] =
not(AS) ~ not(COUNT) ~ not(AVG) ~ not(MIN) ~ not(MAX) ~
......@@ -71,7 +74,9 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers {
"DOUBLE" ^^ { ti => BasicTypeInfo.DOUBLE_TYPE_INFO } |
("BOOL" | "BOOLEAN" ) ^^ { ti => BasicTypeInfo.BOOLEAN_TYPE_INFO } |
"STRING" ^^ { ti => BasicTypeInfo.STRING_TYPE_INFO } |
"DATE" ^^ { ti => BasicTypeInfo.DATE_TYPE_INFO } |
"DATE" ^^ { ti => SqlTimeTypeInfo.DATE.asInstanceOf[TypeInformation[_]] } |
"TIMESTAMP" ^^ { ti => SqlTimeTypeInfo.TIMESTAMP } |
"TIME" ^^ { ti => SqlTimeTypeInfo.TIME } |
"DECIMAL" ^^ { ti => BasicTypeInfo.BIG_DEC_TYPE_INFO }
// Literals
......@@ -127,13 +132,6 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers {
lazy val suffixIsNotNull: PackratParser[Expression] =
composite <~ "." ~ IS_NOT_NULL ~ opt("()") ^^ { e => IsNotNull(e) }
lazy val suffixAsc : PackratParser[Expression] =
(atom <~ ".asc" ^^ { e => Asc(e) }) | (atom <~ ASC ^^ { e => Asc(e) })
lazy val suffixDesc : PackratParser[Expression] =
(atom <~ ".desc" ^^ { e => Desc(e) }) | (atom <~ DESC ^^ { e => Desc(e) })
lazy val suffixSum: PackratParser[Expression] =
composite <~ "." ~ SUM ~ opt("()") ^^ { e => Sum(e) }
......@@ -159,16 +157,6 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers {
case e ~ _ ~ _ ~ _ ~ target ~ _ => Alias(e, target.name)
}
lazy val suffixEval: PackratParser[Expression] =
composite ~ "." ~ EVAL ~ "(" ~ expression ~ "," ~ expression ~ ")" ^^ {
case condition ~ _ ~ _ ~ _ ~ ifTrue ~ _ ~ ifFalse ~ _ => Eval(condition, ifTrue, ifFalse)
}
lazy val suffixFunctionCall =
composite ~ "." ~ functionIdent ~ "(" ~ repsep(expression, ",") ~ ")" ^^ {
case operand ~ _ ~ name ~ _ ~ args ~ _ => Call(name.toUpperCase, operand :: args)
}
lazy val suffixTrim = composite ~ ".trim(" ~ ("BOTH" | "LEADING" | "TRAILING") ~ "," ~
expression ~ ")" ^^ {
case operand ~ _ ~ trimType ~ _ ~ trimCharacter ~ _ =>
......@@ -185,10 +173,35 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers {
Trim(TrimConstants.TRIM_BOTH, TrimConstants.TRIM_DEFAULT_CHAR, e)
}
lazy val suffixEval: PackratParser[Expression] =
composite ~ "." ~ EVAL ~ "(" ~ expression ~ "," ~ expression ~ ")" ^^ {
case condition ~ _ ~ _ ~ _ ~ ifTrue ~ _ ~ ifFalse ~ _ => Eval(condition, ifTrue, ifFalse)
}
lazy val suffixFunctionCall =
composite ~ "." ~ functionIdent ~ "(" ~ repsep(expression, ",") ~ ")" ^^ {
case operand ~ _ ~ name ~ _ ~ args ~ _ => Call(name.toUpperCase, operand :: args)
}
lazy val suffixAsc : PackratParser[Expression] =
atom <~ "." ~ ASC ~ opt("()") ^^ { e => Asc(e) }
lazy val suffixDesc : PackratParser[Expression] =
atom <~ "." ~ DESC ~ opt("()") ^^ { e => Desc(e) }
lazy val suffixToDate: PackratParser[Expression] =
composite <~ "." ~ TO_DATE ~ opt("()") ^^ { e => Cast(e, SqlTimeTypeInfo.DATE) }
lazy val suffixToTimestamp: PackratParser[Expression] =
composite <~ "." ~ TO_TIMESTAMP ~ opt("()") ^^ { e => Cast(e, SqlTimeTypeInfo.TIMESTAMP) }
lazy val suffixToTime: PackratParser[Expression] =
composite <~ "." ~ TO_TIME ~ opt("()") ^^ { e => Cast(e, SqlTimeTypeInfo.TIME) }
lazy val suffixed: PackratParser[Expression] =
suffixIsNull | suffixIsNotNull | suffixSum | suffixMin | suffixMax | suffixCount | suffixAvg |
suffixCast | suffixAs | suffixTrim | suffixTrimWithoutArgs | suffixEval | suffixFunctionCall |
suffixAsc | suffixDesc
suffixAsc | suffixDesc | suffixToDate | suffixToTimestamp | suffixToTime
// prefix operators
......
......@@ -42,8 +42,8 @@ abstract class BinaryComparison extends BinaryExpression {
case (lType, rType) if isComparable(lType) && lType == rType => ValidationSuccess
case (lType, rType) =>
ValidationFailure(
s"Comparison is only supported for numeric types and comparable types of same type," +
s"got $lType and $rType")
s"Comparison is only supported for numeric types and " +
s"comparable types of same type, got $lType and $rType")
}
}
......
......@@ -17,12 +17,13 @@
*/
package org.apache.flink.api.table.expressions
import java.util.Date
import java.sql.{Timestamp, Time, Date}
import java.util.{TimeZone, Calendar}
import org.apache.calcite.rex.RexNode
import org.apache.calcite.sql.`type`.SqlTypeName
import org.apache.calcite.tools.RelBuilder
import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
import org.apache.flink.api.common.typeinfo.{SqlTimeTypeInfo, BasicTypeInfo, TypeInformation}
import org.apache.flink.api.table.typeutils.TypeConverter
object Literal {
......@@ -35,10 +36,12 @@ object Literal {
case f: Float => Literal(f, BasicTypeInfo.FLOAT_TYPE_INFO)
case str: String => Literal(str, BasicTypeInfo.STRING_TYPE_INFO)
case bool: Boolean => Literal(bool, BasicTypeInfo.BOOLEAN_TYPE_INFO)
case date: Date => Literal(date, BasicTypeInfo.DATE_TYPE_INFO)
case javaDec: java.math.BigDecimal => Literal(javaDec, BasicTypeInfo.BIG_DEC_TYPE_INFO)
case scalaDec: scala.math.BigDecimal =>
Literal(scalaDec.bigDecimal, BasicTypeInfo.BIG_DEC_TYPE_INFO)
case sqlDate: Date => Literal(sqlDate, SqlTimeTypeInfo.DATE)
case sqlTime: Time => Literal(sqlTime, SqlTimeTypeInfo.TIME)
case sqlTimestamp: Timestamp => Literal(sqlTimestamp, SqlTimeTypeInfo.TIMESTAMP)
}
}
......@@ -51,9 +54,27 @@ case class Literal(value: Any, resultType: TypeInformation[_]) extends LeafExpre
val bigDecValue = value.asInstanceOf[java.math.BigDecimal]
val decType = relBuilder.getTypeFactory.createSqlType(SqlTypeName.DECIMAL)
relBuilder.getRexBuilder.makeExactLiteral(bigDecValue, decType)
// date/time
case SqlTimeTypeInfo.DATE =>
relBuilder.getRexBuilder.makeDateLiteral(dateToCalendar)
case SqlTimeTypeInfo.TIME =>
relBuilder.getRexBuilder.makeTimeLiteral(dateToCalendar, 0)
case SqlTimeTypeInfo.TIMESTAMP =>
relBuilder.getRexBuilder.makeTimestampLiteral(dateToCalendar, 3)
case _ => relBuilder.literal(value)
}
}
private def dateToCalendar: Calendar = {
val date = value.asInstanceOf[java.util.Date]
val cal = Calendar.getInstance()
val t = date.getTime
// according to Calcite's SqlFunctions.internalToXXX methods
cal.setTimeInMillis(t + TimeZone.getDefault.getOffset(t))
cal
}
}
case class Null(resultType: TypeInformation[_]) extends LeafExpression {
......
......@@ -18,6 +18,7 @@
package org.apache.flink.api.table.runtime
import org.apache.flink.api.common.InvalidProgramException
import org.apache.flink.api.common.functions.Function
import org.codehaus.commons.compiler.CompileException
import org.codehaus.janino.SimpleCompiler
......@@ -29,7 +30,13 @@ trait FunctionCompiler[T <: Function] {
require(cl != null, "Classloader must not be null.")
val compiler = new SimpleCompiler()
compiler.setParentClassLoader(cl)
compiler.cook(code)
try {
compiler.cook(code)
} catch {
case e: CompileException =>
throw new InvalidProgramException("Table program cannot be compiled. " +
"This is a bug. Please file an issue.", e)
}
compiler.getClassLoader.loadClass(name).asInstanceOf[Class[T]]
}
}
......@@ -18,7 +18,7 @@
package org.apache.flink.api.table.typeutils
import org.apache.flink.api.common.typeinfo.BasicTypeInfo.{BIG_DEC_TYPE_INFO, BOOLEAN_TYPE_INFO, STRING_TYPE_INFO}
import org.apache.flink.api.common.typeinfo.{NumericTypeInfo, TypeInformation}
import org.apache.flink.api.common.typeinfo.{SqlTimeTypeInfo, NumericTypeInfo, TypeInformation}
import org.apache.flink.api.table.validate._
object TypeCheckUtils {
......@@ -29,6 +29,8 @@ object TypeCheckUtils {
case _ => false
}
def isTemporal(dataType: TypeInformation[_]): Boolean = dataType.isInstanceOf[SqlTimeTypeInfo[_]]
def isString(dataType: TypeInformation[_]): Boolean = dataType == STRING_TYPE_INFO
def isBoolean(dataType: TypeInformation[_]): Boolean = dataType == BOOLEAN_TYPE_INFO
......
......@@ -19,7 +19,7 @@
package org.apache.flink.api.table.typeutils
import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
import org.apache.flink.api.common.typeinfo.{NumericTypeInfo, TypeInformation}
import org.apache.flink.api.common.typeinfo.{SqlTimeTypeInfo, NumericTypeInfo, TypeInformation}
/**
* Utilities for type conversions.
......@@ -85,6 +85,9 @@ object TypeCoercion {
case (STRING_TYPE_INFO, _: NumericTypeInfo[_]) => true
case (STRING_TYPE_INFO, BOOLEAN_TYPE_INFO) => true
case (STRING_TYPE_INFO, BIG_DEC_TYPE_INFO) => true
case (STRING_TYPE_INFO, SqlTimeTypeInfo.DATE) => true
case (STRING_TYPE_INFO, SqlTimeTypeInfo.TIME) => true
case (STRING_TYPE_INFO, SqlTimeTypeInfo.TIMESTAMP) => true
case (BOOLEAN_TYPE_INFO, _: NumericTypeInfo[_]) => true
case (BOOLEAN_TYPE_INFO, BIG_DEC_TYPE_INFO) => true
......@@ -94,6 +97,16 @@ object TypeCoercion {
case (_: NumericTypeInfo[_], _: NumericTypeInfo[_]) => true
case (BIG_DEC_TYPE_INFO, _: NumericTypeInfo[_]) => true
case (_: NumericTypeInfo[_], BIG_DEC_TYPE_INFO) => true
case (INT_TYPE_INFO, SqlTimeTypeInfo.DATE) => true
case (INT_TYPE_INFO, SqlTimeTypeInfo.TIME) => true
case (LONG_TYPE_INFO, SqlTimeTypeInfo.TIMESTAMP) => true
case (SqlTimeTypeInfo.DATE, SqlTimeTypeInfo.TIME) => false
case (SqlTimeTypeInfo.TIME, SqlTimeTypeInfo.DATE) => false
case (_: SqlTimeTypeInfo[_], _: SqlTimeTypeInfo[_]) => true
case (SqlTimeTypeInfo.DATE, INT_TYPE_INFO) => true
case (SqlTimeTypeInfo.TIME, INT_TYPE_INFO) => true
case (SqlTimeTypeInfo.TIMESTAMP, LONG_TYPE_INFO) => true
case _ => false
}
......
......@@ -24,7 +24,7 @@ import org.apache.calcite.rel.core.JoinRelType._
import org.apache.calcite.sql.`type`.SqlTypeName
import org.apache.calcite.sql.`type`.SqlTypeName._
import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
import org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation}
import org.apache.flink.api.common.typeinfo.{SqlTimeTypeInfo, AtomicType, TypeInformation}
import org.apache.flink.api.common.typeutils.CompositeType
import org.apache.flink.api.java.operators.join.JoinType
import org.apache.flink.api.java.tuple.Tuple
......@@ -40,24 +40,20 @@ object TypeConverter {
def typeInfoToSqlType(typeInfo: TypeInformation[_]): SqlTypeName = typeInfo match {
case BOOLEAN_TYPE_INFO => BOOLEAN
case BOOLEAN_VALUE_TYPE_INFO => BOOLEAN
case BYTE_TYPE_INFO => TINYINT
case BYTE_VALUE_TYPE_INFO => TINYINT
case SHORT_TYPE_INFO => SMALLINT
case SHORT_VALUE_TYPE_INFO => SMALLINT
case INT_TYPE_INFO => INTEGER
case INT_VALUE_TYPE_INFO => INTEGER
case LONG_TYPE_INFO => BIGINT
case LONG_VALUE_TYPE_INFO => BIGINT
case FLOAT_TYPE_INFO => FLOAT
case FLOAT_VALUE_TYPE_INFO => FLOAT
case DOUBLE_TYPE_INFO => DOUBLE
case DOUBLE_VALUE_TYPE_INFO => DOUBLE
case STRING_TYPE_INFO => VARCHAR
case STRING_VALUE_TYPE_INFO => VARCHAR
case DATE_TYPE_INFO => DATE
case BIG_DEC_TYPE_INFO => DECIMAL
// date/time types
case SqlTimeTypeInfo.DATE => DATE
case SqlTimeTypeInfo.TIME => TIME
case SqlTimeTypeInfo.TIMESTAMP => TIMESTAMP
case CHAR_TYPE_INFO | CHAR_VALUE_TYPE_INFO =>
throw new TableException("Character type is not supported.")
......@@ -74,9 +70,15 @@ object TypeConverter {
case FLOAT => FLOAT_TYPE_INFO
case DOUBLE => DOUBLE_TYPE_INFO
case VARCHAR | CHAR => STRING_TYPE_INFO
case DATE => DATE_TYPE_INFO
case DECIMAL => BIG_DEC_TYPE_INFO
// date/time types
case DATE => SqlTimeTypeInfo.DATE
case TIME => SqlTimeTypeInfo.TIME
case TIMESTAMP => SqlTimeTypeInfo.TIMESTAMP
case INTERVAL_DAY_TIME | INTERVAL_YEAR_MONTH =>
throw new TableException("Intervals are not supported yet.")
case NULL =>
throw new TableException("Type NULL is not supported. " +
"Null values must have a supported type.")
......@@ -86,7 +88,7 @@ object TypeConverter {
case SYMBOL => INT_TYPE_INFO
case _ =>
throw new TableException("Type " + sqlType.toString + "is not supported")
throw new TableException("Type " + sqlType.toString + " is not supported.")
}
/**
......
......@@ -18,6 +18,8 @@
package org.apache.flink.api.scala.batch.sql
import java.sql.{Timestamp, Time, Date}
import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase
import org.apache.flink.api.scala.table._
import org.apache.flink.api.scala.{ExecutionEnvironment, _}
......@@ -120,4 +122,26 @@ class ExpressionsITCase(
TestBaseUtils.compareResultAsText(results.asJava, expected)
}
@Test
def testAdvancedDataTypes(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env, config)
val sqlQuery = "SELECT a, b, c, DATE '1984-07-12', TIME '14:34:24', " +
"TIMESTAMP '1984-07-12 14:34:24' FROM MyTable"
val ds = env.fromElements((
Date.valueOf("1984-07-12"),
Time.valueOf("14:34:24"),
Timestamp.valueOf("1984-07-12 14:34:24")))
tEnv.registerDataSet("MyTable", ds, 'a, 'b, 'c)
val result = tEnv.sql(sqlQuery)
val expected = "1984-07-12,14:34:24,1984-07-12 14:34:24.0," +
"1984-07-12,14:34:24,1984-07-12 14:34:24.0"
val results = result.toDataSet[Row].collect()
TestBaseUtils.compareResultAsText(results.asJava, expected)
}
}
......@@ -18,7 +18,7 @@
package org.apache.flink.api.scala.batch.table
import java.util.Date
import java.sql.{Date, Time, Timestamp}
import org.apache.flink.api.common.typeinfo.BasicTypeInfo
import org.apache.flink.api.scala._
......@@ -26,7 +26,7 @@ import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase
import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode
import org.apache.flink.api.scala.table._
import org.apache.flink.api.table.codegen.CodeGenException
import org.apache.flink.api.table.expressions.{Literal, Null}
import org.apache.flink.api.table.expressions.Null
import org.apache.flink.api.table.{Row, TableEnvironment, ValidationException}
import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
import org.apache.flink.test.util.TestBaseUtils
......@@ -158,37 +158,25 @@ class ExpressionsITCase(
}
@Test
def testDecimalLiteral(): Unit = {
def testAdvancedDataTypes(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env, config)
val t = env
.fromElements(
(BigDecimal("78.454654654654654").bigDecimal, BigDecimal("4E+9999").bigDecimal)
)
.toTable(tEnv, 'a, 'b)
.select('a, 'b, BigDecimal("11.2"), BigDecimal("11.2").bigDecimal)
val expected = "78.454654654654654,4E+9999,11.2,11.2"
.fromElements((
BigDecimal("78.454654654654654").bigDecimal,
BigDecimal("4E+9999").bigDecimal,
Date.valueOf("1984-07-12"),
Time.valueOf("14:34:24"),
Timestamp.valueOf("1984-07-12 14:34:24")))
.toTable(tEnv, 'a, 'b, 'c, 'd, 'e)
.select('a, 'b, 'c, 'd, 'e, BigDecimal("11.2"), BigDecimal("11.2").bigDecimal,
Date.valueOf("1984-07-12"), Time.valueOf("14:34:24"),
Timestamp.valueOf("1984-07-12 14:34:24"))
val expected = "78.454654654654654,4E+9999,1984-07-12,14:34:24,1984-07-12 14:34:24.0," +
"11.2,11.2,1984-07-12,14:34:24,1984-07-12 14:34:24.0"
val results = t.toDataSet[Row].collect()
TestBaseUtils.compareResultAsText(results.asJava, expected)
}
// Date literals not yet supported
@Ignore
@Test
def testDateLiteral(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env, config)
val t = env.fromElements((0L, "test")).toTable(tEnv, 'a, 'b)
.select('a,
Literal(new Date(0)).cast(BasicTypeInfo.STRING_TYPE_INFO),
'a.cast(BasicTypeInfo.DATE_TYPE_INFO).cast(BasicTypeInfo.STRING_TYPE_INFO))
val expected = "0,1970-01-01 00:00:00.000,1970-01-01 00:00:00.000"
val results = t.toDataSet[Row].collect()
TestBaseUtils.compareResultAsText(results.asJava, expected)
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.api.scala.expression
import java.sql.{Date, Time, Timestamp}
import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, SqlTimeTypeInfo, TypeInformation}
import org.apache.flink.api.table.expressions.utils.ExpressionTestBase
import org.apache.flink.api.scala.table._
import org.apache.flink.api.table.Row
import org.apache.flink.api.table.typeutils.RowTypeInfo
import org.junit.Test
class TimeTypesTest extends ExpressionTestBase {
@Test
def testTimeLiterals(): Unit = {
testAllApis(
"1990-10-14".toDate,
"'1990-10-14'.toDate",
"DATE '1990-10-14'",
"1990-10-14")
testTableApi(
Date.valueOf("2040-09-11"),
"'2040-09-11'.toDate",
"2040-09-11")
testAllApis(
"1500-04-30".cast(SqlTimeTypeInfo.DATE),
"'1500-04-30'.cast(DATE)",
"CAST('1500-04-30' AS DATE)",
"1500-04-30")
testAllApis(
"15:45:59".toTime,
"'15:45:59'.toTime",
"TIME '15:45:59'",
"15:45:59")
testTableApi(
Time.valueOf("00:00:00"),
"'00:00:00'.toTime",
"00:00:00")
testAllApis(
"1:30:00".cast(SqlTimeTypeInfo.TIME),
"'1:30:00'.cast(TIME)",
"CAST('1:30:00' AS TIME)",
"01:30:00")
testAllApis(
"1990-10-14 23:00:00.123".toTimestamp,
"'1990-10-14 23:00:00.123'.toTimestamp",
"TIMESTAMP '1990-10-14 23:00:00.123'",
"1990-10-14 23:00:00.123")
testTableApi(
Timestamp.valueOf("2040-09-11 00:00:00.000"),
"'2040-09-11 00:00:00.000'.toTimestamp",
"2040-09-11 00:00:00.0")
testAllApis(
"1500-04-30 12:00:00".cast(SqlTimeTypeInfo.TIMESTAMP),
"'1500-04-30 12:00:00'.cast(TIMESTAMP)",
"CAST('1500-04-30 12:00:00' AS TIMESTAMP)",
"1500-04-30 12:00:00.0")
}
@Test
def testTimeInput(): Unit = {
testAllApis(
'f0,
"f0",
"f0",
"1990-10-14")
testAllApis(
'f1,
"f1",
"f1",
"10:20:45")
testAllApis(
'f2,
"f2",
"f2",
"1990-10-14 10:20:45.123")
}
@Test
def testTimeCasting(): Unit = {
testAllApis(
'f0.cast(SqlTimeTypeInfo.TIMESTAMP),
"f0.cast(TIMESTAMP)",
"CAST(f0 AS TIMESTAMP)",
"1990-10-14 00:00:00.0")
testAllApis(
'f1.cast(SqlTimeTypeInfo.TIMESTAMP),
"f1.cast(TIMESTAMP)",
"CAST(f1 AS TIMESTAMP)",
"1970-01-01 10:20:45.0")
testAllApis(
'f2.cast(SqlTimeTypeInfo.DATE),
"f2.cast(DATE)",
"CAST(f2 AS DATE)",
"1990-10-14")
testAllApis(
'f2.cast(SqlTimeTypeInfo.TIME),
"f2.cast(TIME)",
"CAST(f2 AS TIME)",
"10:20:45")
testAllApis(
'f2.cast(SqlTimeTypeInfo.TIME),
"f2.cast(TIME)",
"CAST(f2 AS TIME)",
"10:20:45")
testTableApi(
'f7.cast(SqlTimeTypeInfo.DATE),
"f7.cast(DATE)",
"2002-11-09")
testTableApi(
'f7.cast(SqlTimeTypeInfo.DATE).cast(BasicTypeInfo.INT_TYPE_INFO),
"f7.cast(DATE).cast(INT)",
"12000")
testTableApi(
'f7.cast(SqlTimeTypeInfo.TIME),
"f7.cast(TIME)",
"00:00:12")
testTableApi(
'f7.cast(SqlTimeTypeInfo.TIME).cast(BasicTypeInfo.INT_TYPE_INFO),
"f7.cast(TIME).cast(INT)",
"12000")
testTableApi(
'f8.cast(SqlTimeTypeInfo.TIMESTAMP),
"f8.cast(TIMESTAMP)",
"2016-06-27 07:23:33.0")
testTableApi(
'f8.cast(SqlTimeTypeInfo.TIMESTAMP).cast(BasicTypeInfo.LONG_TYPE_INFO),
"f8.cast(TIMESTAMP).cast(LONG)",
"1467012213000")
}
@Test
def testTimeComparison(): Unit = {
testAllApis(
'f0 < 'f3,
"f0 < f3",
"f0 < f3",
"false")
testAllApis(
'f0 < 'f4,
"f0 < f4",
"f0 < f4",
"true")
testAllApis(
'f1 < 'f5,
"f1 < f5",
"f1 < f5",
"false")
testAllApis(
'f0.cast(SqlTimeTypeInfo.TIMESTAMP) !== 'f2,
"f0.cast(TIMESTAMP) !== f2",
"CAST(f0 AS TIMESTAMP) <> f2",
"true")
testAllApis(
'f0.cast(SqlTimeTypeInfo.TIMESTAMP) === 'f6,
"f0.cast(TIMESTAMP) === f6",
"CAST(f0 AS TIMESTAMP) = f6",
"true")
}
// ----------------------------------------------------------------------------------------------
def testData = {
val testData = new Row(9)
testData.setField(0, Date.valueOf("1990-10-14"))
testData.setField(1, Time.valueOf("10:20:45"))
testData.setField(2, Timestamp.valueOf("1990-10-14 10:20:45.123"))
testData.setField(3, Date.valueOf("1990-10-13"))
testData.setField(4, Date.valueOf("1990-10-15"))
testData.setField(5, Time.valueOf("00:00:00"))
testData.setField(6, Timestamp.valueOf("1990-10-14 00:00:00.0"))
testData.setField(7, 12000)
testData.setField(8, 1467012213000L)
testData
}
def typeInfo = {
new RowTypeInfo(Seq(
SqlTimeTypeInfo.DATE,
SqlTimeTypeInfo.TIME,
SqlTimeTypeInfo.TIMESTAMP,
SqlTimeTypeInfo.DATE,
SqlTimeTypeInfo.DATE,
SqlTimeTypeInfo.TIME,
SqlTimeTypeInfo.TIMESTAMP,
BasicTypeInfo.INT_TYPE_INFO,
BasicTypeInfo.LONG_TYPE_INFO)).asInstanceOf[TypeInformation[Any]]
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册