提交 55b4d0b2 编写于 作者: B bowen.li 提交者: Dawid Wysakowicz

[FLINK-11476] [SQL/TABLE] Create CatalogManager to manage multiple catalogs...

[FLINK-11476] [SQL/TABLE] Create CatalogManager to manage multiple catalogs and encapsulate Calcite schema
上级 07efbde1
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.table.api;
/**
* Exception for adding an already existent catalog.
*/
public class CatalogAlreadyExistsException extends Exception {
public CatalogAlreadyExistsException(String catalogName) {
this(catalogName, null);
}
public CatalogAlreadyExistsException(String catalogName, Throwable cause) {
super(String.format("Catalog %s already exists.", catalogName), cause);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.table.catalog;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.table.api.CatalogAlreadyExistsException;
import org.apache.flink.table.api.CatalogNotExistException;
import java.util.Set;
/**
* CatalogManager manages all the registered ReadableCatalog instances with unique names.
* It has a concept of current catalog, which will be used when it is not given when referencing meta-objects.
*/
@PublicEvolving
public interface CatalogManager {
/**
* Register a catalog with a unique name.
*
* @param catalogName catalog name to register
* @param catalog catalog to register
* @throws CatalogAlreadyExistsException thrown if the name is already take
*/
void registerCatalog(String catalogName, Catalog catalog) throws CatalogAlreadyExistsException;
/**
* Get a catalog by name.
*
* @param catalogName catalog name
* @return the requested catalog
* @throws CatalogNotExistException thrown if the catalog doesn't exist
*/
Catalog getCatalog(String catalogName) throws CatalogNotExistException;
/**
* Get names of all registered catalog.
*
* @return a set of names of registered catalogs
*/
Set<String> getCatalogNames();
/**
* Get the current catalog.
*
* @return the current catalog
*/
Catalog getCurrentCatalog();
/**
* Set the current catalog name.
*
* @param catalogName catalog name to set as current catalog
* @throws CatalogNotExistException thrown if the catalog doesn't exist
*/
void setCurrentCatalog(String catalogName) throws CatalogNotExistException;
}
......@@ -25,10 +25,19 @@ package org.apache.flink.table.catalog.exceptions;
public class DatabaseNotExistException extends Exception {
private static final String MSG = "Database %s does not exist in Catalog %s.";
/**
* @param catalog Catalog name
* @param database Database name
* @param cause The cause
*/
public DatabaseNotExistException(String catalog, String database, Throwable cause) {
super(String.format(MSG, database, catalog), cause);
}
/**
* @param catalog Catalog name
* @param database Database name
*/
public DatabaseNotExistException(String catalog, String database) {
this(catalog, database, null);
}
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.table.catalog;
import org.apache.flink.annotation.Internal;
import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
import org.apache.flink.table.catalog.exceptions.TableNotExistException;
import org.apache.calcite.linq4j.tree.Expression;
import org.apache.calcite.rel.type.RelProtoDataType;
import org.apache.calcite.schema.Function;
import org.apache.calcite.schema.Schema;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.calcite.schema.SchemaVersion;
import org.apache.calcite.schema.Schemas;
import org.apache.calcite.schema.Table;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Collection;
import java.util.HashSet;
import java.util.Set;
/**
* A mapping between Flink's catalog and Calcite's schema. This enables to look up and access tables
* in SQL queries without registering tables in advance. Databases are registered as sub-schemas in the schema.
* This mapping is modeled as a strict two-level reference structure for Flink in Calcite,
* the full path of tables and views is of format [catalog_name].[db_name].[meta-object_name].
*/
@Internal
public class CatalogCalciteSchema implements Schema {
private static final Logger LOGGER = LoggerFactory.getLogger(CatalogCalciteSchema.class);
private final String catalogName;
private final Catalog catalog;
public CatalogCalciteSchema(String catalogName, Catalog catalog) {
this.catalogName = catalogName;
this.catalog = catalog;
}
/**
* Look up a sub-schema (database) by the given sub-schema name.
*
* @param schemaName name of sub-schema to look up
* @return the sub-schema with a given dbName, or null
*/
@Override
public Schema getSubSchema(String schemaName) {
if (catalog.databaseExists(schemaName)) {
return new DatabaseCalciteSchema(schemaName, catalog);
} else {
LOGGER.error(String.format("Schema %s does not exist in catalog %s", schemaName, catalogName));
throw new CatalogException(
new DatabaseNotExistException(catalogName, schemaName));
}
}
@Override
public Set<String> getSubSchemaNames() {
return new HashSet<>(catalog.listDatabases());
}
@Override
public Table getTable(String name) {
return null;
}
@Override
public Set<String> getTableNames() {
return new HashSet<>();
}
@Override
public RelProtoDataType getType(String name) {
return null;
}
@Override
public Set<String> getTypeNames() {
return new HashSet<>();
}
@Override
public Collection<Function> getFunctions(String s) {
return new HashSet<>();
}
@Override
public Set<String> getFunctionNames() {
return new HashSet<>();
}
@Override
public Expression getExpression(SchemaPlus parentSchema, String name) {
return Schemas.subSchemaExpression(parentSchema, name, getClass());
}
@Override
public boolean isMutable() {
return true;
}
@Override
public Schema snapshot(SchemaVersion schemaVersion) {
return this;
}
/**
* Register a ReadableCatalog to Calcite schema with given name.
*
* @param parentSchema the parent schema
* @param catalogName name of the catalog to register
* @param catalog the catalog to register
*/
public static void registerCatalog(
SchemaPlus parentSchema,
String catalogName,
Catalog catalog) {
CatalogCalciteSchema newCatalog = new CatalogCalciteSchema(catalogName, catalog);
parentSchema.add(catalogName, newCatalog);
LOGGER.info("Registered catalog '{}' to Calcite", catalogName);
}
/**
* A mapping between Flink catalog's database and Calcite's schema.
* Tables are registered as tables in the schema.
*/
private class DatabaseCalciteSchema implements Schema {
private final String dbName;
private final Catalog catalog;
public DatabaseCalciteSchema(String dbName, Catalog catalog) {
this.dbName = dbName;
this.catalog = catalog;
}
@Override
public Table getTable(String tableName) {
LOGGER.info("Getting table '{}' from catalog '{}'", tableName, catalogName);
ObjectPath tablePath = new ObjectPath(dbName, tableName);
try {
CatalogBaseTable table = catalog.getTable(tablePath);
LOGGER.info("Successfully got table '{}' from catalog '{}'", tableName, catalogName);
// TODO: [FLINK-12257] Convert CatalogBaseTable to org.apache.calcite.schema.Table
// so that planner can use unified catalog APIs
return null;
} catch (TableNotExistException e) {
throw new CatalogException(e);
}
}
@Override
public Set<String> getTableNames() {
try {
return new HashSet<>(catalog.listTables(dbName));
} catch (DatabaseNotExistException e) {
throw new CatalogException(e);
}
}
@Override
public RelProtoDataType getType(String name) {
return null;
}
@Override
public Set<String> getTypeNames() {
return new HashSet<>();
}
@Override
public Collection<Function> getFunctions(String s) {
return new HashSet<>();
}
@Override
public Set<String> getFunctionNames() {
return new HashSet<>();
}
@Override
public Schema getSubSchema(String s) {
return null;
}
@Override
public Set<String> getSubSchemaNames() {
return new HashSet<>();
}
@Override
public Expression getExpression(SchemaPlus parentSchema, String name) {
return Schemas.subSchemaExpression(parentSchema, name, getClass());
}
@Override
public boolean isMutable() {
return true;
}
@Override
public Schema snapshot(SchemaVersion schemaVersion) {
return this;
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.table.catalog;
import org.apache.flink.table.api.CatalogAlreadyExistsException;
import org.apache.flink.table.api.CatalogNotExistException;
import org.apache.flink.util.StringUtils;
import org.apache.calcite.jdbc.CalciteSchema;
import org.apache.calcite.schema.SchemaPlus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
/**
* A CatalogManager implementation for Flink.
* TODO: [FLINK-11275] Decouple CatalogManager with Calcite
* Idealy FlinkCatalogManager should be in flink-table-api-java module.
* But due to that it currently depends on Calcite, a dependency that flink-table-api-java doesn't have right now.
* We temporarily put FlinkCatalogManager in flink-table-planner-blink.
*/
public class FlinkCatalogManager implements CatalogManager {
private static final Logger LOG = LoggerFactory.getLogger(FlinkCatalogManager.class);
public static final String BUILTIN_CATALOG_NAME = "builtin";
// The catalog to hold all registered and translated tables
// We disable caching here to prevent side effects
private CalciteSchema internalSchema = CalciteSchema.createRootSchema(false, false);
private SchemaPlus rootSchema = internalSchema.plus();
// A map between names and catalogs.
private Map<String, Catalog> catalogs;
// The name of the default catalog and schema
private String currentCatalogName;
public FlinkCatalogManager() {
LOG.info("Initializing FlinkCatalogManager");
catalogs = new HashMap<>();
GenericInMemoryCatalog inMemoryCatalog = new GenericInMemoryCatalog(BUILTIN_CATALOG_NAME);
catalogs.put(BUILTIN_CATALOG_NAME, inMemoryCatalog);
currentCatalogName = BUILTIN_CATALOG_NAME;
CatalogCalciteSchema.registerCatalog(rootSchema, BUILTIN_CATALOG_NAME, inMemoryCatalog);
}
@Override
public void registerCatalog(String catalogName, Catalog catalog) throws CatalogAlreadyExistsException {
checkArgument(!StringUtils.isNullOrWhitespaceOnly(catalogName), "catalogName cannot be null or empty");
checkNotNull(catalog, "catalog cannot be null");
if (catalogs.containsKey(catalogName)) {
throw new CatalogAlreadyExistsException(catalogName);
}
catalogs.put(catalogName, catalog);
catalog.open();
CatalogCalciteSchema.registerCatalog(rootSchema, catalogName, catalog);
}
@Override
public Catalog getCatalog(String catalogName) throws CatalogNotExistException {
if (!catalogs.keySet().contains(catalogName)) {
throw new CatalogNotExistException(catalogName);
}
return catalogs.get(catalogName);
}
@Override
public Set<String> getCatalogNames() {
return catalogs.keySet();
}
@Override
public Catalog getCurrentCatalog() {
return catalogs.get(currentCatalogName);
}
@Override
public void setCurrentCatalog(String catalogName) throws CatalogNotExistException {
checkArgument(!StringUtils.isNullOrWhitespaceOnly(catalogName), "catalogName cannot be null or empty");
if (!catalogs.keySet().contains(catalogName)) {
throw new CatalogNotExistException(catalogName);
}
if (!currentCatalogName.equals(catalogName)) {
currentCatalogName = catalogName;
LOG.info("Set default catalog as '{}' and default database as '{}'",
currentCatalogName, catalogs.get(currentCatalogName).getCurrentDatabase());
}
}
public SchemaPlus getRootSchema() {
return rootSchema;
}
}
......@@ -31,6 +31,7 @@ import org.apache.flink.streaming.api.transformations.StreamTransformation
import org.apache.flink.table.api.java.{BatchTableEnvironment => JavaBatchTableEnvironment, StreamTableEnvironment => JavaStreamTableEnv}
import org.apache.flink.table.api.scala.{BatchTableEnvironment => ScalaBatchTableEnvironment, StreamTableEnvironment => ScalaStreamTableEnv}
import org.apache.flink.table.calcite.{FlinkContextImpl, FlinkPlannerImpl, FlinkRelBuilder, FlinkTypeFactory, FlinkTypeSystem}
import org.apache.flink.table.catalog.{CatalogManager, FlinkCatalogManager, Catalog}
import org.apache.flink.table.codegen.ExpressionReducer
import org.apache.flink.table.dataformat.BaseRow
import org.apache.flink.table.functions.sql.FlinkSqlOperatorTable
......@@ -51,7 +52,6 @@ import org.apache.flink.table.sources.TableSource
import org.apache.flink.table.typeutils.BaseRowTypeInfo
import org.apache.flink.table.validate.FunctionCatalog
import org.apache.flink.types.Row
import org.apache.calcite.config.Lex
import org.apache.calcite.jdbc.CalciteSchema
import org.apache.calcite.plan.{RelOptPlanner, RelTrait, RelTraitDef}
......@@ -63,11 +63,12 @@ import org.apache.calcite.sql.parser.SqlParser
import org.apache.calcite.sql.util.{ChainedSqlOperatorTable, ListSqlOperatorTable}
import org.apache.calcite.sql2rel.SqlToRelConverter
import org.apache.calcite.tools._
import _root_.java.lang.reflect.Modifier
import _root_.java.util.concurrent.atomic.AtomicInteger
import _root_.java.util.{Arrays => JArrays}
import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException
import _root_.scala.annotation.varargs
import _root_.scala.collection.JavaConversions._
import _root_.scala.collection.JavaConverters._
......@@ -80,6 +81,9 @@ import _root_.scala.collection.mutable
*/
abstract class TableEnvironment(val config: TableConfig) {
// Note: The CatalogManager isn't hooked up to planner yet.
private val catalogManager: CatalogManager = new FlinkCatalogManager()
protected val DEFAULT_JOB_NAME = "Flink Exec Table Job"
// the catalog to hold all registered and translated tables
......@@ -317,6 +321,74 @@ abstract class TableEnvironment(val config: TableConfig) {
*/
private[flink] def optimize(root: RelNode): RelNode = optimize(Seq(root)).head
/**
* Register an [[Catalog]] under a unique name.
*
* @param name the name under which the catalog will be registered
* @param catalog the catalog to register
* @throws CatalogAlreadyExistsException thrown if the catalog already exists
*/
@throws[CatalogAlreadyExistsException]
def registerCatalog(name: String, catalog: Catalog): Unit = {
catalogManager.registerCatalog(name, catalog)
}
/**
* Get a registered [[Catalog]].
*
* @param catalogName name of the catalog to get
* @return the requested catalog
* @throws CatalogNotExistException thrown if the catalog doesn't exist
*/
@throws[CatalogNotExistException]
def getCatalog(catalogName: String): Catalog = {
catalogManager.getCatalog(catalogName)
}
/**
* Get the current catalog.
*
* @return the current catalog in CatalogManager
*/
def getCurrentCatalog(): Catalog = {
catalogManager.getCurrentCatalog
}
/**
* Get the current database name.
*
* @return the current database of the current catalog
*/
def getCurrentDatabaseName(): String = {
catalogManager.getCurrentCatalog.getCurrentDatabase
}
/**
* Set the current catalog.
*
* @param name name of the catalog to set as current catalog
* @throws CatalogNotExistException thrown if the catalog doesn't exist
*/
@throws[CatalogNotExistException]
def setCurrentCatalog(name: String): Unit = {
catalogManager.setCurrentCatalog(name)
}
/**
* Set the current catalog and current database.
*
* @param catalogName name of the catalog to set as current catalog
* @param databaseName name of the database to set as current database
* @throws CatalogNotExistException thrown if the catalog doesn't exist
* @throws DatabaseNotExistException thrown if the database doesn't exist
*/
@throws[CatalogNotExistException]
@throws[DatabaseNotExistException]
def setCurrentDatabase(catalogName: String, databaseName: String): Unit = {
catalogManager.setCurrentCatalog(catalogName)
catalogManager.getCurrentCatalog.setCurrentDatabase(databaseName)
}
/**
* Registers a [[Table]] under a unique name in the TableEnvironment's catalog.
* Registered tables can be referenced in SQL queries.
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.table.catalog;
import org.apache.flink.table.api.CatalogNotExistException;
import org.apache.calcite.schema.SchemaPlus;
import org.junit.Before;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
/**
* Tests for FlinkCatalogManager.
*/
public class FlinkCatalogManagerTest {
private static final String TEST_CATALOG_NAME = "test";
private static final Catalog TEST_CATALOG = new GenericInMemoryCatalog(TEST_CATALOG_NAME);
private FlinkCatalogManager manager;
private SchemaPlus rootSchema;
@Before
public void init() {
manager = new FlinkCatalogManager();
rootSchema = manager.getRootSchema();
}
@Test
public void testRegisterCatalog() throws Exception {
assertEquals(1, manager.getCatalogNames().size());
assertFalse(manager.getCatalogNames().contains(TEST_CATALOG_NAME));
assertFalse(rootSchema.getSubSchemaNames().contains(TEST_CATALOG_NAME));
manager.registerCatalog(TEST_CATALOG_NAME, TEST_CATALOG);
assertEquals(2, manager.getCatalogNames().size());
assertTrue(manager.getCatalogNames().contains(TEST_CATALOG_NAME));
assertTrue(rootSchema.getSubSchemaNames().contains(TEST_CATALOG_NAME));
}
@Test
public void testSetCurrentCatalog() throws Exception {
manager.registerCatalog(TEST_CATALOG_NAME, TEST_CATALOG);
assertEquals(manager.getCatalog(FlinkCatalogManager.BUILTIN_CATALOG_NAME), manager.getCurrentCatalog());
manager.setCurrentCatalog(TEST_CATALOG_NAME);
assertEquals(manager.getCatalog(TEST_CATALOG_NAME), manager.getCurrentCatalog());
}
@Test(expected = CatalogNotExistException.class)
public void testSetNonExistCurrentCatalog() throws Exception {
manager.setCurrentCatalog("nonexist");
}
}
......@@ -18,15 +18,18 @@
package org.apache.flink.table.api
import _root_.java.util.HashMap
import org.apache.calcite.plan.RelOptUtil
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.environment.LocalStreamEnvironment
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.scala._
import org.apache.flink.table.api.scala.StreamTableEnvironment
import org.junit.{Rule, Test}
import org.apache.flink.table.api.scala.{StreamTableEnvironment, _}
import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException
import org.apache.flink.table.catalog.{FlinkCatalogManager, GenericCatalogDatabase, GenericInMemoryCatalog}
import org.junit.Assert.assertEquals
import org.junit.rules.ExpectedException
import org.junit.{Rule, Test}
class TableEnvironmentTest {
......@@ -71,4 +74,21 @@ class TableEnvironmentTest {
" LogicalTableScan(table=[[MyTable]])\n"
assertEquals(expected, actual)
}
@Test
def testSetCurrentDatabase(): Unit = {
assertEquals(GenericInMemoryCatalog.DEFAULT_DB, tableEnv.getCurrentDatabaseName)
val testDb = "test"
tableEnv.getCurrentCatalog
.createDatabase(testDb, new GenericCatalogDatabase(new HashMap[String, String]), false)
tableEnv.setCurrentDatabase(FlinkCatalogManager.BUILTIN_CATALOG_NAME, testDb)
assertEquals(testDb, tableEnv.getCurrentDatabaseName)
}
@Test
def testSetNonExistentCurrentDatabase(): Unit = {
thrown.expect(classOf[DatabaseNotExistException])
thrown.expectMessage("Database nonexist does not exist in Catalog builtin")
tableEnv.setCurrentDatabase(FlinkCatalogManager.BUILTIN_CATALOG_NAME, "nonexist")
}
}
......@@ -69,13 +69,13 @@ trait CrudExternalCatalog extends ExternalCatalog {
* @param name The name of the sub catalog to add.
* @param catalog Description of the catalog to add.
* @param ignoreIfExists Flag to specify behavior if a sub catalog with the given name already
* exists: if set to false, it throws a CatalogAlreadyExistException,
* exists: if set to false, it throws a CatalogAlreadyExistsException,
* if set to true, nothing happens.
* @throws CatalogAlreadyExistException
* thrown if the sub catalog does already exist in the catalog
* and ignoreIfExists is false
* @throws CatalogAlreadyExistsException
* thrown if the sub catalog does already exist in the catalog
* and ignoreIfExists is false
*/
@throws[CatalogAlreadyExistException]
@throws[CatalogAlreadyExistsException]
def createSubCatalog(name: String, catalog: ExternalCatalog, ignoreIfExists: Boolean): Unit
/**
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册