提交 695bc56a 编写于 作者: T Timo Walther

[FLINK-8858] [sql-client] Add support for INSERT INTO in SQL Client

This closes #6332.
上级 a7be2e18
......@@ -177,16 +177,16 @@ under the License.
<version>2.4</version>
<executions>
<execution>
<id>create-table-source-factory-jar</id>
<id>create-table-factories-jar</id>
<phase>process-test-classes</phase>
<goals>
<goal>single</goal>
</goals>
<configuration>
<finalName>table-source-factory</finalName>
<finalName>table-factories</finalName>
<attach>false</attach>
<descriptors>
<descriptor>src/test/assembly/test-table-source-factory.xml</descriptor>
<descriptor>src/test/assembly/test-table-factories.xml</descriptor>
</descriptors>
</configuration>
</execution>
......
......@@ -97,14 +97,34 @@ public class SqlClient {
// add shutdown hook
Runtime.getRuntime().addShutdownHook(new EmbeddedShutdownThread(context, executor));
// start CLI
final CliClient cli = new CliClient(context, executor);
cli.open();
// do the actual work
openCli(context, executor);
} else {
throw new SqlClientException("Gateway mode is not supported yet.");
}
}
/**
* Opens the CLI client for executing SQL statements.
*
* @param context session context
* @param executor executor
*/
private void openCli(SessionContext context, Executor executor) {
final CliClient cli = new CliClient(context, executor);
// interactive CLI mode
if (options.getUpdateStatement() == null) {
cli.open();
}
// execute single update statement
else {
final boolean success = cli.submitUpdate(options.getUpdateStatement());
if (!success) {
throw new SqlClientException("Could not submit given SQL update statement to cluster.");
}
}
}
// --------------------------------------------------------------------------------------------
private static void shutdown(SessionContext context, Executor executor) {
......
......@@ -20,9 +20,9 @@ package org.apache.flink.table.client.cli;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.client.SqlClientException;
import org.apache.flink.table.client.cli.SqlCommandParser.SqlCommand;
import org.apache.flink.table.client.cli.SqlCommandParser.SqlCommandCall;
import org.apache.flink.table.client.gateway.Executor;
import org.apache.flink.table.client.gateway.ProgramTargetDescriptor;
import org.apache.flink.table.client.gateway.ResultDescriptor;
import org.apache.flink.table.client.gateway.SessionContext;
import org.apache.flink.table.client.gateway.SqlExecutionException;
......@@ -50,6 +50,7 @@ import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.List;
import java.util.Map;
import java.util.Optional;
/**
* SQL CLI client.
......@@ -85,6 +86,9 @@ public class CliClient {
terminal = TerminalBuilder.builder()
.name(CliStrings.CLI_NAME)
.build();
// make space from previous output and test the writer
terminal.writer().println();
terminal.writer().flush();
} catch (IOException e) {
throw new SqlClientException("Error opening command line interface.", e);
}
......@@ -149,6 +153,9 @@ public class CliClient {
return executor;
}
/**
* Opens the interactive CLI shell.
*/
public void open() {
isRunning = true;
......@@ -173,55 +180,91 @@ public class CliClient {
if (line == null || line.equals("")) {
continue;
}
final Optional<SqlCommandCall> cmdCall = parseCommand(line);
cmdCall.ifPresent(this::callCommand);
}
}
final SqlCommandCall cmdCall = SqlCommandParser.parse(line);
if (cmdCall == null) {
terminal.writer().println(CliStrings.messageError(CliStrings.MESSAGE_UNKNOWN_SQL));
continue;
}
/**
* Submits a SQL update statement and prints status information and/or errors on the terminal.
*
* @param statement SQL update statement
* @return flag to indicate if the submission was successful or not
*/
public boolean submitUpdate(String statement) {
terminal.writer().println(CliStrings.messageInfo(CliStrings.MESSAGE_WILL_EXECUTE).toAnsi());
terminal.writer().println(new AttributedString(statement).toString());
terminal.flush();
final Optional<SqlCommandCall> parsedStatement = parseCommand(statement);
// only support INSERT INTO
return parsedStatement.map(cmdCall -> {
switch (cmdCall.command) {
case QUIT:
case EXIT:
callQuit(cmdCall);
break;
case CLEAR:
callClear(cmdCall);
break;
case RESET:
callReset(cmdCall);
break;
case SET:
callSet(cmdCall);
break;
case HELP:
callHelp(cmdCall);
break;
case SHOW_TABLES:
callShowTables(cmdCall);
break;
case SHOW_FUNCTIONS:
callShowFunctions(cmdCall);
break;
case DESCRIBE:
callDescribe(cmdCall);
break;
case EXPLAIN:
callExplain(cmdCall);
break;
case SELECT:
callSelect(cmdCall);
break;
case SOURCE:
callSource(cmdCall);
break;
case INSERT_INTO:
return callInsertInto(cmdCall);
default:
terminal.writer().println(CliStrings.messageError(CliStrings.MESSAGE_UNSUPPORTED_SQL).toAnsi());
terminal.flush();
return false;
}
}
}).orElse(false);
}
// --------------------------------------------------------------------------------------------
private Optional<SqlCommandCall> parseCommand(String line) {
final Optional<SqlCommandCall> parsedLine = SqlCommandParser.parse(line);
if (!parsedLine.isPresent()) {
terminal.writer().println(CliStrings.messageError(CliStrings.MESSAGE_UNKNOWN_SQL).toAnsi());
terminal.flush();
}
return parsedLine;
}
private void callCommand(SqlCommandCall cmdCall) {
switch (cmdCall.command) {
case QUIT:
case EXIT:
callQuit(cmdCall);
break;
case CLEAR:
callClear(cmdCall);
break;
case RESET:
callReset(cmdCall);
break;
case SET:
callSet(cmdCall);
break;
case HELP:
callHelp(cmdCall);
break;
case SHOW_TABLES:
callShowTables(cmdCall);
break;
case SHOW_FUNCTIONS:
callShowFunctions(cmdCall);
break;
case DESCRIBE:
callDescribe(cmdCall);
break;
case EXPLAIN:
callExplain(cmdCall);
break;
case SELECT:
callSelect(cmdCall);
break;
case INSERT_INTO:
callInsertInto(cmdCall);
break;
case SOURCE:
callSource(cmdCall);
break;
default:
throw new SqlClientException("Unsupported command: " + cmdCall.command);
}
}
private void callQuit(SqlCommandCall cmdCall) {
terminal.writer().println(CliStrings.MESSAGE_QUIT);
terminal.flush();
......@@ -354,6 +397,22 @@ public class CliClient {
}
}
private boolean callInsertInto(SqlCommandCall cmdCall) {
terminal.writer().println(CliStrings.messageInfo(CliStrings.MESSAGE_SUBMITTING_STATEMENT).toAnsi());
terminal.flush();
try {
final ProgramTargetDescriptor programTarget = executor.executeUpdate(context, cmdCall.operands[0]);
terminal.writer().println(CliStrings.messageInfo(CliStrings.MESSAGE_STATEMENT_SUBMITTED).toAnsi());
terminal.writer().println(programTarget.toString());
terminal.flush();
} catch (SqlExecutionException e) {
printException(e);
return false;
}
return true;
}
private void callSource(SqlCommandCall cmdCall) {
final String pathString = cmdCall.operands[0];
......@@ -384,7 +443,8 @@ public class CliClient {
terminal.flush();
// try to run it
callSelect(new SqlCommandCall(SqlCommand.SELECT, new String[] { stmt }));
final Optional<SqlCommandCall> call = parseCommand(stmt);
call.ifPresent(this::callCommand);
}
// --------------------------------------------------------------------------------------------
......
......@@ -33,14 +33,23 @@ public class CliOptions {
private final URL defaults;
private final List<URL> jars;
private final List<URL> libraryDirs;
private final String updateStatement;
public CliOptions(boolean isPrintHelp, String sessionId, URL environment, URL defaults, List<URL> jars, List<URL> libraryDirs) {
public CliOptions(
boolean isPrintHelp,
String sessionId,
URL environment,
URL defaults,
List<URL> jars,
List<URL> libraryDirs,
String updateStatement) {
this.isPrintHelp = isPrintHelp;
this.sessionId = sessionId;
this.environment = environment;
this.defaults = defaults;
this.jars = jars;
this.libraryDirs = libraryDirs;
this.updateStatement = updateStatement;
}
public boolean isPrintHelp() {
......@@ -66,4 +75,8 @@ public class CliOptions {
public List<URL> getLibraryDirs() {
return libraryDirs;
}
public String getUpdateStatement() {
return updateStatement;
}
}
......@@ -103,6 +103,20 @@ public class CliOptionsParser {
"functions, table sources, or sinks. Can be used multiple times.")
.build();
public static final Option OPTION_UPDATE = Option
.builder("u")
.required(false)
.longOpt("update")
.numberOfArgs(1)
.argName("SQL update statement")
.desc(
"Experimental (for testing only!): Instructs the SQL Client to immediately execute " +
"the given update statement after starting up. The process is shut down after the " +
"statement has been submitted to the cluster and returns an appropriate return code. " +
"Currently, this feature is only supported for INSERT INTO statements that declare " +
"the target sink table.")
.build();
private static final Options EMBEDDED_MODE_CLIENT_OPTIONS = getEmbeddedModeClientOptions(new Options());
private static final Options GATEWAY_MODE_CLIENT_OPTIONS = getGatewayModeClientOptions(new Options());
private static final Options GATEWAY_MODE_GATEWAY_OPTIONS = getGatewayModeGatewayOptions(new Options());
......@@ -118,6 +132,7 @@ public class CliOptionsParser {
options.addOption(OPTION_DEFAULTS);
options.addOption(OPTION_JAR);
options.addOption(OPTION_LIBRARY);
options.addOption(OPTION_UPDATE);
return options;
}
......@@ -125,6 +140,7 @@ public class CliOptionsParser {
buildGeneralOptions(options);
options.addOption(OPTION_SESSION);
options.addOption(OPTION_ENVIRONMENT);
options.addOption(OPTION_UPDATE);
return options;
}
......@@ -218,7 +234,8 @@ public class CliOptionsParser {
checkUrl(line, CliOptionsParser.OPTION_ENVIRONMENT),
checkUrl(line, CliOptionsParser.OPTION_DEFAULTS),
checkUrls(line, CliOptionsParser.OPTION_JAR),
checkUrls(line, CliOptionsParser.OPTION_LIBRARY)
checkUrls(line, CliOptionsParser.OPTION_LIBRARY),
line.getOptionValue(CliOptionsParser.OPTION_UPDATE.getOpt())
);
}
catch (ParseException e) {
......@@ -236,7 +253,8 @@ public class CliOptionsParser {
checkUrl(line, CliOptionsParser.OPTION_ENVIRONMENT),
null,
checkUrls(line, CliOptionsParser.OPTION_JAR),
checkUrls(line, CliOptionsParser.OPTION_LIBRARY)
checkUrls(line, CliOptionsParser.OPTION_LIBRARY),
line.getOptionValue(CliOptionsParser.OPTION_UPDATE.getOpt())
);
}
catch (ParseException e) {
......@@ -254,7 +272,8 @@ public class CliOptionsParser {
null,
checkUrl(line, CliOptionsParser.OPTION_DEFAULTS),
checkUrls(line, CliOptionsParser.OPTION_JAR),
checkUrls(line, CliOptionsParser.OPTION_LIBRARY)
checkUrls(line, CliOptionsParser.OPTION_LIBRARY),
null
);
}
catch (ParseException e) {
......
......@@ -49,6 +49,7 @@ public final class CliStrings {
.append(formatCommand(SqlCommand.DESCRIBE, "Describes the schema of a table with the given name."))
.append(formatCommand(SqlCommand.EXPLAIN, "Describes the execution plan of a query or table with the given name."))
.append(formatCommand(SqlCommand.SELECT, "Executes a SQL SELECT query on the Flink cluster."))
.append(formatCommand(SqlCommand.INSERT_INTO, "Inserts the results of a SQL SELECT query into a declared table sink."))
.append(formatCommand(SqlCommand.SOURCE, "Reads a SQL SELECT query from a file and executes it on the Flink cluster."))
.append(formatCommand(SqlCommand.SET, "Sets a session configuration property. Syntax: 'SET <key>=<value>'. Use 'SET' for listing all properties."))
.append(formatCommand(SqlCommand.RESET, "Resets all session configuration properties."))
......@@ -122,12 +123,18 @@ public final class CliStrings {
public static final String MESSAGE_RESULT_QUIT = "Result retrieval cancelled.";
public static final String MESSAGE_SUBMITTING_STATEMENT = "Submitting SQL update statement to the cluster...";
public static final String MESSAGE_STATEMENT_SUBMITTED = "Table update statement has been successfully submitted to the cluster:";
public static final String MESSAGE_INVALID_PATH = "Path is invalid.";
public static final String MESSAGE_MAX_SIZE_EXCEEDED = "The given file exceeds the maximum number of characters.";
public static final String MESSAGE_WILL_EXECUTE = "Executing the following statement:";
public static final String MESSAGE_UNSUPPORTED_SQL = "Unsupported SQL statement.";
// --------------------------------------------------------------------------------------------
public static final String RESULT_TITLE = "SQL Query Result";
......
......@@ -18,6 +18,8 @@
package org.apache.flink.table.client.cli;
import java.util.Optional;
/**
* Simple parser for determining the type of command and its parameters.
*/
......@@ -27,7 +29,7 @@ public final class SqlCommandParser {
// private
}
public static SqlCommandCall parse(String stmt) {
public static Optional<SqlCommandCall> parse(String stmt) {
String trimmed = stmt.trim();
// remove ';' at the end because many people type it intuitively
if (trimmed.endsWith(";")) {
......@@ -43,10 +45,11 @@ public final class SqlCommandParser {
// match
if (tokenCount < cmd.tokens.length && token.equalsIgnoreCase(cmd.tokens[tokenCount])) {
if (tokenCount == cmd.tokens.length - 1) {
return new SqlCommandCall(
final SqlCommandCall call = new SqlCommandCall(
cmd,
splitOperands(cmd, trimmed, trimmed.substring(Math.min(pos, trimmed.length())))
);
return Optional.of(call);
}
} else {
// next sql command
......@@ -56,7 +59,7 @@ public final class SqlCommandParser {
}
}
}
return null;
return Optional.empty();
}
private static String[] splitOperands(SqlCommand cmd, String originalCall, String operands) {
......@@ -69,6 +72,7 @@ public final class SqlCommandParser {
return new String[] {operands.substring(0, delimiter), operands.substring(delimiter + 1)};
}
case SELECT:
case INSERT_INTO:
return new String[] {originalCall};
default:
return new String[] {operands};
......@@ -90,6 +94,7 @@ public final class SqlCommandParser {
DESCRIBE("describe"),
EXPLAIN("explain"),
SELECT("select"),
INSERT_INTO("insert into"),
SET("set"),
RESET("reset"),
SOURCE("source");
......
......@@ -87,6 +87,15 @@ public interface Executor {
*/
void cancelQuery(SessionContext session, String resultId) throws SqlExecutionException;
/**
* Submits a Flink SQL update statement such as INSERT INTO.
*
* @param session context in with the statement is executed
* @param statement SQL update statement (currently only INSERT INTO is supported)
* @return information about the target of the submitted Flink job
*/
ProgramTargetDescriptor executeUpdate(SessionContext session, String statement) throws SqlExecutionException;
/**
* Stops the executor.
*/
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.table.client.gateway;
import org.apache.flink.api.common.JobID;
/**
* Describes the target where a table program has been submitted to.
*/
public class ProgramTargetDescriptor {
private final String clusterId;
private final String jobId;
private final String webInterfaceUrl;
public ProgramTargetDescriptor(String clusterId, String jobId, String webInterfaceUrl) {
this.clusterId = clusterId;
this.jobId = jobId;
this.webInterfaceUrl = webInterfaceUrl;
}
public String getClusterId() {
return clusterId;
}
public String getJobId() {
return jobId;
}
public String getWebInterfaceUrl() {
return webInterfaceUrl;
}
@Override
public String toString() {
return String.format(
"Cluster ID: %s\n" +
"Job ID: %s\n" +
"Web interface: %s",
clusterId, jobId, webInterfaceUrl);
}
/**
* Creates a program target description from deployment classes.
*
* @param clusterId cluster id
* @param jobId job id
* @param <C> cluster id type
* @return program target descriptor
*/
public static <C> ProgramTargetDescriptor of(C clusterId, JobID jobId, String webInterfaceUrl) {
String clusterIdString;
try {
// check if cluster id has a toString method
clusterId.getClass().getDeclaredMethod("toString");
clusterIdString = clusterId.toString();
} catch (NoSuchMethodException e) {
clusterIdString = clusterId.getClass().getSimpleName();
}
return new ProgramTargetDescriptor(clusterIdString, jobId.toString(), webInterfaceUrl);
}
}
......@@ -169,6 +169,14 @@ public class ExecutionContext<T> {
return new EnvironmentInstance();
}
public Map<String, TableSource<?>> getTableSources() {
return tableSources;
}
public Map<String, TableSink<?>> getTableSinks() {
return tableSinks;
}
// --------------------------------------------------------------------------------------------
private static CommandLine createCommandLine(Deployment deployment, Options commandLineOptions) {
......
......@@ -31,16 +31,22 @@ import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.table.api.QueryConfig;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.client.SqlClientException;
import org.apache.flink.table.client.config.Environment;
import org.apache.flink.table.client.gateway.Executor;
import org.apache.flink.table.client.gateway.ProgramTargetDescriptor;
import org.apache.flink.table.client.gateway.ResultDescriptor;
import org.apache.flink.table.client.gateway.SessionContext;
import org.apache.flink.table.client.gateway.SqlExecutionException;
import org.apache.flink.table.client.gateway.TypedResult;
import org.apache.flink.table.client.gateway.local.result.BasicResult;
import org.apache.flink.table.client.gateway.local.result.ChangelogResult;
import org.apache.flink.table.client.gateway.local.result.DynamicResult;
import org.apache.flink.table.client.gateway.local.result.MaterializedResult;
import org.apache.flink.types.Row;
import org.apache.flink.util.StringUtils;
......@@ -276,6 +282,12 @@ public class LocalExecutor implements Executor {
cancelQueryInternal(context, resultId);
}
@Override
public ProgramTargetDescriptor executeUpdate(SessionContext session, String statement) throws SqlExecutionException {
final ExecutionContext<?> context = getOrCreateExecutionContext(session);
return executeUpdateInternal(context, statement);
}
@Override
public void stop(SessionContext session) {
resultStore.getResults().forEach((resultId) -> {
......@@ -329,14 +341,43 @@ public class LocalExecutor implements Executor {
}
}
private <T> ResultDescriptor executeQueryInternal(ExecutionContext<T> context, String query) {
private <C> ProgramTargetDescriptor executeUpdateInternal(ExecutionContext<C> context, String statement) {
final ExecutionContext.EnvironmentInstance envInst = context.createEnvironmentInstance();
applyUpdate(envInst.getTableEnvironment(), envInst.getQueryConfig(), statement);
// create job graph with dependencies
final String jobName = context.getSessionContext().getName() + ": " + statement;
final JobGraph jobGraph;
try {
jobGraph = envInst.createJobGraph(jobName);
} catch (Throwable t) {
// catch everything such that the statement does not crash the executor
throw new SqlExecutionException("Invalid SQL statement.", t);
}
// create execution
final BasicResult<C> result = new BasicResult<>();
final ProgramDeployer<C> deployer = new ProgramDeployer<>(
context, jobName, jobGraph, result, false);
// blocking deployment
deployer.run();
return ProgramTargetDescriptor.of(
result.getClusterId(),
jobGraph.getJobID(),
result.getWebInterfaceUrl());
}
private <C> ResultDescriptor executeQueryInternal(ExecutionContext<C> context, String query) {
final ExecutionContext.EnvironmentInstance envInst = context.createEnvironmentInstance();
// create table
final Table table = createTable(envInst.getTableEnvironment(), query);
// initialize result
final DynamicResult<T> result = resultStore.createResult(
final DynamicResult<C> result = resultStore.createResult(
context.getMergedEnvironment(),
table.getSchema().withoutTimeAttributes(),
envInst.getExecutionConfig());
......@@ -352,7 +393,7 @@ public class LocalExecutor implements Executor {
// it not stored in the result store
result.close();
// catch everything such that the query does not crash the executor
throw new SqlExecutionException("Invalid SQL statement.", t);
throw new SqlExecutionException("Invalid SQL query.", t);
}
// store the result with a unique id (the job id for now)
......@@ -360,7 +401,8 @@ public class LocalExecutor implements Executor {
resultStore.storeResult(resultId, result);
// create execution
final ProgramDeployer<T> deployer = new ProgramDeployer<>(context, jobName, jobGraph, result);
final ProgramDeployer<C> deployer = new ProgramDeployer<>(
context, jobName, jobGraph, result, true);
// start result retrieval
result.startRetrieval(deployer);
......@@ -371,16 +413,32 @@ public class LocalExecutor implements Executor {
result.isMaterialized());
}
private Table createTable(TableEnvironment tableEnv, String query) {
/**
* Creates a table using the given query in the given table environment.
*/
private Table createTable(TableEnvironment tableEnv, String selectQuery) {
// parse and validate query
try {
return tableEnv.sqlQuery(query);
return tableEnv.sqlQuery(selectQuery);
} catch (Throwable t) {
// catch everything such that the query does not crash the executor
throw new SqlExecutionException("Invalid SQL statement.", t);
}
}
/**
* Applies the given update statement to the given table environment with query configuration.
*/
private void applyUpdate(TableEnvironment tableEnv, QueryConfig queryConfig, String updateStatement) {
// parse and validate statement
try {
tableEnv.sqlUpdate(updateStatement, queryConfig);
} catch (Throwable t) {
// catch everything such that the statement does not crash the executor
throw new SqlExecutionException("Invalid SQL update statement.", t);
}
}
/**
* Creates or reuses the execution context.
*/
......
......@@ -24,6 +24,7 @@ import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.client.program.rest.RestClusterClient;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.table.client.gateway.SqlExecutionException;
import org.apache.flink.table.client.gateway.local.result.Result;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -40,18 +41,30 @@ public class ProgramDeployer<C> implements Runnable {
private final ExecutionContext<C> context;
private final JobGraph jobGraph;
private final String jobName;
private final DynamicResult<C> result;
private final Result<C> result;
private final boolean awaitJobResult;
private final BlockingQueue<JobExecutionResult> executionResultBucket;
/**
* Deploys a table program on the cluster.
*
* @param context context with deployment information
* @param jobName job name of the Flink job to be submitted
* @param jobGraph Flink job graph
* @param result result that receives information about the target cluster
* @param awaitJobResult block for a job execution result from the cluster
*/
public ProgramDeployer(
ExecutionContext<C> context,
String jobName,
JobGraph jobGraph,
DynamicResult<C> result) {
Result<C> result,
boolean awaitJobResult) {
this.context = context;
this.jobGraph = jobGraph;
this.jobName = jobName;
this.result = result;
this.awaitJobResult = awaitJobResult;
executionResultBucket = new LinkedBlockingDeque<>(1);
}
......@@ -62,7 +75,7 @@ public class ProgramDeployer<C> implements Runnable {
LOG.debug("Submitting job {} with the following environment: \n{}",
jobGraph.getJobID(), context.getMergedEnvironment());
}
executionResultBucket.add(deployJob(context, jobGraph, result));
deployJob(context, jobGraph, result);
}
public JobExecutionResult fetchExecutionResult() {
......@@ -73,45 +86,20 @@ public class ProgramDeployer<C> implements Runnable {
* Deploys a job. Depending on the deployment creates a new job cluster. It saves the cluster id in
* the result and blocks until job completion.
*/
private <T> JobExecutionResult deployJob(ExecutionContext<T> context, JobGraph jobGraph, DynamicResult<T> result) {
private <T> void deployJob(ExecutionContext<T> context, JobGraph jobGraph, Result<T> result) {
// create or retrieve cluster and deploy job
try (final ClusterDescriptor<T> clusterDescriptor = context.createClusterDescriptor()) {
ClusterClient<T> clusterClient = null;
try {
// new cluster
if (context.getClusterId() == null) {
// deploy job cluster with job attached
clusterClient = clusterDescriptor.deployJobCluster(context.getClusterSpec(), jobGraph, false);
// save the new cluster id
result.setClusterId(clusterClient.getClusterId());
// we need to hard cast for now
return ((RestClusterClient<T>) clusterClient)
.requestJobResult(jobGraph.getJobID())
.get()
.toJobExecutionResult(context.getClassLoader()); // throws exception if job fails
deployJobOnNewCluster(clusterDescriptor, jobGraph, result, context.getClassLoader());
}
// reuse existing cluster
else {
// retrieve existing cluster
clusterClient = clusterDescriptor.retrieve(context.getClusterId());
// save the cluster id
result.setClusterId(clusterClient.getClusterId());
// submit the job
clusterClient.setDetached(false);
return clusterClient
.submitJob(jobGraph, context.getClassLoader())
.getJobExecutionResult(); // throws exception if job fails
deployJobOnExistingCluster(context.getClusterId(), clusterDescriptor, jobGraph, result);
}
} catch (Exception e) {
throw new SqlExecutionException("Could not retrieve or create a cluster.", e);
} finally {
try {
if (clusterClient != null) {
clusterClient.shutdown();
}
} catch (Exception e) {
// ignore
}
}
} catch (SqlExecutionException e) {
throw e;
......@@ -119,5 +107,77 @@ public class ProgramDeployer<C> implements Runnable {
throw new SqlExecutionException("Could not locate a cluster.", e);
}
}
private <T> void deployJobOnNewCluster(
ClusterDescriptor<T> clusterDescriptor,
JobGraph jobGraph,
Result<T> result,
ClassLoader classLoader) throws Exception {
ClusterClient<T> clusterClient = null;
try {
// deploy job cluster with job attached
clusterClient = clusterDescriptor.deployJobCluster(context.getClusterSpec(), jobGraph, false);
// save information about the new cluster
result.setClusterInformation(clusterClient.getClusterId(), clusterClient.getWebInterfaceURL());
// get result
if (awaitJobResult) {
// we need to hard cast for now
final JobExecutionResult jobResult = ((RestClusterClient<T>) clusterClient)
.requestJobResult(jobGraph.getJobID())
.get()
.toJobExecutionResult(context.getClassLoader()); // throws exception if job fails
executionResultBucket.add(jobResult);
}
} finally {
try {
if (clusterClient != null) {
clusterClient.shutdown();
}
} catch (Exception e) {
// ignore
}
}
}
private <T> void deployJobOnExistingCluster(
T clusterId,
ClusterDescriptor<T> clusterDescriptor,
JobGraph jobGraph,
Result<T> result) throws Exception {
ClusterClient<T> clusterClient = null;
try {
// retrieve existing cluster
clusterClient = clusterDescriptor.retrieve(clusterId);
String webInterfaceUrl;
// retrieving the web interface URL might fail on legacy pre-FLIP-6 code paths
// TODO remove this once we drop support for legacy deployment code
try {
webInterfaceUrl = clusterClient.getWebInterfaceURL();
} catch (Exception e) {
webInterfaceUrl = "N/A";
}
// save the cluster information
result.setClusterInformation(clusterClient.getClusterId(), webInterfaceUrl);
// submit job (and get result)
if (awaitJobResult) {
clusterClient.setDetached(false);
final JobExecutionResult jobResult = clusterClient
.submitJob(jobGraph, context.getClassLoader())
.getJobExecutionResult(); // throws exception if job fails
executionResultBucket.add(jobResult);
} else {
clusterClient.setDetached(true);
clusterClient.submitJob(jobGraph, context.getClassLoader());
}
} finally {
try {
if (clusterClient != null) {
clusterClient.shutdown();
}
} catch (Exception e) {
// ignore
}
}
}
}
......@@ -29,6 +29,10 @@ import org.apache.flink.table.client.SqlClientException;
import org.apache.flink.table.client.config.Deployment;
import org.apache.flink.table.client.config.Environment;
import org.apache.flink.table.client.gateway.SqlExecutionException;
import org.apache.flink.table.client.gateway.local.result.ChangelogCollectStreamResult;
import org.apache.flink.table.client.gateway.local.result.DynamicResult;
import org.apache.flink.table.client.gateway.local.result.MaterializedCollectBatchResult;
import org.apache.flink.table.client.gateway.local.result.MaterializedCollectStreamResult;
import org.apache.flink.types.Row;
import java.net.InetAddress;
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.table.client.gateway.local.result;
/**
* Basic result of a table program that has been submitted to a cluster.
*
* @param <C> cluster id to which this result belongs to
*/
public class BasicResult<C> implements Result<C> {
protected C clusterId;
protected String webInterfaceUrl;
@Override
public void setClusterInformation(C clusterId, String webInterfaceUrl) {
if (this.clusterId != null || this.webInterfaceUrl != null) {
throw new IllegalStateException("Cluster information is already present.");
}
this.clusterId = clusterId;
this.webInterfaceUrl = webInterfaceUrl;
}
public C getClusterId() {
if (this.clusterId == null) {
throw new IllegalStateException("Cluster ID has not been set.");
}
return clusterId;
}
public String getWebInterfaceUrl() {
if (this.webInterfaceUrl == null) {
throw new IllegalStateException("Cluster web interface URL has not been set.");
}
return webInterfaceUrl;
}
}
......@@ -16,7 +16,7 @@
* limitations under the License.
*/
package org.apache.flink.table.client.gateway.local;
package org.apache.flink.table.client.gateway.local.result;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeinfo.TypeInformation;
......
......@@ -16,7 +16,7 @@
* limitations under the License.
*/
package org.apache.flink.table.client.gateway.local;
package org.apache.flink.table.client.gateway.local.result;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.table.client.gateway.TypedResult;
......
......@@ -16,7 +16,7 @@
* limitations under the License.
*/
package org.apache.flink.table.client.gateway.local;
package org.apache.flink.table.client.gateway.local.result;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeinfo.TypeInformation;
......@@ -29,6 +29,8 @@ import org.apache.flink.streaming.experimental.SocketStreamIterator;
import org.apache.flink.table.client.SqlClientException;
import org.apache.flink.table.client.gateway.SqlExecutionException;
import org.apache.flink.table.client.gateway.TypedResult;
import org.apache.flink.table.client.gateway.local.CollectStreamTableSink;
import org.apache.flink.table.client.gateway.local.ProgramDeployer;
import org.apache.flink.table.sinks.TableSink;
import org.apache.flink.types.Row;
......@@ -40,7 +42,7 @@ import java.net.InetAddress;
*
* @param <C> cluster id to which this result belongs to
*/
public abstract class CollectStreamResult<C> implements DynamicResult<C> {
public abstract class CollectStreamResult<C> extends BasicResult<C> implements DynamicResult<C> {
private final TypeInformation<Row> outputType;
private final SocketStreamIterator<Tuple2<Boolean, Row>> iterator;
......@@ -48,7 +50,6 @@ public abstract class CollectStreamResult<C> implements DynamicResult<C> {
private final ResultRetrievalThread retrievalThread;
private final JobMonitoringThread monitoringThread;
private ProgramDeployer<C> deployer;
private C clusterId;
protected final Object resultLock;
protected SqlExecutionException executionException;
......@@ -76,14 +77,6 @@ public abstract class CollectStreamResult<C> implements DynamicResult<C> {
monitoringThread = new JobMonitoringThread();
}
@Override
public void setClusterId(C clusterId) {
if (this.clusterId != null) {
throw new IllegalStateException("Cluster id is already present.");
}
this.clusterId = clusterId;
}
@Override
public TypeInformation<Row> getOutputType() {
return outputType;
......
......@@ -16,9 +16,10 @@
* limitations under the License.
*/
package org.apache.flink.table.client.gateway.local;
package org.apache.flink.table.client.gateway.local.result;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.table.client.gateway.local.ProgramDeployer;
import org.apache.flink.table.sinks.TableSink;
import org.apache.flink.types.Row;
......@@ -29,12 +30,7 @@ import org.apache.flink.types.Row;
*
* @param <C> type of the cluster id to which this result belongs to
*/
public interface DynamicResult<C> {
/**
* Sets the cluster id of the cluster this result comes from. This method should only be called once.
*/
void setClusterId(C clusterId);
public interface DynamicResult<C> extends Result<C> {
/**
* Returns whether this result is materialized such that snapshots can be taken or results
......
......@@ -16,7 +16,7 @@
* limitations under the License.
*/
package org.apache.flink.table.client.gateway.local;
package org.apache.flink.table.client.gateway.local.result;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobExecutionResult;
......@@ -24,6 +24,8 @@ import org.apache.flink.api.common.accumulators.SerializedListAccumulator;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.table.client.gateway.SqlExecutionException;
import org.apache.flink.table.client.gateway.TypedResult;
import org.apache.flink.table.client.gateway.local.CollectBatchTableSink;
import org.apache.flink.table.client.gateway.local.ProgramDeployer;
import org.apache.flink.table.sinks.TableSink;
import org.apache.flink.types.Row;
import org.apache.flink.util.AbstractID;
......@@ -35,7 +37,7 @@ import java.util.List;
/**
* Collects results using accumulators and returns them as table snapshots.
*/
public class MaterializedCollectBatchResult<C> implements MaterializedResult<C> {
public class MaterializedCollectBatchResult<C> extends BasicResult<C> implements MaterializedResult<C> {
private final TypeInformation<Row> outputType;
private final String accumulatorName;
......@@ -44,7 +46,6 @@ public class MaterializedCollectBatchResult<C> implements MaterializedResult<C>
private final Thread retrievalThread;
private ProgramDeployer<C> deployer;
private C clusterId;
private int pageSize;
private int pageCount;
private SqlExecutionException executionException;
......@@ -63,14 +64,6 @@ public class MaterializedCollectBatchResult<C> implements MaterializedResult<C>
pageCount = 0;
}
@Override
public void setClusterId(C clusterId) {
if (this.clusterId != null) {
throw new IllegalStateException("Cluster id is already present.");
}
this.clusterId = clusterId;
}
@Override
public boolean isMaterialized() {
return true;
......
......@@ -16,7 +16,7 @@
* limitations under the License.
*/
package org.apache.flink.table.client.gateway.local;
package org.apache.flink.table.client.gateway.local.result;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeinfo.TypeInformation;
......
......@@ -16,7 +16,7 @@
* limitations under the License.
*/
package org.apache.flink.table.client.gateway.local;
package org.apache.flink.table.client.gateway.local.result;
import org.apache.flink.table.client.gateway.TypedResult;
import org.apache.flink.types.Row;
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.table.client.gateway.local.result;
/**
* A result of a table program submission to a cluster.
*
* @param <C> type of the cluster id to which this result belongs to
*/
public interface Result<C> {
/**
* Sets the cluster information of the cluster this result comes from. This method should only be called once.
*/
void setClusterInformation(C clusterId, String webInterfaceUrl);
}
......@@ -32,6 +32,8 @@ under the License.
<includes>
<include>org/apache/flink/table/client/gateway/utils/TestTableSourceFactory.class</include>
<include>org/apache/flink/table/client/gateway/utils/TestTableSourceFactory$*.class</include>
<include>org/apache/flink/table/client/gateway/utils/TestTableSinkFactory.class</include>
<include>org/apache/flink/table/client/gateway/utils/TestTableSinkFactory$*.class</include>
</includes>
</fileSet>
</fileSets>
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.table.client.cli;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.client.config.Environment;
import org.apache.flink.table.client.gateway.Executor;
import org.apache.flink.table.client.gateway.ProgramTargetDescriptor;
import org.apache.flink.table.client.gateway.ResultDescriptor;
import org.apache.flink.table.client.gateway.SessionContext;
import org.apache.flink.table.client.gateway.SqlExecutionException;
import org.apache.flink.table.client.gateway.TypedResult;
import org.apache.flink.types.Row;
import org.apache.flink.util.TestLogger;
import org.junit.Test;
import java.util.List;
import java.util.Map;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
/**
* Tests for the {@link CliClient}.
*/
public class CliClientTest extends TestLogger {
private static final String INSERT_INTO_STATEMENT = "INSERT INTO MyTable SELECT * FROM MyOtherTable";
private static final String SELECT_STATEMENT = "SELECT * FROM MyOtherTable";
@Test
public void testUpdateSubmission() {
verifyUpdateSubmission(INSERT_INTO_STATEMENT, false, false);
}
@Test
public void testFailedUpdateSubmission() {
// fail at executor
verifyUpdateSubmission(INSERT_INTO_STATEMENT, true, true);
// fail early in client
verifyUpdateSubmission(SELECT_STATEMENT, false, true);
}
// --------------------------------------------------------------------------------------------
private void verifyUpdateSubmission(String statement, boolean failExecution, boolean testFailure) {
final SessionContext context = new SessionContext("test-session", new Environment());
final MockExecutor mockExecutor = new MockExecutor();
mockExecutor.failExecution = failExecution;
final CliClient client = new CliClient(context, mockExecutor);
if (testFailure) {
assertFalse(client.submitUpdate(statement));
} else {
assertTrue(client.submitUpdate(statement));
assertEquals(statement, mockExecutor.receivedStatement);
assertEquals(context, mockExecutor.receivedContext);
}
}
// --------------------------------------------------------------------------------------------
private static class MockExecutor implements Executor {
public boolean failExecution;
public SessionContext receivedContext;
public String receivedStatement;
@Override
public void start() throws SqlExecutionException {
// nothing to do
}
@Override
public Map<String, String> getSessionProperties(SessionContext session) throws SqlExecutionException {
return null;
}
@Override
public List<String> listTables(SessionContext session) throws SqlExecutionException {
return null;
}
@Override
public List<String> listUserDefinedFunctions(SessionContext session) throws SqlExecutionException {
return null;
}
@Override
public TableSchema getTableSchema(SessionContext session, String name) throws SqlExecutionException {
return null;
}
@Override
public String explainStatement(SessionContext session, String statement) throws SqlExecutionException {
return null;
}
@Override
public ResultDescriptor executeQuery(SessionContext session, String query) throws SqlExecutionException {
return null;
}
@Override
public TypedResult<List<Tuple2<Boolean, Row>>> retrieveResultChanges(SessionContext session, String resultId) throws SqlExecutionException {
return null;
}
@Override
public TypedResult<Integer> snapshotResult(SessionContext session, String resultId, int pageSize) throws SqlExecutionException {
return null;
}
@Override
public List<Row> retrieveResultPage(String resultId, int page) throws SqlExecutionException {
return null;
}
@Override
public void cancelQuery(SessionContext session, String resultId) throws SqlExecutionException {
// nothing to do
}
@Override
public ProgramTargetDescriptor executeUpdate(SessionContext session, String statement) throws SqlExecutionException {
receivedContext = session;
receivedStatement = statement;
if (failExecution) {
throw new SqlExecutionException("Fail execution.");
}
return new ProgramTargetDescriptor("testClusterId", "testJobId", "http://testcluster:1234");
}
@Override
public void stop(SessionContext session) {
// nothing to do
}
}
}
......@@ -41,20 +41,24 @@ import static org.junit.Assert.assertEquals;
*/
public class DependencyTest {
public static final String CONNECTOR_TYPE_VALUE = "test-connector";
public static final String TEST_PROPERTY = "test-property";
public static final String CONNECTOR_TEST_PROPERTY = "connector.test-property";
private static final String FACTORY_ENVIRONMENT_FILE = "test-sql-client-factory.yaml";
private static final String TABLE_SOURCE_FACTORY_JAR_FILE = "table-source-factory-test-jar.jar";
private static final String TABLE_FACTORY_JAR_FILE = "table-factories-test-jar.jar";
@Test
public void testTableSourceFactoryDiscovery() throws Exception {
public void testTableFactoryDiscovery() throws Exception {
// create environment
final Map<String, String> replaceVars = new HashMap<>();
replaceVars.put("$VAR_0", "test-table-source-factory");
replaceVars.put("$VAR_1", "test-property");
replaceVars.put("$VAR_0", CONNECTOR_TYPE_VALUE);
replaceVars.put("$VAR_1", TEST_PROPERTY);
replaceVars.put("$VAR_2", "test-value");
final Environment env = EnvironmentFileUtil.parseModified(FACTORY_ENVIRONMENT_FILE, replaceVars);
// create executor with dependencies
final URL dependency = Paths.get("target", TABLE_SOURCE_FACTORY_JAR_FILE).toUri().toURL();
final URL dependency = Paths.get("target", TABLE_FACTORY_JAR_FILE).toUri().toURL();
final LocalExecutor executor = new LocalExecutor(
env,
Collections.singletonList(dependency),
......
......@@ -51,6 +51,7 @@ public class EnvironmentTest {
tables.add("TableNumber1");
tables.add("TableNumber2");
tables.add("NewTable");
tables.add("TableSourceSink");
assertEquals(tables, merged.getTables().keySet());
assertTrue(merged.getExecution().isStreamingExecution());
......
......@@ -19,18 +19,24 @@
package org.apache.flink.table.client.gateway.local;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.client.cli.DefaultCLI;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.Types;
import org.apache.flink.table.client.config.Environment;
import org.apache.flink.table.client.gateway.SessionContext;
import org.apache.flink.table.client.gateway.utils.EnvironmentFileUtil;
import org.apache.flink.table.sinks.TableSink;
import org.apache.flink.table.sources.TableSource;
import org.apache.commons.cli.Options;
import org.junit.Test;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
......@@ -60,6 +66,51 @@ public class ExecutionContextTest {
assertArrayEquals(expected, actual);
}
@Test
public void testSourceSinks() throws Exception {
final ExecutionContext<?> context = createExecutionContext();
final Map<String, TableSource<?>> sources = context.getTableSources();
final Map<String, TableSink<?>> sinks = context.getTableSinks();
assertEquals(
new HashSet<>(Arrays.asList("TableSourceSink", "TableNumber1", "TableNumber2")),
sources.keySet());
assertEquals(
new HashSet<>(Collections.singletonList("TableSourceSink")),
sinks.keySet());
assertArrayEquals(
new String[]{"IntegerField1", "StringField1"},
sources.get("TableNumber1").getTableSchema().getColumnNames());
assertArrayEquals(
new TypeInformation[]{Types.INT(), Types.STRING()},
sources.get("TableNumber1").getTableSchema().getTypes());
assertArrayEquals(
new String[]{"IntegerField2", "StringField2"},
sources.get("TableNumber2").getTableSchema().getColumnNames());
assertArrayEquals(
new TypeInformation[]{Types.INT(), Types.STRING()},
sources.get("TableNumber2").getTableSchema().getTypes());
assertArrayEquals(
new String[]{"BooleanField", "StringField"},
sinks.get("TableSourceSink").getFieldNames());
assertArrayEquals(
new TypeInformation[]{Types.BOOLEAN(), Types.STRING()},
sinks.get("TableSourceSink").getFieldTypes());
final TableEnvironment tableEnv = context.createEnvironmentInstance().getTableEnvironment();
assertArrayEquals(
new String[]{"TableNumber1", "TableNumber2", "TableSourceSink"},
tableEnv.listTables());
}
private <T> ExecutionContext<T> createExecutionContext() throws Exception {
final Environment env = EnvironmentFileUtil.parseModified(
DEFAULTS_ENVIRONMENT_FILE,
......
......@@ -19,6 +19,7 @@
package org.apache.flink.table.client.gateway.local;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
......@@ -28,9 +29,11 @@ import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.configuration.WebOptions;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.client.config.Environment;
import org.apache.flink.table.client.gateway.Executor;
import org.apache.flink.table.client.gateway.ProgramTargetDescriptor;
import org.apache.flink.table.client.gateway.ResultDescriptor;
import org.apache.flink.table.client.gateway.SessionContext;
import org.apache.flink.table.client.gateway.TypedResult;
......@@ -44,7 +47,10 @@ import org.apache.flink.util.TestLogger;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import java.io.File;
import java.io.IOException;
import java.net.URL;
import java.util.ArrayList;
import java.util.Arrays;
......@@ -59,6 +65,7 @@ import java.util.stream.IntStream;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
/**
* Contains basic tests for the {@link LocalExecutor}.
......@@ -70,6 +77,9 @@ public class LocalExecutorITCase extends TestLogger {
private static final int NUM_TMS = 2;
private static final int NUM_SLOTS_PER_TM = 2;
@ClassRule
public static TemporaryFolder tempFolder = new TemporaryFolder();
@ClassRule
public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new MiniClusterResource(
new MiniClusterResourceConfiguration.Builder()
......@@ -101,7 +111,7 @@ public class LocalExecutorITCase extends TestLogger {
final List<String> actualTables = executor.listTables(session);
final List<String> expectedTables = Arrays.asList("TableNumber1", "TableNumber2");
final List<String> expectedTables = Arrays.asList("TableNumber1", "TableNumber2", "TableSourceSink");
assertEquals(expectedTables, actualTables);
}
......@@ -268,6 +278,60 @@ public class LocalExecutorITCase extends TestLogger {
}
}
@Test(timeout = 30_000L)
public void testStreamQueryExecutionSink() throws Exception {
final String csvOutputPath = new File(tempFolder.newFolder().getAbsolutePath(), "test-out.csv").toURI().toString();
final URL url = getClass().getClassLoader().getResource("test-data.csv");
Objects.requireNonNull(url);
final Map<String, String> replaceVars = new HashMap<>();
replaceVars.put("$VAR_0", url.getPath());
replaceVars.put("$VAR_2", "streaming");
replaceVars.put("$VAR_4", csvOutputPath);
final Executor executor = createModifiedExecutor(clusterClient, replaceVars);
final SessionContext session = new SessionContext("test-session", new Environment());
try {
// start job
final ProgramTargetDescriptor targetDescriptor = executor.executeUpdate(
session,
"INSERT INTO TableSourceSink SELECT IntegerField1 = 42, StringField1 FROM TableNumber1");
// wait for job completion and verify result
boolean isRunning = true;
while (isRunning) {
Thread.sleep(50); // slow the processing down
final JobStatus jobStatus = clusterClient.getJobStatus(JobID.fromHexString(targetDescriptor.getJobId())).get();
switch (jobStatus) {
case CREATED:
case RUNNING:
continue;
case FINISHED:
isRunning = false;
verifySinkResult(csvOutputPath);
break;
default:
fail("Unexpected job status.");
}
}
} finally {
executor.stop(session);
}
}
private void verifySinkResult(String path) throws IOException {
final List<String> actualResults = new ArrayList<>();
TestBaseUtils.readAllResultLines(actualResults, path);
final List<String> expectedResults = new ArrayList<>();
expectedResults.add("true,Hello World");
expectedResults.add("false,Hello World");
expectedResults.add("false,Hello World");
expectedResults.add("false,Hello World");
expectedResults.add("true,Hello World");
expectedResults.add("false,Hello World!!!!");
TestBaseUtils.compareResultCollections(expectedResults, actualResults, Comparator.naturalOrder());
}
private <T> LocalExecutor createDefaultExecutor(ClusterClient<T> clusterClient) throws Exception {
return new LocalExecutor(
EnvironmentFileUtil.parseModified(DEFAULTS_ENVIRONMENT_FILE, Collections.singletonMap("$VAR_2", "batch")),
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.table.client.gateway.utils;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.Types;
import org.apache.flink.table.client.gateway.local.DependencyTest;
import org.apache.flink.table.descriptors.DescriptorProperties;
import org.apache.flink.table.descriptors.SchemaValidator;
import org.apache.flink.table.factories.StreamTableSinkFactory;
import org.apache.flink.table.sinks.AppendStreamTableSink;
import org.apache.flink.table.sinks.StreamTableSink;
import org.apache.flink.table.sinks.TableSink;
import org.apache.flink.types.Row;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.apache.flink.table.client.gateway.local.DependencyTest.CONNECTOR_TEST_PROPERTY;
import static org.apache.flink.table.client.gateway.local.DependencyTest.CONNECTOR_TYPE_VALUE;
import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_TYPE;
import static org.apache.flink.table.descriptors.RowtimeValidator.ROWTIME_TIMESTAMPS_FROM;
import static org.apache.flink.table.descriptors.RowtimeValidator.ROWTIME_TIMESTAMPS_TYPE;
import static org.apache.flink.table.descriptors.RowtimeValidator.ROWTIME_WATERMARKS_TYPE;
import static org.apache.flink.table.descriptors.SchemaValidator.SCHEMA;
import static org.apache.flink.table.descriptors.SchemaValidator.SCHEMA_NAME;
import static org.apache.flink.table.descriptors.SchemaValidator.SCHEMA_TYPE;
/**
* Table sink factory for testing the classloading in {@link DependencyTest}.
*/
public class TestTableSinkFactory implements StreamTableSinkFactory<Row> {
@Override
public Map<String, String> requiredContext() {
final Map<String, String> context = new HashMap<>();
context.put(CONNECTOR_TYPE(), CONNECTOR_TYPE_VALUE);
return context;
}
@Override
public List<String> supportedProperties() {
final List<String> properties = new ArrayList<>();
properties.add(CONNECTOR_TEST_PROPERTY);
properties.add(SCHEMA() + ".#." + SCHEMA_TYPE());
properties.add(SCHEMA() + ".#." + SCHEMA_NAME());
properties.add(SCHEMA() + ".#." + ROWTIME_TIMESTAMPS_TYPE());
properties.add(SCHEMA() + ".#." + ROWTIME_TIMESTAMPS_FROM());
properties.add(SCHEMA() + ".#." + ROWTIME_WATERMARKS_TYPE());
return properties;
}
@Override
public StreamTableSink<Row> createStreamTableSink(Map<String, String> properties) {
final DescriptorProperties params = new DescriptorProperties(true);
params.putProperties(properties);
return new TestTableSink(
SchemaValidator.deriveTableSinkSchema(params),
properties.get(CONNECTOR_TEST_PROPERTY));
}
// --------------------------------------------------------------------------------------------
/**
* Test table sink.
*/
public static class TestTableSink implements TableSink<Row>, AppendStreamTableSink<Row> {
private final TableSchema schema;
private final String property;
public TestTableSink(TableSchema schema, String property) {
this.schema = schema;
this.property = property;
}
public String getProperty() {
return property;
}
@Override
public TypeInformation<Row> getOutputType() {
return Types.ROW(schema.getColumnNames(), schema.getTypes());
}
@Override
public String[] getFieldNames() {
return schema.getColumnNames();
}
@Override
public TypeInformation<?>[] getFieldTypes() {
return schema.getTypes();
}
@Override
public TableSink<Row> configure(String[] fieldNames, TypeInformation<?>[] fieldTypes) {
return new TestTableSink(new TableSchema(fieldNames, fieldTypes), property);
}
@Override
public void emitDataStream(DataStream<Row> dataStream) {
// do nothing
}
}
}
......@@ -39,6 +39,8 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import static org.apache.flink.table.client.gateway.local.DependencyTest.CONNECTOR_TEST_PROPERTY;
import static org.apache.flink.table.client.gateway.local.DependencyTest.CONNECTOR_TYPE_VALUE;
import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_TYPE;
import static org.apache.flink.table.descriptors.RowtimeValidator.ROWTIME_TIMESTAMPS_FROM;
import static org.apache.flink.table.descriptors.RowtimeValidator.ROWTIME_TIMESTAMPS_TYPE;
......@@ -55,14 +57,14 @@ public class TestTableSourceFactory implements StreamTableSourceFactory<Row> {
@Override
public Map<String, String> requiredContext() {
final Map<String, String> context = new HashMap<>();
context.put(CONNECTOR_TYPE(), "test-table-source-factory");
context.put(CONNECTOR_TYPE(), CONNECTOR_TYPE_VALUE);
return context;
}
@Override
public List<String> supportedProperties() {
final List<String> properties = new ArrayList<>();
properties.add("connector.test-property");
properties.add(CONNECTOR_TEST_PROPERTY);
properties.add(SCHEMA() + ".#." + SCHEMA_TYPE());
properties.add(SCHEMA() + ".#." + SCHEMA_NAME());
properties.add(SCHEMA() + ".#." + ROWTIME_TIMESTAMPS_TYPE());
......@@ -79,7 +81,7 @@ public class TestTableSourceFactory implements StreamTableSourceFactory<Row> {
final List<RowtimeAttributeDescriptor> rowtime = SchemaValidator.deriveRowtimeAttributes(params);
return new TestTableSource(
SchemaValidator.deriveTableSourceSchema(params),
properties.get("connector.test-property"),
properties.get(CONNECTOR_TEST_PROPERTY),
proctime.orElse(null),
rowtime);
}
......
......@@ -18,3 +18,4 @@
#==============================================================================
org.apache.flink.table.client.gateway.utils.TestTableSourceFactory
org.apache.flink.table.client.gateway.utils.TestTableSinkFactory
......@@ -62,6 +62,23 @@ tables:
type: VARCHAR
line-delimiter: "\n"
comment-prefix: "#"
- name: TableSourceSink
type: both
schema:
- name: BooleanField
type: BOOLEAN
- name: StringField
type: VARCHAR
connector:
type: filesystem
path: "$VAR_4"
format:
type: csv
fields:
- name: BooleanField
type: BOOLEAN
- name: StringField
type: VARCHAR
functions:
- name: scalarUDF
......
......@@ -25,7 +25,7 @@
tables:
- name: TableNumber1
type: source
type: both
schema:
- name: IntegerField1
type: INT
......
......@@ -22,9 +22,8 @@ import org.apache.calcite.plan.{RelOptCluster, RelOptTable, RelTraitSet}
import org.apache.calcite.rel.RelWriter
import org.apache.calcite.rel.`type`.RelDataType
import org.apache.calcite.rel.core.TableScan
import org.apache.flink.table.api.TableException
import org.apache.flink.table.calcite.FlinkTypeFactory
import org.apache.flink.table.plan.schema.{BatchTableSourceTable, StreamTableSourceTable, TableSourceSinkTable, TableSourceTable}
import org.apache.flink.table.plan.schema.TableSourceSinkTable
import org.apache.flink.table.sources.{TableSource, TableSourceUtil}
import scala.collection.JavaConverters._
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册