提交 abfdc1a2 编写于 作者: T Timo Walther

[FLINK-8863] [sql-client] Make user-defined functions more robust

- Simplify code and fix various bugs
- Add more tests
- Refactor various names for descriptors and variables
- Make 'from' property mandatory
- Make LiteralValue public API
上级 8014dade
......@@ -21,11 +21,14 @@
# Defaults might be overwritten by a session specific environment.
# See the Table API & SQL documentation for details about supported properties.
#==============================================================================
# Table Sources
#==============================================================================
# Define table sources and sinks here. See the Table API & SQL documentation for details.
# Define table sources and sinks here.
tables: [] # empty list
# A typical table source definition looks like:
......@@ -35,6 +38,19 @@ tables: [] # empty list
# format: ...
# schema: ...
#==============================================================================
# User-defined functions
#==============================================================================
# Define scalar, aggregate, or table functions here.
functions: [] # empty list
# A typical function definition looks like:
# - name: ...
# from: class
# class: ...
# constructor: ...
#==============================================================================
# Execution properties
#==============================================================================
......
......@@ -201,6 +201,9 @@ public class CliClient {
case SHOW_TABLES:
callShowTables(cmdCall);
break;
case SHOW_FUNCTIONS:
callShowFunctions(cmdCall);
break;
case DESCRIBE:
callDescribe(cmdCall);
break;
......@@ -284,6 +287,22 @@ public class CliClient {
terminal.flush();
}
private void callShowFunctions(SqlCommandCall cmdCall) {
final List<String> functions;
try {
functions = executor.listUserDefinedFunctions(context);
} catch (SqlExecutionException e) {
printException(e);
return;
}
if (functions.isEmpty()) {
terminal.writer().println(CliStrings.messageInfo(CliStrings.MESSAGE_EMPTY).toAnsi());
} else {
functions.forEach((v) -> terminal.writer().println(v));
}
terminal.flush();
}
private void callDescribe(SqlCommandCall cmdCall) {
final TableSchema schema;
try {
......
......@@ -45,6 +45,7 @@ public final class CliStrings {
.append(formatCommand(SqlCommand.CLEAR, "Clears the current terminal."))
.append(formatCommand(SqlCommand.HELP, "Prints the available commands."))
.append(formatCommand(SqlCommand.SHOW_TABLES, "Shows all registered tables."))
.append(formatCommand(SqlCommand.SHOW_FUNCTIONS, "Shows all registered user-defined functions."))
.append(formatCommand(SqlCommand.DESCRIBE, "Describes the schema of a table with the given name."))
.append(formatCommand(SqlCommand.EXPLAIN, "Describes the execution plan of a query or table with the given name."))
.append(formatCommand(SqlCommand.SELECT, "Executes a SQL SELECT query on the Flink cluster."))
......
......@@ -86,6 +86,7 @@ public final class SqlCommandParser {
CLEAR("clear"),
HELP("help"),
SHOW_TABLES("show tables"),
SHOW_FUNCTIONS("show functions"),
DESCRIBE("describe"),
EXPLAIN("explain"),
SELECT("select"),
......
......@@ -42,17 +42,17 @@ public class Environment {
private Map<String, TableDescriptor> tables;
private Map<String, UserDefinedFunction> functions;
private Execution execution;
private Deployment deployment;
private Map<String, UserDefinedFunction> functions;
public Environment() {
this.tables = Collections.emptyMap();
this.functions = Collections.emptyMap();
this.execution = new Execution();
this.deployment = new Deployment();
this.functions = Collections.emptyMap();
}
public Map<String, TableDescriptor> getTables() {
......@@ -69,7 +69,7 @@ public class Environment {
config.remove(TableDescriptorValidator.TABLE_TYPE());
final Source s = Source.create(config);
if (this.tables.containsKey(s.getName())) {
throw new SqlClientException("Duplicate source name '" + s + "'.");
throw new SqlClientException("Duplicate source name '" + s.getName() + "'.");
}
this.tables.put(s.getName(), s);
} else {
......@@ -79,11 +79,18 @@ public class Environment {
});
}
public Map<String, UserDefinedFunction> getFunctions() {
return functions;
}
public void setFunctions(List<Map<String, Object>> functions) {
this.functions = new HashMap<>(functions.size());
functions.forEach(config -> {
final UserDefinedFunction f = UserDefinedFunction.create(config);
this.functions.put(f.name(), f);
if (this.tables.containsKey(f.getName())) {
throw new SqlClientException("Duplicate function name '" + f.getName() + "'.");
}
this.functions.put(f.getName(), f);
});
}
......@@ -103,10 +110,6 @@ public class Environment {
return deployment;
}
public Map<String, UserDefinedFunction> getFunctions() {
return functions;
}
@Override
public String toString() {
final StringBuilder sb = new StringBuilder();
......@@ -117,6 +120,13 @@ public class Environment {
table.addProperties(props);
props.asMap().forEach((k, v) -> sb.append(" ").append(k).append(": ").append(v).append('\n'));
});
sb.append("=================== Functions ====================\n");
functions.forEach((name, function) -> {
sb.append("- name: ").append(name).append("\n");
final DescriptorProperties props = new DescriptorProperties(true);
function.addProperties(props);
props.asMap().forEach((k, v) -> sb.append(" ").append(k).append(": ").append(v).append('\n'));
});
sb.append("=================== Execution ====================\n");
execution.toProperties().forEach((k, v) -> sb.append(k).append(": ").append(v).append('\n'));
sb.append("=================== Deployment ===================\n");
......@@ -153,7 +163,7 @@ public class Environment {
// merge functions
final Map<String, UserDefinedFunction> functions = new HashMap<>(env1.getFunctions());
mergedEnv.getFunctions().putAll(env2.getFunctions());
functions.putAll(env2.getFunctions());
mergedEnv.functions = functions;
// merge execution properties
......@@ -165,12 +175,18 @@ public class Environment {
return mergedEnv;
}
/**
* Enriches an environment with new/modified properties and returns the new instance.
*/
public static Environment enrich(Environment env, Map<String, String> properties) {
final Environment enrichedEnv = new Environment();
// merge tables
enrichedEnv.tables = new HashMap<>(env.getTables());
// merge functions
enrichedEnv.functions = new HashMap<>(env.getFunctions());
// enrich execution properties
enrichedEnv.execution = Execution.enrich(env.execution, properties);
......
......@@ -26,7 +26,7 @@ import java.util.HashMap;
import java.util.Map;
/**
* Configuration of a table source. Parses an entry in the `sources` list of an environment
* Configuration of a table source. Parses an entry in the `tables` list of an environment
* file and translates to table descriptor properties.
*/
public class Source extends TableSourceDescriptor {
......@@ -49,6 +49,9 @@ public class Source extends TableSourceDescriptor {
return properties;
}
/**
* Creates a table source descriptor with the given config.
*/
public static Source create(Map<String, Object> config) {
if (!config.containsKey(NAME)) {
throw new SqlClientException("The 'name' attribute of a table source is missing.");
......
......@@ -21,68 +21,47 @@ package org.apache.flink.table.client.config;
import org.apache.flink.table.client.SqlClientException;
import org.apache.flink.table.descriptors.DescriptorProperties;
import org.apache.flink.table.descriptors.FunctionDescriptor;
import org.apache.flink.table.descriptors.FunctionValidator;
import java.util.HashMap;
import java.util.Map;
import static org.apache.flink.table.client.config.UserDefinedFunction.From.CLASS;
/**
* Descriptor for user-defined functions.
*/
public class UserDefinedFunction extends FunctionDescriptor {
private static final String FROM = "from";
private From from;
private String name;
private Map<String, String> properties;
private UserDefinedFunction(String name, From from, Map<String, String> properties) {
super(name);
this.from = from;
private static final String NAME = "name";
private UserDefinedFunction(String name, Map<String, String> properties) {
this.name = name;
this.properties = properties;
}
/**
* Gets where the user-defined function should be created from.
*/
public From getFrom() {
return from;
public String getName() {
return name;
}
public Map<String, String> getProperties() {
return properties;
}
/**
* Creates a UDF descriptor with the given config.
* Creates a user-defined function descriptor with the given config.
*/
public static UserDefinedFunction create(Map<String, Object> config) {
Map<String, String> udfConfig = ConfigUtil.normalizeYaml(config);
if (!udfConfig.containsKey(FunctionValidator.FUNCTION_NAME())) {
if (!config.containsKey(NAME)) {
throw new SqlClientException("The 'name' attribute of a function is missing.");
}
final String name = udfConfig.get(FunctionValidator.FUNCTION_NAME());
if (name.trim().length() <= 0) {
final Object name = config.get(NAME);
if (name == null || !(name instanceof String) || ((String) name).trim().length() <= 0) {
throw new SqlClientException("Invalid function name '" + name + "'.");
}
// the default value is "CLASS"
From fromValue = CLASS;
if (udfConfig.containsKey(FROM)) {
final String from = udfConfig.get(FROM);
try {
fromValue = From.valueOf(from.toUpperCase());
} catch (IllegalArgumentException ex) {
throw new SqlClientException("Unknown 'from' value '" + from + "'.");
}
}
switch (fromValue) {
case CLASS:
return new UserDefinedFunction(name, fromValue, udfConfig);
default:
throw new SqlClientException("The from attribute can only be \"class\" now.");
}
final Map<String, Object> properties = new HashMap<>(config);
properties.remove(NAME);
return new UserDefinedFunction((String) name, ConfigUtil.normalizeYaml(properties));
}
// --------------------------------------------------------------------------------------------
......@@ -91,8 +70,4 @@ public class UserDefinedFunction extends FunctionDescriptor {
public void addProperties(DescriptorProperties properties) {
this.properties.forEach(properties::putString);
}
enum From {
CLASS
}
}
......@@ -45,6 +45,11 @@ public interface Executor {
*/
List<String> listTables(SessionContext session) throws SqlExecutionException;
/**
* Lists all user-defined functions known to the executor.
*/
List<String> listUserDefinedFunctions(SessionContext session) throws SqlExecutionException;
/**
* Returns the schema of a table. Throws an exception if the table could not be found. The
* schema might contain time attribute types for helping the user during debugging a query.
......
......@@ -48,10 +48,9 @@ import org.apache.flink.table.client.config.Deployment;
import org.apache.flink.table.client.config.Environment;
import org.apache.flink.table.client.gateway.SessionContext;
import org.apache.flink.table.client.gateway.SqlExecutionException;
import org.apache.flink.table.descriptors.DescriptorProperties;
import org.apache.flink.table.descriptors.TableSourceDescriptor;
import org.apache.flink.table.descriptors.service.FunctionService;
import org.apache.flink.table.functions.AggregateFunction;
import org.apache.flink.table.functions.FunctionService;
import org.apache.flink.table.functions.ScalarFunction;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.table.functions.UserDefinedFunction;
......@@ -105,20 +104,18 @@ public class ExecutionContext<T> {
tableSources = new HashMap<>();
mergedEnv.getTables().forEach((name, descriptor) -> {
if (descriptor instanceof TableSourceDescriptor) {
TableSource<?> tableSource = TableSourceFactoryService.findAndCreateTableSource(
(TableSourceDescriptor) descriptor, classLoader);
final TableSource<?> tableSource = TableSourceFactoryService.findAndCreateTableSource(
(TableSourceDescriptor) descriptor,
classLoader);
tableSources.put(name, tableSource);
}
});
// generate user-defined functions
// create user-defined functions
functions = new HashMap<>();
mergedEnv.getFunctions().forEach((name, descriptor) -> {
DescriptorProperties properties = new DescriptorProperties(true);
descriptor.addProperties(properties);
functions.put(
name,
FunctionService.generateUserDefinedFunction(properties, classLoader));
final UserDefinedFunction function = FunctionService.createFunction(descriptor, classLoader);
functions.put(name, function);
});
// convert deployment options into command line options that describe a cluster
......@@ -227,7 +224,7 @@ public class ExecutionContext<T> {
// register table sources
tableSources.forEach(tableEnv::registerTableSource);
// register UDFs
// register user-defined functions
if (tableEnv instanceof StreamTableEnvironment) {
StreamTableEnvironment streamTableEnvironment = (StreamTableEnvironment) tableEnv;
functions.forEach((k, v) -> {
......@@ -237,6 +234,8 @@ public class ExecutionContext<T> {
streamTableEnvironment.registerFunction(k, (AggregateFunction<?, ?>) v);
} else if (v instanceof TableFunction) {
streamTableEnvironment.registerFunction(k, (TableFunction<?>) v);
} else {
throw new SqlExecutionException("Unsupported function type: " + v.getClass().getName());
}
});
} else {
......@@ -248,6 +247,8 @@ public class ExecutionContext<T> {
batchTableEnvironment.registerFunction(k, (AggregateFunction<?, ?>) v);
} else if (v instanceof TableFunction) {
batchTableEnvironment.registerFunction(k, (TableFunction<?>) v);
} else {
throw new SqlExecutionException("Unsupported function type: " + v.getClass().getName());
}
});
}
......
......@@ -190,6 +190,14 @@ public class LocalExecutor implements Executor {
return Arrays.asList(tableEnv.listTables());
}
@Override
public List<String> listUserDefinedFunctions(SessionContext session) throws SqlExecutionException {
final TableEnvironment tableEnv = getOrCreateExecutionContext(session)
.createEnvironmentInstance()
.getTableEnvironment();
return Arrays.asList(tableEnv.listUserDefinedFunctions());
}
@Override
public TableSchema getTableSchema(SessionContext session, String name) throws SqlExecutionException {
final TableEnvironment tableEnv = getOrCreateExecutionContext(session)
......
......@@ -21,6 +21,7 @@ package org.apache.flink.table.client.gateway.local;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.client.cli.DefaultCLI;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.client.config.Environment;
import org.apache.flink.table.client.gateway.SessionContext;
import org.apache.flink.table.client.gateway.utils.EnvironmentFileUtil;
......@@ -28,8 +29,10 @@ import org.apache.flink.table.client.gateway.utils.EnvironmentFileUtil;
import org.apache.commons.cli.Options;
import org.junit.Test;
import java.util.Arrays;
import java.util.Collections;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
/**
......@@ -46,6 +49,17 @@ public class ExecutionContextTest {
assertEquals(99, config.getAutoWatermarkInterval());
}
@Test
public void testFunctions() throws Exception {
final ExecutionContext<?> context = createExecutionContext();
final TableEnvironment tableEnv = context.createEnvironmentInstance().getTableEnvironment();
final String[] expected = new String[]{"scalarUDF", "tableUDF", "aggregateUDF"};
final String[] actual = tableEnv.listUserDefinedFunctions();
Arrays.sort(expected);
Arrays.sort(actual);
assertArrayEquals(expected, actual);
}
private <T> ExecutionContext<T> createExecutionContext() throws Exception {
final Environment env = EnvironmentFileUtil.parseModified(
DEFAULTS_ENVIRONMENT_FILE,
......
......@@ -66,7 +66,6 @@ import static org.junit.Assert.assertTrue;
public class LocalExecutorITCase extends TestLogger {
private static final String DEFAULTS_ENVIRONMENT_FILE = "test-sql-client-defaults.yaml";
private static final String UDF_ENVIRONMENT_FILE = "test-sql-client-udf.yaml";
private static final int NUM_TMS = 2;
private static final int NUM_SLOTS_PER_TM = 2;
......@@ -106,6 +105,17 @@ public class LocalExecutorITCase extends TestLogger {
assertEquals(expectedTables, actualTables);
}
@Test
public void testListUserDefinedFunctions() throws Exception {
final Executor executor = createDefaultExecutor(clusterClient);
final SessionContext session = new SessionContext("test-session", new Environment());
final List<String> actualTables = executor.listUserDefinedFunctions(session);
final List<String> expectedTables = Arrays.asList("aggregateUDF", "tableUDF", "scalarUDF");
assertEquals(expectedTables, actualTables);
}
@Test
public void testGetSessionProperties() throws Exception {
final Executor executor = createDefaultExecutor(clusterClient);
......@@ -149,68 +159,6 @@ public class LocalExecutorITCase extends TestLogger {
assertEquals(expectedTableSchema, actualTableSchema);
}
@Test(timeout = 30_000L)
public void testScalarUDF() throws Exception {
final Executor executor =
createDefaultExecutor(UDF_ENVIRONMENT_FILE, clusterClient);
final SessionContext session = new SessionContext("test-scalarUDF", new Environment());
final ResultDescriptor rd =
executor.executeQuery(session, "SELECT scalarUDF(10)");
final List<String> actualResults =
retrieveChangelogResult(executor, session, rd.getResultId());
final List<String> expectedResults = new ArrayList<>();
expectedResults.add("(true,15)");
TestBaseUtils.compareResultCollections(
expectedResults, actualResults, Comparator.naturalOrder());
}
@Test(timeout = 30_000L)
public void testAggregateUDF() throws Exception {
final Executor executor =
createDefaultExecutor(UDF_ENVIRONMENT_FILE, clusterClient);
final SessionContext session = new SessionContext("test-aggregateUDF", new Environment());
final ResultDescriptor rd =
executor.executeQuery(session, "SELECT aggregateUDF(cast(1 as BIGINT))");
final List<String> actualResults =
retrieveChangelogResult(executor, session, rd.getResultId());
final List<String> expectedResults = new ArrayList<>();
expectedResults.add("(true,100)");
TestBaseUtils.compareResultCollections(
expectedResults, actualResults, Comparator.naturalOrder());
}
@Test(timeout = 30_000L)
public void testTableUDF() throws Exception {
final URL url = getClass().getClassLoader().getResource("test-data.csv");
Objects.requireNonNull(url);
final Map<String, String> replaceVars = new HashMap<>();
replaceVars.put("$VAR_0", url.getPath());
final Executor executor =
createModifiedExecutor(UDF_ENVIRONMENT_FILE, clusterClient, replaceVars);
final SessionContext session = new SessionContext("test-aggregateUDF", new Environment());
final ResultDescriptor rd =
executor.executeQuery(
session,
"SELECT w, l from TableNumber1, LATERAL TABLE(tableUDF(StringField1)) as T(w, l)");
final List<String> actualResults =
retrieveChangelogResult(executor, session, rd.getResultId());
final List<String> expectedResults = new ArrayList<>();
expectedResults.add("(true,Hello,10)");
expectedResults.add("(true,World,10)");
expectedResults.add("(true,Hello,10)");
expectedResults.add("(true,World,10)");
expectedResults.add("(true,Hello,10)");
expectedResults.add("(true,World,10)");
expectedResults.add("(true,Hello,10)");
expectedResults.add("(true,World,10)");
expectedResults.add("(true,Hello,10)");
expectedResults.add("(true,World,10)");
expectedResults.add("(true,Hello,10)");
expectedResults.add("(true,World!!!!,14)");
TestBaseUtils.compareResultCollections(expectedResults, actualResults, Comparator.naturalOrder());
}
@Test(timeout = 30_000L)
public void testStreamQueryExecutionChangelog() throws Exception {
final URL url = getClass().getClassLoader().getResource("test-data.csv");
......@@ -226,7 +174,9 @@ public class LocalExecutorITCase extends TestLogger {
try {
// start job and retrieval
final ResultDescriptor desc = executor.executeQuery(session, "SELECT * FROM TableNumber1");
final ResultDescriptor desc = executor.executeQuery(
session,
"SELECT scalarUDF(IntegerField1), StringField1 FROM TableNumber1");
assertFalse(desc.isMaterialized());
......@@ -234,12 +184,12 @@ public class LocalExecutorITCase extends TestLogger {
retrieveChangelogResult(executor, session, desc.getResultId());
final List<String> expectedResults = new ArrayList<>();
expectedResults.add("(true,42,Hello World)");
expectedResults.add("(true,22,Hello World)");
expectedResults.add("(true,32,Hello World)");
expectedResults.add("(true,32,Hello World)");
expectedResults.add("(true,42,Hello World)");
expectedResults.add("(true,52,Hello World!!!!)");
expectedResults.add("(true,47,Hello World)");
expectedResults.add("(true,27,Hello World)");
expectedResults.add("(true,37,Hello World)");
expectedResults.add("(true,37,Hello World)");
expectedResults.add("(true,47,Hello World)");
expectedResults.add("(true,57,Hello World!!!!)");
TestBaseUtils.compareResultCollections(expectedResults, actualResults, Comparator.naturalOrder());
} finally {
......@@ -262,19 +212,21 @@ public class LocalExecutorITCase extends TestLogger {
try {
// start job and retrieval
final ResultDescriptor desc = executor.executeQuery(session, "SELECT IntegerField1 FROM TableNumber1");
final ResultDescriptor desc = executor.executeQuery(
session,
"SELECT scalarUDF(IntegerField1), StringField1 FROM TableNumber1");
assertTrue(desc.isMaterialized());
final List<String> actualResults = retrieveTableResult(executor, session, desc.getResultId());
final List<String> expectedResults = new ArrayList<>();
expectedResults.add("42");
expectedResults.add("22");
expectedResults.add("32");
expectedResults.add("32");
expectedResults.add("42");
expectedResults.add("52");
expectedResults.add("47,Hello World");
expectedResults.add("27,Hello World");
expectedResults.add("37,Hello World");
expectedResults.add("37,Hello World");
expectedResults.add("47,Hello World");
expectedResults.add("57,Hello World!!!!");
TestBaseUtils.compareResultCollections(expectedResults, actualResults, Comparator.naturalOrder());
} finally {
......@@ -317,32 +269,19 @@ public class LocalExecutorITCase extends TestLogger {
}
private <T> LocalExecutor createDefaultExecutor(ClusterClient<T> clusterClient) throws Exception {
return createDefaultExecutor(DEFAULTS_ENVIRONMENT_FILE, clusterClient);
}
private <T> LocalExecutor createDefaultExecutor(
String configFileName, ClusterClient<T>
clusterClient) throws Exception {
return new LocalExecutor(
EnvironmentFileUtil.parseModified(configFileName, Collections.singletonMap("$VAR_2", "batch")),
EnvironmentFileUtil.parseModified(DEFAULTS_ENVIRONMENT_FILE, Collections.singletonMap("$VAR_2", "batch")),
Collections.emptyList(),
clusterClient.getFlinkConfiguration(),
new DummyCustomCommandLine<T>(clusterClient));
}
private <T> LocalExecutor createModifiedExecutor(ClusterClient<T> clusterClient, Map<String, String> replaceVars) throws Exception {
return createModifiedExecutor(DEFAULTS_ENVIRONMENT_FILE, clusterClient, replaceVars);
}
private <T> LocalExecutor createModifiedExecutor(
String configFileName,
ClusterClient<T> clusterClient,
Map<String, String> replaceVars) throws Exception {
return new LocalExecutor(
EnvironmentFileUtil.parseModified(configFileName, replaceVars),
Collections.emptyList(),
clusterClient.getFlinkConfiguration(),
new DummyCustomCommandLine<T>(clusterClient));
EnvironmentFileUtil.parseModified(DEFAULTS_ENVIRONMENT_FILE, replaceVars),
Collections.emptyList(),
clusterClient.getFlinkConfiguration(),
new DummyCustomCommandLine<T>(clusterClient));
}
private List<String> retrieveTableResult(
......@@ -373,6 +312,7 @@ public class LocalExecutorITCase extends TestLogger {
Executor executor,
SessionContext session,
String resultID) throws InterruptedException {
final List<String> actualResults = new ArrayList<>();
while (true) {
Thread.sleep(50); // slow the processing down
......
......@@ -27,12 +27,12 @@ import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.types.Row;
/**
* A bunch of UDFs for SQL-Client test.
* A bunch of UDFs for testing the SQL Client.
*/
public class UserDefinedFunctions {
/**
* The scalar function for SQL-Client test.
* The scalar function for SQL Client test.
*/
public static class ScalarUDF extends ScalarFunction {
......@@ -48,12 +48,12 @@ public class UserDefinedFunctions {
}
/**
* The aggregate function for SQL-Client test.
* The aggregate function for SQL Client test.
*/
public static class AggregateUDF extends AggregateFunction<Long, Long> {
public AggregateUDF(String name, Boolean flag, Integer value) {
// do nothing
}
@Override
......@@ -67,7 +67,7 @@ public class UserDefinedFunctions {
}
public void accumulate(Long acc, Long value) {
// do nothing
}
@Override
......@@ -77,10 +77,10 @@ public class UserDefinedFunctions {
}
/**
* The table function for SQL-Client test.
* The table function for SQL Client test.
*/
public static class TableUDF extends TableFunction<Row> {
private long extra = 2L;
private long extra;
public TableUDF(Long extra) {
this.extra = extra;
......@@ -100,6 +100,4 @@ public class UserDefinedFunctions {
return Types.ROW(Types.STRING(), Types.LONG());
}
}
}
......@@ -63,6 +63,31 @@ tables:
line-delimiter: "\n"
comment-prefix: "#"
functions:
- name: scalarUDF
from: class
class: org.apache.flink.table.client.gateway.utils.UserDefinedFunctions$ScalarUDF
constructor:
- 5
- name: aggregateUDF
from: class
class: org.apache.flink.table.client.gateway.utils.UserDefinedFunctions$AggregateUDF
constructor:
- StarryName
- false
- class: java.lang.Integer
constructor:
- class: java.lang.String
constructor:
- type: VARCHAR
value: 3
- name: tableUDF
from: class
class: org.apache.flink.table.client.gateway.utils.UserDefinedFunctions$TableUDF
constructor:
- type: LONG
value: 5
execution:
type: "$VAR_2"
time-characteristic: event-time
......
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
#==============================================================================
# TEST ENVIRONMENT FILE
# Intended for org.apache.flink.table.client.gateway.local.LocalExecutorITCase.
#==============================================================================
# this file has variables that can be filled with content by replacing $VAR_XXX
tables:
- name: TableNumber1
type: source
schema:
- name: IntegerField1
type: INT
- name: StringField1
type: VARCHAR
connector:
type: filesystem
path: "$VAR_0"
format:
type: csv
fields:
- name: IntegerField1
type: INT
- name: StringField1
type: VARCHAR
line-delimiter: "\n"
comment-prefix: "#"
functions:
- name: scalarUDF
from: class
class: org.apache.flink.table.client.gateway.utils.UserDefinedFunctions$ScalarUDF
constructor:
- 5
- name: aggregateUDF
from: class
class: org.apache.flink.table.client.gateway.utils.UserDefinedFunctions$AggregateUDF
constructor:
- StarryName
- false
- class: java.lang.Integer
constructor:
- class: java.lang.String
constructor:
- type: VARCHAR
value: 3
- name: tableUDF
from: class
class: org.apache.flink.table.client.gateway.utils.UserDefinedFunctions$TableUDF
constructor:
- type: LONG
value: 5
execution:
type: streaming
parallelism: 1
result-mode: changelog
deployment:
response-timeout: 5000
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.table.descriptors
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.table.api.Types
import scala.collection.mutable.ArrayBuffer
/**
* Descriptor for a class instance. A class instance is a Java/Scala object created from a class
* with a public constructor (with or without parameters).
*/
class ClassInstance extends HierarchyDescriptor {
private var className: Option[String] = None
// the parameter is either a literal value or the instance of a class
private val constructor: ArrayBuffer[Either[LiteralValue, ClassInstance]] = ArrayBuffer()
/**
* Sets the fully qualified class name for creating an instance.
*
* E.g. org.example.MyClass or org.example.MyClass$StaticInnerClass
*
* @param className fully qualified class name
*/
def of(className: String): ClassInstance = {
this.className = Option(className)
this
}
/**
* Adds a constructor parameter value of literal type. The type is automatically derived from
* the value. Currently, this is supported for: BOOLEAN, INT, DOUBLE, and VARCHAR. Expression
* values are not allowed.
*
* Examples:
* - "true", "false" -> BOOLEAN
* - "42", "-5" -> INT
* - "2.0", "1234.222" -> DOUBLE
* - VARCHAR otherwise
*
* For other types and explicit type declaration use [[parameter(String, String)]] or
* [[parameter(TypeInformation, String)]].
*
*/
def parameterString(valueString: String): ClassInstance = {
constructor += Left(new LiteralValue().value(valueString))
this
}
/**
* Adds a constructor parameter value of literal type. The type is explicitly defined using a
* type string such as VARCHAR, FLOAT, BOOLEAN, INT, BIGINT, etc. The value is parsed
* accordingly. Expression values are not allowed.
*
* @param typeString the type string that define how to parse the given value string
* @param valueString the literal value to be parsed
*/
def parameter(typeString: String, valueString: String): ClassInstance = {
constructor += Left(new LiteralValue().of(typeString).value(valueString))
this
}
/**
* Adds a constructor parameter value of literal type. The type is explicitly defined using
* type information. The value is parsed accordingly. Expression values are not allowed.
*
* @param typeInfo the type that define how to parse the given value string
* @param valueString the literal value to be parsed
*/
def parameter(typeInfo: TypeInformation[_], valueString: String): ClassInstance = {
constructor += Left(new LiteralValue().of(typeInfo).value(valueString))
this
}
/**
* Adds a constructor parameter value of BOOLEAN type.
*
* @param value BOOLEAN value
*/
def parameter(value: Boolean): ClassInstance = {
constructor += Left(new LiteralValue().of(Types.BOOLEAN).value(value))
this
}
/**
* Adds a constructor parameter value of DOUBLE type.
*
* @param value DOUBLE value
*/
def parameter(value: Double): ClassInstance = {
constructor += Left(new LiteralValue().of(Types.DOUBLE).value(value))
this
}
/**
* Adds a constructor parameter value of FLOAT type.
*
* @param value FLOAT value
*/
def parameter(value: Float): ClassInstance = {
constructor += Left(new LiteralValue().of(Types.FLOAT).value(value))
this
}
/**
* Adds a constructor parameter value of INT type.
*
* @param value INT value
*/
def parameter(value: Int): ClassInstance = {
constructor += Left(new LiteralValue().of(Types.INT).value(value))
this
}
/**
* Adds a constructor parameter value of VARCHAR type.
*
* @param value VARCHAR value
*/
def parameter(value: String): ClassInstance = {
constructor += Left(new LiteralValue().of(Types.STRING).value(value))
this
}
/**
* Adds a constructor parameter value of BIGINT type.
*
* @param value BIGINT value
*/
def parameter(value: Long): ClassInstance = {
constructor += Left(new LiteralValue().of(Types.LONG).value(value))
this
}
/**
* Adds a constructor parameter value of TINYINT type.
*
* @param value TINYINT value
*/
def parameter(value: Byte): ClassInstance = {
constructor += Left(new LiteralValue().of(Types.BYTE).value(value))
this
}
/**
* Adds a constructor parameter value of SMALLINT type.
*
* @param value SMALLINT value
*/
def parameter(value: Short): ClassInstance = {
constructor += Left(new LiteralValue().of(Types.SHORT).value(value))
this
}
/**
* Adds a constructor parameter value of DECIMAL type.
*
* @param value DECIMAL value
*/
def parameter(value: java.math.BigDecimal): ClassInstance = {
constructor += Left(new LiteralValue().of(Types.DECIMAL).value(value))
this
}
/**
* Adds a constructor parameter value of a class instance (i.e. a Java object with a public
* constructor).
*
* @param classInstance description of a class instance (i.e. a Java object with a public
* constructor).
*/
def parameter(classInstance: ClassInstance): ClassInstance = {
constructor += Right(classInstance)
this
}
/**
* Internal method for properties conversion.
*/
override private[flink] def addProperties(properties: DescriptorProperties): Unit = {
addPropertiesWithPrefix(HierarchyDescriptorValidator.EMPTY_PREFIX, properties)
}
/**
* Internal method for properties conversion.
*/
override private[flink] def addPropertiesWithPrefix(
keyPrefix: String,
properties: DescriptorProperties): Unit = {
className.foreach(properties.putString(s"$keyPrefix${ClassInstanceValidator.CLASS}", _))
var i = 0
while (i < constructor.size) {
constructor(i) match {
case Left(literalValue) =>
literalValue.addPropertiesWithPrefix(
s"$keyPrefix${ClassInstanceValidator.CONSTRUCTOR}.$i.",
properties)
case Right(classInstance) =>
classInstance.addPropertiesWithPrefix(
s"$keyPrefix${ClassInstanceValidator.CONSTRUCTOR}.$i.",
properties)
}
i += 1
}
}
}
/**
* Descriptor for a class instance. A class instance is a Java/Scala object created from a class
* with a public constructor (with or without parameters).
*/
object ClassInstance {
/**
* Descriptor for a class instance. A class instance is a Java/Scala object created from a class
* with a public constructor (with or without parameters).
*/
def apply() = new ClassInstance
}
......@@ -18,38 +18,42 @@
package org.apache.flink.table.descriptors
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.table.typeutils.TypeStringUtils
import org.apache.flink.table.descriptors.HierarchyDescriptorValidator.EMPTY_PREFIX
import scala.collection.JavaConversions._
/**
* Descriptor for a primitive type. Use internally only.
* Validator for [[ClassInstance]].
*/
class PrimitiveType[T] extends HierarchyDescriptor {
var typeInformation: TypeInformation[T] = _
var value: T = _
def of(basicType: TypeInformation[T]): PrimitiveType[T] = {
typeInformation = basicType
this
}
def value(value: T): PrimitiveType[T] = {
this.value = value
this
}
override private[flink] def addProperties(properties: DescriptorProperties): Unit = {
addPropertiesWithPrefix("", properties)
}
override private[flink] def addPropertiesWithPrefix(
keyPrefix: String, properties: DescriptorProperties): Unit = {
properties.putString(keyPrefix + "type", TypeStringUtils.writeTypeInfo(typeInformation))
properties.putString(keyPrefix + "value", value.toString)
class ClassInstanceValidator(keyPrefix: String = EMPTY_PREFIX)
extends HierarchyDescriptorValidator(keyPrefix) {
override def validateWithPrefix(keyPrefix: String, properties: DescriptorProperties): Unit = {
// check class name
properties.validateString(s"$keyPrefix${ClassInstanceValidator.CLASS}", isOptional = false, 1)
// check constructor
val constructorPrefix = s"$keyPrefix${ClassInstanceValidator.CONSTRUCTOR}"
val constructorProperties = properties.getVariableIndexedProperties(constructorPrefix, List())
var i = 0
while (i < constructorProperties.size()) {
// nested class instance
if (constructorProperties(i).containsKey(ClassInstanceValidator.CLASS)) {
val classInstanceValidator = new ClassInstanceValidator(s"$constructorPrefix.$i.")
classInstanceValidator.validate(properties)
}
// literal value
else {
val primitiveValueValidator = new LiteralValueValidator(s"$constructorPrefix.$i.")
primitiveValueValidator.validate(properties)
}
i += 1
}
}
}
object PrimitiveType {
def apply[T]() = new PrimitiveType[T]()
object ClassInstanceValidator {
val CLASS = "class"
val CONSTRUCTOR = "constructor"
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.table.descriptors
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.table.typeutils.TypeStringUtils
import scala.collection.mutable.ArrayBuffer
/**
* Descriptor for a class type.
*
* @param className the full name of the class (e.g., java.lang.Integer)
*/
class ClassType(var className: Option[String] = None)
extends HierarchyDescriptor {
// the parameter is either a primitive type or a class type
private val constructor: ArrayBuffer[Either[PrimitiveType[_], ClassType]] =
ArrayBuffer()
/**
* Sets the class name for the descriptor.
*/
def of(name: String): ClassType = {
this.className = Option(name)
this
}
/**
* Adds the given string formatted value as a parameter, the type of which will be automatically
* derived (e.g., "true" -> Boolean, "1" -> Integer, "2.0" -> Double and "abc" -> String).
*
*/
def strParam(valueStr: String): ClassType = {
val typeString = PrimitiveTypeValidator.deriveTypeStrFromValueStr(valueStr)
param(typeString, valueStr)
}
/**
* Adds the given string formatted value as a parameter, the type of which will be decided by the
* given type string (e.g., "DOUBLE", "VARCHAR").
*/
def param(typeStr: String, valueStr: String): ClassType = {
param(TypeStringUtils.readTypeInfo(typeStr), valueStr)
}
/**
* Adds the give string formatted value as a parameter, the type of which will be defined by the
* given type information.
*/
def param[T](typeInfo: TypeInformation[T], valueStr: String): ClassType = {
constructor += Left(
new PrimitiveType[T]()
.of(typeInfo)
.value(PrimitiveTypeValidator.deriveTypeAndValueStr(typeInfo, valueStr)))
this
}
/**
* Adds the give value as a parameter, the type of which will be automatically derived.
*/
def param[T](value: T): ClassType = {
constructor += Left(
new PrimitiveType[T]()
.of(TypeInformation.of(value.getClass.asInstanceOf[Class[T]]))
.value(value))
this
}
/**
* Adds a parameter defined by the given class type descriptor.
*/
def param(field: ClassType): ClassType = {
constructor += Right(field)
this
}
override private[flink] def addProperties(properties: DescriptorProperties): Unit = {
addPropertiesWithPrefix("", properties)
}
override private[flink] def addPropertiesWithPrefix(
keyPrefix: String,
properties: DescriptorProperties): Unit = {
className.foreach(properties.putString(s"$keyPrefix${ClassTypeValidator.CLASS}", _))
var i = 0
while (i < constructor.size) {
constructor(i) match {
case Left(basicType) =>
basicType.addPropertiesWithPrefix(
s"$keyPrefix${ClassTypeValidator.CONSTRUCTOR}.$i.",
properties)
case Right(classType) =>
classType.addPropertiesWithPrefix(
s"$keyPrefix${ClassTypeValidator.CONSTRUCTOR}.$i.",
properties)
}
i += 1
}
}
}
object ClassType {
def apply() = new ClassType
def apply(className: String) = new ClassType(Option(className))
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.table.descriptors
import org.apache.flink.table.api.ValidationException
import scala.collection.JavaConversions._
/**
* Validator for [[ClassType]].
*/
class ClassTypeValidator extends HierarchyDescriptorValidator {
override def validateWithPrefix(keyPrefix: String, properties: DescriptorProperties): Unit = {
properties.validateString(s"$keyPrefix${ClassTypeValidator.CLASS}", isOptional = false)
val constructorPrefix = s"$keyPrefix${ClassTypeValidator.CONSTRUCTOR}"
normalizeConstructorParams(constructorPrefix, properties)
val constructorProps =
properties.getVariableIndexedProperties(constructorPrefix, List())
var i = 0
val primitiveTypeValidator = new PrimitiveTypeValidator
while (i < constructorProps.size()) {
if (constructorProps(i).containsKey(PrimitiveTypeValidator.PRIMITIVE_TYPE)) {
primitiveTypeValidator.validateWithPrefix(s"$constructorPrefix.$i.", properties)
} else if (constructorProps(i).containsKey(ClassTypeValidator.CLASS)) {
validateWithPrefix(s"$constructorPrefix.$i.", properties)
} else {
throw ValidationException("A constructor field must contain a 'class' or a 'type' key.")
}
i += 1
}
}
/**
* For each constructor parameter (e.g., constructor.0 = abc), we derive its type and replace it
* with the normalized form (e.g., constructor.0.type = VARCHAR, constructor.0.value = abc);
*
* @param constructorPrefix the prefix to get the constructor parameters
* @param properties the descriptor properties
*/
def normalizeConstructorParams(
constructorPrefix: String,
properties: DescriptorProperties): Unit = {
val constructorValues = properties.getListProperties(constructorPrefix)
constructorValues.foreach(kv => {
properties.unsafeRemove(kv._1)
val tp = PrimitiveTypeValidator.deriveTypeStrFromValueStr(kv._2)
properties.putString(s"${kv._1}.${PrimitiveTypeValidator.PRIMITIVE_TYPE}", tp)
properties.putString(s"${kv._1}.${PrimitiveTypeValidator.PRIMITIVE_VALUE}", kv._2)
})
}
}
object ClassTypeValidator {
val CLASS = "class"
val CONSTRUCTOR = "constructor"
}
......@@ -19,8 +19,12 @@
package org.apache.flink.table.descriptors
/**
* A class that adds a set of string-based, normalized properties for describing a
* table source or table sink.
* A class that adds a set of string-based, normalized properties for describing DDL information.
*
* Typical characteristics of a descriptor are:
* - descriptors have a default constructor and a default 'apply()' method for Scala
* - descriptors themselves contain very little logic
* - corresponding validators validate the correctness (goal: have a single point of validation)
*/
abstract class Descriptor {
......
......@@ -249,7 +249,7 @@ class DescriptorProperties(normalizeKeys: Boolean = true) {
/**
* Returns a big decimal value under the given existing key.
*/
def getBigDecimal(key: String): BigDecimal = {
def getBigDecimal(key: String): JBigDecimal = {
getOptionalBigDecimal(key).orElseThrow(exceptionSupplier(key))
}
......@@ -292,7 +292,7 @@ class DescriptorProperties(normalizeKeys: Boolean = true) {
}
/**
* Returns a double value under the given key if it exists.
* Returns a double value under the given existing key.
*/
def getDouble(key: String): Double = {
getOptionalDouble(key).orElseThrow(exceptionSupplier(key))
......@@ -307,7 +307,7 @@ class DescriptorProperties(normalizeKeys: Boolean = true) {
}
/**
* Returns a float value under the given key if it exists.
* Returns a float value under the given given existing key.
*/
def getFloat(key: String): Float = {
getOptionalFloat(key).orElseThrow(exceptionSupplier(key))
......@@ -489,13 +489,13 @@ class DescriptorProperties(normalizeKeys: Boolean = true) {
// filter for index
val escapedKey = Pattern.quote(key)
val pattern = Pattern.compile(s"$escapedKey\\.(\\d+)\\.(.*)")
val pattern = Pattern.compile(s"$escapedKey\\.(\\d+)(\\.)?(.*)")
// extract index and property keys
val indexes = properties.keys.flatMap { k =>
val matcher = pattern.matcher(k)
if (matcher.find()) {
Some((JInt.parseInt(matcher.group(1)), matcher.group(2)))
Some((JInt.parseInt(matcher.group(1)), matcher.group(3)))
} else {
None
}
......@@ -530,16 +530,6 @@ class DescriptorProperties(normalizeKeys: Boolean = true) {
list
}
/**
* Returns all properties under a group of array formed keys.
*
* E.g. constructor -> returns all constructor.# properties.
*/
def getListProperties(key: String): JMap[String, String] = {
val escapedKey = Pattern.quote(key)
properties.filterKeys(k => k.matches(s"$escapedKey\\.\\d+")).asJava
}
/**
* Returns all properties under a given key that contains an index in between.
*
......@@ -744,7 +734,8 @@ class DescriptorProperties(normalizeKeys: Boolean = true) {
*/
def validateBigDecimal(
key: String,
isOptional: Boolean): Unit = {
isOptional: Boolean)
: Unit = {
if (!properties.contains(key)) {
if (!isOptional) {
......@@ -767,14 +758,14 @@ class DescriptorProperties(normalizeKeys: Boolean = true) {
def validateBigDecimal(
key: String,
isOptional: Boolean,
min: BigDecimal, // inclusive
max: BigDecimal) // inclusive
: Unit = {
min: JBigDecimal, // inclusive
max: JBigDecimal) // inclusive
: Unit = {
validateComparable(
key,
isOptional,
min.bigDecimal,
max.bigDecimal,
min,
max,
(value: String) => new JBigDecimal(value))
}
......@@ -783,7 +774,7 @@ class DescriptorProperties(normalizeKeys: Boolean = true) {
*/
def validateByte(
key: String,
isOptional: Boolean): Unit = validateDouble(key, isOptional, Byte.MinValue, Byte.MaxValue)
isOptional: Boolean): Unit = validateByte(key, isOptional, Byte.MinValue, Byte.MaxValue)
/**
* Validates a byte property. The boundaries are inclusive.
......@@ -792,7 +783,7 @@ class DescriptorProperties(normalizeKeys: Boolean = true) {
key: String,
isOptional: Boolean,
min: Byte) // inclusive
: Unit = {
: Unit = {
validateByte(key, isOptional, min, Byte.MaxValue)
}
......@@ -804,7 +795,7 @@ class DescriptorProperties(normalizeKeys: Boolean = true) {
isOptional: Boolean,
min: Byte, // inclusive
max: Byte) // inclusive
: Unit = {
: Unit = {
validateComparable(key, isOptional, new JByte(min), new JByte(max), JByte.valueOf)
}
......@@ -822,7 +813,7 @@ class DescriptorProperties(normalizeKeys: Boolean = true) {
key: String,
isOptional: Boolean,
min: Float) // inclusive
: Unit = {
: Unit = {
validateFloat(key, isOptional, min, Float.MaxValue)
}
......@@ -834,7 +825,7 @@ class DescriptorProperties(normalizeKeys: Boolean = true) {
isOptional: Boolean,
min: Float, // inclusive
max: Float) // inclusive
: Unit = {
: Unit = {
validateComparable(key, isOptional, new JFloat(min), new JFloat(max), JFloat.valueOf)
}
......@@ -848,57 +839,26 @@ class DescriptorProperties(normalizeKeys: Boolean = true) {
/**
* Validates a short property. The boundaries are inclusive.
*/
def validateFloat(
def validateShort(
key: String,
isOptional: Boolean,
min: Short) // inclusive
: Unit = {
: Unit = {
validateShort(key, isOptional, min, Short.MaxValue)
}
/**
* Validates a float property. The boundaries are inclusive.
* Validates a short property. The boundaries are inclusive.
*/
def validateShort(
key: String,
isOptional: Boolean,
min: Short, // inclusive
max: Short) // inclusive
: Unit = {
: Unit = {
validateComparable(key, isOptional, new JShort(min), new JShort(max), JShort.valueOf)
}
/**
* Validates a property by first parsing the string value to a comparable object.
* The boundaries are inclusive.
*/
private def validateComparable[T <: Comparable[T]](
key: String,
isOptional: Boolean,
min: T,
max: T,
parseFunction: String => T)
: Unit = {
if (!properties.contains(key)) {
if (!isOptional) {
throw new ValidationException(s"Could not find required property '$key'.")
}
} else {
try {
val value = parseFunction(properties(key))
if (value.compareTo(min) < 0 || value.compareTo(max) > 0) {
throw new ValidationException(s"Property '$key' must be a ${min.getClass.getSimpleName}" +
s" value between $min and $max but was: ${properties(key)}")
}
} catch {
case _: NumberFormatException =>
throw new ValidationException(
s"Property '$key' must be a byte value but was: ${properties(key)}")
}
}
}
/**
* Validation for variable indexed properties.
*
......@@ -1211,6 +1171,38 @@ class DescriptorProperties(normalizeKeys: Boolean = true) {
}
}
}
/**
* Validates a property by first parsing the string value to a comparable object.
* The boundaries are inclusive.
*/
private def validateComparable[T <: Comparable[T]](
key: String,
isOptional: Boolean,
min: T,
max: T,
parseFunction: String => T)
: Unit = {
if (!properties.contains(key)) {
if (!isOptional) {
throw new ValidationException(s"Could not find required property '$key'.")
}
} else {
val typeName = min.getClass.getSimpleName
try {
val value = parseFunction(properties(key))
if (value.compareTo(min) < 0 || value.compareTo(max) > 0) {
throw new ValidationException(s"Property '$key' must be a $typeName" +
s" value between $min and $max but was: ${properties(key)}")
}
} catch {
case _: NumberFormatException =>
throw new ValidationException(
s"Property '$key' must be a $typeName value but was: ${properties(key)}")
}
}
}
}
object DescriptorProperties {
......
......@@ -19,34 +19,38 @@
package org.apache.flink.table.descriptors
/**
* Descriptor for describing a function that can be instantiated from somewhere (e.g., a class).
*
* @param name name of the function
* Descriptor for describing a function.
*/
class FunctionDescriptor(var name: String) extends Descriptor {
class FunctionDescriptor extends Descriptor {
var classDescriptor: Option[ClassType] = None
private var from: Option[String] = None
private var classInstance: Option[ClassInstance] = None
/**
* Uses the class provided by the descriptor to instantiate the function.
* Creates a function from a class description.
*/
def using(classDescriptor: ClassType): FunctionDescriptor = {
this.classDescriptor = Option(classDescriptor)
def fromClass(classType: ClassInstance): FunctionDescriptor = {
from = Some(FunctionDescriptorValidator.FROM_VALUE_CLASS)
this.classInstance = Option(classType)
this
}
def getDescriptorProperties: DescriptorProperties = {
val descriptorProperties = new DescriptorProperties()
addProperties(descriptorProperties)
descriptorProperties
}
/**
* Internal method for format properties conversion.
*/
override def addProperties(properties: DescriptorProperties): Unit = {
properties.putString(FunctionValidator.FUNCTION_NAME, name)
classDescriptor.foreach(_.addProperties(properties))
from.foreach(properties.putString(FunctionDescriptorValidator.FROM, _))
classInstance.foreach(_.addProperties(properties))
}
}
/**
* Descriptor for describing a function.
*/
object FunctionDescriptor {
def apply(name: String): FunctionDescriptor = new FunctionDescriptor(name)
/**
* Descriptor for describing a function.
*/
def apply(): FunctionDescriptor = new FunctionDescriptor()
}
......@@ -18,25 +18,40 @@
package org.apache.flink.table.descriptors
import org.apache.flink.table.functions.UserDefinedFunction
import org.apache.flink.table.api.ValidationException
import org.apache.flink.table.descriptors.DescriptorProperties.toJava
import org.apache.flink.table.descriptors.FunctionDescriptorValidator.FROM
import scala.collection.JavaConverters._
/**
* Validator for [[FunctionDescriptor]].
*/
class FunctionValidator extends DescriptorValidator {
class FunctionDescriptorValidator extends DescriptorValidator {
override def validate(properties: DescriptorProperties): Unit = {
properties.validateString(FunctionValidator.FUNCTION_NAME, isOptional = false, 1)
new ClassTypeValidator().validate(properties)
val classValidation = (_: String) => {
new ClassInstanceValidator().validate(properties)
}
// check for 'from'
if (properties.containsKey(FROM)) {
properties.validateEnum(
FROM,
isOptional = false,
Map(
FunctionDescriptorValidator.FROM_VALUE_CLASS -> toJava(classValidation)
).asJava
)
} else {
throw new ValidationException("Could not find 'from' property for function.")
}
}
}
object FunctionValidator {
val FUNCTION_NAME = "name"
object FunctionDescriptorValidator {
val FROM = "from"
val FROM_VALUE_CLASS = "class"
}
......@@ -25,10 +25,11 @@ package org.apache.flink.table.descriptors
abstract class HierarchyDescriptor extends Descriptor {
/**
* Internal method for properties conversion. All the property keys will be prefixed according
* to the level.
* Internal method for properties conversion. All the property keys will be prefixed with the
* given key prefix.
*/
private[flink] def addPropertiesWithPrefix(
keyPrefix: String, properties: DescriptorProperties): Unit
keyPrefix: String,
properties: DescriptorProperties): Unit
}
......@@ -20,16 +20,22 @@ package org.apache.flink.table.descriptors
/**
* Validator for a [[HierarchyDescriptor]].
*
* @param keyPrefix prefix to be added to every property before validation
*/
trait HierarchyDescriptorValidator extends DescriptorValidator{
abstract class HierarchyDescriptorValidator(keyPrefix: String) extends DescriptorValidator{
def validate(properties: DescriptorProperties): Unit = {
validateWithPrefix("", properties)
final def validate(properties: DescriptorProperties): Unit = {
validateWithPrefix(keyPrefix, properties)
}
/**
* Performs validation with a prefix for the keys.
*/
def validateWithPrefix(keyPrefix: String, properties: DescriptorProperties): Unit
protected def validateWithPrefix(keyPrefix: String, properties: DescriptorProperties): Unit
}
object HierarchyDescriptorValidator {
val EMPTY_PREFIX = ""
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.table.descriptors
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.table.api.ValidationException
import org.apache.flink.table.typeutils.TypeStringUtils
import org.apache.flink.util.Preconditions
/**
* Descriptor for a literal value. A literal value consists of a type and the actual value.
* Expression values are not allowed.
*
* If no type is set, the type is automatically derived from the value. Currently,
* this is supported for: BOOLEAN, INT, DOUBLE, and VARCHAR.
*
* Examples:
* - "true", "false" -> BOOLEAN
* - "42", "-5" -> INT
* - "2.0", "1234.222" -> DOUBLE
* - VARCHAR otherwise
*/
class LiteralValue extends HierarchyDescriptor {
var typeInfo: Option[String] = None
var value: Option[Any] = None
/**
* Type information of the literal value. E.g. Types.BOOLEAN.
*
* @param typeInfo type information describing the value
*/
def of(typeInfo: TypeInformation[_]): LiteralValue = {
Preconditions.checkNotNull("Type information must not be null.")
this.typeInfo = Option(TypeStringUtils.writeTypeInfo(typeInfo))
this
}
/**
* Type string of the literal value. E.g. "BOOLEAN".
*
* @param typeString type string describing the value
*/
def of(typeString: String): LiteralValue = {
this.typeInfo = Option(typeString)
this
}
/**
* Literal BOOLEAN value.
*
* @param value literal BOOLEAN value
*/
def value(value: Boolean): LiteralValue = {
this.value = Option(value)
this
}
/**
* Literal INT value.
*
* @param value literal INT value
*/
def value(value: Int): LiteralValue = {
this.value = Option(value)
this
}
/**
* Literal DOUBLE value.
*
* @param value literal DOUBLE value
*/
def value(value: Double): LiteralValue = {
this.value = Option(value)
this
}
/**
* Literal FLOAT value.
*
* @param value literal FLOAT value
*/
def value(value: Float): LiteralValue = {
this.value = Option(value)
this
}
/**
* Literal value either for an explicit VARCHAR type or automatically derived type.
*
* If no type is set, the type is automatically derived from the value. Currently,
* this is supported for: BOOLEAN, INT, DOUBLE, and VARCHAR.
*
* @param value literal value
*/
def value(value: String): LiteralValue = {
this.value = Option(value)
this
}
/**
* Literal BIGINT value.
*
* @param value literal BIGINT value
*/
def value(value: Long): LiteralValue = {
this.value = Option(value)
this
}
/**
* Literal TINYINT value.
*
* @param value literal TINYINT value
*/
def value(value: Byte): LiteralValue = {
this.value = Option(value)
this
}
/**
* Literal SMALLINT value.
*
* @param value literal SMALLINT value
*/
def value(value: Short): LiteralValue = {
this.value = Option(value)
this
}
/**
* Literal DECIMAL value.
*
* @param value literal DECIMAL value
*/
def value(value: java.math.BigDecimal): LiteralValue = {
this.value = Option(value)
this
}
/**
* Internal method for properties conversion.
*/
override private[flink] def addProperties(properties: DescriptorProperties): Unit = {
addPropertiesWithPrefix(HierarchyDescriptorValidator.EMPTY_PREFIX, properties)
}
/**
* Internal method for properties conversion.
*/
override private[flink] def addPropertiesWithPrefix(
keyPrefix: String,
properties: DescriptorProperties)
: Unit = {
typeInfo match {
// explicit type
case Some(ti) =>
properties.putString(keyPrefix + "type", ti)
value.foreach(v => properties.putString(keyPrefix + "value", String.valueOf(v)))
// implicit type
case None =>
// do not allow values in top-level
if (keyPrefix == HierarchyDescriptorValidator.EMPTY_PREFIX) {
throw new ValidationException(
"Literal values with implicit type must not exist in the top level of a hierarchy.")
}
value.foreach { v =>
properties.putString(keyPrefix.substring(0, keyPrefix.length - 1), String.valueOf(v))
}
}
}
}
/**
* Descriptor for a literal value. A literal value consists of a type and the actual value.
* Expression values are not allowed.
*
* If no type is set, the type is automatically derived from the value. Currently,
* this is supported for: BOOLEAN, INT, DOUBLE, and VARCHAR.
*
* Examples:
* - "true", "false" -> BOOLEAN
* - "42", "-5" -> INT
* - "2.0", "1234.222" -> DOUBLE
* - VARCHAR otherwise
*/
object LiteralValue {
/**
* Descriptor for a literal value. A literal value consists of a type and the actual value.
* Expression values are not allowed.
*
* If no type is set, the type is automatically derived from the value. Currently,
* this is supported for: BOOLEAN, INT, DOUBLE, and VARCHAR.
*
* Examples:
* - "true", "false" -> BOOLEAN
* - "42", "-5" -> INT
* - "2.0", "1234.222" -> DOUBLE
* - VARCHAR otherwise
*/
def apply() = new LiteralValue()
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.table.descriptors
import java.lang.{Boolean => JBoolean, Double => JDouble, Integer => JInt}
import org.apache.flink.table.api.{TableException, Types, ValidationException}
/**
* Validator for [[LiteralValue]].
*/
class LiteralValueValidator(keyPrefix: String) extends HierarchyDescriptorValidator(keyPrefix) {
/*
* TODO The following types need to be supported next.
* Types.SQL_DATE
* Types.SQL_TIME
* Types.SQL_TIMESTAMP
* Types.PRIMITIVE_ARRAY
* Types.OBJECT_ARRAY
* Types.MAP
* Types.MULTISET
* null
*/
override protected def validateWithPrefix(
keyPrefix: String,
properties: DescriptorProperties)
: Unit = {
val typeKey = s"$keyPrefix${LiteralValueValidator.TYPE}"
properties.validateType(typeKey, isOptional = true)
// explicit type
if (properties.containsKey(typeKey)) {
val valueKey = s"$keyPrefix${LiteralValueValidator.VALUE}"
val typeInfo = properties.getType(typeKey)
typeInfo match {
case Types.DECIMAL => properties.validateBigDecimal(valueKey, isOptional = false)
case Types.BOOLEAN => properties.validateBoolean(valueKey, isOptional = false)
case Types.BYTE => properties.validateByte(valueKey, isOptional = false)
case Types.DOUBLE => properties.validateDouble(valueKey, isOptional = false)
case Types.FLOAT => properties.validateFloat(valueKey, isOptional = false)
case Types.INT => properties.validateInt(valueKey, isOptional = false)
case Types.LONG => properties.validateLong(valueKey, isOptional = false)
case Types.SHORT => properties.validateShort(valueKey, isOptional = false)
case Types.STRING => properties.validateString(valueKey, isOptional = false)
case _ => throw TableException(s"Unsupported type '$typeInfo'.")
}
}
// implicit type
else {
// do not allow values in top-level
if (keyPrefix == HierarchyDescriptorValidator.EMPTY_PREFIX) {
throw new ValidationException(
"Literal values with implicit type must not exist in the top level of a hierarchy.")
}
properties.validateString(keyPrefix.substring(0, keyPrefix.length - 1), isOptional = false)
}
}
}
object LiteralValueValidator {
val TYPE = "type"
val VALUE = "value"
private val LITERAL_FALSE = "false"
private val LITERAL_TRUE = "true"
/**
* Gets the value according to the type and value strings.
*
* @param keyPrefix the prefix of the literal type key
* @param properties the descriptor properties
* @return the derived value
*/
def getValue(keyPrefix: String, properties: DescriptorProperties): Any = {
val typeKey = s"$keyPrefix$TYPE"
// explicit type
if (properties.containsKey(typeKey)) {
val valueKey = s"$keyPrefix$VALUE"
val typeInfo = properties.getType(typeKey)
typeInfo match {
case Types.DECIMAL => properties.getBigDecimal(valueKey)
case Types.BOOLEAN => properties.getBoolean(valueKey)
case Types.BYTE => properties.getByte(valueKey)
case Types.DOUBLE => properties.getDouble(valueKey)
case Types.FLOAT => properties.getFloat(valueKey)
case Types.INT => properties.getInt(valueKey)
case Types.LONG => properties.getLong(valueKey)
case Types.SHORT => properties.getShort(valueKey)
case Types.STRING => properties.getString(valueKey)
case _ => throw TableException(s"Unsupported type '${typeInfo.getTypeClass}'.")
}
}
// implicit type
else {
deriveTypeStringFromValueString(
properties.getString(keyPrefix.substring(0, keyPrefix.length - 1)))
}
}
/**
* Tries to derive a literal value from the given string value.
* The derivation priority for the types are BOOLEAN, INT, DOUBLE, and VARCHAR.
*
* @param valueString the string formatted value
* @return parsed value
*/
def deriveTypeStringFromValueString(valueString: String): AnyRef = {
if (valueString.equals(LITERAL_TRUE) || valueString.equals(LITERAL_FALSE)) {
JBoolean.valueOf(valueString)
} else {
try {
JInt.valueOf(valueString)
} catch {
case _: NumberFormatException =>
try {
JDouble.valueOf(valueString)
} catch {
case _: NumberFormatException =>
valueString
}
}
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.table.descriptors
import java.lang.{Boolean => JBoolean, Byte => JByte, Double => JDouble, Float => JFloat, Integer => JInt, Long => JLong, Short => JShort}
import java.math.{BigDecimal => JBigDecimal}
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.table.api.{TableException, Types}
import org.apache.flink.table.typeutils.TypeStringUtils
/**
* Validator for [[PrimitiveType]].
*/
class PrimitiveTypeValidator extends HierarchyDescriptorValidator {
/*
* TODO The following types need to be supported next.
* Types.SQL_DATE
* Types.SQL_TIME
* Types.SQL_TIMESTAMP
* Types.INTERVAL_MONTHS
* Types.INTERVAL_MILLIS
* Types.PRIMITIVE_ARRAY
* Types.OBJECT_ARRAY
* Types.MAP
* Types.MULTISET
*/
override def validateWithPrefix(keyPrefix: String, properties: DescriptorProperties): Unit = {
val typeKey = s"$keyPrefix${PrimitiveTypeValidator.PRIMITIVE_TYPE}"
val valueKey = s"$keyPrefix${PrimitiveTypeValidator.PRIMITIVE_VALUE}"
properties.validateType(typeKey, isOptional = false)
val typeInfo: TypeInformation[_] =
properties.getType(s"$keyPrefix${PrimitiveTypeValidator.PRIMITIVE_TYPE}")
typeInfo match {
case Types.DECIMAL => properties.validateBigDecimal(valueKey, isOptional = false)
case Types.BOOLEAN => properties.validateBoolean(valueKey, isOptional = false)
case Types.BYTE => properties.validateByte(valueKey, isOptional = false)
case Types.DOUBLE => properties.validateDouble(valueKey, isOptional = false)
case Types.FLOAT => properties.validateFloat(valueKey, isOptional = false)
case Types.INT => properties.validateInt(valueKey, isOptional = false)
case Types.LONG => properties.validateLong(valueKey, isOptional = false)
case Types.SHORT => properties.validateShort(valueKey, isOptional = false)
case Types.STRING => properties.validateString(valueKey, isOptional = false)
case _ => throw TableException(s"Unsupported type ${typeInfo.getTypeClass}.")
}
}
}
object PrimitiveTypeValidator {
val PRIMITIVE_TYPE = "type"
val PRIMITIVE_VALUE = "value"
private val LITERAL_FALSE = "false"
private val LITERAL_TRUE = "true"
/**
* Derives the value according to the type and value strings.
*
* @param keyPrefix the prefix of the primitive type key
* @param properties the descriptor properties
* @return the derived value
*/
def derivePrimitiveValue(keyPrefix: String, properties: DescriptorProperties): Any = {
val typeInfo =
properties.getType(s"$keyPrefix$PRIMITIVE_TYPE")
val valueKey = s"$keyPrefix$PRIMITIVE_VALUE"
val value = typeInfo match {
case Types.DECIMAL => properties.getBigDecimal(valueKey)
case Types.BOOLEAN => properties.getBoolean(valueKey)
case Types.BYTE => properties.getByte(valueKey)
case Types.DOUBLE => properties.getDouble(valueKey)
case Types.FLOAT => properties.getFloat(valueKey)
case Types.INT => properties.getInt(valueKey)
case Types.LONG => properties.getLong(valueKey)
case Types.SHORT => properties.getShort(valueKey)
case Types.STRING => properties.getString(valueKey)
case _ => throw TableException(s"Unsupported type ${typeInfo.getTypeClass}.")
}
value
}
/**
* Derives the actually value with the type information and string formatted value.
*/
def deriveTypeAndValueStr[T](typeInfo: TypeInformation[T], valueStr: String): T = {
typeInfo match {
case Types.DECIMAL => new JBigDecimal(valueStr).asInstanceOf[T]
case Types.BOOLEAN => JBoolean.parseBoolean(valueStr).asInstanceOf[T]
case Types.BYTE => JByte.parseByte(valueStr).asInstanceOf[T]
case Types.DOUBLE => JDouble.parseDouble(valueStr).asInstanceOf[T]
case Types.FLOAT => JFloat.parseFloat(valueStr).asInstanceOf[T]
case Types.INT => JInt.parseInt(valueStr).asInstanceOf[T]
case Types.LONG => JLong.parseLong(valueStr).asInstanceOf[T]
case Types.SHORT => JShort.parseShort(valueStr).asInstanceOf[T]
case Types.STRING => valueStr.asInstanceOf[T]
case _ => throw TableException(s"Unsupported type ${typeInfo.getTypeClass}.")
}
}
/**
* Tries to derive the type string from the given string value.
* The derive priority for the types are BOOLEAN, INT, DOUBLE, and VARCHAR.
*
* @param valueStr the string formatted value
* @return the type string of the given value
*/
def deriveTypeStrFromValueStr(valueStr: String): String = {
if (valueStr.equals(LITERAL_TRUE) || valueStr.equals(LITERAL_FALSE)) {
TypeStringUtils.BOOLEAN.key
} else {
try {
valueStr.toInt
TypeStringUtils.INT.key
} catch {
case _: NumberFormatException =>
try {
valueStr.toDouble
TypeStringUtils.DOUBLE.key
} catch {
case _: NumberFormatException =>
TypeStringUtils.STRING.key
}
}
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.table.descriptors.service
import org.apache.flink.table.api.TableException
import org.apache.flink.table.descriptors.{ClassTypeValidator, DescriptorProperties, FunctionDescriptor, FunctionValidator, PrimitiveTypeValidator}
import org.apache.flink.table.functions.UserDefinedFunction
import scala.collection.mutable.ArrayBuffer
import scala.collection.JavaConversions._
/**
* Utils that serve [[FunctionDescriptor]].
*/
object FunctionService {
/**
* Generates a user-defined function with the given properties.
*
* @param properties the descriptor properties that belongs to a [[FunctionDescriptor]]
* @param classLoader the class loader to load the function and its parameter's classes
* @return the generated user-defined function
*/
def generateUserDefinedFunction(
properties: DescriptorProperties,
classLoader: ClassLoader): UserDefinedFunction = {
new FunctionValidator().validate(properties)
generateInstance[UserDefinedFunction]("", properties, classLoader)
}
/**
* Recursively generate an instance of a class according the given properties.
*
* @param keyPrefix the prefix to fetch properties
* @param descriptorProps the descriptor properties that contains the class type information
* @param classLoader the class loader to load the class
* @tparam T type fo the generated instance
* @return an instance of the class
*/
def generateInstance[T](
keyPrefix: String,
descriptorProps: DescriptorProperties,
classLoader: ClassLoader): T = {
val constructorPrefix = s"$keyPrefix${ClassTypeValidator.CONSTRUCTOR}"
val constructorProps =
descriptorProps.getVariableIndexedProperties(constructorPrefix, List())
var i = 0
val typeValueList: ArrayBuffer[(Class[_], Any)] = new ArrayBuffer
while (i < constructorProps.size()) {
if (constructorProps(i).containsKey(PrimitiveTypeValidator.PRIMITIVE_TYPE)) {
val primitiveVal = PrimitiveTypeValidator
.derivePrimitiveValue(s"$constructorPrefix.$i.", descriptorProps)
typeValueList += ((primitiveVal.getClass, primitiveVal))
} else if (constructorProps(i).containsKey(ClassTypeValidator.CLASS)) {
val typeValuePair = (
Class.forName(
descriptorProps.getString(constructorProps(i).get(ClassTypeValidator.CLASS))),
generateInstance(s"$constructorPrefix.$i.", descriptorProps, classLoader))
typeValueList += typeValuePair
}
i += 1
}
val clazz = classLoader
.loadClass(descriptorProps.getString(s"$keyPrefix${ClassTypeValidator.CLASS}"))
val constructor = clazz.getConstructor(typeValueList.map(_._1): _*)
if (null == constructor) {
throw TableException(s"Cannot find a constructor with parameter types " +
s"${typeValueList.map(_._1)} for ${clazz.getName}")
}
constructor.newInstance(typeValueList.map(_._2.asInstanceOf[AnyRef]): _*).asInstanceOf[T]
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.table.functions
import org.apache.flink.table.api.ValidationException
import org.apache.flink.table.descriptors._
import org.apache.flink.table.util.Logging
import scala.collection.JavaConversions._
import scala.collection.mutable.ArrayBuffer
/**
* Service for creating configured instances of [[UserDefinedFunction]] using a
* [[FunctionDescriptor]].
*/
object FunctionService extends Logging {
/**
* Creates a user-defined function with the given properties and the current thread's
* context class loader.
*
* @param descriptor the descriptor that describes a function
* @return the generated user-defined function
*/
def createFunction(descriptor: FunctionDescriptor): UserDefinedFunction = {
createFunction(descriptor, Thread.currentThread().getContextClassLoader)
}
/**
* Creates a user-defined function with the given properties.
*
* @param descriptor the descriptor that describes a function
* @param classLoader the class loader to load the function and its parameter's classes
* @return the generated user-defined function
*/
def createFunction(
descriptor: FunctionDescriptor,
classLoader: ClassLoader)
: UserDefinedFunction = {
val descriptorProperties = new DescriptorProperties(true)
descriptor.addProperties(descriptorProperties)
// validate
new FunctionDescriptorValidator().validate(descriptorProperties)
// instantiate
val (instanceClass, instance) = generateInstance[AnyRef](
HierarchyDescriptorValidator.EMPTY_PREFIX,
descriptorProperties,
classLoader)
if (!classOf[UserDefinedFunction].isAssignableFrom(instanceClass)) {
throw new ValidationException(
s"Instantiated class '${instanceClass.getName}' is not a user-defined function.")
}
instance.asInstanceOf[UserDefinedFunction]
}
/**
* Recursively generate an instance of a class according the given properties.
*
* @param keyPrefix the prefix to fetch properties
* @param descriptorProperties the descriptor properties that contains the class type information
* @param classLoader the class loader to load the class
* @tparam T type fo the generated instance
* @return an instance of the class
*/
private def generateInstance[T](
keyPrefix: String,
descriptorProperties: DescriptorProperties,
classLoader: ClassLoader)
: (Class[T], T) = {
val instanceClassName = descriptorProperties.getString(
s"$keyPrefix${ClassInstanceValidator.CLASS}")
val instanceClass = try {
Class
.forName(
descriptorProperties.getString(s"$keyPrefix${ClassInstanceValidator.CLASS}"),
true,
classLoader)
.asInstanceOf[Class[T]]
} catch {
case e: Exception =>
// only log the cause to have clean error messages
val msg = s"Could not find class '$instanceClassName' for creating an instance."
LOG.error(msg, e)
throw new ValidationException(msg)
}
val constructorPrefix = s"$keyPrefix${ClassInstanceValidator.CONSTRUCTOR}"
val constructorProps = descriptorProperties
.getVariableIndexedProperties(constructorPrefix, List())
var i = 0
val parameterList: ArrayBuffer[(Class[_], Any)] = new ArrayBuffer
while (i < constructorProps.size()) {
// nested class instance
if (constructorProps(i).containsKey(ClassInstanceValidator.CLASS)) {
parameterList += generateInstance(
s"$constructorPrefix.$i.",
descriptorProperties,
classLoader)
}
// literal value
else {
val literalValue = LiteralValueValidator
.getValue(s"$constructorPrefix.$i.", descriptorProperties)
parameterList += ((literalValue.getClass, literalValue))
}
i += 1
}
val constructor = try {
instanceClass.getConstructor(parameterList.map(_._1): _*)
} catch {
case e: Exception =>
// only log the cause to have clean error messages
val msg = s"Cannot find a public constructor with parameter types " +
s"'${parameterList.map(_._1.getName).mkString(", ")}' for '$instanceClassName'."
LOG.error(msg, e)
throw new ValidationException(msg)
}
val instance = try {
constructor.newInstance(parameterList.map(_._2.asInstanceOf[AnyRef]): _*)
} catch {
case e: Exception =>
// only log the cause to have clean error messages
val msg = s"Error while creating instance of class '$instanceClassName' " +
s"with parameter types '${parameterList.map(_._1.getName).mkString(", ")}'."
LOG.error(msg, e)
throw new ValidationException(msg)
}
(instanceClass, instance)
}
}
......@@ -18,40 +18,52 @@
package org.apache.flink.table.descriptors
import java.util.{List => JList, Map => JMap, Arrays => JArrays}
import java.util.{Arrays => JArrays, List => JList, Map => JMap}
import org.apache.flink.api.common.typeinfo.BasicTypeInfo
import org.apache.flink.table.api.ValidationException
import org.apache.flink.table.api.{Types, ValidationException}
import org.junit.Test
import scala.collection.JavaConverters._
class ClassTypeTest extends DescriptorTestBase {
class ClassInstanceTest extends DescriptorTestBase {
@Test(expected = classOf[ValidationException])
def testMissingClass(): Unit = {
removePropertyAndVerify(descriptors().get(0), ClassTypeValidator.CLASS)
removePropertyAndVerify(descriptors().get(0), ClassInstanceValidator.CLASS)
}
override def descriptors(): JList[Descriptor] = {
val desc1 = ClassType("class1")
.param(BasicTypeInfo.LONG_TYPE_INFO, "1")
.param(
ClassType("class2")
.param(
ClassType("class3")
.param("StarryNight")
.param(
ClassType("class4"))))
.param(2L)
val desc1 = ClassInstance()
.of("class1")
.parameter(Types.LONG, "1")
.parameter(
ClassInstance()
.of("class2")
.parameter(
ClassInstance()
.of("class3")
.parameterString("StarryNight")
.parameter(
ClassInstance()
.of("class4"))))
.parameter(2L)
val desc2 = ClassType().of("class2")
val desc2 = ClassInstance()
.of("class2")
JArrays.asList(desc1, desc2)
val desc3 = ClassInstance()
.of("org.example.Function")
.parameter(42)
.parameter(2.asInstanceOf[Byte])
.parameter(new java.math.BigDecimal("23.22"))
.parameter(222.2222)
.parameter(222.2222f)
JArrays.asList(desc1, desc2, desc3)
}
override def validator(): DescriptorValidator = {
new ClassTypeValidator()
new ClassInstanceValidator()
}
override def properties(): JList[JMap[String, String]] = {
......@@ -61,8 +73,7 @@ class ClassTypeTest extends DescriptorTestBase {
"constructor.0.value" -> "1",
"constructor.1.class" -> "class2",
"constructor.1.constructor.0.class" -> "class3",
"constructor.1.constructor.0.constructor.0.type" -> "VARCHAR",
"constructor.1.constructor.0.constructor.0.value" -> "StarryNight",
"constructor.1.constructor.0.constructor.0" -> "StarryNight",
"constructor.1.constructor.0.constructor.1.class" -> "class4",
"constructor.2.type" -> "BIGINT",
"constructor.2.value" -> "2"
......@@ -72,6 +83,20 @@ class ClassTypeTest extends DescriptorTestBase {
"class" -> "class2"
)
JArrays.asList(props1.asJava, props2.asJava)
val props3 = Map(
"class" -> "org.example.Function",
"constructor.0.type" -> "INT",
"constructor.0.value" -> "42",
"constructor.1.type" -> "TINYINT",
"constructor.1.value" -> "2",
"constructor.2.type" -> "DECIMAL",
"constructor.2.value" -> "23.22",
"constructor.3.type" -> "DOUBLE",
"constructor.3.value" -> "222.2222",
"constructor.4.type" -> "FLOAT",
"constructor.4.value" -> "222.2222"
)
JArrays.asList(props1.asJava, props2.asJava, props3.asJava)
}
}
......@@ -27,6 +27,9 @@ import org.junit.Test
import scala.collection.JavaConverters._
/**
* Tests for [[Csv]].
*/
class CsvTest extends DescriptorTestBase {
@Test(expected = classOf[ValidationException])
......
......@@ -18,47 +18,41 @@
package org.apache.flink.table.descriptors
import java.util.{List => JList, Map => JMap, Arrays => JArrays}
import org.apache.flink.table.api.ValidationException
import org.junit.Test
import java.util.{Arrays => JArrays, List => JList, Map => JMap}
import scala.collection.JavaConverters._
class FunctionTest extends DescriptorTestBase {
@Test(expected = classOf[ValidationException])
def testMissingName(): Unit = {
removePropertyAndVerify(descriptors().get(0), "name")
}
/**
* Tests for [[FunctionDescriptor]].
*/
class FunctionDescriptorTest extends DescriptorTestBase {
override def descriptors(): JList[Descriptor] = {
val desc1 = FunctionDescriptor("func1")
.using(
ClassType("another.class")
val desc1 = FunctionDescriptor()
.fromClass(
ClassInstance()
.of("my.class")
.param("INT", "1")
.param(
ClassType()
.parameter("INT", "1")
.parameter(
ClassInstance()
.of("my.class2")
.strParam("true")))
.parameterString("true")))
JArrays.asList(desc1)
}
override def validator(): DescriptorValidator = {
new FunctionValidator()
new FunctionDescriptorValidator()
}
override def properties(): JList[JMap[String, String]] = {
val props1 = Map(
"name" -> "func1",
"from" -> "class",
"class" -> "my.class",
"constructor.0.type" -> "INT",
"constructor.0.value" -> "1",
"constructor.1.class" -> "my.class2",
"constructor.1.constructor.0.type" -> "BOOLEAN",
"constructor.1.constructor.0.value" -> "true"
"constructor.1.constructor.0" -> "true"
)
JArrays.asList(props1.asJava)
}
......
......@@ -26,28 +26,35 @@ import org.junit.Test
import scala.collection.JavaConverters._
class PrimitiveTypeTest extends DescriptorTestBase {
/**
* Tests for [[LiteralValue]].
*/
class LiteralValueTest extends DescriptorTestBase {
@Test(expected = classOf[ValidationException])
def testMissingType(): Unit = {
removePropertyAndVerify(descriptors().get(0), PrimitiveTypeValidator.PRIMITIVE_TYPE)
def testMissingValue(): Unit = {
removePropertyAndVerify(descriptors().get(0), LiteralValueValidator.VALUE)
}
@Test(expected = classOf[ValidationException])
def testMissingValue(): Unit = {
removePropertyAndVerify(descriptors().get(0), PrimitiveTypeValidator.PRIMITIVE_VALUE)
def testWrongValue(): Unit = {
// byte expected
addPropertyAndVerify(descriptors().get(2), LiteralValueValidator.VALUE, "12.222")
}
override def descriptors(): JList[Descriptor] = {
val bigDecimalDesc = PrimitiveType().of(Types.DECIMAL).value(new JBigDecimal(1))
val booleanDesc = PrimitiveType().of(Types.BOOLEAN).value(false)
val byteDesc = PrimitiveType().of(Types.BYTE).value(4.asInstanceOf[Byte])
val doubleDesc = PrimitiveType().of(Types.DOUBLE).value(7.0)
val floatDesc = PrimitiveType().of(Types.FLOAT).value(8f)
val intDesc = PrimitiveType().of(Types.INT).value(9)
val longDesc = PrimitiveType().of(Types.LONG).value(10L)
val shortDesc = PrimitiveType().of(Types.SHORT).value(11.asInstanceOf[Short])
val stringDesc = PrimitiveType().of(Types.STRING).value("12")
val bigDecimalDesc = LiteralValue().of(Types.DECIMAL).value(new JBigDecimal(1))
val booleanDesc = LiteralValue().of(Types.BOOLEAN).value(false)
val byteDesc = LiteralValue().of(Types.BYTE).value(4.asInstanceOf[Byte])
val doubleDesc = LiteralValue().of(Types.DOUBLE).value(7.0)
val floatDesc = LiteralValue().of(Types.FLOAT).value(8.0f)
val intDesc = LiteralValue().of(Types.INT).value(9)
val longDesc = LiteralValue().of(Types.LONG).value(10L)
val shortDesc = LiteralValue().of(Types.SHORT).value(11.asInstanceOf[Short])
val stringDesc = LiteralValue().of(Types.STRING).value("12")
// for tests with implicit type see ClassInstanceTest because literal value are not
// supported in the top level of a hierarchy
JArrays.asList(
bigDecimalDesc,
......@@ -62,7 +69,7 @@ class PrimitiveTypeTest extends DescriptorTestBase {
}
override def validator(): DescriptorValidator = {
new PrimitiveTypeValidator()
new LiteralValueValidator(HierarchyDescriptorValidator.EMPTY_PREFIX)
}
override def properties(): JList[JMap[String, String]] = {
......@@ -112,6 +119,7 @@ class PrimitiveTypeTest extends DescriptorTestBase {
intProps.asJava,
longDesc.asJava,
shortDesc.asJava,
stringDesc.asJava)
stringDesc.asJava
)
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.table.functions
import org.apache.flink.table.api.ValidationException
import org.apache.flink.table.descriptors.{ClassInstance, FunctionDescriptor}
import org.apache.flink.table.functions.FunctionServiceTest.{MultiArgClass, NoArgClass, OneArgClass, PrivateClass}
import org.junit.Assert.{assertEquals, assertFalse}
import org.junit.Test
/**
* Tests for [[FunctionService]].
*/
class FunctionServiceTest {
@Test(expected = classOf[ValidationException])
def testWrongArgsFunctionCreation(): Unit = {
val descriptor = FunctionDescriptor()
.fromClass(ClassInstance()
.of(classOf[NoArgClass].getName)
.parameterString("12"))
FunctionService.createFunction(descriptor)
}
@Test(expected = classOf[ValidationException])
def testPrivateFunctionCreation(): Unit = {
val descriptor = FunctionDescriptor()
.fromClass(ClassInstance().of(classOf[PrivateClass].getName))
FunctionService.createFunction(descriptor)
}
@Test(expected = classOf[ValidationException])
def testInvalidClassFunctionCreation(): Unit = {
val descriptor = FunctionDescriptor()
.fromClass(ClassInstance().of("this.class.does.not.exist"))
FunctionService.createFunction(descriptor)
}
@Test(expected = classOf[ValidationException])
def testNotFunctionClassFunctionCreation(): Unit = {
val descriptor = FunctionDescriptor()
.fromClass(ClassInstance()
.of(classOf[java.lang.String].getName)
.parameterString("hello"))
FunctionService.createFunction(descriptor)
}
@Test
def testNoArgFunctionCreation(): Unit = {
val descriptor = FunctionDescriptor()
.fromClass(ClassInstance().of(classOf[NoArgClass].getName))
assertEquals(classOf[NoArgClass], FunctionService.createFunction(descriptor).getClass)
}
@Test
def testOneArgFunctionCreation(): Unit = {
val descriptor = FunctionDescriptor()
.fromClass(
ClassInstance()
.of(classOf[OneArgClass].getName)
.parameterString("false"))
val actualFunction = FunctionService.createFunction(descriptor)
assertEquals(classOf[OneArgClass], actualFunction.getClass)
assertFalse(actualFunction.asInstanceOf[OneArgClass].field)
}
@Test
def testMultiArgFunctionCreation(): Unit = {
val descriptor = FunctionDescriptor()
.fromClass(
ClassInstance()
.of(classOf[MultiArgClass].getName)
.parameter(new java.math.BigDecimal("12.0003"))
.parameter(ClassInstance()
.of(classOf[java.math.BigInteger].getName)
.parameter("111111111111111111111111111111111")))
val actualFunction = FunctionService.createFunction(descriptor)
assertEquals(classOf[MultiArgClass], actualFunction.getClass)
assertEquals(
new java.math.BigDecimal("12.0003"),
actualFunction.asInstanceOf[MultiArgClass].field1)
assertEquals(
new java.math.BigInteger("111111111111111111111111111111111"),
actualFunction.asInstanceOf[MultiArgClass].field2)
}
}
object FunctionServiceTest {
class NoArgClass
extends ScalarFunction
class OneArgClass(val field: java.lang.Boolean)
extends ScalarFunction
class MultiArgClass(val field1: java.math.BigDecimal, val field2: java.math.BigInteger)
extends ScalarFunction
class PrivateClass private() extends ScalarFunction
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册