提交 85167511 编写于 作者: D Dawid Wysakowicz 提交者: Dawid Wysakowicz

[FLINK-11476][table] Integrated CatalogManager with TableEnvironment

In this commit both registration & tables lookup go through CatalogManager with respect to the current default  catalog & database. It is not decoupled from Calcite yet. The next step will be to register the CatalogTables built exclusively in the table api rather than converted already to the Calcite's Tables.

This closes #8404
上级 55b4d0b2
......@@ -512,18 +512,25 @@ class Table(object):
"""
return Table(self._j_table.dropColumns(fields))
def insert_into(self, table_name):
def insert_into(self, table_path, *table_path_continued):
"""
Writes the :class:`Table` to a :class:`TableSink` that was registered under
the specified name.
the specified name. For the path resolution algorithm see
:func:`~TableEnvironment.useDatabase`.
Example:
::
>>> tab.insert_into("print")
:param table_name: Name of the :class:`TableSink` to which the :class:`Table` is written.
:param table_path: The first part of the path of the registered :class:`TableSink` to which
the :class:`Table` is written. This is to ensure at least the name of the
:class:`Table` is provided.
:param table_path_continued: The remaining part of the path of the registered
:class:`TableSink` to which the :class:`Table` is written.
"""
self._j_table.insertInto(table_name)
gateway = get_gateway()
j_table_path = to_jarray(gateway.jvm.String, table_path_continued)
self._j_table.insertInto(table_path, j_table_path)
def print_schema(self):
"""
......
......@@ -185,6 +185,120 @@ class TableEnvironment(object):
else:
self._j_tenv.sqlUpdate(stmt)
def get_current_catalog(self):
"""
Gets the current default catalog name of the current session.
:return The current default catalog name that is used for the path resolution.
.. seealso:: :func:`~pyflink.table.TableEnvironment.use_catalog`
"""
self._j_tenv.getCurrentCatalog()
def use_catalog(self, catalog_name):
"""
Sets the current catalog to the given value. It also sets the default
database to the catalog's default one.
See also :func:`~TableEnvironment.use_database`.
This is used during the resolution of object paths. Both the catalog and database are
optional when referencing catalog objects such as tables, views etc. The algorithm looks for
requested objects in following paths in that order:
* ``[current-catalog].[current-database].[requested-path]``
* ``[current-catalog].[requested-path]``
* ``[requested-path]``
Example:
Given structure with default catalog set to ``default_catalog`` and default database set to
``default_database``. ::
root:
|- default_catalog
|- default_database
|- tab1
|- db1
|- tab1
|- cat1
|- db1
|- tab1
The following table describes resolved paths:
+----------------+-----------------------------------------+
| Requested path | Resolved path |
+================+=========================================+
| tab1 | default_catalog.default_database.tab1 |
+----------------+-----------------------------------------+
| db1.tab1 | default_catalog.db1.tab1 |
+----------------+-----------------------------------------+
| cat1.db1.tab1 | cat1.db1.tab1 |
+----------------+-----------------------------------------+
:param: catalog_name: The name of the catalog to set as the current default catalog.
:throws: CatalogException thrown if a catalog with given name could not be set as the
default one
.. seealso:: :func:`~pyflink.table.TableEnvironment.use_database`
"""
self._j_tenv.useCatalog(catalog_name)
def get_current_database(self):
"""
Gets the current default database name of the running session.
:return The name of the current database of the current catalog.
.. seealso:: :func:`~pyflink.table.TableEnvironment.use_database`
"""
self._j_tenv.getCurrentCatalog()
def use_database(self, database_name):
"""
Sets the current default database. It has to exist in the current catalog. That path will
be used as the default one when looking for unqualified object names.
This is used during the resolution of object paths. Both the catalog and database are
optional when referencing catalog objects such as tables, views etc. The algorithm looks for
requested objects in following paths in that order:
* ``[current-catalog].[current-database].[requested-path]``
* ``[current-catalog].[requested-path]``
* ``[requested-path]``
Example:
Given structure with default catalog set to ``default_catalog`` and default database set to
``default_database``. ::
root:
|- default_catalog
|- default_database
|- tab1
|- db1
|- tab1
|- cat1
|- db1
|- tab1
The following table describes resolved paths:
+----------------+-----------------------------------------+
| Requested path | Resolved path |
+================+=========================================+
| tab1 | default_catalog.default_database.tab1 |
+----------------+-----------------------------------------+
| db1.tab1 | default_catalog.db1.tab1 |
+----------------+-----------------------------------------+
| cat1.db1.tab1 | cat1.db1.tab1 |
+----------------+-----------------------------------------+
:throws: CatalogException thrown if the given catalog and database could not be set as
the default ones
.. seealso:: :func:`~pyflink.table.TableEnvironment.use_catalog`
:param: database_name: The name of the database to set as the current database.
"""
self._j_tenv.useDatabase(database_name)
def execute(self, job_name=None):
"""
Triggers the program execution.
......
......@@ -38,11 +38,12 @@ class EnvironmentAPICompletenessTests(PythonAPICompletenessTestCase, unittest.Te
@classmethod
def excluded_methods(cls):
# registerFunction and listUserDefinedFunctions should be supported when UDFs supported.
# registerExternalCatalog, getRegisteredExternalCatalog and listTables
# should be supported when catalog supported in python.
# registerExternalCatalog, getRegisteredExternalCatalog, registerCatalog, getCatalog and
# listTables should be supported when catalog supported in python.
# getCompletionHints has been deprecated. It will be removed in the next release.
return {'registerExternalCatalog', 'getRegisteredExternalCatalog',
'registerFunction', 'listUserDefinedFunctions', 'listTables', 'getCompletionHints'}
return {'registerExternalCatalog', 'getRegisteredExternalCatalog', 'registerCatalog',
'getCatalog', 'registerFunction', 'listUserDefinedFunctions', 'listTables',
'getCompletionHints'}
if __name__ == '__main__':
......
......@@ -74,7 +74,7 @@ import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Options;
import java.net.URL;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Supplier;
......@@ -115,8 +115,8 @@ public class ExecutionContext<T> {
this.getClass().getClassLoader());
// create table sources & sinks.
tableSources = new HashMap<>();
tableSinks = new HashMap<>();
tableSources = new LinkedHashMap<>();
tableSinks = new LinkedHashMap<>();
mergedEnv.getTables().forEach((name, entry) -> {
if (entry instanceof SourceTableEntry || entry instanceof SourceSinkTableEntry) {
tableSources.put(name, createTableSource(mergedEnv.getExecution(), entry.asMap(), classLoader));
......@@ -127,7 +127,7 @@ public class ExecutionContext<T> {
});
// create user-defined functions
functions = new HashMap<>();
functions = new LinkedHashMap<>();
mergedEnv.getFunctions().forEach((name, entry) -> {
final UserDefinedFunction function = FunctionService.createFunction(entry.getDescriptor(), classLoader, false);
functions.put(name, function);
......
......@@ -119,13 +119,13 @@ public class LocalExecutorITCase extends TestLogger {
List<String> actualTables = executor.listTables(session);
List<String> expectedTables = Arrays.asList(
"AdditionalView1",
"AdditionalView2",
"TableNumber1",
"TableNumber2",
"TableSourceSink",
"TestView1",
"TestView2");
"TestView2",
"AdditionalView1",
"AdditionalView2");
assertEquals(expectedTables, actualTables);
session.removeView("AdditionalView1");
......@@ -229,9 +229,9 @@ public class LocalExecutorITCase extends TestLogger {
final SessionContext session = new SessionContext("test-session", new Environment());
final List<String> expectedTableHints = Arrays.asList(
"TableNumber1",
"TableNumber2",
"TableSourceSink");
"default_catalog.default_database.TableNumber1",
"default_catalog.default_database.TableNumber2",
"default_catalog.default_database.TableSourceSink");
assertEquals(expectedTableHints, executor.completeStatement(session, "SELECT * FROM Ta", 16));
final List<String> expectedClause = Collections.singletonList("WHERE");
......
......@@ -27,6 +27,8 @@ import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.catalog.CatalogManager;
import org.apache.flink.table.catalog.GenericInMemoryCatalog;
import org.apache.flink.table.descriptors.BatchTableDescriptor;
import org.apache.flink.table.descriptors.ConnectorDescriptor;
import org.apache.flink.table.functions.AggregateFunction;
......@@ -283,9 +285,13 @@ public interface BatchTableEnvironment extends TableEnvironment {
*/
static BatchTableEnvironment create(ExecutionEnvironment executionEnvironment, TableConfig tableConfig) {
try {
Class clazz = Class.forName("org.apache.flink.table.api.java.BatchTableEnvImpl");
Constructor con = clazz.getConstructor(ExecutionEnvironment.class, TableConfig.class);
return (BatchTableEnvironment) con.newInstance(executionEnvironment, tableConfig);
Class<?> clazz = Class.forName("org.apache.flink.table.api.java.BatchTableEnvImpl");
Constructor con = clazz.getConstructor(ExecutionEnvironment.class, TableConfig.class, CatalogManager.class);
CatalogManager catalogManager = new CatalogManager(
tableConfig.getBuiltInCatalogName(),
new GenericInMemoryCatalog(tableConfig.getBuiltInCatalogName(), tableConfig.getBuiltInDatabaseName())
);
return (BatchTableEnvironment) con.newInstance(executionEnvironment, tableConfig, catalogManager);
} catch (Throwable t) {
throw new TableException("Create BatchTableEnvironment failed.", t);
}
......
......@@ -28,6 +28,8 @@ import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.catalog.CatalogManager;
import org.apache.flink.table.catalog.GenericInMemoryCatalog;
import org.apache.flink.table.descriptors.ConnectorDescriptor;
import org.apache.flink.table.descriptors.StreamTableDescriptor;
import org.apache.flink.table.functions.AggregateFunction;
......@@ -400,9 +402,15 @@ public interface StreamTableEnvironment extends TableEnvironment {
*/
static StreamTableEnvironment create(StreamExecutionEnvironment executionEnvironment, TableConfig tableConfig) {
try {
Class clazz = Class.forName("org.apache.flink.table.api.java.StreamTableEnvImpl");
Constructor con = clazz.getConstructor(StreamExecutionEnvironment.class, TableConfig.class);
return (StreamTableEnvironment) con.newInstance(executionEnvironment, tableConfig);
Class<?> clazz = Class.forName("org.apache.flink.table.api.java.StreamTableEnvImpl");
Constructor con = clazz.getConstructor(
StreamExecutionEnvironment.class,
TableConfig.class,
CatalogManager.class);
CatalogManager catalogManager = new CatalogManager(
tableConfig.getBuiltInCatalogName(),
new GenericInMemoryCatalog(tableConfig.getBuiltInCatalogName(), tableConfig.getBuiltInDatabaseName()));
return (StreamTableEnvironment) con.newInstance(executionEnvironment, tableConfig, catalogManager);
} catch (Throwable t) {
throw new TableException("Create StreamTableEnvironment failed.", t);
}
......
......@@ -812,7 +812,8 @@ public interface Table {
Table fetch(int fetch);
/**
* Writes the {@link Table} to a {@link TableSink} that was registered under the specified name.
* Writes the {@link Table} to a {@link TableSink} that was registered under the specified path.
* For the path resolution algorithm see {@link TableEnvironment#useDatabase(String)}.
*
* <p>A batch {@link Table} can only be written to a
* {@code org.apache.flink.table.sinks.BatchTableSink}, a streaming {@link Table} requires a
......@@ -820,13 +821,16 @@ public interface Table {
* {@code org.apache.flink.table.sinks.RetractStreamTableSink}, or an
* {@code org.apache.flink.table.sinks.UpsertStreamTableSink}.
*
* @param tableName Name of the registered {@link TableSink} to which the {@link Table} is
* written.
* @param tablePath The first part of the path of the registered {@link TableSink} to which the {@link Table} is
* written. This is to ensure at least the name of the {@link TableSink} is provided.
* @param tablePathContinued The remaining part of the path of the registered {@link TableSink} to which the
* {@link Table} is written.
*/
void insertInto(String tableName);
void insertInto(String tablePath, String... tablePathContinued);
/**
* Writes the {@link Table} to a {@link TableSink} that was registered under the specified name.
* Writes the {@link Table} to a {@link TableSink} that was registered under the specified name
* in the initial default catalog.
*
* <p>A batch {@link Table} can only be written to a
* {@code org.apache.flink.table.sinks.BatchTableSink}, a streaming {@link Table} requires a
......@@ -834,11 +838,31 @@ public interface Table {
* {@code org.apache.flink.table.sinks.RetractStreamTableSink}, or an
* {@code org.apache.flink.table.sinks.UpsertStreamTableSink}.
*
* @param tableName Name of the {@link TableSink} to which the {@link Table} is written.
* @param tableName The name of the {@link TableSink} to which the {@link Table} is written.
* @param conf The {@link QueryConfig} to use.
* @deprecated use {@link #insertInto(QueryConfig, String, String...)}
*/
@Deprecated
void insertInto(String tableName, QueryConfig conf);
/**
* Writes the {@link Table} to a {@link TableSink} that was registered under the specified path.
* For the path resolution algorithm see {@link TableEnvironment#useDatabase(String)}.
*
* <p>A batch {@link Table} can only be written to a
* {@code org.apache.flink.table.sinks.BatchTableSink}, a streaming {@link Table} requires a
* {@code org.apache.flink.table.sinks.AppendStreamTableSink}, a
* {@code org.apache.flink.table.sinks.RetractStreamTableSink}, or an
* {@code org.apache.flink.table.sinks.UpsertStreamTableSink}.
*
* @param conf The {@link QueryConfig} to use.
* @param tablePath The first part of the path of the registered {@link TableSink} to which the {@link Table} is
* written. This is to ensure at least the name of the {@link TableSink} is provided.
* @param tablePathContinued The remaining part of the path of the registered {@link TableSink} to which the
* {@link Table} is written.
*/
void insertInto(QueryConfig conf, String tablePath, String... tablePathContinued);
/**
* Groups the records of a table by assigning them to windows defined by a time or row interval.
*
......
......@@ -57,6 +57,18 @@ public class TableConfig {
*/
private Integer maxGeneratedCodeLength = 64000; // just an estimate
/**
* Specifies the name of the initial catalog to be created when instantiating
* TableEnvironment.
*/
private String builtInCatalogName = "default_catalog";
/**
* Specifies the name of the default database in the initial catalog to be created when instantiating
* TableEnvironment.
*/
private String builtInDatabaseName = "default_database";
/**
* Returns the timezone for date/time/timestamp conversions.
*/
......@@ -134,6 +146,38 @@ public class TableConfig {
this.maxGeneratedCodeLength = Preconditions.checkNotNull(maxGeneratedCodeLength);
}
/**
* Gets the specified name of the initial catalog to be created when instantiating
* a {@link TableEnvironment}.
*/
public String getBuiltInCatalogName() {
return builtInCatalogName;
}
/**
* Specifies the name of the initial catalog to be created when instantiating
* a {@link TableEnvironment}. This method has no effect if called on the {@link TableEnvironment#getConfig()}.
*/
public void setBuiltInCatalogName(String builtInCatalogName) {
this.builtInCatalogName = builtInCatalogName;
}
/**
* Gets the specified name of the default database in the initial catalog to be created when instantiating
* a {@link TableEnvironment}.
*/
public String getBuiltInDatabaseName() {
return builtInDatabaseName;
}
/**
* Specifies the name of the default database in the initial catalog to be created when instantiating
* a {@link TableEnvironment}. This method has no effect if called on the {@link TableEnvironment#getConfig()}.
*/
public void setBuiltInDatabaseName(String builtInDatabaseName) {
this.builtInDatabaseName = builtInDatabaseName;
}
public static TableConfig getDefault() {
return new TableConfig();
}
......
......@@ -20,6 +20,7 @@ package org.apache.flink.table.api;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.table.catalog.Catalog;
import org.apache.flink.table.catalog.ExternalCatalog;
import org.apache.flink.table.descriptors.ConnectorDescriptor;
import org.apache.flink.table.descriptors.TableDescriptor;
......@@ -27,6 +28,8 @@ import org.apache.flink.table.functions.ScalarFunction;
import org.apache.flink.table.sinks.TableSink;
import org.apache.flink.table.sources.TableSource;
import java.util.Optional;
/**
* The base class for batch and stream TableEnvironments.
*
......@@ -55,19 +58,44 @@ public interface TableEnvironment {
* Registers an {@link ExternalCatalog} under a unique name in the TableEnvironment's schema.
* All tables registered in the {@link ExternalCatalog} can be accessed.
*
* @param name The name under which the externalCatalog will be registered
* @param externalCatalog The externalCatalog to register
* @param name The name under which the externalCatalog will be registered.
* @param externalCatalog The externalCatalog to register.
* @see TableEnvironment#getCatalog(String)
* @see TableEnvironment#registerCatalog(String, Catalog)
* @deprecated the {@link ExternalCatalog} API is deprecated. Use the corresponding {@link Catalog} API.
*/
@Deprecated
void registerExternalCatalog(String name, ExternalCatalog externalCatalog);
/**
* Gets a registered {@link ExternalCatalog} by name.
*
* @param name The name to look up the {@link ExternalCatalog}
* @return The {@link ExternalCatalog}
* @param name The name to look up the {@link ExternalCatalog}.
* @return The {@link ExternalCatalog}.
* @see TableEnvironment#getCatalog(String)
* @see TableEnvironment#registerCatalog(String, Catalog)
* @deprecated the {@link ExternalCatalog} API is deprecated. Use the corresponding {@link Catalog} API.
*/
@Deprecated
ExternalCatalog getRegisteredExternalCatalog(String name);
/**
* Registers a {@link Catalog} under a unique name.
* All tables registered in the {@link Catalog} can be accessed.
*
* @param catalogName The name under which the catalog will be registered.
* @param catalog The catalog to register.
*/
void registerCatalog(String catalogName, Catalog catalog);
/**
* Gets a registered {@link Catalog} by name.
*
* @param catalogName The name to look up the {@link Catalog}.
* @return The requested catalog, empty if there is no registered catalog with given name.
*/
Optional<Catalog> getCatalog(String catalogName);
/**
* Registers a {@link ScalarFunction} under a unique name. Replaces already existing
* user-defined functions under this name.
......@@ -119,8 +147,11 @@ public interface TableEnvironment {
/**
* Scans a registered table and returns the resulting {@link Table}.
*
* <p>A table to scan must be registered in the TableEnvironment. It can be either directly
* registered or be a member of an {@link ExternalCatalog}.
* <p>A table to scan must be registered in the {@link TableEnvironment}. It can be either directly
* registered or be an external member of a {@link Catalog}.
*
* <p>See the documentation of {@link TableEnvironment#useDatabase(String)} or
* {@link TableEnvironment#useCatalog(String)} for the rules on the path resolution.
*
* <p>Examples:
*
......@@ -140,9 +171,10 @@ public interface TableEnvironment {
*
* @param tablePath The path of the table to scan.
* @return The resulting {@link Table}.
* @throws TableException if no table is found using the given table path.
* @see TableEnvironment#useCatalog(String)
* @see TableEnvironment#useDatabase(String)
*/
Table scan(String... tablePath) throws TableException;
Table scan(String... tablePath);
/**
* Creates a table source and/or table sink from a descriptor.
......@@ -280,6 +312,138 @@ public interface TableEnvironment {
*/
void sqlUpdate(String stmt, QueryConfig config);
/**
* Gets the current default catalog name of the current session.
*
* @return The current default catalog name that is used for the path resolution.
* @see TableEnvironment#useCatalog(String)
*/
String getCurrentCatalog();
/**
* Sets the current catalog to the given value. It also sets the default
* database to the catalog's default one. See also {@link TableEnvironment#useDatabase(String)}.
*
* <p>This is used during the resolution of object paths. Both the catalog and database are optional
* when referencing catalog objects such as tables, views etc. The algorithm looks for requested objects in
* following paths in that order:
* <ol>
* <li>{@code [current-catalog].[current-database].[requested-path]}</li>
* <li>{@code [current-catalog].[requested-path]}</li>
* <li>{@code [requested-path]}</li>
* </ol>
*
* <p>Example:
*
* <p>Given structure with default catalog set to {@code default_catalog} and default database set to
* {@code default_database}.
* <pre>
* root:
* |- default_catalog
* |- default_database
* |- tab1
* |- db1
* |- tab1
* |- cat1
* |- db1
* |- tab1
* </pre>
*
* <p>The following table describes resolved paths:
* <table>
* <thead>
* <tr>
* <th>Requested path</th>
* <th>Resolved path</th>
* </tr>
* </thead>
* <tbody>
* <tr>
* <td>tab1</td>
* <td>default_catalog.default_database.tab1</td>
* </tr>
* <tr>
* <td>db1.tab1</td>
* <td>default_catalog.db1.tab1</td>
* </tr>
* <tr>
* <td>cat1.db1.tab1</td>
* <td>cat1.db1.tab1</td>
* </tr>
* </tbody>
* </table>
*
* @param catalogName The name of the catalog to set as the current default catalog.
* @see TableEnvironment#useDatabase(String)
*/
void useCatalog(String catalogName);
/**
* Gets the current default database name of the running session.
*
* @return The name of the current database of the current catalog.
* @see TableEnvironment#useDatabase(String)
*/
String getCurrentDatabase();
/**
* Sets the current default database. It has to exist in the current catalog. That path will be used as
* the default one when looking for unqualified object names.
*
* <p>This is used during the resolution of object paths. Both the catalog and database are optional
* when referencing catalog objects such as tables, views etc. The algorithm looks for requested objects in
* following paths in that order:
* <ol>
* <li>{@code [current-catalog].[current-database].[requested-path]}</li>
* <li>{@code [current-catalog].[requested-path]}</li>
* <li>{@code [requested-path]}</li>
* </ol>
*
* <p>Example:
*
* <p>Given structure with default catalog set to {@code default_catalog} and default database set to
* {@code default_database}.
* <pre>
* root:
* |- default_catalog
* |- default_database
* |- tab1
* |- db1
* |- tab1
* |- cat1
* |- db1
* |- tab1
* </pre>
*
* <p></p>The following table describes resolved paths:
* <table>
* <thead>
* <tr>
* <th>Requested path</th>
* <th>Resolved path</th>
* </tr>
* </thead>
* <tbody>
* <tr>
* <td>tab1</td>
* <td>default_catalog.default_database.tab1</td>
* </tr>
* <tr>
* <td>db1.tab1</td>
* <td>default_catalog.db1.tab1</td>
* </tr>
* <tr>
* <td>cat1.db1.tab1</td>
* <td>cat1.db1.tab1</td>
* </tr>
* </tbody>
* </table>
*
* @param databaseName The name of the database to set as the current database.
* @see TableEnvironment#useCatalog(String)
*/
void useDatabase(String databaseName);
/**
* Returns the table config that defines the runtime behavior of the Table API.
*/
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.table.catalog;
import org.apache.flink.annotation.Internal;
import org.apache.flink.table.api.CatalogNotExistException;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.flink.table.catalog.exceptions.TableNotExistException;
import org.apache.flink.table.factories.TableFactoryUtil;
import org.apache.flink.table.operations.CatalogTableOperation;
import org.apache.flink.table.sinks.TableSink;
import org.apache.flink.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import static java.lang.String.format;
import static java.util.Arrays.asList;
import static java.util.Collections.emptyList;
import static java.util.Collections.singletonList;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
/**
* A CatalogManager that encapsulates all available catalogs. It also implements the logic of
* table path resolution. Supports both new API ({@link Catalog} as well as {@link ExternalCatalog}).
*/
@Internal
public class CatalogManager {
private static final Logger LOG = LoggerFactory.getLogger(CatalogManager.class);
// A map between names and catalogs.
private Map<String, Catalog> catalogs;
// TO BE REMOVED along with ExternalCatalog API
private Map<String, ExternalCatalog> externalCatalogs;
// The name of the default catalog and schema
private String currentCatalogName;
private String currentDatabaseName;
public CatalogManager(String defaultCatalogName, Catalog defaultCatalog) {
checkArgument(
!StringUtils.isNullOrWhitespaceOnly(defaultCatalogName),
"Default catalog name cannot be null or empty");
checkNotNull(defaultCatalog, "Default catalog cannot be null");
catalogs = new LinkedHashMap<>();
externalCatalogs = new LinkedHashMap<>();
catalogs.put(defaultCatalogName, defaultCatalog);
this.currentCatalogName = defaultCatalogName;
this.currentDatabaseName = defaultCatalog.getDefaultDatabase();
}
/**
* Registers a catalog under the given name. The catalog name must be unique across both
* {@link Catalog}s and {@link ExternalCatalog}s.
*
* @param catalogName name under which to register the given catalog
* @param catalog catalog to register
* @throws CatalogException if the registration of the catalog under the given name failed
*/
public void registerCatalog(String catalogName, Catalog catalog) {
checkArgument(!StringUtils.isNullOrWhitespaceOnly(catalogName), "Catalog name cannot be null or empty.");
checkNotNull(catalog, "Catalog cannot be null");
if (catalogs.containsKey(catalogName) || externalCatalogs.containsKey(catalogName)) {
throw new CatalogException(format("Catalog %s already exists.", catalogName));
}
catalogs.put(catalogName, catalog);
catalog.open();
}
/**
* Gets a catalog by name.
*
* @param catalogName name of the catalog to retrieve
* @return the requested catalog or empty if it does not exist
* @see CatalogManager#getExternalCatalog(String)
*/
public Optional<Catalog> getCatalog(String catalogName) {
return Optional.ofNullable(catalogs.get(catalogName));
}
/**
* Registers an external catalog under the given name. The catalog name must be unique across both
* {@link Catalog}s and {@link ExternalCatalog}s.
*
* @param catalogName name under which to register the given catalog
* @param catalog catalog to register
* @throws CatalogException thrown if the name is already taken
* @deprecated {@link ExternalCatalog} APIs will be dropped
*/
@Deprecated
public void registerExternalCatalog(String catalogName, ExternalCatalog catalog) {
checkArgument(!StringUtils.isNullOrWhitespaceOnly(catalogName), "The catalog name cannot be null or empty.");
checkNotNull(catalog, "The catalog cannot be null.");
if (externalCatalogs.containsKey(catalogName) || catalogs.containsKey(catalogName)) {
throw new CatalogException(format("An external catalog named [%s] already exists.", catalogName));
}
externalCatalogs.put(catalogName, catalog);
}
/**
* Gets an external catalog by name.
*
* @param externalCatalogName name of the catalog to retrieve
* @return the requested external catalog or empty if it does not exist
* @see CatalogManager#getCatalog(String)
* @deprecated {@link ExternalCatalog} APIs will be dropped
*/
@Deprecated
public Optional<ExternalCatalog> getExternalCatalog(String externalCatalogName) {
return Optional.ofNullable(externalCatalogs.get(externalCatalogName));
}
/**
* Retrieves names of all registered catalogs. It does not include {@link ExternalCatalog}s.
*
* @return a set of names of registered catalogs
* @see CatalogManager#getExternalCatalogs()
*/
public Set<String> getCatalogs() {
return catalogs.keySet();
}
/**
* Retrieves names of all registered external catalogs. It does not include {@link Catalog}s.
*
* @return a set of names of registered catalogs
* @see CatalogManager#getCatalogs()
* @deprecated {@link ExternalCatalog} APIs will be dropped
*/
@Deprecated
public Set<String> getExternalCatalogs() {
return externalCatalogs.keySet();
}
/**
* Gets the current default catalog that will be used when resolving table path.
*
* @return the current default catalog
* @see CatalogManager#resolveTable(String...)
*/
public String getCurrentCatalog() {
return currentCatalogName;
}
/**
* Sets the current default catalog name that will be used when resolving table path.
*
* @param catalogName catalog name to set as current default catalog
* @throws CatalogNotExistException thrown if the catalog doesn't exist
* @see CatalogManager#resolveTable(String...)
*/
public void setCurrentCatalog(String catalogName) throws CatalogNotExistException {
checkArgument(!StringUtils.isNullOrWhitespaceOnly(catalogName), "Catalog name cannot be null or empty.");
if (externalCatalogs.containsKey(catalogName)) {
throw new CatalogException("An external catalog cannot be set as the default one.");
}
Catalog potentialCurrentCatalog = catalogs.get(catalogName);
if (potentialCurrentCatalog == null) {
throw new CatalogException(format("A catalog with name [%s] does not exist.", catalogName));
}
if (!currentCatalogName.equals(catalogName)) {
currentCatalogName = catalogName;
currentDatabaseName = potentialCurrentCatalog.getDefaultDatabase();
LOG.info(
"Set the current default catalog as [{}] and the current default database as [{}].",
currentCatalogName,
currentDatabaseName);
}
}
/**
* Gets the current default database name that will be used when resolving table path.
*
* @return the current default database
* @see CatalogManager#resolveTable(String...)
*/
public String getCurrentDatabase() {
return currentDatabaseName;
}
/**
* Sets the current default database name that will be used when resolving a table path.
* The database has to exist in the current catalog.
*
* @param databaseName database name to set as current default database name
* @throws CatalogException thrown if the database doesn't exist in the current catalog
* @see CatalogManager#resolveTable(String...)
* @see CatalogManager#setCurrentCatalog(String)
*/
public void setCurrentDatabase(String databaseName) {
checkArgument(!StringUtils.isNullOrWhitespaceOnly(databaseName), "The database name cannot be null or empty.");
if (!catalogs.get(currentCatalogName).databaseExists(databaseName)) {
throw new CatalogException(format(
"A database with name [%s] does not exist in the catalog: [%s].",
databaseName,
currentCatalogName));
}
if (!currentDatabaseName.equals(databaseName)) {
currentDatabaseName = databaseName;
LOG.info(
"Set the current default database as [{}] in the current default catalog [{}].",
currentCatalogName,
currentDatabaseName);
}
}
/**
* Tries to resolve a table path to a {@link CatalogTableOperation}. The algorithm looks for requested table
* in following paths in that order:
* <ol>
* <li>{@code [current-catalog].[current-database].[tablePath]}</li>
* <li>{@code [current-catalog].[tablePath]}</li>
* <li>{@code [tablePath]}</li>
* </ol>
*
* @param tablePath table path to look for
* @return {@link CatalogTableOperation} containing both fully qualified table identifier and its
* {@link TableSchema}.
*/
public Optional<CatalogTableOperation> resolveTable(String... tablePath) {
checkArgument(tablePath != null && tablePath.length != 0, "Table path must not be null or empty.");
List<String> userPath = asList(tablePath);
List<List<String>> prefixes = asList(
asList(currentCatalogName, currentDatabaseName),
singletonList(currentCatalogName),
emptyList()
);
for (List<String> prefix : prefixes) {
Optional<CatalogTableOperation> potentialTable = lookupPath(prefix, userPath);
if (potentialTable.isPresent()) {
return potentialTable;
}
}
return Optional.empty();
}
private Optional<CatalogTableOperation> lookupPath(List<String> prefix, List<String> userPath) {
try {
List<String> path = new ArrayList<>(prefix);
path.addAll(userPath);
Optional<CatalogTableOperation> potentialTable = lookupCatalogTable(path);
if (!potentialTable.isPresent()) {
potentialTable = lookupExternalTable(path);
}
return potentialTable;
} catch (TableNotExistException e) {
return Optional.empty();
}
}
private Optional<CatalogTableOperation> lookupCatalogTable(List<String> path) throws TableNotExistException {
if (path.size() == 3) {
Catalog currentCatalog = catalogs.get(path.get(0));
String currentDatabaseName = path.get(1);
String tableName = String.join(".", path.subList(2, path.size()));
ObjectPath objectPath = new ObjectPath(currentDatabaseName, tableName);
if (currentCatalog != null && currentCatalog.tableExists(objectPath)) {
TableSchema tableSchema = currentCatalog.getTable(objectPath).getSchema();
return Optional.of(new CatalogTableOperation(
asList(path.get(0), currentDatabaseName, tableName),
tableSchema));
}
}
return Optional.empty();
}
private Optional<CatalogTableOperation> lookupExternalTable(List<String> path) {
ExternalCatalog currentCatalog = externalCatalogs.get(path.get(0));
return Optional.ofNullable(currentCatalog)
.flatMap(externalCatalog -> extractPath(externalCatalog, path.subList(1, path.size() - 1)))
.map(finalCatalog -> finalCatalog.getTable(path.get(path.size() - 1)))
.map(table -> new CatalogTableOperation(path, getTableSchema(table)));
}
private Optional<ExternalCatalog> extractPath(ExternalCatalog rootExternalCatalog, List<String> path) {
ExternalCatalog schema = rootExternalCatalog;
for (String pathPart : path) {
schema = schema.getSubCatalog(pathPart);
if (schema == null) {
return Optional.empty();
}
}
return Optional.of(schema);
}
private static TableSchema getTableSchema(ExternalCatalogTable externalTable) {
if (externalTable.isTableSource()) {
return TableFactoryUtil.findAndCreateTableSource(externalTable).getTableSchema();
} else {
TableSink tableSink = TableFactoryUtil.findAndCreateTableSink(externalTable);
return new TableSchema(tableSink.getFieldNames(), tableSink.getFieldTypes());
}
}
}
......@@ -20,6 +20,7 @@ package org.apache.flink.table.api.scala
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment}
import org.apache.flink.table.api.{TableEnvironment, _}
import org.apache.flink.table.catalog.{CatalogManager, GenericInMemoryCatalog}
import org.apache.flink.table.descriptors.{BatchTableDescriptor, ConnectorDescriptor}
import org.apache.flink.table.expressions.Expression
import org.apache.flink.table.functions.{AggregateFunction, TableFunction}
......@@ -226,8 +227,19 @@ object BatchTableEnvironment {
: BatchTableEnvironment = {
try {
val clazz = Class.forName("org.apache.flink.table.api.scala.BatchTableEnvImpl")
val const = clazz.getConstructor(classOf[ExecutionEnvironment], classOf[TableConfig])
const.newInstance(executionEnvironment, tableConfig).asInstanceOf[BatchTableEnvironment]
val const = clazz
.getConstructor(
classOf[ExecutionEnvironment],
classOf[TableConfig],
classOf[CatalogManager])
val catalogManager = new CatalogManager(
tableConfig.getBuiltInCatalogName,
new GenericInMemoryCatalog(
tableConfig.getBuiltInCatalogName,
tableConfig.getBuiltInDatabaseName)
)
const.newInstance(executionEnvironment, tableConfig, catalogManager)
.asInstanceOf[BatchTableEnvironment]
} catch {
case t: Throwable => throw new TableException("Create BatchTableEnvironment failed.", t)
}
......
......@@ -20,6 +20,7 @@ package org.apache.flink.table.api.scala
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.table.api.{TableEnvironment, _}
import org.apache.flink.table.catalog.{CatalogManager, GenericInMemoryCatalog}
import org.apache.flink.table.descriptors.{ConnectorDescriptor, StreamTableDescriptor}
import org.apache.flink.table.expressions.Expression
import org.apache.flink.table.functions.{AggregateFunction, TableAggregateFunction, TableFunction}
......@@ -281,8 +282,19 @@ object StreamTableEnvironment {
: StreamTableEnvironment = {
try {
val clazz = Class.forName("org.apache.flink.table.api.scala.StreamTableEnvImpl")
val const = clazz.getConstructor(classOf[StreamExecutionEnvironment], classOf[TableConfig])
const.newInstance(executionEnvironment, tableConfig).asInstanceOf[StreamTableEnvironment]
val const = clazz
.getConstructor(
classOf[StreamExecutionEnvironment],
classOf[TableConfig],
classOf[CatalogManager])
val catalogManager = new CatalogManager(
tableConfig.getBuiltInCatalogName,
new GenericInMemoryCatalog(
tableConfig.getBuiltInCatalogName,
tableConfig.getBuiltInDatabaseName)
)
const.newInstance(executionEnvironment, tableConfig, catalogManager)
.asInstanceOf[StreamTableEnvironment]
} catch {
case t: Throwable => throw new TableException("Create StreamTableEnvironment failed.", t)
}
......
......@@ -20,25 +20,15 @@ package org.apache.flink.table.catalog.exceptions;
/**
* Exception for trying to operate on a database that doesn't exist.
*
*/
public class DatabaseNotExistException extends Exception {
private static final String MSG = "Database %s does not exist in Catalog %s.";
/**
* @param catalog Catalog name
* @param database Database name
* @param cause The cause
*/
public DatabaseNotExistException(String catalog, String database, Throwable cause) {
super(String.format(MSG, database, catalog), cause);
public DatabaseNotExistException(String catalogName, String databaseName, Throwable cause) {
super(String.format(MSG, databaseName, catalogName), cause);
}
/**
* @param catalog Catalog name
* @param database Database name
*/
public DatabaseNotExistException(String catalog, String database) {
this(catalog, database, null);
public DatabaseNotExistException(String catalogName, String databaseName) {
this(catalogName, databaseName, null);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.table.catalog;
import org.apache.flink.table.api.CatalogAlreadyExistsException;
import org.apache.flink.table.api.CatalogNotExistException;
import org.apache.flink.util.StringUtils;
import org.apache.calcite.jdbc.CalciteSchema;
import org.apache.calcite.schema.SchemaPlus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
/**
* A CatalogManager implementation for Flink.
* TODO: [FLINK-11275] Decouple CatalogManager with Calcite
* Idealy FlinkCatalogManager should be in flink-table-api-java module.
* But due to that it currently depends on Calcite, a dependency that flink-table-api-java doesn't have right now.
* We temporarily put FlinkCatalogManager in flink-table-planner-blink.
*/
public class FlinkCatalogManager implements CatalogManager {
private static final Logger LOG = LoggerFactory.getLogger(FlinkCatalogManager.class);
public static final String BUILTIN_CATALOG_NAME = "builtin";
// The catalog to hold all registered and translated tables
// We disable caching here to prevent side effects
private CalciteSchema internalSchema = CalciteSchema.createRootSchema(false, false);
private SchemaPlus rootSchema = internalSchema.plus();
// A map between names and catalogs.
private Map<String, Catalog> catalogs;
// The name of the default catalog and schema
private String currentCatalogName;
public FlinkCatalogManager() {
LOG.info("Initializing FlinkCatalogManager");
catalogs = new HashMap<>();
GenericInMemoryCatalog inMemoryCatalog = new GenericInMemoryCatalog(BUILTIN_CATALOG_NAME);
catalogs.put(BUILTIN_CATALOG_NAME, inMemoryCatalog);
currentCatalogName = BUILTIN_CATALOG_NAME;
CatalogCalciteSchema.registerCatalog(rootSchema, BUILTIN_CATALOG_NAME, inMemoryCatalog);
}
@Override
public void registerCatalog(String catalogName, Catalog catalog) throws CatalogAlreadyExistsException {
checkArgument(!StringUtils.isNullOrWhitespaceOnly(catalogName), "catalogName cannot be null or empty");
checkNotNull(catalog, "catalog cannot be null");
if (catalogs.containsKey(catalogName)) {
throw new CatalogAlreadyExistsException(catalogName);
}
catalogs.put(catalogName, catalog);
catalog.open();
CatalogCalciteSchema.registerCatalog(rootSchema, catalogName, catalog);
}
@Override
public Catalog getCatalog(String catalogName) throws CatalogNotExistException {
if (!catalogs.keySet().contains(catalogName)) {
throw new CatalogNotExistException(catalogName);
}
return catalogs.get(catalogName);
}
@Override
public Set<String> getCatalogNames() {
return catalogs.keySet();
}
@Override
public Catalog getCurrentCatalog() {
return catalogs.get(currentCatalogName);
}
@Override
public void setCurrentCatalog(String catalogName) throws CatalogNotExistException {
checkArgument(!StringUtils.isNullOrWhitespaceOnly(catalogName), "catalogName cannot be null or empty");
if (!catalogs.keySet().contains(catalogName)) {
throw new CatalogNotExistException(catalogName);
}
if (!currentCatalogName.equals(catalogName)) {
currentCatalogName = catalogName;
LOG.info("Set default catalog as '{}' and default database as '{}'",
currentCatalogName, catalogs.get(currentCatalogName).getCurrentDatabase());
}
}
public SchemaPlus getRootSchema() {
return rootSchema;
}
}
......@@ -18,6 +18,21 @@
package org.apache.flink.table.api
import _root_.java.lang.reflect.Modifier
import _root_.java.util.concurrent.atomic.AtomicInteger
import _root_.java.util.{Arrays => JArrays}
import org.apache.calcite.config.Lex
import org.apache.calcite.jdbc.CalciteSchema
import org.apache.calcite.plan.{RelOptPlanner, RelTrait, RelTraitDef}
import org.apache.calcite.rel.RelNode
import org.apache.calcite.schema.SchemaPlus
import org.apache.calcite.schema.impl.AbstractTable
import org.apache.calcite.sql._
import org.apache.calcite.sql.parser.SqlParser
import org.apache.calcite.sql.util.{ChainedSqlOperatorTable, ListSqlOperatorTable}
import org.apache.calcite.sql2rel.SqlToRelConverter
import org.apache.calcite.tools._
import org.apache.flink.annotation.VisibleForTesting
import org.apache.flink.api.common.JobExecutionResult
import org.apache.flink.api.common.typeinfo.TypeInformation
......@@ -30,8 +45,7 @@ import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment => Scala
import org.apache.flink.streaming.api.transformations.StreamTransformation
import org.apache.flink.table.api.java.{BatchTableEnvironment => JavaBatchTableEnvironment, StreamTableEnvironment => JavaStreamTableEnv}
import org.apache.flink.table.api.scala.{BatchTableEnvironment => ScalaBatchTableEnvironment, StreamTableEnvironment => ScalaStreamTableEnv}
import org.apache.flink.table.calcite.{FlinkContextImpl, FlinkPlannerImpl, FlinkRelBuilder, FlinkTypeFactory, FlinkTypeSystem}
import org.apache.flink.table.catalog.{CatalogManager, FlinkCatalogManager, Catalog}
import org.apache.flink.table.calcite._
import org.apache.flink.table.codegen.ExpressionReducer
import org.apache.flink.table.dataformat.BaseRow
import org.apache.flink.table.functions.sql.FlinkSqlOperatorTable
......@@ -52,22 +66,6 @@ import org.apache.flink.table.sources.TableSource
import org.apache.flink.table.typeutils.BaseRowTypeInfo
import org.apache.flink.table.validate.FunctionCatalog
import org.apache.flink.types.Row
import org.apache.calcite.config.Lex
import org.apache.calcite.jdbc.CalciteSchema
import org.apache.calcite.plan.{RelOptPlanner, RelTrait, RelTraitDef}
import org.apache.calcite.rel.RelNode
import org.apache.calcite.schema.SchemaPlus
import org.apache.calcite.schema.impl.AbstractTable
import org.apache.calcite.sql._
import org.apache.calcite.sql.parser.SqlParser
import org.apache.calcite.sql.util.{ChainedSqlOperatorTable, ListSqlOperatorTable}
import org.apache.calcite.sql2rel.SqlToRelConverter
import org.apache.calcite.tools._
import _root_.java.lang.reflect.Modifier
import _root_.java.util.concurrent.atomic.AtomicInteger
import _root_.java.util.{Arrays => JArrays}
import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException
import _root_.scala.annotation.varargs
import _root_.scala.collection.JavaConversions._
......@@ -81,9 +79,6 @@ import _root_.scala.collection.mutable
*/
abstract class TableEnvironment(val config: TableConfig) {
// Note: The CatalogManager isn't hooked up to planner yet.
private val catalogManager: CatalogManager = new FlinkCatalogManager()
protected val DEFAULT_JOB_NAME = "Flink Exec Table Job"
// the catalog to hold all registered and translated tables
......@@ -321,74 +316,6 @@ abstract class TableEnvironment(val config: TableConfig) {
*/
private[flink] def optimize(root: RelNode): RelNode = optimize(Seq(root)).head
/**
* Register an [[Catalog]] under a unique name.
*
* @param name the name under which the catalog will be registered
* @param catalog the catalog to register
* @throws CatalogAlreadyExistsException thrown if the catalog already exists
*/
@throws[CatalogAlreadyExistsException]
def registerCatalog(name: String, catalog: Catalog): Unit = {
catalogManager.registerCatalog(name, catalog)
}
/**
* Get a registered [[Catalog]].
*
* @param catalogName name of the catalog to get
* @return the requested catalog
* @throws CatalogNotExistException thrown if the catalog doesn't exist
*/
@throws[CatalogNotExistException]
def getCatalog(catalogName: String): Catalog = {
catalogManager.getCatalog(catalogName)
}
/**
* Get the current catalog.
*
* @return the current catalog in CatalogManager
*/
def getCurrentCatalog(): Catalog = {
catalogManager.getCurrentCatalog
}
/**
* Get the current database name.
*
* @return the current database of the current catalog
*/
def getCurrentDatabaseName(): String = {
catalogManager.getCurrentCatalog.getCurrentDatabase
}
/**
* Set the current catalog.
*
* @param name name of the catalog to set as current catalog
* @throws CatalogNotExistException thrown if the catalog doesn't exist
*/
@throws[CatalogNotExistException]
def setCurrentCatalog(name: String): Unit = {
catalogManager.setCurrentCatalog(name)
}
/**
* Set the current catalog and current database.
*
* @param catalogName name of the catalog to set as current catalog
* @param databaseName name of the database to set as current database
* @throws CatalogNotExistException thrown if the catalog doesn't exist
* @throws DatabaseNotExistException thrown if the database doesn't exist
*/
@throws[CatalogNotExistException]
@throws[DatabaseNotExistException]
def setCurrentDatabase(catalogName: String, databaseName: String): Unit = {
catalogManager.setCurrentCatalog(catalogName)
catalogManager.getCurrentCatalog.setCurrentDatabase(databaseName)
}
/**
* Registers a [[Table]] under a unique name in the TableEnvironment's catalog.
* Registered tables can be referenced in SQL queries.
......
......@@ -169,7 +169,12 @@ class TableImpl(val tableEnv: TableEnvironment, relNode: RelNode) extends Table
override def fetch(fetch: Int): Table = ???
override def insertInto(tableName: String): Unit = ???
override def insertInto(tablePath: String, tablePathContinued: String*): Unit = ???
override def insertInto(
conf: QueryConfig,
tablePath: String,
tablePathContinued: String*): Unit = ???
override def insertInto(
tableName: String,
......
......@@ -18,18 +18,15 @@
package org.apache.flink.table.api
import _root_.java.util.HashMap
import org.apache.calcite.plan.RelOptUtil
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.environment.LocalStreamEnvironment
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.scala.{StreamTableEnvironment, _}
import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException
import org.apache.flink.table.catalog.{FlinkCatalogManager, GenericCatalogDatabase, GenericInMemoryCatalog}
import org.apache.flink.table.api.scala._
import org.apache.flink.table.api.scala.StreamTableEnvironment
import org.junit.{Rule, Test}
import org.junit.Assert.assertEquals
import org.junit.rules.ExpectedException
import org.junit.{Rule, Test}
class TableEnvironmentTest {
......@@ -74,21 +71,4 @@ class TableEnvironmentTest {
" LogicalTableScan(table=[[MyTable]])\n"
assertEquals(expected, actual)
}
@Test
def testSetCurrentDatabase(): Unit = {
assertEquals(GenericInMemoryCatalog.DEFAULT_DB, tableEnv.getCurrentDatabaseName)
val testDb = "test"
tableEnv.getCurrentCatalog
.createDatabase(testDb, new GenericCatalogDatabase(new HashMap[String, String]), false)
tableEnv.setCurrentDatabase(FlinkCatalogManager.BUILTIN_CATALOG_NAME, testDb)
assertEquals(testDb, tableEnv.getCurrentDatabaseName)
}
@Test
def testSetNonExistentCurrentDatabase(): Unit = {
thrown.expect(classOf[DatabaseNotExistException])
thrown.expectMessage("Database nonexist does not exist in Catalog builtin")
tableEnv.setCurrentDatabase(FlinkCatalogManager.BUILTIN_CATALOG_NAME, "nonexist")
}
}
......@@ -16,18 +16,25 @@
* limitations under the License.
*/
package org.apache.flink.table.api;
package org.apache.calcite.jdbc;
import org.apache.calcite.schema.Schema;
/**
* Exception for adding an already existent catalog.
* This class is used to create a {@link CalciteSchema} with a given {@link Schema} as the root.
*/
public class CatalogAlreadyExistsException extends Exception {
public class CalciteSchemaBuilder {
public CatalogAlreadyExistsException(String catalogName) {
this(catalogName, null);
/**
* Creates a {@link CalciteSchema} with a given {@link Schema} as the root.
*
* @param root schema to use as a root schema
* @return calcite schema with given schema as the root
*/
public static CalciteSchema asRootSchema(Schema root) {
return new SimpleCalciteSchema(null, root, "");
}
public CatalogAlreadyExistsException(String catalogName, Throwable cause) {
super(String.format("Catalog %s already exists.", catalogName), cause);
private CalciteSchemaBuilder() {
}
}
......@@ -18,58 +18,71 @@
package org.apache.flink.table.catalog;
import org.apache.flink.table.api.CatalogNotExistException;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.calcite.FlinkTypeFactory;
import org.apache.calcite.schema.SchemaPlus;
import org.junit.Before;
import org.junit.Test;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.schema.Table;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.util.Collections;
import java.util.Map;
import java.util.Optional;
/**
* Tests for FlinkCatalogManager.
* Thin wrapper around Calcite specific {@link Table}, this is a temporary solution
* that allows to register those tables in the {@link CatalogManager}.
* TODO remove once we decouple TableEnvironment from Calcite.
*/
public class FlinkCatalogManagerTest {
private static final String TEST_CATALOG_NAME = "test";
private static final Catalog TEST_CATALOG = new GenericInMemoryCatalog(TEST_CATALOG_NAME);
@Internal
public class CalciteCatalogTable implements CatalogBaseTable {
private final Table table;
private final FlinkTypeFactory typeFactory;
private FlinkCatalogManager manager;
private SchemaPlus rootSchema;
public CalciteCatalogTable(Table table, FlinkTypeFactory typeFactory) {
this.table = table;
this.typeFactory = typeFactory;
}
@Before
public void init() {
manager = new FlinkCatalogManager();
rootSchema = manager.getRootSchema();
public Table getTable() {
return table;
}
@Test
public void testRegisterCatalog() throws Exception {
assertEquals(1, manager.getCatalogNames().size());
assertFalse(manager.getCatalogNames().contains(TEST_CATALOG_NAME));
assertFalse(rootSchema.getSubSchemaNames().contains(TEST_CATALOG_NAME));
@Override
public Map<String, String> getProperties() {
return Collections.emptyMap();
}
manager.registerCatalog(TEST_CATALOG_NAME, TEST_CATALOG);
@Override
public TableSchema getSchema() {
RelDataType relDataType = table.getRowType(typeFactory);
assertEquals(2, manager.getCatalogNames().size());
assertTrue(manager.getCatalogNames().contains(TEST_CATALOG_NAME));
assertTrue(rootSchema.getSubSchemaNames().contains(TEST_CATALOG_NAME));
}
String[] fieldNames = relDataType.getFieldNames().toArray(new String[0]);
TypeInformation[] fieldTypes = relDataType.getFieldList()
.stream()
.map(field -> FlinkTypeFactory.toTypeInfo(field.getType())).toArray(TypeInformation[]::new);
@Test
public void testSetCurrentCatalog() throws Exception {
manager.registerCatalog(TEST_CATALOG_NAME, TEST_CATALOG);
return new TableSchema(fieldNames, fieldTypes);
}
assertEquals(manager.getCatalog(FlinkCatalogManager.BUILTIN_CATALOG_NAME), manager.getCurrentCatalog());
@Override
public String getComment() {
return null;
}
manager.setCurrentCatalog(TEST_CATALOG_NAME);
@Override
public CatalogBaseTable copy() {
return this;
}
assertEquals(manager.getCatalog(TEST_CATALOG_NAME), manager.getCurrentCatalog());
@Override
public Optional<String> getDescription() {
return Optional.empty();
}
@Test(expected = CatalogNotExistException.class)
public void testSetNonExistCurrentCatalog() throws Exception {
manager.setCurrentCatalog("nonexist");
@Override
public Optional<String> getDetailedDescription() {
return Optional.empty();
}
}
......@@ -19,9 +19,6 @@
package org.apache.flink.table.catalog;
import org.apache.flink.annotation.Internal;
import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
import org.apache.flink.table.catalog.exceptions.TableNotExistException;
import org.apache.calcite.linq4j.tree.Expression;
import org.apache.calcite.rel.type.RelProtoDataType;
......@@ -31,22 +28,18 @@ import org.apache.calcite.schema.SchemaPlus;
import org.apache.calcite.schema.SchemaVersion;
import org.apache.calcite.schema.Schemas;
import org.apache.calcite.schema.Table;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Collection;
import java.util.HashSet;
import java.util.Set;
/**
* A mapping between Flink's catalog and Calcite's schema. This enables to look up and access tables
* in SQL queries without registering tables in advance. Databases are registered as sub-schemas in the schema.
* This mapping is modeled as a strict two-level reference structure for Flink in Calcite,
* the full path of tables and views is of format [catalog_name].[db_name].[meta-object_name].
* A mapping between Flink's catalog and Calcite's schema. This enables to look up and access objects(tables, views,
* functions, types) in SQL queries without registering them in advance. Databases are registered as sub-schemas
* in the schema.
*/
@Internal
public class CatalogCalciteSchema implements Schema {
private static final Logger LOGGER = LoggerFactory.getLogger(CatalogCalciteSchema.class);
private final String catalogName;
private final Catalog catalog;
......@@ -60,17 +53,15 @@ public class CatalogCalciteSchema implements Schema {
* Look up a sub-schema (database) by the given sub-schema name.
*
* @param schemaName name of sub-schema to look up
* @return the sub-schema with a given dbName, or null
* @return the sub-schema with a given database name, or null
*/
@Override
public Schema getSubSchema(String schemaName) {
if (catalog.databaseExists(schemaName)) {
return new DatabaseCalciteSchema(schemaName, catalog);
return new DatabaseCalciteSchema(schemaName, catalogName, catalog);
} else {
LOGGER.error(String.format("Schema %s does not exist in catalog %s", schemaName, catalogName));
throw new CatalogException(
new DatabaseNotExistException(catalogName, schemaName));
return null;
}
}
......@@ -123,109 +114,4 @@ public class CatalogCalciteSchema implements Schema {
public Schema snapshot(SchemaVersion schemaVersion) {
return this;
}
/**
* Register a ReadableCatalog to Calcite schema with given name.
*
* @param parentSchema the parent schema
* @param catalogName name of the catalog to register
* @param catalog the catalog to register
*/
public static void registerCatalog(
SchemaPlus parentSchema,
String catalogName,
Catalog catalog) {
CatalogCalciteSchema newCatalog = new CatalogCalciteSchema(catalogName, catalog);
parentSchema.add(catalogName, newCatalog);
LOGGER.info("Registered catalog '{}' to Calcite", catalogName);
}
/**
* A mapping between Flink catalog's database and Calcite's schema.
* Tables are registered as tables in the schema.
*/
private class DatabaseCalciteSchema implements Schema {
private final String dbName;
private final Catalog catalog;
public DatabaseCalciteSchema(String dbName, Catalog catalog) {
this.dbName = dbName;
this.catalog = catalog;
}
@Override
public Table getTable(String tableName) {
LOGGER.info("Getting table '{}' from catalog '{}'", tableName, catalogName);
ObjectPath tablePath = new ObjectPath(dbName, tableName);
try {
CatalogBaseTable table = catalog.getTable(tablePath);
LOGGER.info("Successfully got table '{}' from catalog '{}'", tableName, catalogName);
// TODO: [FLINK-12257] Convert CatalogBaseTable to org.apache.calcite.schema.Table
// so that planner can use unified catalog APIs
return null;
} catch (TableNotExistException e) {
throw new CatalogException(e);
}
}
@Override
public Set<String> getTableNames() {
try {
return new HashSet<>(catalog.listTables(dbName));
} catch (DatabaseNotExistException e) {
throw new CatalogException(e);
}
}
@Override
public RelProtoDataType getType(String name) {
return null;
}
@Override
public Set<String> getTypeNames() {
return new HashSet<>();
}
@Override
public Collection<Function> getFunctions(String s) {
return new HashSet<>();
}
@Override
public Set<String> getFunctionNames() {
return new HashSet<>();
}
@Override
public Schema getSubSchema(String s) {
return null;
}
@Override
public Set<String> getSubSchemaNames() {
return new HashSet<>();
}
@Override
public Expression getExpression(SchemaPlus parentSchema, String name) {
return Schemas.subSchemaExpression(parentSchema, name, getClass());
}
@Override
public boolean isMutable() {
return true;
}
@Override
public Schema snapshot(SchemaVersion schemaVersion) {
return this;
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.table.catalog;
import org.apache.flink.annotation.Internal;
import org.apache.calcite.linq4j.tree.Expression;
import org.apache.calcite.rel.type.RelProtoDataType;
import org.apache.calcite.schema.Function;
import org.apache.calcite.schema.Schema;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.calcite.schema.SchemaVersion;
import org.apache.calcite.schema.Table;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedHashSet;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
/**
* Bridge between the {@link CatalogManager} and the {@link Schema}. This way we can query Flink's specific catalogs
* from Calcite.
*
* <p>The mapping for {@link Catalog}s is modeled as a strict two-level reference structure for Flink in Calcite,
* the full path of objects is of format [catalog_name].[db_name].[meta-object_name].
*
* <p>It also supports {@link ExternalCatalog}s. An external catalog maps 1:1 to the Calcite's schema.
*/
@Internal
public class CatalogManagerCalciteSchema implements Schema {
private final CatalogManager catalogManager;
private boolean isBatch;
public CatalogManagerCalciteSchema(CatalogManager catalogManager, boolean isBatch) {
this.catalogManager = catalogManager;
this.isBatch = isBatch;
}
@Override
public Table getTable(String name) {
return null;
}
@Override
public Set<String> getTableNames() {
return Collections.emptySet();
}
@Override
public RelProtoDataType getType(String name) {
return null;
}
@Override
public Set<String> getTypeNames() {
return Collections.emptySet();
}
@Override
public Collection<Function> getFunctions(String name) {
return Collections.emptyList();
}
@Override
public Set<String> getFunctionNames() {
return Collections.emptySet();
}
@Override
public Schema getSubSchema(String name) {
Optional<Schema> externalSchema = catalogManager.getExternalCatalog(name)
.map(externalCatalog -> new ExternalCatalogSchema(isBatch, name, externalCatalog));
return externalSchema.orElseGet(() ->
catalogManager.getCatalog(name)
.map(catalog -> new CatalogCalciteSchema(name, catalog))
.orElse(null)
);
}
@Override
public Set<String> getSubSchemaNames() {
return Stream.concat(
catalogManager.getCatalogs().stream(),
catalogManager.getExternalCatalogs().stream())
.collect(Collectors.toCollection(LinkedHashSet::new));
}
@Override
public Expression getExpression(SchemaPlus parentSchema, String name) {
return null;
}
@Override
public boolean isMutable() {
return false;
}
@Override
public Schema snapshot(SchemaVersion version) {
return this;
}
}
......@@ -18,56 +18,37 @@
package org.apache.flink.table.catalog;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.table.api.CatalogAlreadyExistsException;
import org.apache.flink.table.api.CatalogNotExistException;
import org.apache.flink.annotation.Internal;
import java.util.Set;
import org.apache.calcite.config.CalciteConnectionConfig;
import org.apache.calcite.jdbc.CalciteSchema;
import org.apache.calcite.prepare.CalciteCatalogReader;
import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.sql.validate.SqlNameMatchers;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;
/**
* CatalogManager manages all the registered ReadableCatalog instances with unique names.
* It has a concept of current catalog, which will be used when it is not given when referencing meta-objects.
* A thin wrapper around {@link CalciteCatalogReader} that enables providing multiple
* default paths to look in.
*/
@PublicEvolving
public interface CatalogManager {
/**
* Register a catalog with a unique name.
*
* @param catalogName catalog name to register
* @param catalog catalog to register
* @throws CatalogAlreadyExistsException thrown if the name is already take
*/
void registerCatalog(String catalogName, Catalog catalog) throws CatalogAlreadyExistsException;
/**
* Get a catalog by name.
*
* @param catalogName catalog name
* @return the requested catalog
* @throws CatalogNotExistException thrown if the catalog doesn't exist
*/
Catalog getCatalog(String catalogName) throws CatalogNotExistException;
/**
* Get names of all registered catalog.
*
* @return a set of names of registered catalogs
*/
Set<String> getCatalogNames();
/**
* Get the current catalog.
*
* @return the current catalog
*/
Catalog getCurrentCatalog();
/**
* Set the current catalog name.
*
* @param catalogName catalog name to set as current catalog
* @throws CatalogNotExistException thrown if the catalog doesn't exist
*/
void setCurrentCatalog(String catalogName) throws CatalogNotExistException;
@Internal
public class CatalogReader extends CalciteCatalogReader {
public CatalogReader(
CalciteSchema rootSchema,
List<List<String>> defaultSchema,
RelDataTypeFactory typeFactory,
CalciteConnectionConfig config) {
super(rootSchema,
SqlNameMatchers.withCaseSensitive(config != null && config.caseSensitive()),
Stream.concat(
defaultSchema.stream(),
Stream.of(Collections.<String>emptyList())
).collect(Collectors.toList()),
typeFactory,
config);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.table.catalog;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
import org.apache.flink.table.catalog.exceptions.TableNotExistException;
import org.apache.calcite.linq4j.tree.Expression;
import org.apache.calcite.rel.type.RelProtoDataType;
import org.apache.calcite.schema.Function;
import org.apache.calcite.schema.Schema;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.calcite.schema.SchemaVersion;
import org.apache.calcite.schema.Schemas;
import org.apache.calcite.schema.Table;
import java.util.Collection;
import java.util.HashSet;
import java.util.Set;
import static java.lang.String.format;
/**
* A mapping between Flink catalog's database and Calcite's schema.
* Tables are registered as tables in the schema.
*/
class DatabaseCalciteSchema implements Schema {
private final String databaseName;
private final String catalogName;
private final Catalog catalog;
public DatabaseCalciteSchema(String databaseName, String catalogName, Catalog catalog) {
this.databaseName = databaseName;
this.catalogName = catalogName;
this.catalog = catalog;
}
@Override
public Table getTable(String tableName) {
ObjectPath tablePath = new ObjectPath(databaseName, tableName);
try {
if (!catalog.tableExists(tablePath)) {
return null;
}
CatalogBaseTable table = catalog.getTable(tablePath);
if (table instanceof CalciteCatalogTable) {
return ((CalciteCatalogTable) table).getTable();
} else {
throw new TableException("Unsupported table type: " + table);
}
} catch (TableNotExistException | CatalogException e) {
// TableNotExistException should never happen, because we are checking it exists
// via catalog.tableExists
throw new TableException(format(
"A failure occured when accesing table. Table path [%s, %s, %s]",
catalogName,
databaseName,
tableName), e);
}
}
@Override
public Set<String> getTableNames() {
try {
return new HashSet<>(catalog.listTables(databaseName));
} catch (DatabaseNotExistException e) {
throw new CatalogException(e);
}
}
@Override
public RelProtoDataType getType(String name) {
return null;
}
@Override
public Set<String> getTypeNames() {
return new HashSet<>();
}
@Override
public Collection<Function> getFunctions(String s) {
return new HashSet<>();
}
@Override
public Set<String> getFunctionNames() {
return new HashSet<>();
}
@Override
public Schema getSubSchema(String s) {
return null;
}
@Override
public Set<String> getSubSchemaNames() {
return new HashSet<>();
}
@Override
public Expression getExpression(SchemaPlus parentSchema, String name) {
return Schemas.subSchemaExpression(parentSchema, name, getClass());
}
@Override
public boolean isMutable() {
return true;
}
@Override
public Schema snapshot(SchemaVersion schemaVersion) {
return this;
}
}
......@@ -21,11 +21,13 @@ package org.apache.flink.table.planner;
import org.apache.flink.annotation.Internal;
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.calcite.CalciteConfig;
import org.apache.flink.table.calcite.FlinkPlannerImpl;
import org.apache.flink.table.calcite.FlinkRelBuilder;
import org.apache.flink.table.calcite.FlinkRelBuilderFactory;
import org.apache.flink.table.calcite.FlinkRelOptClusterFactory;
import org.apache.flink.table.calcite.FlinkTypeFactory;
import org.apache.flink.table.calcite.FlinkTypeSystem;
import org.apache.flink.table.catalog.CatalogReader;
import org.apache.flink.table.codegen.ExpressionReducer;
import org.apache.flink.table.expressions.ExpressionBridge;
import org.apache.flink.table.expressions.PlannerExpression;
......@@ -44,10 +46,8 @@ import org.apache.calcite.plan.RelOptCostFactory;
import org.apache.calcite.plan.RelOptPlanner;
import org.apache.calcite.plan.RelOptSchema;
import org.apache.calcite.plan.volcano.VolcanoPlanner;
import org.apache.calcite.prepare.CalciteCatalogReader;
import org.apache.calcite.rel.type.RelDataTypeSystem;
import org.apache.calcite.rex.RexBuilder;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.calcite.sql.SqlOperatorTable;
import org.apache.calcite.sql.parser.SqlParser;
import org.apache.calcite.sql.util.ChainedSqlOperatorTable;
......@@ -55,7 +55,8 @@ import org.apache.calcite.sql2rel.SqlToRelConverter;
import org.apache.calcite.tools.FrameworkConfig;
import org.apache.calcite.tools.Frameworks;
import java.util.List;
import static java.util.Arrays.asList;
import static java.util.Collections.singletonList;
/**
* Utility class to create {@link org.apache.calcite.tools.RelBuilder} or {@link FrameworkConfig} used to create
......@@ -100,20 +101,34 @@ public class PlanningConfigurationBuilder {
/**
* Creates a configured {@link FlinkRelBuilder} for a planning session.
*
* @param defaultSchema the default schema to look for first during planning.
* @param currentCatalog the current default catalog to look for first during planning.
* @param currentDatabase the current default database to look for first during planning.
* @return configured rel builder
*/
public FlinkRelBuilder createRelBuilder(List<String> defaultSchema) {
RelOptCluster cluster = FlinkRelOptClusterFactory.create(planner, new RexBuilder(typeFactory));
RelOptSchema relOptSchema = new CalciteCatalogReader(
rootSchema,
defaultSchema,
typeFactory,
CalciteConfig.connectionConfig(getSqlParserConfig(calciteConfig(tableConfig))));
public FlinkRelBuilder createRelBuilder(String currentCatalog, String currentDatabase) {
RelOptCluster cluster = FlinkRelOptClusterFactory.create(
planner,
new RexBuilder(typeFactory));
RelOptSchema relOptSchema = createCatalogReader(false, currentCatalog, currentDatabase);
return new FlinkRelBuilder(context, cluster, relOptSchema, expressionBridge);
}
/**
* Creates a configured {@link FlinkPlannerImpl} for a planning session.
*
* @param currentCatalog the current default catalog to look for first during planning.
* @param currentDatabase the current default database to look for first during planning.
* @return configured flink planner
*/
public FlinkPlannerImpl createFlinkPlanner(String currentCatalog, String currentDatabase) {
return new FlinkPlannerImpl(
createFrameworkConfig(),
isLenient -> createCatalogReader(isLenient, currentCatalog, currentDatabase),
planner,
typeFactory);
}
/** Returns the Calcite {@link org.apache.calcite.plan.RelOptPlanner} that will be used. */
public RelOptPlanner getPlanner() {
return planner;
......@@ -129,20 +144,55 @@ public class PlanningConfigurationBuilder {
}
/**
* Creates a configured {@link FrameworkConfig} for a planning session.
*
* @param defaultSchema the default schema to look for first during planning
* @return configured framework config
* Returns the SQL parser config for this environment including a custom Calcite configuration.
*/
public FrameworkConfig createFrameworkConfig(SchemaPlus defaultSchema) {
public SqlParser.Config getSqlParserConfig() {
return JavaScalaConversionUtil.toJava(calciteConfig(tableConfig).sqlParserConfig()).orElseGet(() ->
// we use Java lex because back ticks are easier than double quotes in programming
// and cases are preserved
SqlParser
.configBuilder()
.setLex(Lex.JAVA)
.build());
}
private CatalogReader createCatalogReader(
boolean lenientCaseSensitivity,
String currentCatalog,
String currentDatabase) {
SqlParser.Config sqlParserConfig = getSqlParserConfig();
final boolean caseSensitive;
if (lenientCaseSensitivity) {
caseSensitive = false;
} else {
caseSensitive = sqlParserConfig.caseSensitive();
}
SqlParser.Config parserConfig = SqlParser.configBuilder(sqlParserConfig)
.setCaseSensitive(caseSensitive)
.build();
return new CatalogReader(
rootSchema,
asList(
asList(currentCatalog, currentDatabase),
singletonList(currentCatalog)
),
typeFactory,
CalciteConfig.connectionConfig(parserConfig));
}
private FrameworkConfig createFrameworkConfig() {
return Frameworks
.newConfigBuilder()
.defaultSchema(defaultSchema)
.parserConfig(getSqlParserConfig(calciteConfig(tableConfig)))
.parserConfig(getSqlParserConfig())
.costFactory(costFactory)
.typeSystem(typeSystem)
.operatorTable(getSqlOperatorTable(calciteConfig(tableConfig), functionCatalog))
.sqlToRelConverterConfig(getSqlToRelConverterConfig(calciteConfig(tableConfig), expressionBridge))
.sqlToRelConverterConfig(
getSqlToRelConverterConfig(
calciteConfig(tableConfig),
expressionBridge))
// the converter is needed when calling temporal table functions from SQL, because
// they reference a history table represented with a tree of table operations
.context(context)
......@@ -186,17 +236,4 @@ public class PlanningConfigurationBuilder {
}
).orElseGet(functionCatalog::getSqlOperatorTable);
}
/**
* Returns the SQL parser config for this environment including a custom Calcite configuration.
*/
private SqlParser.Config getSqlParserConfig(CalciteConfig calciteConfig) {
return JavaScalaConversionUtil.toJava(calciteConfig.sqlParserConfig()).orElseGet(() ->
// we use Java lex because back ticks are easier than double quotes in programming
// and cases are preserved
SqlParser
.configBuilder()
.setLex(Lex.JAVA)
.build());
}
}
......@@ -30,6 +30,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.io.DiscardingOutputFormat
import org.apache.flink.api.java.typeutils.GenericTypeInfo
import org.apache.flink.api.java.{DataSet, ExecutionEnvironment}
import org.apache.flink.table.catalog.CatalogManager
import org.apache.flink.table.descriptors.{BatchTableDescriptor, ConnectorDescriptor}
import org.apache.flink.table.explain.PlanJsonParser
import org.apache.flink.table.expressions.{Expression, TimeAttribute}
......@@ -50,8 +51,9 @@ import org.apache.flink.types.Row
*/
abstract class BatchTableEnvImpl(
private[flink] val execEnv: ExecutionEnvironment,
config: TableConfig)
extends TableEnvImpl(config) {
config: TableConfig,
catalogManager: CatalogManager)
extends TableEnvImpl(config, catalogManager) {
// a counter for unique table names.
private val nameCntr: AtomicInteger = new AtomicInteger(0)
......@@ -97,7 +99,7 @@ abstract class BatchTableEnvImpl(
// check for proper batch table source
case batchTableSource: BatchTableSource[_] =>
// check if a table (source or sink) is registered
getTable(name) match {
getTable(defaultCatalogName, defaultDatabaseName, name) match {
// table source and/or sink is registered
case Some(table: TableSourceSinkTable[_, _]) => table.tableSourceTable match {
......
......@@ -37,6 +37,7 @@ import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.datastream.DataStream
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.table.calcite.{FlinkTypeFactory, RelTimeIndicatorConverter}
import org.apache.flink.table.catalog.CatalogManager
import org.apache.flink.table.descriptors.{ConnectorDescriptor, StreamTableDescriptor}
import org.apache.flink.table.explain.PlanJsonParser
import org.apache.flink.table.expressions._
......@@ -63,8 +64,9 @@ import _root_.scala.collection.JavaConverters._
*/
abstract class StreamTableEnvImpl(
private[flink] val execEnv: StreamExecutionEnvironment,
config: TableConfig)
extends TableEnvImpl(config) {
config: TableConfig,
catalogManager: CatalogManager)
extends TableEnvImpl(config, catalogManager) {
// a counter for unique table names
private val nameCntr: AtomicInteger = new AtomicInteger(0)
......@@ -118,7 +120,7 @@ abstract class StreamTableEnvImpl(
}
// register
getTable(name) match {
getTable(defaultCatalogName, defaultDatabaseName, name) match {
// check if a table (source or sink) is registered
case Some(table: TableSourceSinkTable[_, _]) => table.tableSourceTable match {
......
......@@ -19,26 +19,29 @@
package org.apache.flink.table.api
import _root_.java.lang.reflect.Modifier
import _root_.java.util.Optional
import _root_.java.util.concurrent.atomic.AtomicInteger
import com.google.common.collect.ImmutableList
import org.apache.calcite.jdbc.CalciteSchema
import org.apache.calcite.jdbc.CalciteSchemaBuilder.asRootSchema
import org.apache.calcite.plan.RelOptPlanner.CannotPlanException
import org.apache.calcite.plan._
import org.apache.calcite.plan.hep.{HepMatchOrder, HepPlanner, HepProgram, HepProgramBuilder}
import org.apache.calcite.rel.RelNode
import org.apache.calcite.schema
import org.apache.calcite.schema.SchemaPlus
import org.apache.calcite.schema.impl.AbstractTable
import org.apache.calcite.sql._
import org.apache.calcite.sql.parser.SqlParser
import org.apache.calcite.tools._
import org.apache.flink.annotation.VisibleForTesting
import org.apache.flink.api.common.functions.MapFunction
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.common.typeutils.CompositeType
import org.apache.flink.api.java.typeutils.{RowTypeInfo, _}
import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
import org.apache.flink.table.calcite._
import org.apache.flink.table.catalog.{ExternalCatalog, ExternalCatalogSchema}
import org.apache.flink.table.catalog._
import org.apache.flink.table.codegen.{FunctionCodeGenerator, GeneratedFunction}
import org.apache.flink.table.expressions._
import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils._
......@@ -51,6 +54,7 @@ import org.apache.flink.table.planner.PlanningConfigurationBuilder
import org.apache.flink.table.sinks.TableSink
import org.apache.flink.table.sources.TableSource
import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo
import org.apache.flink.table.util.JavaScalaConversionUtil
import org.apache.flink.table.validate.FunctionCatalog
import org.apache.flink.types.Row
......@@ -62,16 +66,20 @@ import _root_.scala.collection.mutable
*
* @param config The configuration of the TableEnvironment
*/
abstract class TableEnvImpl(val config: TableConfig) extends TableEnvironment {
// the catalog to hold all registered and translated tables
// we disable caching here to prevent side effects
private val internalSchema: CalciteSchema = CalciteSchema.createRootSchema(false, false)
private val rootSchema: SchemaPlus = internalSchema.plus()
abstract class TableEnvImpl(
val config: TableConfig,
private val catalogManager: CatalogManager)
extends TableEnvironment {
// Table API/SQL function catalog
private[flink] val functionCatalog: FunctionCatalog = new FunctionCatalog()
protected val defaultCatalogName: String = config.getBuiltInCatalogName
protected val defaultDatabaseName: String = config.getBuiltInDatabaseName
private val internalSchema: CalciteSchema =
asRootSchema(new CatalogManagerCalciteSchema(catalogManager, isBatch))
// temporary bridge between API and planner
private[flink] val expressionBridge: ExpressionBridge[PlannerExpression] =
new ExpressionBridge[PlannerExpression](functionCatalog, PlannerExpressionConverter.INSTANCE)
......@@ -84,11 +92,12 @@ abstract class TableEnvImpl(val config: TableConfig) extends TableEnvironment {
private[flink] val operationTreeBuilder = new OperationTreeBuilder(this)
private val planningSession: PlanningConfigurationBuilder = new PlanningConfigurationBuilder(
config,
functionCatalog,
internalSchema,
expressionBridge)
private val planningConfigurationBuilder: PlanningConfigurationBuilder =
new PlanningConfigurationBuilder(
config,
functionCatalog,
internalSchema,
expressionBridge)
protected def calciteConfig: CalciteConfig = config.getPlannerConfig
.unwrap(classOf[CalciteConfig])
......@@ -96,6 +105,11 @@ abstract class TableEnvImpl(val config: TableConfig) extends TableEnvironment {
def getConfig: TableConfig = config
private def isBatch: Boolean = this match {
case _: BatchTableEnvImpl => true
case _ => false
}
private[flink] def queryConfig: QueryConfig = this match {
case _: BatchTableEnvImpl => new BatchQueryConfig
case _: StreamTableEnvImpl => new StreamQueryConfig
......@@ -274,7 +288,7 @@ abstract class TableEnvImpl(val config: TableConfig) extends TableEnvironment {
input: RelNode,
targetTraits: RelTraitSet): RelNode = {
val planner = new HepPlanner(hepProgram, planningSession.getContext)
val planner = new HepPlanner(hepProgram, planningConfigurationBuilder.getContext)
planner.setRoot(input)
if (input.getTraitSet != targetTraits) {
planner.changeTraits(input, targetTraits.simplify)
......@@ -321,12 +335,7 @@ abstract class TableEnvImpl(val config: TableConfig) extends TableEnvironment {
}
override def registerExternalCatalog(name: String, externalCatalog: ExternalCatalog): Unit = {
if (rootSchema.getSubSchema(name) != null) {
throw new ExternalCatalogAlreadyExistException(name)
}
this.externalCatalogs.put(name, externalCatalog)
// create an external catalog Calcite schema, register it on the root schema
ExternalCatalogSchema.registerCatalog(this, rootSchema, name, externalCatalog)
catalogManager.registerExternalCatalog(name, externalCatalog)
}
override def getRegisteredExternalCatalog(name: String): ExternalCatalog = {
......@@ -343,7 +352,7 @@ abstract class TableEnvImpl(val config: TableConfig) extends TableEnvironment {
functionCatalog.registerScalarFunction(
name,
function,
planningSession.getTypeFactory)
planningConfigurationBuilder.getTypeFactory)
}
/**
......@@ -367,7 +376,7 @@ abstract class TableEnvImpl(val config: TableConfig) extends TableEnvironment {
name,
function,
typeInfo,
planningSession.getTypeFactory)
planningConfigurationBuilder.getTypeFactory)
}
/**
......@@ -394,7 +403,7 @@ abstract class TableEnvImpl(val config: TableConfig) extends TableEnvironment {
function,
resultTypeInfo,
accTypeInfo,
planningSession.getTypeFactory)
planningConfigurationBuilder.getTypeFactory)
}
override def registerTable(name: String, table: Table): Unit = {
......@@ -415,6 +424,30 @@ abstract class TableEnvImpl(val config: TableConfig) extends TableEnvironment {
registerTableSourceInternal(name, tableSource)
}
override def registerCatalog(catalogName: String, catalog: Catalog): Unit = {
catalogManager.registerCatalog(catalogName, catalog)
}
override def getCatalog(catalogName: String): Optional[Catalog] = {
catalogManager.getCatalog(catalogName)
}
override def getCurrentCatalog: String = {
catalogManager.getCurrentCatalog
}
override def getCurrentDatabase: String = {
catalogManager.getCurrentDatabase
}
override def useCatalog(catalogName: String): Unit = {
catalogManager.setCurrentCatalog(catalogName)
}
override def useDatabase(databaseName: String): Unit = {
catalogManager.setCurrentDatabase(databaseName)
}
/**
* Registers an internal [[TableSource]] in this [[TableEnvironment]]'s catalog without
* name checking. Registered tables can be referenced in SQL queries.
......@@ -424,14 +457,6 @@ abstract class TableEnvImpl(val config: TableConfig) extends TableEnvironment {
*/
protected def registerTableSourceInternal(name: String, tableSource: TableSource[_]): Unit
override def registerTableSink(
name: String,
fieldNames: Array[String],
fieldTypes: Array[TypeInformation[_]],
tableSink: TableSink[_]): Unit
override def registerTableSink(name: String, configuredSink: TableSink[_]): Unit
/**
* Replaces a registered Table with another Table under the same name.
* We use this method to replace a [[org.apache.flink.table.plan.schema.DataStreamTable]]
......@@ -441,58 +466,37 @@ abstract class TableEnvImpl(val config: TableConfig) extends TableEnvironment {
* @param table The table that replaces the previous table.
*/
protected def replaceRegisteredTable(name: String, table: AbstractTable): Unit = {
if (isRegistered(name)) {
rootSchema.add(name, table)
} else {
throw new TableException(s"Table \'$name\' is not registered.")
val path = new ObjectPath(defaultDatabaseName, name)
JavaScalaConversionUtil.toScala(catalogManager.getCatalog(defaultCatalogName)) match {
case Some(catalog) =>
catalog.alterTable(
path,
new CalciteCatalogTable(table, planningConfigurationBuilder.getTypeFactory),
false)
case None => throw new TableException("The default catalog does not exist.")
}
}
@throws[TableException]
override def scan(tablePath: String*): Table = {
scanInternal(tablePath.toArray) match {
case Some(table) => table
case Some(table) => new TableImpl(this, table)
case None => throw new TableException(s"Table '${tablePath.mkString(".")}' was not found.")
}
}
private[flink] def scanInternal(tablePath: Array[String]): Option[Table] = {
require(tablePath != null && !tablePath.isEmpty, "tablePath must not be null or empty.")
val schemaPaths = tablePath.slice(0, tablePath.length - 1)
val schema = getSchema(schemaPaths)
if (schema != null) {
val tableName = tablePath(tablePath.length - 1)
val table = schema.getTable(tableName)
if (table != null) {
return Some(new TableImpl(this,
new CatalogTableOperation(tablePath.toList.asJava, extractTableSchema(table))))
}
}
None
}
private def extractTableSchema(table: schema.Table): TableSchema = {
val relDataType = table.getRowType(planningSession.getTypeFactory)
val fieldNames = relDataType.getFieldNames
val fieldTypes = relDataType.getFieldList.asScala
.map(field => FlinkTypeFactory.toTypeInfo(field.getType))
new TableSchema(fieldNames.asScala.toArray, fieldTypes.toArray)
}
private def getSchema(schemaPath: Array[String]): SchemaPlus = {
var schema = rootSchema
for (schemaName <- schemaPath) {
schema = schema.getSubSchema(schemaName)
if (schema == null) {
return schema
}
}
schema
private[flink] def scanInternal(tablePath: Array[String]): Option[CatalogTableOperation] = {
JavaScalaConversionUtil.toScala(catalogManager.resolveTable(tablePath : _*))
}
override def listTables(): Array[String] = {
rootSchema.getTableNames.asScala.toArray
val currentCatalogName = catalogManager.getCurrentCatalog
val currentCatalog = catalogManager.getCatalog(currentCatalogName)
JavaScalaConversionUtil.toScala(currentCatalog) match {
case Some(catalog) => catalog.listTables(catalogManager.getCurrentDatabase).asScala.toArray
case None =>
throw new TableException(s"The current catalog ($currentCatalogName) does not exist.")
}
}
override def listUserDefinedFunctions(): Array[String] = {
......@@ -502,15 +506,12 @@ abstract class TableEnvImpl(val config: TableConfig) extends TableEnvironment {
override def explain(table: Table): String
override def getCompletionHints(statement: String, position: Int): Array[String] = {
val planner = new FlinkPlannerImpl(
getFrameworkConfig,
getPlanner,
getTypeFactory)
val planner = getFlinkPlanner
planner.getCompletionHints(statement, position)
}
override def sqlQuery(query: String): Table = {
val planner = new FlinkPlannerImpl(getFrameworkConfig, getPlanner, getTypeFactory)
val planner = getFlinkPlanner
// parse the sql query
val parsed = planner.parse(query)
if (null != parsed && parsed.getKind.belongsTo(SqlKind.QUERY)) {
......@@ -531,7 +532,7 @@ abstract class TableEnvImpl(val config: TableConfig) extends TableEnvironment {
}
override def sqlUpdate(stmt: String, config: QueryConfig): Unit = {
val planner = new FlinkPlannerImpl(getFrameworkConfig, getPlanner, getTypeFactory)
val planner = getFlinkPlanner
// parse the sql query
val parsed = planner.parse(stmt)
parsed match {
......@@ -545,10 +546,10 @@ abstract class TableEnvImpl(val config: TableConfig) extends TableEnvironment {
new PlannerTableOperation(planner.rel(validatedQuery).rel))
// get name of sink table
val targetTableName = insert.getTargetTable.asInstanceOf[SqlIdentifier].names.get(0)
val targetTablePath = insert.getTargetTable.asInstanceOf[SqlIdentifier].names
// insert query result into sink table
insertInto(queryResult, targetTableName, config)
insertInto(queryResult, config, targetTablePath.asScala:_*)
case _ =>
throw new TableException(
"Unsupported SQL query! sqlUpdate() only accepts SQL statements of type INSERT.")
......@@ -569,19 +570,19 @@ abstract class TableEnvImpl(val config: TableConfig) extends TableEnvironment {
* Writes the [[Table]] to a [[TableSink]] that was registered under the specified name.
*
* @param table The table to write to the TableSink.
* @param sinkTableName The name of the registered TableSink.
* @param sinkTablePath The name of the registered TableSink.
* @param conf The query configuration to use.
*/
private[flink] def insertInto(table: Table, sinkTableName: String, conf: QueryConfig): Unit = {
private[flink] def insertInto(table: Table, conf: QueryConfig, sinkTablePath: String*): Unit = {
// check that sink table exists
if (null == sinkTableName) throw new TableException("Name of TableSink must not be null.")
if (sinkTableName.isEmpty) throw new TableException("Name of TableSink must not be empty.")
if (null == sinkTablePath) throw new TableException("Name of TableSink must not be null.")
if (sinkTablePath.isEmpty) throw new TableException("Name of TableSink must not be empty.")
getTable(sinkTableName) match {
getTable(sinkTablePath: _*) match {
case None =>
throw new TableException(s"No table was registered under the name $sinkTableName.")
throw new TableException(s"No table was registered under the name $sinkTablePath.")
case Some(s: TableSourceSinkTable[_, _]) if s.tableSinkTable.isDefined =>
val tableSink = s.tableSinkTable.get.tableSink
......@@ -605,7 +606,7 @@ abstract class TableEnvImpl(val config: TableConfig) extends TableEnvironment {
throw new ValidationException(
s"Field types of query result and registered TableSink " +
s"$sinkTableName do not match.\n" +
s"$sinkTablePath do not match.\n" +
s"Query result schema: $srcSchema\n" +
s"TableSink schema: $sinkSchema")
}
......@@ -613,13 +614,13 @@ abstract class TableEnvImpl(val config: TableConfig) extends TableEnvironment {
writeToSink(table, tableSink, conf)
case Some(_) =>
throw new TableException(s"The table registered as $sinkTableName is not a TableSink. " +
throw new TableException(s"The table registered as $sinkTablePath is not a TableSink. " +
s"You can only emit query results to a registered TableSink.")
}
}
/**
* Registers a Calcite [[AbstractTable]] in the TableEnvironment's catalog.
* Registers a Calcite [[AbstractTable]] in the TableEnvironment's default catalog.
*
* @param name The name under which the table will be registered.
* @param table The table to register in the catalog
......@@ -627,12 +628,14 @@ abstract class TableEnvImpl(val config: TableConfig) extends TableEnvironment {
*/
@throws[TableException]
protected def registerTableInternal(name: String, table: AbstractTable): Unit = {
if (isRegistered(name)) {
throw new TableException(s"Table \'$name\' already exists. " +
s"Please, choose a different name.")
} else {
rootSchema.add(name, table)
val path = new ObjectPath(defaultDatabaseName, name)
JavaScalaConversionUtil.toScala(catalogManager.getCatalog(defaultCatalogName)) match {
case Some(catalog) =>
catalog.createTable(
path,
new CalciteCatalogTable(table, planningConfigurationBuilder.getTypeFactory),
false)
case None => throw new TableException("The default catalog does not exist.")
}
}
......@@ -646,23 +649,13 @@ abstract class TableEnvImpl(val config: TableConfig) extends TableEnvironment {
*/
protected def checkValidTableName(name: String): Unit
/**
* Checks if a table is registered under the given name.
*
* @param name The table name to check.
* @return true, if a table is registered under the name, false otherwise.
*/
protected[flink] def isRegistered(name: String): Boolean = {
rootSchema.getTableNames.contains(name)
}
/**
* Get a table from either internal or external catalogs.
*
* @param name The name of the table.
* @return The table registered either internally or externally, None otherwise.
*/
protected def getTable(name: String): Option[org.apache.calcite.schema.Table] = {
protected def getTable(name: String*): Option[org.apache.calcite.schema.Table] = {
// recursively fetches a table from a schema.
def getTableFromSchema(
......@@ -687,8 +680,10 @@ abstract class TableEnvImpl(val config: TableConfig) extends TableEnvironment {
}
}
val pathNames = name.split('.').toList
getTableFromSchema(rootSchema, pathNames)
JavaScalaConversionUtil.toScala(catalogManager.resolveTable(name: _*))
.flatMap(t =>
getTableFromSchema(internalSchema.plus(), t.getTablePath.asScala.toList)
)
}
/** Returns a unique temporary attribute name. */
......@@ -698,26 +693,31 @@ abstract class TableEnvImpl(val config: TableConfig) extends TableEnvironment {
/** Returns the [[FlinkRelBuilder]] of this TableEnvironment. */
private[flink] def getRelBuilder: FlinkRelBuilder = {
planningSession.createRelBuilder(List().asJava)
}
val currentCatalogName = catalogManager.getCurrentCatalog
val currentDatabase = catalogManager.getCurrentDatabase
/** Returns the Calcite [[org.apache.calcite.plan.RelOptPlanner]] of this TableEnvironment. */
private[flink] def getPlanner: RelOptPlanner = {
planningSession.getPlanner
planningConfigurationBuilder.createRelBuilder(currentCatalogName, currentDatabase)
}
/** Returns the [[FlinkTypeFactory]] of this TableEnvironment. */
private[flink] def getTypeFactory: FlinkTypeFactory = {
planningSession.getTypeFactory
/** Returns the Calcite [[org.apache.calcite.plan.RelOptPlanner]] of this TableEnvironment. */
private def getPlanner: RelOptPlanner = {
planningConfigurationBuilder.getPlanner
}
private[flink] def getFunctionCatalog: FunctionCatalog = {
functionCatalog
}
private[flink] def getParserConfig: SqlParser.Config = planningConfigurationBuilder
.getSqlParserConfig
/** Returns the Calcite [[FrameworkConfig]] of this TableEnvironment. */
private[flink] def getFrameworkConfig: FrameworkConfig = {
planningSession.createFrameworkConfig(rootSchema)
@VisibleForTesting
private[flink] def getFlinkPlanner: FlinkPlannerImpl = {
val currentCatalogName = catalogManager.getCurrentCatalog
val currentDatabase = catalogManager.getCurrentDatabase
planningConfigurationBuilder.createFlinkPlanner(currentCatalogName, currentDatabase)
}
/**
......
......@@ -21,6 +21,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.typeutils.TypeExtractor
import org.apache.flink.api.java.{DataSet, ExecutionEnvironment}
import org.apache.flink.table.api._
import org.apache.flink.table.catalog.CatalogManager
import org.apache.flink.table.expressions.ExpressionParser
import org.apache.flink.table.functions.{AggregateFunction, TableFunction}
......@@ -35,9 +36,13 @@ import _root_.scala.collection.JavaConverters._
*/
class BatchTableEnvImpl(
execEnv: ExecutionEnvironment,
config: TableConfig)
extends org.apache.flink.table.api.BatchTableEnvImpl(execEnv, config)
with org.apache.flink.table.api.java.BatchTableEnvironment {
config: TableConfig,
catalogManager: CatalogManager)
extends org.apache.flink.table.api.BatchTableEnvImpl(
execEnv,
config,
catalogManager)
with org.apache.flink.table.api.java.BatchTableEnvironment {
override def fromDataSet[T](dataSet: DataSet[T]): Table = {
......
......@@ -21,11 +21,14 @@ import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.typeutils.{TupleTypeInfo, TypeExtractor}
import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
import org.apache.flink.table.api._
import org.apache.flink.table.functions.{AggregateFunction, TableFunction, TableAggregateFunction, UserDefinedAggregateFunction}
import org.apache.flink.table.functions.{AggregateFunction, TableAggregateFunction, TableFunction, UserDefinedAggregateFunction}
import org.apache.flink.table.expressions.ExpressionParser
import org.apache.flink.streaming.api.datastream.DataStream
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import _root_.java.lang.{Boolean => JBool}
import org.apache.flink.table.catalog.CatalogManager
import _root_.scala.collection.JavaConverters._
/**
......@@ -37,9 +40,13 @@ import _root_.scala.collection.JavaConverters._
*/
class StreamTableEnvImpl(
execEnv: StreamExecutionEnvironment,
config: TableConfig)
extends org.apache.flink.table.api.StreamTableEnvImpl(execEnv, config)
with org.apache.flink.table.api.java.StreamTableEnvironment {
config: TableConfig,
catalogManager: CatalogManager)
extends org.apache.flink.table.api.StreamTableEnvImpl(
execEnv,
config,
catalogManager)
with org.apache.flink.table.api.java.StreamTableEnvironment {
override def fromDataStream[T](dataStream: DataStream[T]): Table = {
......
......@@ -20,6 +20,7 @@ package org.apache.flink.table.api.scala
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.scala._
import org.apache.flink.table.api._
import org.apache.flink.table.catalog.CatalogManager
import org.apache.flink.table.expressions.Expression
import org.apache.flink.table.functions.{AggregateFunction, TableFunction}
......@@ -34,9 +35,13 @@ import _root_.scala.reflect.ClassTag
*/
class BatchTableEnvImpl(
execEnv: ExecutionEnvironment,
config: TableConfig)
extends org.apache.flink.table.api.BatchTableEnvImpl(execEnv.getJavaEnv, config)
with org.apache.flink.table.api.scala.BatchTableEnvironment {
config: TableConfig,
catalogManager: CatalogManager)
extends org.apache.flink.table.api.BatchTableEnvImpl(
execEnv.getJavaEnv,
config,
catalogManager)
with org.apache.flink.table.api.scala.BatchTableEnvironment {
override def fromDataSet[T](dataSet: DataSet[T]): Table = {
......
......@@ -24,6 +24,7 @@ import org.apache.flink.table.expressions.Expression
import org.apache.flink.table.functions.{AggregateFunction, TableAggregateFunction, TableFunction}
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.api.scala.asScalaStream
import org.apache.flink.table.catalog.CatalogManager
/**
* The implementation for a Scala [[StreamTableEnvironment]].
......@@ -33,11 +34,13 @@ import org.apache.flink.streaming.api.scala.asScalaStream
*/
class StreamTableEnvImpl(
execEnv: StreamExecutionEnvironment,
config: TableConfig)
config: TableConfig,
catalogManager: CatalogManager)
extends org.apache.flink.table.api.StreamTableEnvImpl(
execEnv.getWrappedStreamExecutionEnvironment,
config)
with org.apache.flink.table.api.scala.StreamTableEnvironment {
config,
catalogManager)
with org.apache.flink.table.api.scala.StreamTableEnvironment {
override def fromDataStream[T](dataStream: DataStream[T]): Table = {
......
......@@ -355,12 +355,20 @@ class TableImpl(
wrap(operationTreeBuilder.limitWithFetch(fetch, operationTree))
}
override def insertInto(tableName: String): Unit = {
insertInto(tableName, tableEnv.queryConfig)
override def insertInto(tablePath: String, tablePathContinued: String*): Unit = {
insertInto(tableEnv.queryConfig, tablePath, tablePathContinued: _*)
}
override def insertInto(tableName: String, conf: QueryConfig): Unit = {
tableEnv.insertInto(this, tableName, conf)
insertInto(conf, tableName)
}
override def insertInto(
conf: QueryConfig,
tablePath: String,
tablePathContinued: String*)
: Unit = {
tableEnv.insertInto(this, conf, tablePath +: tablePathContinued:_*)
}
override def window(window: GroupWindow): GroupWindowedTable = {
......
......@@ -36,8 +36,11 @@ import org.apache.calcite.sql.{SqlNode, SqlOperatorTable}
import org.apache.calcite.sql2rel.{RelDecorrelator, SqlRexConvertletTable, SqlToRelConverter}
import org.apache.calcite.tools.{FrameworkConfig, RelConversionException}
import org.apache.flink.table.api.{SqlParserException, TableException, ValidationException}
import org.apache.flink.table.catalog.CatalogReader
import scala.collection.JavaConversions._
import java.util.function.{Function => JFunction}
import java.lang.{Boolean => JBoolean}
/**
* NOTE: this is heavily inspired by Calcite's PlannerImpl.
......@@ -47,6 +50,7 @@ import scala.collection.JavaConversions._
*/
class FlinkPlannerImpl(
config: FrameworkConfig,
catalogReaderSupplier: JFunction[JBoolean, CatalogReader],
planner: RelOptPlanner,
typeFactory: FlinkTypeFactory) {
......@@ -55,7 +59,6 @@ class FlinkPlannerImpl(
val traitDefs: ImmutableList[RelTraitDef[_ <: RelTrait]] = config.getTraitDefs
val parserConfig: SqlParser.Config = config.getParserConfig
val convertletTable: SqlRexConvertletTable = config.getConvertletTable
val defaultSchema: SchemaPlus = config.getDefaultSchema
val sqlToRelConverterConfig: SqlToRelConverter.Config = config.getSqlToRelConverterConfig
var validator: FlinkCalciteSqlValidator = _
......@@ -73,7 +76,7 @@ class FlinkPlannerImpl(
def getCompletionHints(sql: String, cursor: Int): Array[String] = {
val advisorValidator = new SqlAdvisorValidator(
operatorTable,
createCatalogReader(true), // ignore cases for lenient completion
catalogReaderSupplier.apply(true), // ignore cases for lenient completion
typeFactory,
config.getParserConfig.conformance())
val advisor = new SqlAdvisor(advisorValidator, config.getParserConfig)
......@@ -98,7 +101,7 @@ class FlinkPlannerImpl(
def validate(sqlNode: SqlNode): SqlNode = {
validator = new FlinkCalciteSqlValidator(
operatorTable,
createCatalogReader(false),
catalogReaderSupplier.apply(false),
typeFactory)
validator.setIdentifierExpansion(true)
try {
......@@ -118,7 +121,7 @@ class FlinkPlannerImpl(
val sqlToRelConverter: SqlToRelConverter = new SqlToRelConverter(
new ViewExpanderImpl,
validator,
createCatalogReader(false),
catalogReaderSupplier.apply(false),
cluster,
convertletTable,
sqlToRelConverterConfig)
......@@ -156,7 +159,7 @@ class FlinkPlannerImpl(
case e: CSqlParseException =>
throw new SqlParserException(s"SQL parse failed. ${e.getMessage}", e)
}
val catalogReader: CalciteCatalogReader = createCatalogReader(false)
val catalogReader: CalciteCatalogReader = catalogReaderSupplier.apply(false)
.withSchemaPath(schemaPath)
val validator: SqlValidator =
new FlinkCalciteSqlValidator(operatorTable, catalogReader, typeFactory)
......@@ -178,27 +181,6 @@ class FlinkPlannerImpl(
}
}
private def createCatalogReader(lenientCaseSensitivity: Boolean): CalciteCatalogReader = {
val rootSchema: SchemaPlus = FlinkPlannerImpl.rootSchema(defaultSchema)
val caseSensitive = if (lenientCaseSensitivity) {
false
} else {
this.parserConfig.caseSensitive()
}
val parserConfig = SqlParser.configBuilder(this.parserConfig)
.setCaseSensitive(caseSensitive)
.build()
new CalciteCatalogReader(
CalciteSchema.from(rootSchema),
CalciteSchema.from(defaultSchema).path(null),
typeFactory,
CalciteConfig.connectionConfig(parserConfig)
)
}
private def createRexBuilder: RexBuilder = {
new RexBuilder(typeFactory)
}
......
......@@ -71,11 +71,11 @@ trait CrudExternalCatalog extends ExternalCatalog {
* @param ignoreIfExists Flag to specify behavior if a sub catalog with the given name already
* exists: if set to false, it throws a CatalogAlreadyExistsException,
* if set to true, nothing happens.
* @throws CatalogAlreadyExistsException
* @throws CatalogAlreadyExistException
* thrown if the sub catalog does already exist in the catalog
* and ignoreIfExists is false
*/
@throws[CatalogAlreadyExistsException]
@throws[CatalogAlreadyExistException]
def createSubCatalog(name: String, catalog: ExternalCatalog, ignoreIfExists: Boolean): Unit
/**
......
......@@ -35,12 +35,11 @@ import scala.collection.JavaConverters._
* The external catalog and all included sub-catalogs and tables is registered as
* sub-schemas and tables in Calcite.
*
* @param tableEnv the environment for this schema
* @param catalogIdentifier external catalog name
* @param catalog external catalog
*/
class ExternalCatalogSchema(
tableEnv: TableEnvironment,
isBatch: Boolean,
catalogIdentifier: String,
catalog: ExternalCatalog) extends Schema with Logging {
......@@ -54,7 +53,7 @@ class ExternalCatalogSchema(
override def getSubSchema(name: String): Schema = {
try {
val db = catalog.getSubCatalog(name)
new ExternalCatalogSchema(tableEnv, name, db)
new ExternalCatalogSchema(isBatch, name, db)
} catch {
case _: CatalogNotExistException =>
LOG.warn(s"Sub-catalog $name does not exist in externalCatalog $catalogIdentifier")
......@@ -79,7 +78,7 @@ class ExternalCatalogSchema(
*/
override def getTable(name: String): Table = try {
val externalCatalogTable = catalog.getTable(name)
ExternalTableUtil.fromExternalCatalogTable(tableEnv, externalCatalogTable)
ExternalTableUtil.fromExternalCatalogTable(isBatch, externalCatalogTable)
} catch {
case _: TableNotExistException => {
LOG.warn(s"Table $name does not exist in externalCatalog $catalogIdentifier")
......@@ -119,17 +118,16 @@ object ExternalCatalogSchema {
/**
* Registers an external catalog in a Calcite schema.
*
* @param tableEnv The environment the catalog will be part of.
* @param parentSchema Parent schema into which the catalog is registered
* @param externalCatalogIdentifier Identifier of the external catalog
* @param externalCatalog The external catalog to register
*/
def registerCatalog(
tableEnv: TableEnvironment,
isBatch: Boolean,
parentSchema: SchemaPlus,
externalCatalogIdentifier: String,
externalCatalog: ExternalCatalog): Unit = {
val newSchema = new ExternalCatalogSchema(tableEnv, externalCatalogIdentifier, externalCatalog)
val newSchema = new ExternalCatalogSchema(isBatch, externalCatalogIdentifier, externalCatalog)
val schemaPlusOfNewSchema = parentSchema.add(externalCatalogIdentifier, newSchema)
newSchema.registerSubSchemas(schemaPlusOfNewSchema)
}
......
......@@ -40,21 +40,19 @@ object ExternalTableUtil extends Logging {
* @param externalTable the [[ExternalCatalogTable]] instance which to convert
* @return converted [[TableSourceTable]] instance from the input catalog table
*/
def fromExternalCatalogTable[T1, T2](
tableEnv: TableEnvironment,
externalTable: ExternalCatalogTable)
def fromExternalCatalogTable[T1, T2](isBatch: Boolean, externalTable: ExternalCatalogTable)
: TableSourceSinkTable[T1, T2] = {
val statistics = new FlinkStatistic(toScala(externalTable.getTableStats))
val source: Option[TableSourceTable[T1]] = if (externalTable.isTableSource) {
Some(createTableSource(tableEnv, externalTable, statistics))
Some(createTableSource(isBatch, externalTable, statistics))
} else {
None
}
val sink: Option[TableSinkTable[T2]] = if (externalTable.isTableSink) {
Some(createTableSink(tableEnv, externalTable, statistics))
Some(createTableSink(isBatch, externalTable, statistics))
} else {
None
}
......@@ -63,40 +61,37 @@ object ExternalTableUtil extends Logging {
}
private def createTableSource[T](
tableEnv: TableEnvironment,
isBatch: Boolean,
externalTable: ExternalCatalogTable,
statistics: FlinkStatistic)
: TableSourceTable[T] = tableEnv match {
case _: BatchTableEnvImpl if externalTable.isBatchTable =>
: TableSourceTable[T] = {
if (isBatch && externalTable.isBatchTable) {
val source = TableFactoryUtil.findAndCreateTableSource(externalTable)
new BatchTableSourceTable[T](source.asInstanceOf[BatchTableSource[T]], statistics)
case _: StreamTableEnvImpl if externalTable.isStreamTable =>
} else if (!isBatch && externalTable.isStreamTable) {
val source = TableFactoryUtil.findAndCreateTableSource(externalTable)
new StreamTableSourceTable[T](source.asInstanceOf[StreamTableSource[T]], statistics)
case _ =>
} else {
throw new ValidationException(
"External catalog table does not support the current environment for a table source.")
}
}
private def createTableSink[T](
tableEnv: TableEnvironment,
isBatch: Boolean,
externalTable: ExternalCatalogTable,
statistics: FlinkStatistic)
: TableSinkTable[T] = tableEnv match {
: TableSinkTable[T] = {
case _: BatchTableEnvImpl if externalTable.isBatchTable =>
if (isBatch && externalTable.isBatchTable) {
val sink = TableFactoryUtil.findAndCreateTableSink(externalTable)
new TableSinkTable[T](sink.asInstanceOf[BatchTableSink[T]], statistics)
case _: StreamTableEnvImpl if externalTable.isStreamTable =>
} else if (!isBatch && externalTable.isStreamTable) {
val sink = TableFactoryUtil.findAndCreateTableSink(externalTable)
new TableSinkTable[T](sink.asInstanceOf[StreamTableSink[T]], statistics)
case _ =>
} else {
throw new ValidationException(
"External catalog table does not support the current environment for a table sink.")
}
}
}
......@@ -65,7 +65,7 @@ class OperationTreeBuilder(private val tableEnv: TableEnvImpl) {
override def lookupTable(name: String): Optional[TableReferenceExpression] =
JavaScalaConversionUtil
.toJava(tableEnv.scanInternal(Array(name))
.map(op => new TableReferenceExpression(name, op.getTableOperation)))
.map(op => new TableReferenceExpression(name, op)))
}
def project(
......
......@@ -113,7 +113,7 @@ class DataSetWindowAggregate(
val inputDS = getInput.asInstanceOf[DataSetRel].translateToPlan(tableEnv, queryConfig)
// whether identifiers are matched case-sensitively
val caseSensitive = tableEnv.getFrameworkConfig.getParserConfig.caseSensitive()
val caseSensitive = tableEnv.getParserConfig.caseSensitive()
window match {
case TumblingGroupWindow(_, timeField, size)
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.table.catalog;
import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.flink.table.runtime.utils.CommonTestData;
import org.apache.flink.util.TestLogger;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import static org.apache.flink.table.catalog.CatalogStructureBuilder.database;
import static org.apache.flink.table.catalog.CatalogStructureBuilder.root;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
/**
* Tests for {@link CatalogManager}. See also {@link CatalogManagerPathResolutionTest}.
*/
public class CatalogManagerTest extends TestLogger {
private static final String TEST_CATALOG_NAME = "test";
private static final String TEST_CATALOG_DEFAULT_DB_NAME = "test";
private static final String BUILTIN_DEFAULT_DATABASE_NAME = "default";
@Rule
public ExpectedException thrown = ExpectedException.none();
@Test
public void testRegisterCatalog() throws Exception {
CatalogManager manager = root()
.builtin(
database(BUILTIN_DEFAULT_DATABASE_NAME))
.build();
assertEquals(1, manager.getCatalogs().size());
assertFalse(manager.getCatalogs().contains(TEST_CATALOG_NAME));
manager.registerCatalog(TEST_CATALOG_NAME, new GenericInMemoryCatalog(TEST_CATALOG_NAME));
assertEquals(2, manager.getCatalogs().size());
assertTrue(manager.getCatalogs().contains(TEST_CATALOG_NAME));
}
@Test
public void testSetCurrentCatalog() throws Exception {
CatalogManager manager = root()
.builtin(
database(BUILTIN_DEFAULT_DATABASE_NAME))
.catalog(
TEST_CATALOG_NAME,
database(TEST_CATALOG_DEFAULT_DB_NAME))
.build();
assertEquals(CatalogStructureBuilder.BUILTIN_CATALOG_NAME, manager.getCurrentCatalog());
assertEquals(BUILTIN_DEFAULT_DATABASE_NAME, manager.getCurrentDatabase());
manager.setCurrentCatalog(TEST_CATALOG_NAME);
assertEquals(TEST_CATALOG_NAME, manager.getCurrentCatalog());
assertEquals(TEST_CATALOG_DEFAULT_DB_NAME, manager.getCurrentDatabase());
}
@Test
public void testRegisterCatalogWithExistingName() throws Exception {
thrown.expect(CatalogException.class);
CatalogManager manager = root()
.builtin(
database(BUILTIN_DEFAULT_DATABASE_NAME))
.catalog(TEST_CATALOG_NAME, database(TEST_CATALOG_DEFAULT_DB_NAME))
.build();
manager.registerCatalog(TEST_CATALOG_NAME, new GenericInMemoryCatalog(TEST_CATALOG_NAME));
}
@Test
public void testRegisterCatalogWithExistingExternalCatalog() throws Exception {
thrown.expect(CatalogException.class);
CatalogManager manager = root()
.builtin(
database(BUILTIN_DEFAULT_DATABASE_NAME))
.externalCatalog(TEST_CATALOG_NAME)
.build();
manager.registerCatalog(TEST_CATALOG_NAME, new GenericInMemoryCatalog(TEST_CATALOG_NAME));
}
@Test
public void testRegisterExternalCatalogWithExistingName() throws Exception {
thrown.expect(CatalogException.class);
thrown.expectMessage("An external catalog named [test] already exists.");
CatalogManager manager = root()
.builtin(
database(BUILTIN_DEFAULT_DATABASE_NAME))
.catalog(TEST_CATALOG_NAME, database(TEST_CATALOG_DEFAULT_DB_NAME))
.build();
manager.registerExternalCatalog(TEST_CATALOG_NAME, CommonTestData.getInMemoryTestCatalog(false));
}
@Test
public void testCannotSetExternalCatalogAsDefault() throws Exception {
thrown.expect(CatalogException.class);
thrown.expectMessage("An external catalog cannot be set as the default one.");
CatalogManager manager = root()
.externalCatalog("ext")
.build();
manager.setCurrentCatalog("ext");
}
@Test
public void testSetNonExistingCurrentCatalog() throws Exception {
thrown.expect(CatalogException.class);
thrown.expectMessage("A catalog with name [nonexistent] does not exist.");
CatalogManager manager = root().build();
manager.setCurrentCatalog("nonexistent");
}
@Test
public void testSetNonExistingCurrentDatabase() throws Exception {
thrown.expect(CatalogException.class);
thrown.expectMessage("A database with name [nonexistent] does not exist in the catalog: [builtin].");
CatalogManager manager = root().build();
// This catalog does not exist in the builtin catalog
manager.setCurrentDatabase("nonexistent");
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.table.catalog;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.calcite.FlinkTypeFactory;
import org.apache.flink.table.calcite.FlinkTypeSystem;
import org.apache.flink.table.plan.schema.StreamTableSourceTable;
import org.apache.flink.table.plan.schema.TableSourceSinkTable;
import org.apache.flink.table.plan.stats.FlinkStatistic;
import org.apache.flink.table.sources.StreamTableSource;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import scala.Option;
import scala.Some;
import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_TYPE;
/**
* Utility classes to construct a {@link CatalogManager} with a given structure.
* It does create tables ({@link TestTable} and {@link ExternalTestTable} which
* {@link Object#equals(Object)} method compares the fully qualified paths.
*
* <p>Example:
* <pre>{@code
* root()
* .builtin(
* database(
* "default",
* table("tab1"),
* table("tab2")
* )
* )
* .externalCatalog(
* "extCat1",
* table("tab1"),
* extCatalog(
* "extCat2",
* extCatalog("extCat3",
* table("tab1")
* ),
* table("tab1")
* )
* ).build();
* }</pre>
*/
public class CatalogStructureBuilder {
public static final String BUILTIN_CATALOG_NAME = "builtin";
private CatalogManager catalogManager = new CatalogManager(
BUILTIN_CATALOG_NAME,
new GenericInMemoryCatalog(BUILTIN_CATALOG_NAME));
public static CatalogStructureBuilder root() {
return new CatalogStructureBuilder();
}
public static DatabaseBuilder database(String name, TableBuilder... tables) {
return new DatabaseBuilder(name, tables);
}
public static TableBuilder table(String name) {
return new TableBuilder(name);
}
public static ExternalCatalogEntry extCatalog(String name, ExternalCatalogEntry... entries) {
return new ExternalCatalogBuilder(name, entries);
}
public CatalogStructureBuilder builtin(DatabaseBuilder defaultDb, DatabaseBuilder... databases) throws Exception {
GenericInMemoryCatalog catalog = buildCatalog(BUILTIN_CATALOG_NAME, defaultDb, databases);
this.catalogManager = new CatalogManager(BUILTIN_CATALOG_NAME, catalog);
return this;
}
public CatalogStructureBuilder catalog(
String name,
DatabaseBuilder defaultDatabase,
DatabaseBuilder... databases) throws Exception {
GenericInMemoryCatalog catalog = buildCatalog(name, defaultDatabase, databases);
catalogManager.registerCatalog(name, catalog);
return this;
}
private GenericInMemoryCatalog buildCatalog(
String name,
DatabaseBuilder defaultDatabase,
DatabaseBuilder[] databases) throws Exception {
GenericInMemoryCatalog catalog = new GenericInMemoryCatalog(
name,
defaultDatabase.getName());
defaultDatabase.build(catalog, name);
registerDatabases(name, catalog, databases);
return catalog;
}
private void registerDatabases(
String name,
Catalog catalog,
DatabaseBuilder[] databases) throws Exception {
for (DatabaseBuilder database : databases) {
catalog.createDatabase(database.getName(), new GenericCatalogDatabase(new HashMap<>(), ""), false);
database.build(catalog, name);
}
}
public CatalogStructureBuilder externalCatalog(String name, ExternalCatalogEntry... entries) throws Exception {
new ExternalCatalogBuilder(name, entries).build(catalogManager);
return this;
}
public CatalogManager build() {
return catalogManager;
}
/**
* Helper class for creating mock {@link ExternalCatalog} in a {@link CatalogStructureBuilder}.
*/
public static class ExternalCatalogBuilder implements ExternalCatalogEntry {
private final String name;
private final ExternalCatalogEntry[] entries;
private ExternalCatalogBuilder(String name, ExternalCatalogEntry[] entries) {
this.entries = entries;
this.name = name;
}
public void build(CrudExternalCatalog catalog, String path) throws Exception {
catalog.createSubCatalog(name, buildCurrentCatalog(path), false);
}
private InMemoryExternalCatalog buildCurrentCatalog(String path) throws Exception {
InMemoryExternalCatalog thisCatalog = new InMemoryExternalCatalog(name);
final String currentPath;
if (path != null) {
currentPath = path + "." + name;
} else {
currentPath = name;
}
for (ExternalCatalogEntry entry : entries) {
if (entry instanceof ExternalCatalogBuilder) {
((ExternalCatalogBuilder) entry).build(thisCatalog, currentPath);
} else if (entry instanceof TableBuilder){
TableBuilder tableBuilder = (TableBuilder) entry;
thisCatalog.createTable(
tableBuilder.getName(),
tableBuilder.buildExternalTable(currentPath),
false);
}
}
return thisCatalog;
}
public void build(CatalogManager catalogManager) throws Exception {
catalogManager.registerExternalCatalog(name, buildCurrentCatalog(null));
}
}
/**
* Helper class for creating mock {@link CatalogDatabase} in a {@link CatalogStructureBuilder}.
*/
public static class DatabaseBuilder {
private final TableBuilder[] tables;
private final String name;
public DatabaseBuilder(String name, TableBuilder[] tables) {
this.tables = tables;
this.name = name;
}
public String getName() {
return name;
}
public void build(Catalog catalog, String catalogName) throws Exception {
for (TableBuilder tableBuilder : tables) {
catalog.createTable(
new ObjectPath(name, tableBuilder.getName()),
tableBuilder.build(catalogName + "." + name),
false);
}
}
}
/**
* Helper class for creating mock {@link CatalogTable} & {@link ExternalCatalogTable}
* in a {@link CatalogStructureBuilder}.
*/
public static class TableBuilder implements ExternalCatalogEntry {
private final String name;
TableBuilder(String name) {
this.name = name;
}
public String getName() {
return name;
}
public TestTable build(String path) {
return new TestTable(path + "." + name);
}
public ExternalTestTable buildExternalTable(String path) {
return new ExternalTestTable(path + "." + name);
}
}
/**
* Marker interface to make {@link ExternalCatalogBuilder#extCatalog(String, ExternalCatalogEntry...)}
* accept both {@link ExternalCatalogBuilder} and {@link TableBuilder}.
*/
public interface ExternalCatalogEntry {
}
private static class ExternalTestTable extends ExternalCatalogTable {
private final String fullyQualifiedName;
public ExternalTestTable(String fullyQualifiedName) {
super(false, true, true, false, new HashMap<>());
this.fullyQualifiedName = fullyQualifiedName;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
ExternalTestTable that = (ExternalTestTable) o;
return Objects.equals(fullyQualifiedName, that.fullyQualifiedName);
}
@Override
public Map<String, String> toProperties() {
Map<String, String> properties = new HashMap<>();
properties.put(CONNECTOR_TYPE, TestExternalTableSourceFactory.TEST_EXTERNAL_CONNECTOR_TYPE);
return properties;
}
@Override
public int hashCode() {
return Objects.hash(fullyQualifiedName);
}
}
private static class TestTable extends CalciteCatalogTable {
private final String fullyQualifiedPath;
private static final StreamTableSourceTable<Object> tableSourceTable = new StreamTableSourceTable<>(
new StreamTableSource<Object>() {
@Override
public DataStream<Object> getDataStream(StreamExecutionEnvironment execEnv) {
return null;
}
@Override
public TypeInformation<Object> getReturnType() {
return null;
}
@Override
public TableSchema getTableSchema() {
return new TableSchema(new String[] {}, new TypeInformation[] {});
}
}, FlinkStatistic.UNKNOWN());
private TestTable(String fullyQualifiedPath) {
super(new TableSourceSinkTable<>(
new Some<>(tableSourceTable),
Option.empty()
), new FlinkTypeFactory(new FlinkTypeSystem()));
this.fullyQualifiedPath = fullyQualifiedPath;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
TestTable testTable = (TestTable) o;
return Objects.equals(fullyQualifiedPath, testTable.fullyQualifiedPath);
}
@Override
public int hashCode() {
return Objects.hash(fullyQualifiedPath);
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.table.catalog;
import org.apache.flink.table.api.java.StreamTableEnvImpl;
import org.apache.flink.table.operations.CatalogTableOperation;
import org.apache.flink.table.utils.StreamTableTestUtil;
import org.apache.flink.util.Preconditions;
import org.hamcrest.CoreMatchers;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameter;
import org.junit.runners.Parameterized.Parameters;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import scala.Some;
import static java.lang.String.format;
import static java.util.Arrays.asList;
import static org.apache.flink.table.catalog.CatalogStructureBuilder.BUILTIN_CATALOG_NAME;
import static org.apache.flink.table.catalog.CatalogStructureBuilder.database;
import static org.apache.flink.table.catalog.CatalogStructureBuilder.extCatalog;
import static org.apache.flink.table.catalog.CatalogStructureBuilder.root;
import static org.apache.flink.table.catalog.CatalogStructureBuilder.table;
import static org.apache.flink.table.catalog.PathResolutionTest.TestSpec.testSpec;
import static org.junit.Assert.assertThat;
/**
* Tests for {@link CatalogManager#resolveTable(String...)}.
*/
@RunWith(Parameterized.class)
public class PathResolutionTest {
@Parameters(name = "{index}: {0}")
public static List<TestSpec> testData() throws Exception {
return asList(
testSpec("simpleInDefaultPath")
.withCatalogManager(simpleCatalog())
.tableApiLookupPath("tab1")
.sqlLookupPath("tab1")
.expectPath(BUILTIN_CATALOG_NAME, "default", "tab1"),
testSpec("simpleInChangedDefaultCatalog")
.withCatalogManager(simpleCatalog())
.withDefaultPath("cat1")
.tableApiLookupPath("tab1")
.sqlLookupPath("tab1")
.expectPath("cat1", "db1", "tab1"),
testSpec("simpleInChangedDefaultPath")
.withCatalogManager(simpleCatalog())
.withDefaultPath("cat1", "db2")
.tableApiLookupPath("tab1")
.sqlLookupPath("tab1")
.expectPath("cat1", "db2", "tab1"),
testSpec("qualifiedWithDatabase")
.withCatalogManager(simpleCatalog())
.withDefaultPath(BUILTIN_CATALOG_NAME, "default")
.tableApiLookupPath("db1", "tab1")
.sqlLookupPath("db1.tab1")
.expectPath(BUILTIN_CATALOG_NAME, "db1", "tab1"),
testSpec("fullyQualifiedName")
.withCatalogManager(simpleCatalog())
.withDefaultPath(BUILTIN_CATALOG_NAME, "default")
.tableApiLookupPath("cat1", "db1", "tab1")
.sqlLookupPath("cat1.db1.tab1")
.expectPath("cat1", "db1", "tab1"),
testSpec("externalCatalogTopLevelTable")
.withCatalogManager(externalCatalog())
.tableApiLookupPath("extCat1", "tab1")
.sqlLookupPath("extCat1.tab1")
.expectPath("extCat1", "tab1"),
testSpec("externalCatalogMultiLevelNesting")
.withCatalogManager(externalCatalog())
.tableApiLookupPath("extCat1", "extCat2", "extCat3", "tab1")
.sqlLookupPath("extCat1.extCat2.extCat3.tab1")
.expectPath("extCat1", "extCat2", "extCat3", "tab1"),
testSpec("dotInUnqualifiedTableName")
.withCatalogManager(catalogWithSpecialCharacters())
.tableApiLookupPath("tab.1")
.sqlLookupPath("`tab.1`")
.expectPath(BUILTIN_CATALOG_NAME, "default", "tab.1"),
testSpec("dotInDatabaseName")
.withCatalogManager(catalogWithSpecialCharacters())
.tableApiLookupPath("default.db", "tab1")
.sqlLookupPath("`default.db`.tab1")
.expectPath(BUILTIN_CATALOG_NAME, "default.db", "tab1"),
testSpec("dotInDefaultDatabaseName")
.withCatalogManager(catalogWithSpecialCharacters())
.withDefaultPath(BUILTIN_CATALOG_NAME, "default.db")
.tableApiLookupPath("tab1")
.sqlLookupPath("tab1")
.expectPath(BUILTIN_CATALOG_NAME, "default.db", "tab1"),
testSpec("spaceInNames")
.withCatalogManager(catalogWithSpecialCharacters())
.tableApiLookupPath("default db", "tab 1")
.sqlLookupPath("`default db`.`tab 1`")
.expectPath(BUILTIN_CATALOG_NAME, "default db", "tab 1")
);
}
private static CatalogManager simpleCatalog() throws Exception {
return root()
.builtin(
database(
"default",
table("tab1")
),
database(
"db1",
table("tab1")
)
)
.catalog(
"cat1",
database(
"db1",
table("tab1")
),
database(
"db2",
table("tab1")
)
).build();
}
private static CatalogManager externalCatalog() throws Exception {
return root()
.builtin(
database(
"default",
table("tab1"),
table("tab2")
)
)
.externalCatalog(
"extCat1",
table("tab1"),
extCatalog(
"extCat2",
extCatalog("extCat3",
table("tab1")
),
table("tab1"))
).build();
}
private static CatalogManager catalogWithSpecialCharacters() throws Exception {
return root()
.builtin(
database(
"default",
table("tab.1")
),
database(
"default.db",
table("tab1"),
table("tab.1")
),
database(
"default db",
table("tab 1")
)
).build();
}
@Parameter
public TestSpec testSpec;
@Test
public void testTableApiPathResolution() {
List<String> lookupPath = testSpec.getTableApiLookupPath();
CatalogManager catalogManager = testSpec.getCatalogManager();
testSpec.getDefaultCatalog().ifPresent(catalogManager::setCurrentCatalog);
testSpec.getDefaultDatabase().ifPresent(catalogManager::setCurrentDatabase);
CatalogTableOperation tab = catalogManager.resolveTable(lookupPath.toArray(new String[0])).get();
assertThat(tab.getTablePath(), CoreMatchers.equalTo(testSpec.getExpectedPath()));
}
@Test
public void testStreamSqlPathResolution() {
StreamTableTestUtil util = new StreamTableTestUtil(new Some<>(testSpec.getCatalogManager()));
StreamTableEnvImpl tEnv = util.javaTableEnv();
testSpec.getDefaultCatalog().ifPresent(tEnv::useCatalog);
testSpec.getDefaultDatabase().ifPresent(tEnv::useDatabase);
util.verifyJavaSql(
format("SELECT * FROM %s", testSpec.getSqlPathToLookup()),
format(
"StreamTableSourceScan(table=[[%s]], fields=[], source=[()])",
String.join(", ", testSpec.getExpectedPath()))
);
}
private static class DatabasePath {
private final String catalogName;
private final String databaseName;
DatabasePath(String catalogName, String databaseName) {
this.catalogName = catalogName;
this.databaseName = databaseName;
}
public String getCatalogName() {
return catalogName;
}
public String getDatabaseName() {
return databaseName;
}
}
static class TestSpec {
private String label;
private String sqlPathToLookup;
private List<String> tableApiLookupPath;
private List<String> expectedPath;
private String defaultCatalog;
private String defaultDatabase;
private CatalogManager catalogManager;
public TestSpec(String label) {
this.label = label;
}
public static TestSpec testSpec(String label) {
return new TestSpec(label);
}
public TestSpec withCatalogManager(CatalogManager catalogManager) {
this.catalogManager = catalogManager;
return this;
}
public TestSpec tableApiLookupPath(String... path) {
this.tableApiLookupPath = asList(path);
return this;
}
public TestSpec sqlLookupPath(String path) {
this.sqlPathToLookup = path;
return this;
}
public TestSpec expectPath(String... expectedPath) {
Preconditions.checkArgument(
sqlPathToLookup != null && tableApiLookupPath != null,
"Both sql & table API versions of path lookups required. Remember expectPath needs to be called last");
Preconditions.checkArgument(
catalogManager != null,
"A catalog manager needs to provided. Remember expectPath needs to be called last"
);
this.expectedPath = asList(expectedPath);
return this;
}
public TestSpec withDefaultPath(String defaultCatalog) {
this.defaultCatalog = defaultCatalog;
return this;
}
public TestSpec withDefaultPath(String defaultCatalog, String defaultDatabase) {
this.defaultCatalog = defaultCatalog;
this.defaultDatabase = defaultDatabase;
return this;
}
public String getSqlPathToLookup() {
return sqlPathToLookup;
}
public List<String> getTableApiLookupPath() {
return tableApiLookupPath;
}
public CatalogManager getCatalogManager() {
return catalogManager;
}
public List<String> getExpectedPath() {
return expectedPath;
}
public Optional<String> getDefaultCatalog() {
return Optional.ofNullable(defaultCatalog);
}
public Optional<String> getDefaultDatabase() {
return Optional.ofNullable(defaultDatabase);
}
@Override
public String toString() {
StringBuilder stringBuilder = new StringBuilder();
List<String> properties = new ArrayList<>();
if (defaultCatalog != null) {
properties.add("defaultCatalog: " + defaultCatalog);
}
if (defaultDatabase != null) {
properties.add("defaultDatabase: " + defaultDatabase);
}
properties.add("sqlPath: " + sqlPathToLookup);
properties.add("tableApiPath: " + tableApiLookupPath);
properties.add("expectedPath: " + expectedPath);
stringBuilder.append(format("%s=[%s]", label, String.join(", ", properties)));
return stringBuilder.toString();
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.table.catalog;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.factories.TableSourceFactory;
import org.apache.flink.table.sources.BatchTableSource;
import org.apache.flink.table.sources.StreamTableSource;
import org.apache.flink.table.sources.TableSource;
import org.apache.flink.types.Row;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_TYPE;
/**
* Table source factory for testing. It creates a dummy {@link TableSource}
* that returns an empty {@link TableSchema}.
*/
public class TestExternalTableSourceFactory implements TableSourceFactory<Row> {
static final String TEST_EXTERNAL_CONNECTOR_TYPE = "test-external-connector";
@Override
public Map<String, String> requiredContext() {
Map<String, String> properties = new HashMap<>();
properties.put(CONNECTOR_TYPE, TEST_EXTERNAL_CONNECTOR_TYPE);
return properties;
}
@Override
public List<String> supportedProperties() {
return Collections.emptyList();
}
@Override
public TableSource<Row> createTableSource(Map<String, String> properties) {
return new TestExternalTableSource();
}
private static class TestExternalTableSource implements StreamTableSource<Row>, BatchTableSource<Row> {
private final TableSchema tableSchema = new TableSchema(new String[0], new TypeInformation[0]);
@Override
public DataSet<Row> getDataSet(ExecutionEnvironment execEnv) {
return null;
}
@Override
public DataStream<Row> getDataStream(StreamExecutionEnvironment execEnv) {
return null;
}
@Override
public TypeInformation<Row> getReturnType() {
return tableSchema.toRowType();
}
@Override
public TableSchema getTableSchema() {
return tableSchema;
}
@Override
public String explainSource() {
return "()";
}
}
}
......@@ -33,6 +33,7 @@ import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.java.BatchTableEnvironment;
import org.apache.flink.table.calcite.CalciteConfigBuilder;
import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
import org.apache.flink.table.runtime.utils.TableProgramsCollectionTestBase;
import org.apache.flink.table.runtime.utils.TableProgramsTestBase;
import org.apache.flink.test.operators.util.CollectionDataSets;
......@@ -114,7 +115,7 @@ public class JavaTableEnvironmentITCase extends TableProgramsCollectionTestBase
compareResultAsText(results, expected);
}
@Test(expected = TableException.class)
@Test(expected = TableAlreadyExistException.class)
public void testRegisterExistingDatasetTable() throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment tableEnv = BatchTableEnvironment.create(env, config());
......
......@@ -20,3 +20,4 @@ org.apache.flink.table.factories.utils.TestTableSourceFactory
org.apache.flink.table.factories.utils.TestTableFormatFactory
org.apache.flink.table.factories.utils.TestAmbiguousTableFormatFactory
org.apache.flink.table.factories.utils.TestExternalCatalogFactory
org.apache.flink.table.catalog.TestExternalTableSourceFactory
......@@ -45,7 +45,7 @@ class ExternalCatalogInsertTest extends TableTestBase {
val table2 = tableBatchEnv.scan("test", "db2", "tb2")
table2.select('d * 2, 'e, 'g.upperCase())
.unionAll(table1.select('a * 2, 'b, 'c.upperCase()))
.insertInto("test.db3.tb3")
.insertInto("test", "db3", "tb3")
}
@Test
......@@ -54,7 +54,7 @@ class ExternalCatalogInsertTest extends TableTestBase {
"test",
CommonTestData.getInMemoryTestCatalog(isStreaming = false))
val sqlInsert = "INSERT INTO `test.db3.tb3` SELECT d * 2, e, g FROM test.db2.tb2 WHERE d < 3 " +
val sqlInsert = "INSERT INTO test.db3.tb3 SELECT d * 2, e, g FROM test.db2.tb2 WHERE d < 3 " +
"UNION ALL (SELECT a * 2, b, c FROM test.db1.tb1)"
tableBatchEnv.sqlUpdate(sqlInsert)
......@@ -74,7 +74,7 @@ class ExternalCatalogInsertTest extends TableTestBase {
table2.where("d < 3")
.select('d * 2, 'e, 'g.upperCase())
.unionAll(table1.select('a * 2, 'b, 'c.upperCase()))
.insertInto("test.db3.tb3")
.insertInto("test", "db3", "tb3")
}
@Test
......@@ -85,7 +85,7 @@ class ExternalCatalogInsertTest extends TableTestBase {
"test",
CommonTestData.getInMemoryTestCatalog(isStreaming = true))
val sqlInsert = "INSERT INTO `test.db3.tb3` SELECT d * 2, e, g FROM test.db2.tb2 WHERE d < 3 " +
val sqlInsert = "INSERT INTO test.db3.tb3 SELECT d * 2, e, g FROM test.db2.tb2 WHERE d < 3 " +
"UNION ALL (SELECT a * 2, b, c FROM test.db1.tb1)"
tableEnv.sqlUpdate(sqlInsert)
......@@ -103,6 +103,6 @@ class ExternalCatalogInsertTest extends TableTestBase {
val table2 = tableEnv.scan("test", "db2", "tb2")
table2.select('d * 2, 'e, 'g.upperCase())
.unionAll(table1.select('a * 2, 'b, 'c.upperCase()))
.insertInto("test.tb3")
.insertInto("test", "tb3")
}
}
......@@ -30,6 +30,7 @@ import org.apache.flink.table.api.java.{StreamTableEnvironment => JStreamTableEn
import org.apache.flink.table.api.java.{StreamTableEnvImpl => JStreamTableEnvImpl}
import org.apache.flink.table.api.scala._
import org.apache.flink.table.api.{TableConfig, Types}
import org.apache.flink.table.catalog.{CatalogManager, GenericInMemoryCatalog}
import org.apache.flink.table.utils.TableTestUtil.{binaryNode, streamTableNode, term, unaryNode}
import org.apache.flink.table.utils.TableTestBase
import org.junit.Test
......@@ -176,7 +177,13 @@ class StreamTableEnvironmentTest extends TableTestBase {
val jStreamExecEnv = mock(classOf[JStreamExecEnv])
when(jStreamExecEnv.getStreamTimeCharacteristic).thenReturn(TimeCharacteristic.EventTime)
val jTEnv = new JStreamTableEnvImpl(jStreamExecEnv, new TableConfig)
val config = new TableConfig
val jTEnv = new JStreamTableEnvImpl(
jStreamExecEnv,
config,
new CatalogManager(
config.getBuiltInCatalogName,
new GenericInMemoryCatalog(config.getBuiltInCatalogName, config.getBuiltInDatabaseName)))
val sType = new TupleTypeInfo(Types.LONG, Types.INT, Types.STRING, Types.INT, Types.LONG)
.asInstanceOf[TupleTypeInfo[JTuple5[JLong, JInt, String, JInt, JLong]]]
......
......@@ -26,6 +26,7 @@ import org.apache.flink.api.scala.util.CollectionDataSets
import org.apache.flink.table.api.TableEnvironmentTest.{CClass, PojoClass}
import org.apache.flink.table.api.scala._
import org.apache.flink.table.api.TableException
import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException
import org.apache.flink.table.runtime.types.CRowTypeInfo
import org.apache.flink.table.utils.TableTestBase
import org.apache.flink.types.Row
......@@ -110,7 +111,7 @@ class TableEnvironmentValidationTest extends TableTestBase {
util.addTable('first)(genericRowType)
}
@Test(expected = classOf[TableException])
@Test(expected = classOf[TableAlreadyExistException])
def testRegisterExistingDataSet(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val tEnv = BatchTableEnvironment.create(env)
......@@ -130,7 +131,7 @@ class TableEnvironmentValidationTest extends TableTestBase {
tEnv.scan("someTable")
}
@Test(expected = classOf[TableException])
@Test(expected = classOf[TableAlreadyExistException])
def testRegisterExistingTable(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val tEnv = BatchTableEnvironment.create(env)
......
......@@ -22,6 +22,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.scala._
import org.apache.flink.table.api.{TableException, Types}
import org.apache.flink.table.api.scala._
import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException
import org.apache.flink.table.runtime.stream.table.TestAppendSink
import org.apache.flink.table.utils.MemoryTableSourceSinkUtil.UnsafeMemoryAppendTableSink
import org.apache.flink.table.utils.TableTestBase
......@@ -42,7 +43,7 @@ class TableSinksValidationTest extends TableTestBase {
.insertInto("testSink")
}
@Test(expected = classOf[TableException])
@Test(expected = classOf[TableAlreadyExistException])
def testSinkTableRegistrationUsingExistedTableName(): Unit = {
val util = streamTestUtil()
util.addTable[(Int, String)]("TargetTable", 'id, 'text)
......
......@@ -48,8 +48,7 @@ class ExternalCatalogSchemaTest extends TableTestBase {
def setUp(): Unit = {
val rootSchemaPlus: SchemaPlus = CalciteSchema.createRootSchema(true, false).plus()
val catalog = CommonTestData.getInMemoryTestCatalog(isStreaming = true)
ExternalCatalogSchema.registerCatalog(
streamTestUtil().tableEnv, rootSchemaPlus, schemaName, catalog)
ExternalCatalogSchema.registerCatalog(false, rootSchemaPlus, schemaName, catalog)
externalCatalogSchema = rootSchemaPlus.getSubSchema("schemaName")
val typeFactory = new FlinkTypeFactory(new FlinkTypeSystem())
val prop = new Properties()
......
......@@ -25,7 +25,7 @@ import org.apache.calcite.plan.hep.{HepMatchOrder, HepPlanner, HepProgramBuilder
import org.apache.calcite.rel.RelNode
import org.apache.calcite.rex.RexNode
import org.apache.calcite.sql.`type`.SqlTypeName._
import org.apache.calcite.tools.{Programs, RelBuilder}
import org.apache.calcite.tools.Programs
import org.apache.flink.api.common.TaskInfo
import org.apache.flink.api.common.accumulators.Accumulator
import org.apache.flink.api.common.functions._
......@@ -37,10 +37,9 @@ import org.apache.flink.api.java.{DataSet => JDataSet}
import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment}
import org.apache.flink.configuration.Configuration
import org.apache.flink.core.fs.Path
import org.apache.flink.table.api.scala.BatchTableEnvImpl
import org.apache.flink.table.api.scala.BatchTableEnvironment
import org.apache.flink.table.api.scala.{BatchTableEnvImpl, BatchTableEnvironment}
import org.apache.flink.table.api.{TableConfig, TableEnvImpl, TableImpl}
import org.apache.flink.table.calcite.FlinkPlannerImpl
import org.apache.flink.table.calcite.FlinkRelBuilder
import org.apache.flink.table.codegen.{Compiler, FunctionCodeGenerator, GeneratedFunction}
import org.apache.flink.table.expressions.{Expression, ExpressionParser}
import org.apache.flink.table.functions.ScalarFunction
......@@ -63,10 +62,7 @@ abstract class ExpressionTestBase {
// setup test utils
private val tableName = "testTable"
private val context = prepareContext(typeInfo)
private val planner = new FlinkPlannerImpl(
context._2.getFrameworkConfig,
context._2.getPlanner,
context._2.getTypeFactory)
private val planner = context._2.getFlinkPlanner
private val logicalOptProgram = Programs.ofRules(FlinkRuleSets.LOGICAL_OPT_RULES)
private val dataSetOptProgram = Programs.ofRules(FlinkRuleSets.DATASET_OPT_RULES)
......@@ -77,11 +73,11 @@ abstract class ExpressionTestBase {
while (it.hasNext) {
builder.addRuleInstance(it.next())
}
new HepPlanner(builder.build, context._2.getFrameworkConfig.getContext)
new HepPlanner(builder.build, context._1.getPlanner.getContext)
}
private def prepareContext(typeInfo: TypeInformation[Any])
: (RelBuilder, TableEnvImpl, ExecutionEnvironment) = {
: (FlinkRelBuilder, TableEnvImpl, ExecutionEnvironment) = {
// create DataSetTable
val dataSetMock = mock(classOf[DataSet[Any]])
val jDataSetMock = mock(classOf[JDataSet[Any]])
......
......@@ -24,9 +24,8 @@ import org.apache.flink.api.java.typeutils.RowTypeInfo
import org.apache.flink.cep.pattern.Pattern
import org.apache.flink.streaming.api.datastream.{DataStream => JDataStream}
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.table.api.scala._
import org.apache.flink.table.api.TableConfig
import org.apache.flink.table.calcite.FlinkPlannerImpl
import org.apache.flink.table.api.scala._
import org.apache.flink.table.plan.nodes.datastream.{DataStreamMatch, DataStreamScan}
import org.apache.flink.types.Row
import org.apache.flink.util.TestLogger
......@@ -46,10 +45,7 @@ abstract class PatternTranslatorTestBase extends TestLogger{
private val testTableTypeInfo = new RowTypeInfo(BasicTypeInfo.INT_TYPE_INFO)
private val tableName = "testTable"
private val context = prepareContext(testTableTypeInfo)
private val planner = new FlinkPlannerImpl(
context._2.getFrameworkConfig,
context._2.getPlanner,
context._2.getTypeFactory)
private val planner = context._2.getFlinkPlanner
private def prepareContext(typeInfo: TypeInformation[Row])
: (RelBuilder, StreamTableEnvImpl, StreamExecutionEnvironment) = {
......
......@@ -18,9 +18,11 @@
package org.apache.flink.table.utils
import java.util.Optional
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.table.api.{QueryConfig, Table, TableConfig, TableEnvironment}
import org.apache.flink.table.catalog.ExternalCatalog
import org.apache.flink.table.catalog.{Catalog, ExternalCatalog}
import org.apache.flink.table.descriptors.{ConnectorDescriptor, TableDescriptor}
import org.apache.flink.table.functions.ScalarFunction
import org.apache.flink.table.sinks.TableSink
......@@ -66,4 +68,18 @@ class MockTableEnvironment extends TableEnvironment {
override def sqlUpdate(stmt: String, config: QueryConfig): Unit = ???
override def getConfig: TableConfig = ???
override def registerCatalog(
name: String,
catalog: Catalog): Unit = ???
override def getCatalog(catalogName: String): Optional[Catalog] = ???
override def getCurrentCatalog: String = ???
override def getCurrentDatabase: String = ???
override def useCatalog(catalogName: String): Unit = ???
override def useDatabase(databaseName: String): Unit = ???
}
......@@ -32,8 +32,10 @@ import org.apache.flink.table.api.scala.{BatchTableEnvironment => ScalaBatchTabl
import org.apache.flink.table.api.scala.{BatchTableEnvImpl => ScalaBatchTableEnvImpl, StreamTableEnvImpl => ScalaStreamTableEnvImpl}
import org.apache.flink.table.api.scala._
import org.apache.flink.table.api.{Table, TableConfig, TableImpl, TableSchema}
import org.apache.flink.table.catalog.{CatalogManager, GenericCatalogDatabase, GenericInMemoryCatalog}
import org.apache.flink.table.expressions.Expression
import org.apache.flink.table.functions.{AggregateFunction, ScalarFunction, TableFunction}
import org.apache.flink.table.utils.TableTestUtil.{createCatalogManager, extractBuiltinPath}
import org.junit.Assert.assertEquals
import org.junit.{ComparisonFailure, Rule}
import org.junit.rules.ExpectedException
......@@ -70,7 +72,7 @@ class TableTestBase {
}
}
abstract class TableTestUtil {
abstract class TableTestUtil(verifyCatalogPath: Boolean = false) {
private var counter = 0
......@@ -106,8 +108,16 @@ abstract class TableTestUtil {
// depends on the native machine (Little/Big Endian)
val actualNoCharset = actual.replace("_UTF-16LE'", "'").replace("_UTF-16BE'", "'")
// majority of tests did not assume existence of Catalog API.
// this enables disabling catalog path verification
val actualWithAdjustedPath = if (!verifyCatalogPath) {
actualNoCharset.replaceAll("default_catalog, default_database, ", "")
} else {
actualNoCharset
}
val expectedLines = expected.split("\n").map(_.trim)
val actualLines = actualNoCharset.split("\n").map(_.trim)
val actualLines = actualWithAdjustedPath.split("\n").map(_.trim)
val expectedMessage = expectedLines.mkString("\n")
val actualMessage = actualLines.mkString("\n")
......@@ -118,8 +128,7 @@ abstract class TableTestUtil {
}
else if (expectedLine == TableTestUtil.ANY_SUBTREE) {
break
}
else if (expectedLine != actualLine) {
} else if (expectedLine != actualLine) {
throw new ComparisonFailure(null, expectedMessage, actualMessage)
}
}
......@@ -134,6 +143,27 @@ object TableTestUtil {
val ANY_SUBTREE = "%ANY_SUBTREE%"
/**
* Creates a [[CatalogManager]] with a builtin default catalog & database set to values
* specified in the [[TableConfig]].
*/
def createCatalogManager(config: TableConfig): CatalogManager = {
new CatalogManager(
config.getBuiltInCatalogName,
new GenericInMemoryCatalog(config.getBuiltInCatalogName, config.getBuiltInDatabaseName))
}
/**
* Sets the configuration of the builtin catalog & databases in [[TableConfig]]
* to the current catalog & database of the given [[CatalogManager]]. This should be used
* to ensure sanity of a [[org.apache.flink.table.api.TableEnvironment]].
*/
def extractBuiltinPath(config: TableConfig, catalogManager: CatalogManager): TableConfig = {
config.setBuiltInCatalogName(catalogManager.getCurrentCatalog)
config.setBuiltInDatabaseName(catalogManager.getCurrentDatabase)
config
}
// this methods are currently just for simplifying string construction,
// we could replace it with logic later
......@@ -189,11 +219,27 @@ object TableTestUtil {
}
}
case class BatchTableTestUtil() extends TableTestUtil {
case class BatchTableTestUtil(
catalogManager: Option[CatalogManager] = None)
extends TableTestUtil {
val javaEnv = new LocalEnvironment()
val javaTableEnv = new JavaBatchTableEnvImpl(javaEnv, new TableConfig)
private def tableConfig = catalogManager match {
case Some(c) =>
TableTestUtil.extractBuiltinPath(new TableConfig, c)
case None =>
new TableConfig
}
val javaTableEnv = new JavaBatchTableEnvImpl(
javaEnv,
tableConfig,
catalogManager.getOrElse(createCatalogManager(new TableConfig)))
val env = new ExecutionEnvironment(javaEnv)
val tableEnv = ScalaBatchTableEnv.create(env).asInstanceOf[ScalaBatchTableEnvImpl]
val tableEnv = new ScalaBatchTableEnvImpl(
env,
tableConfig,
catalogManager.getOrElse(createCatalogManager(new TableConfig)))
def addTable[T: TypeInformation](
name: String,
......@@ -273,13 +319,28 @@ case class BatchTableTestUtil() extends TableTestUtil {
}
}
case class StreamTableTestUtil() extends TableTestUtil {
case class StreamTableTestUtil(
catalogManager: Option[CatalogManager] = None)
extends TableTestUtil {
val javaEnv = new LocalStreamEnvironment()
javaEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val javaTableEnv = new JavaStreamTableEnvImpl(javaEnv, new TableConfig)
private def tableConfig = catalogManager match {
case Some(c) =>
TableTestUtil.extractBuiltinPath(new TableConfig, c)
case None =>
new TableConfig
}
val javaTableEnv = new JavaStreamTableEnvImpl(
javaEnv,
tableConfig,
catalogManager.getOrElse(createCatalogManager(new TableConfig)))
val env = new StreamExecutionEnvironment(javaEnv)
val tableEnv = ScalaStreamTableEnv.create(env).asInstanceOf[StreamTableEnvImpl]
val tableEnv = new StreamTableEnvImpl(
env,
tableConfig,
catalogManager.getOrElse(createCatalogManager(new TableConfig)))
def addTable[T: TypeInformation](
name: String,
......
== Abstract Syntax Tree ==
LogicalFilter(condition=[=(MOD($0, 2), 0)])
LogicalTableScan(table=[[_DataSetTable_0]])
LogicalTableScan(table=[[default_catalog, default_database, _DataSetTable_0]])
== Optimized Logical Plan ==
DataSetCalc(select=[a, b], where=[=(MOD(a, 2), 0)])
DataSetScan(table=[[_DataSetTable_0]])
DataSetScan(table=[[default_catalog, default_database, _DataSetTable_0]])
== Physical Execution Plan ==
Stage 3 : Data Source
......
== Abstract Syntax Tree ==
LogicalFilter(condition=[=(MOD($0, 2), 0)])
LogicalTableScan(table=[[_DataSetTable_0]])
LogicalTableScan(table=[[default_catalog, default_database, _DataSetTable_0]])
== Optimized Logical Plan ==
DataSetCalc(select=[a, b], where=[=(MOD(a, 2), 0)])
DataSetScan(table=[[_DataSetTable_0]])
DataSetScan(table=[[default_catalog, default_database, _DataSetTable_0]])
== Physical Execution Plan ==
Stage 3 : Data Source
......
== Abstract Syntax Tree ==
LogicalFilter(condition=[=(MOD($0, 2), 0)])
LogicalTableScan(table=[[_DataStreamTable_0]])
LogicalTableScan(table=[[default_catalog, default_database, _DataStreamTable_0]])
== Optimized Logical Plan ==
DataStreamCalc(select=[a, b], where=[=(MOD(a, 2), 0)])
DataStreamScan(table=[[_DataStreamTable_0]])
DataStreamScan(table=[[default_catalog, default_database, _DataStreamTable_0]])
== Physical Execution Plan ==
Stage 1 : Data Source
......
......@@ -2,14 +2,14 @@
LogicalProject(a=[$0], c=[$2])
LogicalFilter(condition=[=($1, $3)])
LogicalJoin(condition=[true], joinType=[inner])
LogicalTableScan(table=[[_DataSetTable_0]])
LogicalTableScan(table=[[_DataSetTable_1]])
LogicalTableScan(table=[[default_catalog, default_database, _DataSetTable_0]])
LogicalTableScan(table=[[default_catalog, default_database, _DataSetTable_1]])
== Optimized Logical Plan ==
DataSetCalc(select=[a, c])
DataSetJoin(where=[=(b, d)], join=[a, b, c, d], joinType=[InnerJoin])
DataSetScan(table=[[_DataSetTable_0]])
DataSetScan(table=[[_DataSetTable_1]])
DataSetScan(table=[[default_catalog, default_database, _DataSetTable_0]])
DataSetScan(table=[[default_catalog, default_database, _DataSetTable_1]])
== Physical Execution Plan ==
Stage 4 : Data Source
......
......@@ -2,14 +2,14 @@
LogicalProject(a=[$0], c=[$2])
LogicalFilter(condition=[=($1, $3)])
LogicalJoin(condition=[true], joinType=[inner])
LogicalTableScan(table=[[_DataSetTable_0]])
LogicalTableScan(table=[[_DataSetTable_1]])
LogicalTableScan(table=[[default_catalog, default_database, _DataSetTable_0]])
LogicalTableScan(table=[[default_catalog, default_database, _DataSetTable_1]])
== Optimized Logical Plan ==
DataSetCalc(select=[a, c])
DataSetJoin(where=[=(b, d)], join=[a, b, c, d], joinType=[InnerJoin])
DataSetScan(table=[[_DataSetTable_0]])
DataSetScan(table=[[_DataSetTable_1]])
DataSetScan(table=[[default_catalog, default_database, _DataSetTable_0]])
DataSetScan(table=[[default_catalog, default_database, _DataSetTable_1]])
== Physical Execution Plan ==
Stage 4 : Data Source
......
== Abstract Syntax Tree ==
LogicalUnion(all=[true])
LogicalTableScan(table=[[_DataSetTable_0]])
LogicalTableScan(table=[[_DataSetTable_1]])
LogicalTableScan(table=[[default_catalog, default_database, _DataSetTable_0]])
LogicalTableScan(table=[[default_catalog, default_database, _DataSetTable_1]])
== Optimized Logical Plan ==
DataSetUnion(all=[true], union=[count, word])
DataSetScan(table=[[_DataSetTable_0]])
DataSetScan(table=[[_DataSetTable_1]])
DataSetScan(table=[[default_catalog, default_database, _DataSetTable_0]])
DataSetScan(table=[[default_catalog, default_database, _DataSetTable_1]])
== Physical Execution Plan ==
Stage 3 : Data Source
......
== Abstract Syntax Tree ==
LogicalUnion(all=[true])
LogicalTableScan(table=[[_DataSetTable_0]])
LogicalTableScan(table=[[_DataSetTable_1]])
LogicalTableScan(table=[[default_catalog, default_database, _DataSetTable_0]])
LogicalTableScan(table=[[default_catalog, default_database, _DataSetTable_1]])
== Optimized Logical Plan ==
DataSetUnion(all=[true], union=[count, word])
DataSetScan(table=[[_DataSetTable_0]])
DataSetScan(table=[[_DataSetTable_1]])
DataSetScan(table=[[default_catalog, default_database, _DataSetTable_0]])
DataSetScan(table=[[default_catalog, default_database, _DataSetTable_1]])
== Physical Execution Plan ==
Stage 3 : Data Source
......
== Abstract Syntax Tree ==
LogicalUnion(all=[true])
LogicalTableScan(table=[[_DataStreamTable_0]])
LogicalTableScan(table=[[_DataStreamTable_1]])
LogicalTableScan(table=[[default_catalog, default_database, _DataStreamTable_0]])
LogicalTableScan(table=[[default_catalog, default_database, _DataStreamTable_1]])
== Optimized Logical Plan ==
DataStreamUnion(all=[true], union all=[count, word])
DataStreamScan(table=[[_DataStreamTable_0]])
DataStreamScan(table=[[_DataStreamTable_1]])
DataStreamScan(table=[[default_catalog, default_database, _DataStreamTable_0]])
DataStreamScan(table=[[default_catalog, default_database, _DataStreamTable_1]])
== Physical Execution Plan ==
Stage 1 : Data Source
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册