提交 857a5b90 编写于 作者: T Timo Walther

[FLINK-20522][table] Add built-in TYPEOF function

上级 a9da2299
......@@ -4600,6 +4600,22 @@ CAST(value AS type)
<p>E.g., <code>CAST('42' AS INT)</code> returns 42; <code>CAST(NULL AS VARCHAR)</code> returns NULL of type VARCHAR.</p>
</td>
</tr>
<tr>
<td>
{% highlight text %}
TYPEOF(input)
TYPEOF(input, force_serializable)
{% endhighlight %}
</td>
<td>
<p>Returns the string representation of the input expression's data type. By default, the
returned string is a summary string that might omit certain details for readability. If
<code>force_serializable</code> is set to TRUE, the string represents a full data type
that could be persisted in a catalog. Note that especially anonymous, inline data types
have no serializable string representation. In this case, NULL is returned.</p>
<p>E.g., <code>TYPEOF(12)</code> returns 'INT NOT NULL'.</p>
</td>
</tr>
</tbody>
</table>
</div>
......@@ -4625,6 +4641,22 @@ ANY.cast(TYPE)
<p>E.g., <code>'42'.cast(INT)</code> returns 42; <code>Null(STRING)</code> returns NULL of type STRING.</p>
</td>
</tr>
<tr>
<td>
{% highlight java %}
call("TYPEOF", input)
call("TYPEOF", input, force_serializable)
{% endhighlight %}
</td>
<td>
<p>Returns the string representation of the input expression's data type. By default, the
returned string is a summary string that might omit certain details for readability. If
<code>force_serializable</code> is set to TRUE, the string represents a full data type
that could be persisted in a catalog. Note that especially anonymous, inline data types
have no serializable string representation. In this case, NULL is returned.</p>
<p>E.g., <code>call("TYPEOF", 12)</code> returns 'INT NOT NULL'.</p>
</td>
</tr>
</tbody>
</table>
</div>
......@@ -4650,6 +4682,22 @@ ANY.cast(TYPE)
<p>E.g., <code>"42".cast(Types.INT)</code> returns 42; <code>Null(Types.STRING)</code> returns NULL of type STRING.</p>
</td>
</tr>
<tr>
<td>
{% highlight scala %}
call("TYPEOF", input)
call("TYPEOF", input, force_serializable)
{% endhighlight %}
</td>
<td>
<p>Returns the string representation of the input expression's data type. By default, the
returned string is a summary string that might omit certain details for readability. If
<code>force_serializable</code> is set to TRUE, the string represents a full data type
that could be persisted in a catalog. Note that especially anonymous, inline data types
have no serializable string representation. In this case, NULL is returned.</p>
<p>E.g., <code>call("TYPEOF", 12)</code> returns 'INT NOT NULL'.</p>
</td>
</tr>
</tbody>
</table>
......
......@@ -21,6 +21,7 @@ package org.apache.flink.table.functions;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.types.inference.ArgumentTypeStrategy;
import org.apache.flink.table.types.inference.ConstantArgumentCount;
import org.apache.flink.table.types.inference.InputTypeStrategies;
import org.apache.flink.table.types.inference.TypeStrategies;
......@@ -39,6 +40,7 @@ import java.util.Set;
import static org.apache.flink.table.functions.FunctionKind.AGGREGATE;
import static org.apache.flink.table.functions.FunctionKind.OTHER;
import static org.apache.flink.table.functions.FunctionKind.SCALAR;
import static org.apache.flink.table.types.inference.InputTypeStrategies.LITERAL;
import static org.apache.flink.table.types.inference.InputTypeStrategies.NO_ARGS;
import static org.apache.flink.table.types.inference.InputTypeStrategies.OUTPUT_IF_NULL;
import static org.apache.flink.table.types.inference.InputTypeStrategies.SPECIFIC_FOR_CAST;
......@@ -71,6 +73,29 @@ import static org.apache.flink.table.types.inference.TypeStrategies.varyingStrin
@PublicEvolving
public final class BuiltInFunctionDefinitions {
// --------------------------------------------------------------------------------------------
// Debugging functions
// --------------------------------------------------------------------------------------------
public static final BuiltInFunctionDefinition TYPE_OF =
BuiltInFunctionDefinition.newBuilder()
.name("TYPEOF")
.kind(SCALAR)
.inputTypeStrategy(
or(
sequence(
new String[] {"input"},
new ArgumentTypeStrategy[] {InputTypeStrategies.ANY}),
sequence(
new String[] {"input", "force_serializable"},
new ArgumentTypeStrategy[] {
InputTypeStrategies.ANY,
and(logical(LogicalTypeRoot.BOOLEAN), LITERAL)
})))
.outputTypeStrategy(explicit(DataTypes.STRING()))
.runtimeClass("org.apache.flink.table.runtime.functions.scalar.TypeOfFunction")
.build();
// --------------------------------------------------------------------------------------------
// Logic functions
// --------------------------------------------------------------------------------------------
......
......@@ -105,6 +105,7 @@ public final class LogicalTypeUtils {
case RAW:
return RawValueData.class;
case NULL:
return Object.class;
case SYMBOL:
case UNRESOLVED:
default:
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.table.planner.functions;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
import org.junit.runners.Parameterized;
import java.util.Arrays;
import java.util.List;
import static org.apache.flink.table.api.Expressions.$;
import static org.apache.flink.table.api.Expressions.call;
/** Tests for miscellaneous {@link BuiltInFunctionDefinitions}. */
public class MiscFunctionsITCase extends BuiltInFunctionTestBase {
@Parameterized.Parameters(name = "{index}: {0}")
public static List<TestSpec> testData() {
return Arrays.asList(
TestSpec.forFunction(BuiltInFunctionDefinitions.TYPE_OF)
.onFieldsWithData(12, "Hello world", false)
.testResult(
call("TYPEOF", $("f0")),
"TYPEOF(f0)",
"INT NOT NULL",
DataTypes.STRING())
.testTableApiError(
call("TYPEOF", $("f0"), $("f2")), "Invalid input arguments.")
.testSqlError(
"TYPEOF(f0, f2)",
"SQL validation failed. Invalid function call:\nTYPEOF(INT NOT NULL, BOOLEAN NOT NULL)")
.testTableApiResult(
call("TYPEOF", $("f1"), true),
"CHAR(11) NOT NULL",
DataTypes.STRING())
.testSqlResult("TYPEOF(NULL)", "NULL", DataTypes.STRING()));
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.table.runtime.functions.scalar;
import org.apache.flink.annotation.Internal;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
import org.apache.flink.table.functions.FunctionContext;
import org.apache.flink.table.functions.SpecializedFunction.SpecializedContext;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.inference.CallContext;
import org.apache.flink.table.types.logical.LogicalType;
import javax.annotation.Nullable;
import java.util.List;
/** Implementation of {@link BuiltInFunctionDefinitions#TYPE_OF}. */
@Internal
public class TypeOfFunction extends BuiltInScalarFunction {
private final String typeString;
private transient StringData typeStringData;
public TypeOfFunction(SpecializedContext context) {
super(BuiltInFunctionDefinitions.TYPE_OF, context);
final CallContext callContext = context.getCallContext();
this.typeString = createTypeString(callContext, isForceSerializable(callContext));
}
// --------------------------------------------------------------------------------------------
// Planning
// --------------------------------------------------------------------------------------------
private static boolean isForceSerializable(CallContext context) {
final List<DataType> argumentDataTypes = context.getArgumentDataTypes();
if (argumentDataTypes.size() != 2) {
return false;
}
return context.getArgumentValue(1, Boolean.class).orElse(false);
}
private static @Nullable String createTypeString(
CallContext context, boolean forceSerializable) {
final LogicalType input = context.getArgumentDataTypes().get(0).getLogicalType();
if (forceSerializable) {
try {
return input.asSerializableString();
} catch (Exception t) {
return null;
}
}
return input.asSummaryString();
}
// --------------------------------------------------------------------------------------------
// Runtime
// --------------------------------------------------------------------------------------------
@Override
public void open(FunctionContext context) throws Exception {
this.typeStringData = StringData.fromString(typeString);
}
@SuppressWarnings("unused")
public @Nullable StringData eval(Object... unused) {
return typeStringData;
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册