提交 135a57c4 编写于 作者: J jingzhang 提交者: Fabian Hueske

[FLINK-5570] [table] Register ExternalCatalogs in TableEnvironment.

This closes #3409.
上级 976e03c1
......@@ -344,6 +344,43 @@ tableEnvironment.unregisterTable("Customers")
</div>
</div>
Registering external Catalogs
--------------------------------
An external catalog is defined by the `ExternalCatalog` interface and provides information about databases and tables such as their name, schema, statistics, and access information. An `ExternalCatalog` is registered in a `TableEnvironment` as follows:
<div class="codetabs" markdown="1">
<div data-lang="java" markdown="1">
{% highlight java %}
// works for StreamExecutionEnvironment identically
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
ExternalCatalog customerCatalog = new InMemoryExternalCatalog();
// register the ExternalCatalog customerCatalog
tableEnv.registerExternalCatalog("Customers", customerCatalog);
{% endhighlight %}
</div>
<div data-lang="scala" markdown="1">
{% highlight scala %}
// works for StreamExecutionEnvironment identically
val env = ExecutionEnvironment.getExecutionEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)
val customerCatalog: ExternalCatalog = new InMemoryExternalCatalog
// register the ExternalCatalog customerCatalog
tableEnv.registerExternalCatalog("Customers", customerCatalog)
{% endhighlight %}
</div>
</div>
Once registered in a `TableEnvironment`, all tables defined in a `ExternalCatalog` can be accessed from Table API or SQL queries by specifying their full path (`catalog`.`database`.`table`).
Currently, Flink provides an `InMemoryExternalCatalog` for demo and testing purposes. However, the `ExternalCatalog` interface can also be used to connect catalogs like HCatalog or Metastore to the Table API.
Table API
----------
......
......@@ -46,6 +46,7 @@ import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment => Scala
import org.apache.flink.table.api.java.{BatchTableEnvironment => JavaBatchTableEnv, StreamTableEnvironment => JavaStreamTableEnv}
import org.apache.flink.table.api.scala.{BatchTableEnvironment => ScalaBatchTableEnv, StreamTableEnvironment => ScalaStreamTableEnv}
import org.apache.flink.table.calcite.{FlinkPlannerImpl, FlinkRelBuilder, FlinkTypeFactory, FlinkTypeSystem}
import org.apache.flink.table.catalog.{ExternalCatalog, ExternalCatalogSchema}
import org.apache.flink.table.codegen.{CodeGenerator, ExpressionReducer}
import org.apache.flink.table.expressions.{Alias, Expression, UnresolvedFieldReference}
import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils.{checkForInstantiation, checkNotSingleton, createScalarSqlFunction, createTableSqlFunctions}
......@@ -60,6 +61,8 @@ import org.apache.flink.table.validate.FunctionCatalog
import org.apache.flink.types.Row
import _root_.scala.collection.JavaConverters._
import _root_.scala.collection.mutable.HashMap
import _root_.scala.annotation.varargs
/**
* The abstract base class for batch and stream TableEnvironments.
......@@ -71,7 +74,7 @@ abstract class TableEnvironment(val config: TableConfig) {
// the catalog to hold all registered and translated tables
// we disable caching here to prevent side effects
private val internalSchema: CalciteSchema = CalciteSchema.createRootSchema(true, false)
private val tables: SchemaPlus = internalSchema.plus()
private val rootSchema: SchemaPlus = internalSchema.plus()
// Table API/SQL function catalog
private val functionCatalog: FunctionCatalog = FunctionCatalog.withBuiltIns
......@@ -79,7 +82,7 @@ abstract class TableEnvironment(val config: TableConfig) {
// the configuration to create a Calcite planner
private lazy val frameworkConfig: FrameworkConfig = Frameworks
.newConfigBuilder
.defaultSchema(tables)
.defaultSchema(rootSchema)
.parserConfig(getSqlParserConfig)
.costFactory(new DataSetCostFactory)
.typeSystem(new FlinkTypeSystem)
......@@ -99,6 +102,9 @@ abstract class TableEnvironment(val config: TableConfig) {
// a counter for unique attribute names
private[flink] val attrNameCntr: AtomicInteger = new AtomicInteger(0)
// registered external catalog names -> catalog
private val externalCatalogs = new HashMap[String, ExternalCatalog]
/** Returns the table config to define the runtime behavior of the Table API. */
def getConfig = config
......@@ -245,6 +251,35 @@ abstract class TableEnvironment(val config: TableConfig) {
output
}
/**
* Registers an [[ExternalCatalog]] under a unique name in the TableEnvironment's schema.
* All tables registered in the [[ExternalCatalog]] can be accessed.
*
* @param name The name under which the externalCatalog will be registered
* @param externalCatalog The externalCatalog to register
*/
def registerExternalCatalog(name: String, externalCatalog: ExternalCatalog): Unit = {
if (rootSchema.getSubSchema(name) != null) {
throw new ExternalCatalogAlreadyExistException(name)
}
this.externalCatalogs.put(name, externalCatalog)
// create an external catalog calicte schema, register it on the root schema
ExternalCatalogSchema.registerCatalog(rootSchema, name, externalCatalog)
}
/**
* Gets a registered [[ExternalCatalog]] by name.
*
* @param name The name to look up the [[ExternalCatalog]]
* @return The [[ExternalCatalog]]
*/
def getRegisteredExternalCatalog(name: String): ExternalCatalog = {
this.externalCatalogs.get(name) match {
case Some(catalog) => catalog
case None => throw new ExternalCatalogNotExistException(name)
}
}
/**
* Registers a [[ScalarFunction]] under a unique name. Replaces already existing
* user-defined functions under this name.
......@@ -254,6 +289,7 @@ abstract class TableEnvironment(val config: TableConfig) {
checkForInstantiation(function.getClass)
// register in Table API
functionCatalog.registerFunction(name, function.getClass)
// register in SQL API
......@@ -341,7 +377,7 @@ abstract class TableEnvironment(val config: TableConfig) {
protected def replaceRegisteredTable(name: String, table: AbstractTable): Unit = {
if (isRegistered(name)) {
tables.add(name, table)
rootSchema.add(name, table)
} else {
throw new TableException(s"Table \'$name\' is not registered.")
}
......@@ -350,19 +386,55 @@ abstract class TableEnvironment(val config: TableConfig) {
/**
* Scans a registered table and returns the resulting [[Table]].
*
* The table to scan must be registered in the [[TableEnvironment]]'s catalog.
* A table to scan must be registered in the TableEnvironment. It can be either directly
* registered as DataStream, DataSet, or Table or as member of an [[ExternalCatalog]].
*
* Examples:
*
* @param tableName The name of the table to scan.
* @throws ValidationException if no table is registered under the given name.
* @return The scanned table.
* - Scanning a directly registered table
* {{{
* val tab: Table = tableEnv.scan("tableName")
* }}}
*
* - Scanning a table from a registered catalog
* {{{
* val tab: Table = tableEnv.scan("catalogName", "dbName", "tableName")
* }}}
*
* @param tablePath The path of the table to scan.
* @throws TableException if no table is found using the given table path.
* @return The resulting [[Table]].
*/
@throws[ValidationException]
def scan(tableName: String): Table = {
if (isRegistered(tableName)) {
new Table(this, CatalogNode(tableName, getRowType(tableName)))
} else {
throw new TableException(s"Table \'$tableName\' was not found in the registry.")
@throws[TableException]
@varargs
def scan(tablePath: String*): Table = {
scanInternal(tablePath.toArray)
}
@throws[TableException]
private def scanInternal(tablePath: Array[String]): Table = {
require(tablePath != null && !tablePath.isEmpty, "tablePath must not be null or empty.")
val schemaPaths = tablePath.slice(0, tablePath.length - 1)
val schema = getSchema(schemaPaths)
if (schema != null) {
val tableName = tablePath(tablePath.length - 1)
val table = schema.getTable(tableName)
if (table != null) {
return new Table(this, CatalogNode(tablePath, table.getRowType(typeFactory)))
}
}
throw new TableException(s"Table \'${tablePath.mkString(".")}\' was not found.")
}
private def getSchema(schemaPath: Array[String]): SchemaPlus = {
var schema = rootSchema
for (schemaName <- schemaPath) {
schema = schema.getSubSchema(schemaName)
if (schema == null) {
return schema
}
}
schema
}
/**
......@@ -416,7 +488,7 @@ abstract class TableEnvironment(val config: TableConfig) {
throw new TableException(s"Table \'$name\' already exists. " +
s"Please, choose a different name.")
} else {
tables.add(name, table)
rootSchema.add(name, table)
}
}
......@@ -434,11 +506,11 @@ abstract class TableEnvironment(val config: TableConfig) {
* @return true, if a table is registered under the name, false otherwise.
*/
protected def isRegistered(name: String): Boolean = {
tables.getTableNames.contains(name)
rootSchema.getTableNames.contains(name)
}
protected def getRowType(name: String): RelDataType = {
tables.getTable(name).getRowType(typeFactory)
rootSchema.getTable(name).getRowType(typeFactory)
}
/** Returns a unique temporary attribute name. */
......
......@@ -75,34 +75,34 @@ object ValidationException {
case class UnresolvedException(msg: String) extends RuntimeException(msg)
/**
* Exception for operation on a nonexistent table
* Exception for an operation on a nonexistent table
*
* @param db database name
* @param table table name
* @param cause
* @param cause the cause
*/
case class TableNotExistException(
db: String,
table: String,
cause: Throwable)
extends RuntimeException(s"table $db.$table does not exist!", cause) {
extends RuntimeException(s"Table $db.$table does not exist.", cause) {
def this(db: String, table: String) = this(db, table, null)
}
/**
* Exception for adding an already existed table
* Exception for adding an already existent table
*
* @param db database name
* @param table table name
* @param cause
* @param cause the cause
*/
case class TableAlreadyExistException(
db: String,
table: String,
cause: Throwable)
extends RuntimeException(s"table $db.$table already exists!", cause) {
extends RuntimeException(s"Table $db.$table already exists.", cause) {
def this(db: String, table: String) = this(db, table, null)
......@@ -112,56 +112,84 @@ case class TableAlreadyExistException(
* Exception for operation on a nonexistent database
*
* @param db database name
* @param cause
* @param cause the cause
*/
case class DatabaseNotExistException(
db: String,
cause: Throwable)
extends RuntimeException(s"database $db does not exist!", cause) {
extends RuntimeException(s"Database $db does not exist.", cause) {
def this(db: String) = this(db, null)
}
/**
* Exception for adding an already existed database
* Exception for adding an already existent database
*
* @param db database name
* @param cause
* @param cause the cause
*/
case class DatabaseAlreadyExistException(
db: String,
cause: Throwable)
extends RuntimeException(s"database $db already exists!", cause) {
extends RuntimeException(s"Database $db already exists.", cause) {
def this(db: String) = this(db, null)
}
/**
* Exception for does not find any matched [[TableSourceConverter]] for a specified table type
* Exception for not finding a [[TableSourceConverter]] for a given table type.
*
* @param tableType table type
* @param cause
* @param cause the cause
*/
case class NoMatchedTableSourceConverterException(
tableType: String,
cause: Throwable)
extends RuntimeException(s"find no table source converter matched table type $tableType!",
extends RuntimeException(s"Could not find a TableSourceConverter for table type $tableType.",
cause) {
def this(tableType: String) = this(tableType, null)
}
/**
* Exception for find more than one matched [[TableSourceConverter]] for a specified table type
* Exception for finding more than one [[TableSourceConverter]] for a given table type.
*
* @param tableType table type
* @param cause
* @param cause the cause
*/
case class AmbiguousTableSourceConverterException(
tableType: String,
cause: Throwable)
extends RuntimeException(s"more than one table source converter matched table type $tableType!",
extends RuntimeException(s"More than one TableSourceConverter for table type $tableType.",
cause) {
def this(tableType: String) = this(tableType, null)
}
/**
* Exception for operation on a nonexistent external catalog
*
* @param catalogName external catalog name
* @param cause the cause
*/
case class ExternalCatalogNotExistException(
catalogName: String,
cause: Throwable)
extends RuntimeException(s"External catalog $catalogName does not exist.", cause) {
def this(catalogName: String) = this(catalogName, null)
}
/**
* Exception for adding an already existent external catalog
*
* @param catalogName external catalog name
* @param cause the cause
*/
case class ExternalCatalogAlreadyExistException(
catalogName: String,
cause: Throwable)
extends RuntimeException(s"External catalog $catalogName already exists.", cause) {
def this(catalogName: String) = this(catalogName, null)
}
......@@ -136,20 +136,18 @@ class ExternalCatalogSchema(
object ExternalCatalogSchema {
/**
* Creates a FlinkExternalCatalogSchema.
* Registers an external catalog in a Calcite schema.
*
* @param parentSchema Parent schema
* @param externalCatalogIdentifier External catalog identifier
* @param externalCatalog External catalog object
* @return Created schema
* @param parentSchema Parent schema into which the catalog is registered
* @param externalCatalogIdentifier Identifier of the external catalog
* @param externalCatalog The external catalog to register
*/
def create(
def registerCatalog(
parentSchema: SchemaPlus,
externalCatalogIdentifier: String,
externalCatalog: ExternalCatalog): ExternalCatalogSchema = {
externalCatalog: ExternalCatalog): Unit = {
val newSchema = new ExternalCatalogSchema(externalCatalogIdentifier, externalCatalog)
val schemaPlusOfNewSchema = parentSchema.add(externalCatalogIdentifier, newSchema)
newSchema.registerSubSchemas(schemaPlusOfNewSchema)
newSchema
}
}
......@@ -511,7 +511,7 @@ case class Join(
}
case class CatalogNode(
tableName: String,
tablePath: Array[String],
rowType: RelDataType) extends LeafNode {
val output: Seq[Attribute] = rowType.getFieldList.asScala.map { field =>
......@@ -519,7 +519,7 @@ case class CatalogNode(
}
override protected[logical] def construct(relBuilder: RelBuilder): RelBuilder = {
relBuilder.scan(tableName)
relBuilder.scan(tablePath.toIterable.asJava)
}
override def validate(tableEnv: TableEnvironment): LogicalNode = this
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.table
import org.apache.flink.table.utils.{CommonTestData, TableTestBase}
import org.apache.flink.table.api.scala._
import org.apache.flink.table.utils.TableTestUtil._
import org.junit.Test
/**
* Test for external catalog query plan.
*/
class ExternalCatalogTest extends TableTestBase {
private val table1Path: Array[String] = Array("test", "db1", "tb1")
private val table1ProjectedFields: Array[String] = Array("a", "b", "c")
private val table2Path: Array[String] = Array("test", "db2", "tb2")
private val table2ProjectedFields: Array[String] = Array("d", "e", "g")
@Test
def testBatchTableApi(): Unit = {
val util = batchTestUtil()
val tEnv = util.tEnv
tEnv.registerExternalCatalog("test", CommonTestData.getInMemoryTestCatalog)
val table1 = tEnv.scan("test", "db1", "tb1")
val table2 = tEnv.scan("test", "db2", "tb2")
val result = table2
.select('d * 2, 'e, 'g.upperCase())
.unionAll(table1.select('a * 2, 'b, 'c.upperCase()))
val expected = binaryNode(
"DataSetUnion",
unaryNode(
"DataSetCalc",
sourceBatchTableNode(table2Path, table2ProjectedFields),
term("select", "*(d, 2) AS _c0", "e", "UPPER(g) AS _c2")
),
unaryNode(
"DataSetCalc",
sourceBatchTableNode(table1Path, table1ProjectedFields),
term("select", "*(a, 2) AS _c0", "b", "UPPER(c) AS _c2")
),
term("union", "_c0", "e", "_c2")
)
util.verifyTable(result, expected)
}
@Test
def testBatchSQL(): Unit = {
val util = batchTestUtil()
util.tEnv.registerExternalCatalog("test", CommonTestData.getInMemoryTestCatalog)
val sqlQuery = "SELECT d * 2, e, g FROM test.db2.tb2 WHERE d < 3 UNION ALL " +
"(SELECT a * 2, b, c FROM test.db1.tb1)"
val expected = binaryNode(
"DataSetUnion",
unaryNode(
"DataSetCalc",
sourceBatchTableNode(table2Path, table2ProjectedFields),
term("select", "*(d, 2) AS EXPR$0", "e", "g"),
term("where", "<(d, 3)")),
unaryNode(
"DataSetCalc",
sourceBatchTableNode(table1Path, table1ProjectedFields),
term("select", "*(a, 2) AS EXPR$0", "b", "c")
),
term("union", "EXPR$0", "e", "g"))
util.verifySql(sqlQuery, expected)
}
@Test
def testStreamTableApi(): Unit = {
val util = streamTestUtil()
val tEnv = util.tEnv
util.tEnv.registerExternalCatalog("test", CommonTestData.getInMemoryTestCatalog)
val table1 = tEnv.scan("test", "db1", "tb1")
val table2 = tEnv.scan("test", "db2", "tb2")
val result = table2.where("d < 3")
.select('d * 2, 'e, 'g.upperCase())
.unionAll(table1.select('a * 2, 'b, 'c.upperCase()))
val expected = binaryNode(
"DataStreamUnion",
unaryNode(
"DataStreamCalc",
sourceStreamTableNode(table2Path, table2ProjectedFields),
term("select", "*(d, 2) AS _c0", "e", "UPPER(g) AS _c2"),
term("where", "<(d, 3)")
),
unaryNode(
"DataStreamCalc",
sourceStreamTableNode(table1Path, table1ProjectedFields),
term("select", "*(a, 2) AS _c0", "b", "UPPER(c) AS _c2")
),
term("union", "_c0", "e", "_c2")
)
util.verifyTable(result, expected)
}
@Test
def testStreamSQL(): Unit = {
val util = streamTestUtil()
util.tEnv.registerExternalCatalog("test", CommonTestData.getInMemoryTestCatalog)
val sqlQuery = "SELECT d * 2, e, g FROM test.db2.tb2 WHERE d < 3 UNION ALL " +
"(SELECT a * 2, b, c FROM test.db1.tb1)"
val expected = binaryNode(
"DataStreamUnion",
unaryNode(
"DataStreamCalc",
sourceStreamTableNode(table2Path, table2ProjectedFields),
term("select", "*(d, 2) AS EXPR$0", "e", "g"),
term("where", "<(d, 3)")),
unaryNode(
"DataStreamCalc",
sourceStreamTableNode(table1Path, table1ProjectedFields),
term("select", "*(a, 2) AS EXPR$0", "b", "c")
),
term("union", "EXPR$0", "e", "g"))
util.verifySql(sqlQuery, expected)
}
def sourceBatchTableNode(sourceTablePath: Array[String], fields: Array[String]): String = {
s"BatchTableSourceScan(table=[[${sourceTablePath.mkString(", ")}]], " +
s"fields=[${fields.mkString(", ")}])"
}
def sourceStreamTableNode(sourceTablePath: Array[String], fields: Array[String]): String = {
s"StreamTableSourceScan(table=[[${sourceTablePath.mkString(", ")}]], " +
s"fields=[${fields.mkString(", ")}])"
}
}
......@@ -37,7 +37,7 @@ import scala.collection.JavaConverters._
class ExternalCatalogSchemaTest {
private val schemaName: String = "test"
private var externalCatalogSchema: ExternalCatalogSchema = _
private var externalCatalogSchema: SchemaPlus = _
private var calciteCatalogReader: CalciteCatalogReader = _
private val db = "db1"
private val tb = "tb1"
......@@ -46,7 +46,8 @@ class ExternalCatalogSchemaTest {
def setUp(): Unit = {
val rootSchemaPlus: SchemaPlus = CalciteSchema.createRootSchema(true, false).plus()
val catalog = CommonTestData.getInMemoryTestCatalog
externalCatalogSchema = ExternalCatalogSchema.create(rootSchemaPlus, schemaName, catalog)
ExternalCatalogSchema.registerCatalog(rootSchemaPlus, schemaName, catalog)
externalCatalogSchema = rootSchemaPlus.getSubSchema("schemaName")
val typeFactory = new FlinkTypeFactory(new FlinkTypeSystem())
calciteCatalogReader = new CalciteCatalogReader(
CalciteSchema.from(rootSchemaPlus),
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册