提交 c16121f9 编写于 作者: T Timo Walther

[FLINK-15612][table] Integrate data type factory into catalog manager

上级 4df48404
......@@ -451,16 +451,24 @@ public class ExecutionContext<ClusterID> {
final TableConfig config = new TableConfig();
environment.getConfiguration().asMap().forEach((k, v) ->
config.getConfiguration().setString(k, v));
// Step 1.1 Initialize the CatalogManager if required.
final CatalogManager catalogManager = new CatalogManager(
// Step 1.1 Initialize the ModuleManager if required.
final ModuleManager moduleManager = new ModuleManager();
// Step 1.2 Initialize the CatalogManager if required.
final CatalogManager catalogManager = CatalogManager.newBuilder()
.classLoader(classLoader)
.config(config.getConfiguration())
.defaultCatalog(
settings.getBuiltInCatalogName(),
new GenericInMemoryCatalog(
settings.getBuiltInCatalogName(),
settings.getBuiltInDatabaseName()));
// Step 1.2 Initialize the ModuleManager if required.
final ModuleManager moduleManager = new ModuleManager();
settings.getBuiltInCatalogName(),
settings.getBuiltInDatabaseName()))
.build();
// Step 1.3 Initialize the FunctionCatalog if required.
final FunctionCatalog functionCatalog = new FunctionCatalog(config, catalogManager, moduleManager);
// Step 1.4 Set up session state.
this.sessionState = SessionState.of(config, catalogManager, moduleManager, functionCatalog);
......
......@@ -389,14 +389,27 @@ public interface BatchTableEnvironment extends TableEnvironment {
*/
static BatchTableEnvironment create(ExecutionEnvironment executionEnvironment, TableConfig tableConfig) {
try {
Class<?> clazz = Class.forName("org.apache.flink.table.api.java.internal.BatchTableEnvironmentImpl");
Constructor con = clazz.getConstructor(ExecutionEnvironment.class, TableConfig.class, CatalogManager.class, ModuleManager.class);
String defaultCatalog = "default_catalog";
CatalogManager catalogManager = new CatalogManager(
defaultCatalog,
new GenericInMemoryCatalog(defaultCatalog, "default_database")
);
// temporary solution until FLINK-15635 is fixed
ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
ModuleManager moduleManager = new ModuleManager();
String defaultCatalog = "default_catalog";
CatalogManager catalogManager = CatalogManager.newBuilder()
.classLoader(classLoader)
.config(tableConfig.getConfiguration())
.defaultCatalog(
defaultCatalog,
new GenericInMemoryCatalog(defaultCatalog, "default_database"))
.executionConfig(executionEnvironment.getConfig())
.build();
Class<?> clazz = Class.forName("org.apache.flink.table.api.java.internal.BatchTableEnvironmentImpl");
Constructor<?> con = clazz.getConstructor(
ExecutionEnvironment.class,
TableConfig.class,
CatalogManager.class,
ModuleManager.class);
return (BatchTableEnvironment) con.newInstance(executionEnvironment, tableConfig, catalogManager, moduleManager);
} catch (Throwable t) {
throw new TableException("Create BatchTableEnvironment failed.", t);
......
......@@ -104,12 +104,22 @@ public final class StreamTableEnvironmentImpl extends TableEnvironmentImpl imple
"StreamTableEnvironment can not run in batch mode for now, please use TableEnvironment.");
}
CatalogManager catalogManager = new CatalogManager(
settings.getBuiltInCatalogName(),
new GenericInMemoryCatalog(settings.getBuiltInCatalogName(), settings.getBuiltInDatabaseName()));
// temporary solution until FLINK-15635 is fixed
ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
ModuleManager moduleManager = new ModuleManager();
CatalogManager catalogManager = CatalogManager.newBuilder()
.classLoader(classLoader)
.config(tableConfig.getConfiguration())
.defaultCatalog(
settings.getBuiltInCatalogName(),
new GenericInMemoryCatalog(
settings.getBuiltInCatalogName(),
settings.getBuiltInDatabaseName()))
.executionConfig(executionEnvironment.getConfig())
.build();
FunctionCatalog functionCatalog = new FunctionCatalog(tableConfig, catalogManager, moduleManager);
Map<String, String> executorProperties = settings.toExecutorProperties();
......
......@@ -26,9 +26,9 @@ import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.catalog.CatalogManager;
import org.apache.flink.table.catalog.FunctionCatalog;
import org.apache.flink.table.catalog.GenericInMemoryCatalog;
import org.apache.flink.table.module.ModuleManager;
import org.apache.flink.table.operations.ModifyOperation;
import org.apache.flink.table.utils.CatalogManagerMocks;
import org.apache.flink.table.utils.ExecutorMock;
import org.apache.flink.table.utils.PlannerMock;
import org.apache.flink.types.Row;
......@@ -91,7 +91,7 @@ public class StreamTableEnvironmentImplTest {
StreamExecutionEnvironment env,
DataStreamSource<Integer> elements) {
TableConfig config = new TableConfig();
CatalogManager catalogManager = new CatalogManager("cat", new GenericInMemoryCatalog("cat", "db"));
CatalogManager catalogManager = CatalogManagerMocks.createEmptyCatalogManager();
ModuleManager moduleManager = new ModuleManager();
return new StreamTableEnvironmentImpl(
catalogManager,
......
......@@ -190,13 +190,23 @@ public class TableEnvironmentImpl implements TableEnvironment {
public static TableEnvironmentImpl create(EnvironmentSettings settings) {
TableConfig tableConfig = new TableConfig();
// temporary solution until FLINK-15635 is fixed
ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
CatalogManager catalogManager = new CatalogManager(
settings.getBuiltInCatalogName(),
new GenericInMemoryCatalog(settings.getBuiltInCatalogName(), settings.getBuiltInDatabaseName()));
TableConfig tableConfig = new TableConfig();
ModuleManager moduleManager = new ModuleManager();
CatalogManager catalogManager = CatalogManager.newBuilder()
.classLoader(classLoader)
.config(tableConfig.getConfiguration())
.defaultCatalog(
settings.getBuiltInCatalogName(),
new GenericInMemoryCatalog(
settings.getBuiltInCatalogName(),
settings.getBuiltInDatabaseName()))
.build();
FunctionCatalog functionCatalog = new FunctionCatalog(tableConfig, catalogManager, moduleManager);
Map<String, String> executorProperties = settings.toExecutorProperties();
......@@ -205,7 +215,12 @@ public class TableEnvironmentImpl implements TableEnvironment {
Map<String, String> plannerProperties = settings.toPlannerProperties();
Planner planner = ComponentFactoryService.find(PlannerFactory.class, plannerProperties)
.create(plannerProperties, executor, tableConfig, functionCatalog, catalogManager);
.create(
plannerProperties,
executor,
tableConfig,
functionCatalog,
catalogManager);
return new TableEnvironmentImpl(
catalogManager,
......
......@@ -19,6 +19,8 @@
package org.apache.flink.table.catalog;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.api.CatalogNotExistException;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.ValidationException;
......@@ -31,6 +33,8 @@ import org.apache.flink.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
......@@ -46,11 +50,11 @@ import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
/**
* A CatalogManager that encapsulates all available catalogs. It also implements the logic of
* table path resolution.
* A manager for dealing with catalog objects such as tables, views, functions, and types. It encapsulates
* all available catalogs and stores temporary objects.
*/
@Internal
public class CatalogManager {
public final class CatalogManager {
private static final Logger LOG = LoggerFactory.getLogger(CatalogManager.class);
// A map between names and catalogs.
......@@ -68,19 +72,84 @@ public class CatalogManager {
// The name of the built-in catalog
private final String builtInCatalogName;
public CatalogManager(String defaultCatalogName, Catalog defaultCatalog) {
private final DataTypeFactory typeFactory;
private CatalogManager(
String defaultCatalogName,
Catalog defaultCatalog,
DataTypeFactory typeFactory) {
checkArgument(
!StringUtils.isNullOrWhitespaceOnly(defaultCatalogName),
"Default catalog name cannot be null or empty");
checkNotNull(defaultCatalog, "Default catalog cannot be null");
catalogs = new LinkedHashMap<>();
catalogs.put(defaultCatalogName, defaultCatalog);
this.currentCatalogName = defaultCatalogName;
this.currentDatabaseName = defaultCatalog.getDefaultDatabase();
currentCatalogName = defaultCatalogName;
currentDatabaseName = defaultCatalog.getDefaultDatabase();
this.temporaryTables = new HashMap<>();
temporaryTables = new HashMap<>();
// right now the default catalog is always the built-in one
this.builtInCatalogName = defaultCatalogName;
builtInCatalogName = defaultCatalogName;
this.typeFactory = typeFactory;
}
public static Builder newBuilder() {
return new Builder();
}
/**
* Builder for a fluent definition of a {@link CatalogManager}.
*/
public static final class Builder {
private @Nullable ClassLoader classLoader;
private @Nullable ReadableConfig config;
private @Nullable String defaultCatalogName;
private @Nullable Catalog defaultCatalog;
private @Nullable ExecutionConfig executionConfig;
public Builder classLoader(ClassLoader classLoader) {
this.classLoader = classLoader;
return this;
}
public Builder config(ReadableConfig config) {
this.config = config;
return this;
}
public Builder defaultCatalog(String defaultCatalogName, Catalog defaultCatalog) {
this.defaultCatalogName = defaultCatalogName;
this.defaultCatalog = defaultCatalog;
return this;
}
public Builder executionConfig(ExecutionConfig executionConfig) {
this.executionConfig = executionConfig;
return this;
}
public CatalogManager build() {
checkNotNull(classLoader, "Class loader cannot be null");
checkNotNull(config, "Config cannot be null");
return new CatalogManager(
defaultCatalogName,
defaultCatalog,
new DataTypeFactoryImpl(classLoader, config, executionConfig));
}
}
/**
* Returns a factory for creating fully resolved data types that can be used for planning.
*/
public DataTypeFactory getDataTypeFactory() {
return typeFactory;
}
/**
......
......@@ -27,6 +27,7 @@ import org.apache.flink.table.functions.ScalarFunction;
import org.apache.flink.table.functions.ScalarFunctionDefinition;
import org.apache.flink.table.module.Module;
import org.apache.flink.table.module.ModuleManager;
import org.apache.flink.table.utils.CatalogManagerMocks;
import org.junit.Before;
import org.junit.Test;
......@@ -63,7 +64,10 @@ public class FunctionCatalogTest {
moduleManager = new ModuleManager();
functionCatalog = new FunctionCatalog(
TableConfig.getDefault(),
new CatalogManager(testCatalogName, catalog), moduleManager);
CatalogManagerMocks.preparedCatalogManager()
.defaultCatalog(testCatalogName, catalog)
.build(),
moduleManager);
}
@Test
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.table.utils;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.catalog.CatalogManager;
import org.apache.flink.table.catalog.GenericInMemoryCatalog;
/**
* Mock implementations of {@link CatalogManager} for testing purposes.
*/
public final class CatalogManagerMocks {
public static final String DEFAULT_CATALOG = EnvironmentSettings.DEFAULT_BUILTIN_CATALOG;
public static final String DEFAULT_DATABASE = EnvironmentSettings.DEFAULT_BUILTIN_DATABASE;
public static CatalogManager createEmptyCatalogManager() {
return preparedCatalogManager().build();
}
public static CatalogManager.Builder preparedCatalogManager() {
return CatalogManager.newBuilder()
.classLoader(CatalogManagerMocks.class.getClassLoader())
.config(new Configuration())
.defaultCatalog(DEFAULT_CATALOG,
new GenericInMemoryCatalog(DEFAULT_CATALOG, DEFAULT_DATABASE))
.executionConfig(new ExecutionConfig());
}
private CatalogManagerMocks() {
// no instantiation
}
}
......@@ -18,13 +18,11 @@
package org.apache.flink.table.utils;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.internal.TableEnvironmentImpl;
import org.apache.flink.table.catalog.CatalogManager;
import org.apache.flink.table.catalog.FunctionCatalog;
import org.apache.flink.table.catalog.GenericInMemoryCatalog;
import org.apache.flink.table.module.ModuleManager;
/**
......@@ -66,7 +64,7 @@ public class TableEnvironmentMock extends TableEnvironmentImpl {
private static TableEnvironmentMock getInstance(boolean isStreamingMode) {
final TableConfig config = createTableConfig();
final CatalogManager catalogManager = createCatalogManager();
final CatalogManager catalogManager = CatalogManagerMocks.createEmptyCatalogManager();
final ModuleManager moduleManager = new ModuleManager();
return new TableEnvironmentMock(
catalogManager,
......@@ -78,14 +76,6 @@ public class TableEnvironmentMock extends TableEnvironmentImpl {
isStreamingMode);
}
private static CatalogManager createCatalogManager() {
return new CatalogManager(
EnvironmentSettings.DEFAULT_BUILTIN_CATALOG,
new GenericInMemoryCatalog(
EnvironmentSettings.DEFAULT_BUILTIN_CATALOG,
EnvironmentSettings.DEFAULT_BUILTIN_DATABASE));
}
private static TableConfig createTableConfig() {
return TableConfig.getDefault();
}
......
......@@ -349,23 +349,30 @@ object BatchTableEnvironment {
def create(executionEnvironment: ExecutionEnvironment, tableConfig: TableConfig)
: BatchTableEnvironment = {
try {
// temporary solution until FLINK-15635 is fixed
val classLoader = Thread.currentThread.getContextClassLoader
val moduleManager = new ModuleManager
val defaultCatalog = "default_catalog"
val catalogManager = CatalogManager.newBuilder
.classLoader(classLoader)
.config(tableConfig.getConfiguration)
.defaultCatalog(
defaultCatalog,
new GenericInMemoryCatalog(defaultCatalog, "default_database"))
.executionConfig(executionEnvironment.getConfig)
.build
val clazz = Class
.forName("org.apache.flink.table.api.scala.internal.BatchTableEnvironmentImpl")
val const = clazz
val con = clazz
.getConstructor(
classOf[ExecutionEnvironment],
classOf[TableConfig],
classOf[CatalogManager],
classOf[ModuleManager])
val builtInCatalog = "default_catalog"
val catalogManager = new CatalogManager(
"default_catalog",
new GenericInMemoryCatalog(
builtInCatalog,
"default_database")
)
val moduleManager = new ModuleManager
const.newInstance(executionEnvironment, tableConfig, catalogManager, moduleManager)
con.newInstance(executionEnvironment, tableConfig, catalogManager, moduleManager)
.asInstanceOf[BatchTableEnvironment]
} catch {
case t: Throwable => throw new TableException("Create BatchTableEnvironment failed.", t)
......
......@@ -40,7 +40,6 @@ import org.apache.flink.table.operations.{OutputConversionModifyOperation, Query
import org.apache.flink.table.sources.{TableSource, TableSourceValidation}
import org.apache.flink.table.types.utils.TypeConversions
import org.apache.flink.table.typeutils.FieldInfoUtils
import java.util
import java.util.{Collections, List => JList, Map => JMap}
......@@ -70,11 +69,6 @@ class StreamTableEnvironmentImpl (
isStreaming)
with org.apache.flink.table.api.scala.StreamTableEnvironment {
if (!isStreaming) {
throw new TableException(
"StreamTableEnvironment is not supported on batch mode now, please use TableEnvironment.")
}
override def fromDataStream[T](dataStream: DataStream[T]): Table = {
val queryOperation = asQueryOperation(dataStream, None)
createTable(queryOperation)
......@@ -310,11 +304,27 @@ object StreamTableEnvironmentImpl {
tableConfig: TableConfig)
: StreamTableEnvironmentImpl = {
val catalogManager = new CatalogManager(
settings.getBuiltInCatalogName,
new GenericInMemoryCatalog(settings.getBuiltInCatalogName, settings.getBuiltInDatabaseName))
if (!settings.isStreamingMode) {
throw new TableException(
"StreamTableEnvironment can not run in batch mode for now, please use TableEnvironment.")
}
// temporary solution until FLINK-15635 is fixed
val classLoader = Thread.currentThread.getContextClassLoader
val moduleManager = new ModuleManager
val catalogManager = CatalogManager.newBuilder
.classLoader(classLoader)
.config(tableConfig.getConfiguration)
.defaultCatalog(
settings.getBuiltInCatalogName,
new GenericInMemoryCatalog(
settings.getBuiltInCatalogName,
settings.getBuiltInDatabaseName))
.executionConfig(executionEnvironment.getConfig)
.build
val functionCatalog = new FunctionCatalog(tableConfig, catalogManager, moduleManager)
val executorProperties = settings.toExecutorProperties
......
......@@ -18,22 +18,21 @@
package org.apache.flink.table.api.scala.internal
import java.util.{Collections, List => JList}
import org.apache.flink.api.common.time.Time
import org.apache.flink.api.dag.Transformation
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _}
import org.apache.flink.table.api.TableConfig
import org.apache.flink.table.catalog.{CatalogManager, FunctionCatalog, GenericInMemoryCatalog}
import org.apache.flink.table.catalog.FunctionCatalog
import org.apache.flink.table.module.ModuleManager
import org.apache.flink.table.operations.ModifyOperation
import org.apache.flink.table.utils.{ExecutorMock, PlannerMock}
import org.apache.flink.table.utils.{CatalogManagerMocks, ExecutorMock, PlannerMock}
import org.apache.flink.types.Row
import org.hamcrest.CoreMatchers.equalTo
import org.junit.Assert.assertThat
import org.junit.Test
import java.util.{Collections, List => JList}
/**
* Tests for [[StreamTableEnvironmentImpl]].
*/
......@@ -82,9 +81,7 @@ class StreamTableEnvironmentImplTest {
env: StreamExecutionEnvironment,
elements: DataStream[Int]) = {
val config = new TableConfig
val catalogManager = new CatalogManager(
"cat",
new GenericInMemoryCatalog("cat", "db"))
val catalogManager = CatalogManagerMocks.createEmptyCatalogManager()
val moduleManager = new ModuleManager
new StreamTableEnvironmentImpl(
catalogManager,
......
......@@ -210,6 +210,14 @@ under the License.
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-runtime-blink_${scala.binary.version}</artifactId>
......
......@@ -22,12 +22,12 @@ import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.catalog.CatalogManager;
import org.apache.flink.table.catalog.FunctionCatalog;
import org.apache.flink.table.catalog.GenericInMemoryCatalog;
import org.apache.flink.table.expressions.ValueLiteralExpression;
import org.apache.flink.table.module.ModuleManager;
import org.apache.flink.table.planner.delegation.PlannerContext;
import org.apache.flink.table.planner.plan.metadata.MetadataTestUtil;
import org.apache.flink.table.planner.plan.trait.FlinkRelDistributionTraitDef;
import org.apache.flink.table.utils.CatalogManagerMocks;
import org.apache.calcite.jdbc.CalciteSchema;
import org.apache.calcite.plan.ConventionTraitDef;
......@@ -46,9 +46,7 @@ import java.util.Arrays;
public class ExpressionConverterTest {
private final TableConfig tableConfig = new TableConfig();
private final CatalogManager catalogManager = new CatalogManager(
"default_catalog",
new GenericInMemoryCatalog("default_catalog", "default_database"));
private final CatalogManager catalogManager = CatalogManagerMocks.createEmptyCatalogManager();
private final PlannerContext plannerContext = new PlannerContext(
tableConfig,
new FunctionCatalog(tableConfig, catalogManager, new ModuleManager()),
......@@ -61,7 +59,9 @@ public class ExpressionConverterTest {
)
);
private final ExpressionConverter converter = new ExpressionConverter(
plannerContext.createRelBuilder("default_catalog", "default_database"));
plannerContext.createRelBuilder(
CatalogManagerMocks.DEFAULT_CATALOG,
CatalogManagerMocks.DEFAULT_DATABASE));
@Test
public void testLiteral() {
......
......@@ -59,6 +59,7 @@ import org.apache.flink.table.planner.expressions.utils.Func1$;
import org.apache.flink.table.planner.expressions.utils.Func8$;
import org.apache.flink.table.planner.runtime.utils.JavaUserDefinedScalarFunctions;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.utils.CatalogManagerMocks;
import org.apache.calcite.sql.SqlNode;
import org.junit.After;
......@@ -86,8 +87,9 @@ public class SqlToOperationConverterTest {
private final TableConfig tableConfig = new TableConfig();
private final Catalog catalog = new GenericInMemoryCatalog("MockCatalog",
"default");
private final CatalogManager catalogManager =
new CatalogManager("builtin", catalog);
private final CatalogManager catalogManager = CatalogManagerMocks.preparedCatalogManager()
.defaultCatalog("builtin", catalog)
.build();
private final ModuleManager moduleManager = new ModuleManager();
private final FunctionCatalog functionCatalog = new FunctionCatalog(
tableConfig,
......
......@@ -18,10 +18,15 @@
package org.apache.flink.table.planner.codegen
import java.lang.{Integer => JInt, Long => JLong}
import java.util.Collections
import org.apache.calcite.jdbc.CalciteSchemaBuilder.asRootSchema
import org.apache.calcite.plan.ConventionTraitDef
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.util.MockStreamingRuntimeContext
import org.apache.flink.table.api.TableConfig
import org.apache.flink.table.catalog.{CatalogManager, FunctionCatalog, GenericInMemoryCatalog, ObjectIdentifier}
import org.apache.flink.table.catalog.{CatalogManager, FunctionCatalog, ObjectIdentifier}
import org.apache.flink.table.dataformat.{GenericRow, SqlTimestamp}
import org.apache.flink.table.module.ModuleManager
import org.apache.flink.table.planner.calcite.{FlinkPlannerImpl, FlinkTypeFactory}
......@@ -30,15 +35,10 @@ import org.apache.flink.table.planner.delegation.PlannerContext
import org.apache.flink.table.planner.runtime.utils.JavaUserDefinedScalarFunctions.JavaFunc5
import org.apache.flink.table.runtime.generated.WatermarkGenerator
import org.apache.flink.table.types.logical.{IntType, TimestampType}
import org.apache.calcite.jdbc.CalciteSchemaBuilder.asRootSchema
import org.apache.calcite.plan.ConventionTraitDef
import org.apache.flink.table.utils.CatalogManagerMocks
import org.junit.Assert.{assertEquals, assertTrue}
import org.junit.Test
import java.lang.{Integer => JInt, Long => JLong}
import java.util.Collections
/**
* Tests the generated [[WatermarkGenerator]] from [[WatermarkGeneratorCodeGenerator]].
*/
......@@ -46,8 +46,7 @@ class WatermarkGeneratorCodeGenTest {
// mock FlinkPlannerImpl to avoid discovering TableEnvironment and Executor.
val config = new TableConfig
val catalog = new GenericInMemoryCatalog("MockCatalog", "default")
val catalogManager = new CatalogManager("builtin", catalog)
val catalogManager: CatalogManager = CatalogManagerMocks.createEmptyCatalogManager()
val functionCatalog = new FunctionCatalog(config, catalogManager, new ModuleManager)
val plannerContext = new PlannerContext(
config,
......@@ -101,7 +100,10 @@ class WatermarkGeneratorCodeGenTest {
JavaFunc5.openCalled = false
JavaFunc5.closeCalled = false
functionCatalog.registerTempCatalogScalarFunction(
ObjectIdentifier.of("builtin", "default", "myFunc"),
ObjectIdentifier.of(
CatalogManagerMocks.DEFAULT_CATALOG,
CatalogManagerMocks.DEFAULT_DATABASE,
"myFunc"),
new JavaFunc5
)
val generator = generateWatermarkGenerator("myFunc(ts, `offset`)")
......
......@@ -48,6 +48,7 @@ import org.powermock.modules.junit4.PowerMockRunner
import java.math.BigDecimal
import org.apache.flink.table.module.ModuleManager
import org.apache.flink.table.utils.CatalogManagerMocks
import scala.collection.JavaConversions._
......@@ -82,7 +83,7 @@ class AggCallSelectivityEstimatorTest {
val tableScan = mock(classOf[TableScan])
val cluster = mock(classOf[RelOptCluster])
val planner = mock(classOf[AbstractRelOptPlanner])
val catalogManager = mock(classOf[CatalogManager])
val catalogManager = CatalogManagerMocks.createEmptyCatalogManager()
val moduleManager = mock(classOf[ModuleManager])
val config = new TableConfig
val functionCatalog = new FunctionCatalog(config, catalogManager, moduleManager)
......
......@@ -19,7 +19,7 @@
package org.apache.flink.table.planner.plan.metadata
import org.apache.flink.table.api.{TableConfig, TableException}
import org.apache.flink.table.catalog.{CatalogManager, FunctionCatalog, GenericInMemoryCatalog}
import org.apache.flink.table.catalog.{CatalogManager, FunctionCatalog}
import org.apache.flink.table.expressions._
import org.apache.flink.table.expressions.utils.ApiExpressionUtils.intervalOfMillis
import org.apache.flink.table.functions.{FunctionIdentifier, UserDefinedFunctionHelper}
......@@ -49,7 +49,6 @@ import org.apache.flink.table.runtime.operators.rank.{ConstantRankRange, RankTyp
import org.apache.flink.table.types.AtomicDataType
import org.apache.flink.table.types.logical.{BigIntType, DoubleType, IntType, LogicalType, TimestampKind, TimestampType, VarCharType}
import org.apache.flink.table.types.utils.TypeConversions
import com.google.common.collect.{ImmutableList, Lists}
import org.apache.calcite.jdbc.CalciteSchema
import org.apache.calcite.plan._
......@@ -69,10 +68,11 @@ import org.apache.calcite.sql.fun.{SqlCountAggFunction, SqlStdOperatorTable}
import org.apache.calcite.sql.parser.SqlParserPos
import org.apache.calcite.util.{DateString, ImmutableBitSet, ImmutableIntList, TimeString, TimestampString}
import org.junit.{Before, BeforeClass}
import java.math.BigDecimal
import java.util
import org.apache.flink.table.utils.CatalogManagerMocks
import scala.collection.JavaConversions._
class FlinkRelMdHandlerTestBase {
......@@ -80,10 +80,7 @@ class FlinkRelMdHandlerTestBase {
val tableConfig = new TableConfig()
val rootSchema: SchemaPlus = MetadataTestUtil.initRootSchema()
val builtinCatalog = "default_catalog"
val builtinDatabase = "default_database"
val catalogManager = new CatalogManager(
builtinCatalog, new GenericInMemoryCatalog(builtinCatalog, builtinDatabase))
val catalogManager: CatalogManager = CatalogManagerMocks.createEmptyCatalogManager()
val moduleManager = new ModuleManager
// TODO batch RelNode and stream RelNode should have different PlannerContext
......
......@@ -19,7 +19,7 @@
package org.apache.flink.table.planner.plan.metadata
import org.apache.flink.table.api.TableConfig
import org.apache.flink.table.catalog.{CatalogManager, FunctionCatalog}
import org.apache.flink.table.catalog.FunctionCatalog
import org.apache.flink.table.module.ModuleManager
import org.apache.flink.table.plan.stats.{ColumnStats, TableStats}
import org.apache.flink.table.planner.calcite.{FlinkContext, FlinkContextImpl, FlinkTypeFactory, FlinkTypeSystem}
......@@ -27,7 +27,6 @@ import org.apache.flink.table.planner.plan.schema._
import org.apache.flink.table.planner.plan.stats.FlinkStatistic
import org.apache.flink.table.planner.{JDouble, JLong}
import org.apache.flink.util.Preconditions
import org.apache.calcite.plan.{AbstractRelOptPlanner, RelOptCluster}
import org.apache.calcite.rel.`type`.RelDataType
import org.apache.calcite.rel.core.TableScan
......@@ -44,10 +43,11 @@ import org.junit.{Before, BeforeClass, Test}
import org.powermock.api.mockito.PowerMockito._
import org.powermock.core.classloader.annotations.PrepareForTest
import org.powermock.modules.junit4.PowerMockRunner
import java.math.BigDecimal
import java.sql.{Date, Time, Timestamp}
import org.apache.flink.table.utils.CatalogManagerMocks
import scala.collection.JavaConverters._
/**
......@@ -86,7 +86,7 @@ class SelectivityEstimatorTest {
val tableScan = mock(classOf[TableScan])
val cluster = mock(classOf[RelOptCluster])
val planner = mock(classOf[AbstractRelOptPlanner])
val catalogManager = mock(classOf[CatalogManager])
val catalogManager = CatalogManagerMocks.createEmptyCatalogManager()
val moduleManager = mock(classOf[ModuleManager])
val functionCatalog = new FunctionCatalog(tableConfig, catalogManager, moduleManager)
val context: FlinkContext = new FlinkContextImpl(tableConfig, functionCatalog, catalogManager)
......
......@@ -18,33 +18,35 @@
package org.apache.flink.table.planner.plan.utils
import java.math.BigDecimal
import java.time.ZoneId
import java.util.{TimeZone, List => JList}
import org.apache.calcite.rel.`type`.RelDataType
import org.apache.calcite.rex.{RexBuilder, RexNode}
import org.apache.calcite.sql.SqlPostfixOperator
import org.apache.calcite.sql.`type`.SqlTypeName
import org.apache.calcite.sql.`type`.SqlTypeName.{BIGINT, INTEGER, VARCHAR}
import org.apache.calcite.sql.fun.{SqlStdOperatorTable, SqlTrimFunction}
import org.apache.calcite.util.{DateString, TimeString, TimestampString}
import org.apache.flink.api.common.typeinfo.Types
import org.apache.flink.table.api.{DataTypes, TableConfig}
import org.apache.flink.table.catalog.{CatalogManager, FunctionCatalog, GenericInMemoryCatalog}
import org.apache.flink.table.catalog.{CatalogManager, FunctionCatalog}
import org.apache.flink.table.expressions.utils.ApiExpressionUtils.{unresolvedCall, unresolvedRef, valueLiteral}
import org.apache.flink.table.expressions.{Expression, ExpressionParser}
import org.apache.flink.table.functions.{AggregateFunctionDefinition, FunctionIdentifier}
import org.apache.flink.table.functions.BuiltInFunctionDefinitions.{EQUALS, GREATER_THAN, LESS_THAN, LESS_THAN_OR_EQUAL}
import org.apache.flink.table.functions.{AggregateFunctionDefinition, FunctionIdentifier}
import org.apache.flink.table.module.ModuleManager
import org.apache.flink.table.planner.expressions.utils.Func1
import org.apache.flink.table.planner.expressions.{EqualTo, ExpressionBridge, GreaterThan, Literal, PlannerExpression, PlannerExpressionConverter, Sum, UnresolvedFieldReference}
import org.apache.flink.table.planner.expressions._
import org.apache.flink.table.planner.functions.sql.FlinkSqlOperatorTable
import org.apache.flink.table.planner.functions.utils.ScalarSqlFunction
import org.apache.flink.table.planner.plan.utils.InputTypeBuilder.inputOf
import org.apache.flink.table.planner.utils.{DateTimeTestUtil, IntSumAggFunction}
import org.apache.calcite.rel.`type`.RelDataType
import org.apache.calcite.rex.{RexBuilder, RexNode}
import org.apache.calcite.sql.SqlPostfixOperator
import org.apache.calcite.sql.`type`.SqlTypeName
import org.apache.calcite.sql.`type`.SqlTypeName.{BIGINT, INTEGER, VARCHAR}
import org.apache.calcite.sql.fun.{SqlStdOperatorTable, SqlTrimFunction}
import org.apache.calcite.util.{DateString, TimeString, TimestampString}
import org.apache.flink.table.utils.CatalogManagerMocks
import org.hamcrest.CoreMatchers.is
import org.junit.Assert.{assertArrayEquals, assertEquals, assertThat, assertTrue}
import org.junit.Test
import java.math.BigDecimal
import java.time.ZoneId
import java.util.{TimeZone, List => JList}
import scala.collection.JavaConverters._
......@@ -52,9 +54,7 @@ import scala.collection.JavaConverters._
* Test for [[RexNodeExtractor]].
*/
class RexNodeExtractorTest extends RexNodeTestBase {
val defaultCatalog = "default_catalog"
val catalogManager = new CatalogManager(
defaultCatalog, new GenericInMemoryCatalog(defaultCatalog, "default_database"))
val catalogManager: CatalogManager = CatalogManagerMocks.createEmptyCatalogManager()
val moduleManager = new ModuleManager
private val functionCatalog = new FunctionCatalog(
TableConfig.getDefault,
......
......@@ -1079,22 +1079,37 @@ object TestingTableEnvironment {
settings: EnvironmentSettings,
catalogManager: Option[CatalogManager] = None,
tableConfig: TableConfig): TestingTableEnvironment = {
// temporary solution until FLINK-15635 is fixed
val classLoader = Thread.currentThread.getContextClassLoader
val moduleManager = new ModuleManager
val catalogMgr = catalogManager match {
case Some(c) => c
case _ =>
new CatalogManager(settings.getBuiltInCatalogName,
new GenericInMemoryCatalog(
settings.getBuiltInCatalogName, settings.getBuiltInDatabaseName))
CatalogManager.newBuilder
.classLoader(classLoader)
.config(tableConfig.getConfiguration)
.defaultCatalog(
settings.getBuiltInCatalogName,
new GenericInMemoryCatalog(
settings.getBuiltInCatalogName,
settings.getBuiltInDatabaseName))
.build
}
val moduleManager = new ModuleManager
val functionCatalog = new FunctionCatalog(tableConfig, catalogMgr, moduleManager)
val plannerProperties = settings.toPlannerProperties
val executorProperties = settings.toExecutorProperties
val executor = ComponentFactoryService.find(classOf[ExecutorFactory],
executorProperties).create(executorProperties)
val plannerProperties = settings.toPlannerProperties
val planner = ComponentFactoryService.find(classOf[PlannerFactory], plannerProperties)
.create(plannerProperties, executor, tableConfig, functionCatalog, catalogMgr)
.asInstanceOf[PlannerBase]
new TestingTableEnvironment(
catalogMgr,
moduleManager,
......
......@@ -208,6 +208,13 @@ under the License.
<!-- test dependencies -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
......@@ -218,8 +225,9 @@ under the License.
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils_${scala.binary.version}</artifactId>
<artifactId>flink-table-api-java</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
......
......@@ -39,9 +39,14 @@ class PushFilterIntoTableSourceScanRule extends RelOptRule(
operand(classOf[FlinkLogicalTableSourceScan], none)),
"PushFilterIntoTableSourceScanRule") {
// we don't offer a context for the legacy planner
private val tableConfig = TableConfig.getDefault
private val defaultCatalog = "default_catalog"
private val catalogManager = new CatalogManager(
defaultCatalog, new GenericInMemoryCatalog(defaultCatalog, "default_database"))
private val catalogManager = CatalogManager.newBuilder()
.classLoader(Thread.currentThread().getContextClassLoader)
.config(tableConfig.getConfiguration)
.defaultCatalog(defaultCatalog, new GenericInMemoryCatalog(defaultCatalog, "default_database"))
.build()
override def matches(call: RelOptRuleCall): Boolean = {
val calc: FlinkLogicalCalc = call.rel(0).asInstanceOf[FlinkLogicalCalc]
......@@ -74,7 +79,7 @@ class PushFilterIntoTableSourceScanRule extends RelOptRule(
RexProgramExtractor.extractConjunctiveConditions(
program,
call.builder().getRexBuilder,
new FunctionCatalog(TableConfig.getDefault, catalogManager, new ModuleManager))
new FunctionCatalog(tableConfig, catalogManager, new ModuleManager))
if (predicates.isEmpty) {
// no condition can be translated to expression
return
......
......@@ -23,6 +23,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.sources.StreamTableSource;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.utils.CatalogManagerMocks;
import org.apache.flink.types.Row;
import java.util.Collections;
......@@ -60,9 +61,9 @@ import java.util.Optional;
public class CatalogStructureBuilder {
public static final String BUILTIN_CATALOG_NAME = "builtin";
private CatalogManager catalogManager = new CatalogManager(
BUILTIN_CATALOG_NAME,
new GenericInMemoryCatalog(BUILTIN_CATALOG_NAME));
private CatalogManager catalogManager = CatalogManagerMocks.preparedCatalogManager()
.defaultCatalog(BUILTIN_CATALOG_NAME, new GenericInMemoryCatalog(BUILTIN_CATALOG_NAME))
.build();
public static CatalogStructureBuilder root() {
return new CatalogStructureBuilder();
......@@ -82,7 +83,9 @@ public class CatalogStructureBuilder {
public CatalogStructureBuilder builtin(DatabaseBuilder defaultDb, DatabaseBuilder... databases) throws Exception {
GenericInMemoryCatalog catalog = buildCatalog(BUILTIN_CATALOG_NAME, defaultDb, databases);
this.catalogManager = new CatalogManager(BUILTIN_CATALOG_NAME, catalog);
this.catalogManager = CatalogManagerMocks.preparedCatalogManager()
.defaultCatalog(BUILTIN_CATALOG_NAME, catalog)
.build();
return this;
}
......
......@@ -23,6 +23,7 @@ import org.apache.flink.table.catalog.TestExternalTableSourceFactory.TestExterna
import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
import org.apache.flink.table.plan.schema.TableSourceTable;
import org.apache.flink.table.utils.CatalogManagerMocks;
import org.apache.calcite.schema.Table;
import org.junit.Test;
......@@ -48,7 +49,9 @@ public class DatabaseCalciteSchemaTest {
@Test
public void testCatalogTable() throws TableAlreadyExistException, DatabaseNotExistException {
GenericInMemoryCatalog catalog = new GenericInMemoryCatalog(catalogName, databaseName);
CatalogManager catalogManager = new CatalogManager(catalogName, catalog);
CatalogManager catalogManager = CatalogManagerMocks.preparedCatalogManager()
.defaultCatalog(catalogName, catalog)
.build();
DatabaseCalciteSchema calciteSchema = new DatabaseCalciteSchema(true,
databaseName,
catalogName,
......
......@@ -57,6 +57,7 @@ import org.apache.flink.table.operations.ddl.DropDatabaseOperation;
import org.apache.flink.table.planner.PlanningConfigurationBuilder;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.utils.TypeConversions;
import org.apache.flink.table.utils.CatalogManagerMocks;
import org.apache.calcite.sql.SqlNode;
import org.junit.After;
......@@ -83,8 +84,9 @@ public class SqlToOperationConverterTest {
private final TableConfig tableConfig = new TableConfig();
private final Catalog catalog = new GenericInMemoryCatalog("MockCatalog",
"default");
private final CatalogManager catalogManager =
new CatalogManager("builtin", catalog);
private final CatalogManager catalogManager = CatalogManagerMocks.preparedCatalogManager()
.defaultCatalog("builtin", catalog)
.build();
private final ModuleManager moduleManager = new ModuleManager();
private final FunctionCatalog functionCatalog = new FunctionCatalog(
tableConfig,
......
......@@ -33,7 +33,7 @@ import org.apache.flink.table.catalog.{CatalogManager, FunctionCatalog, GenericI
import org.apache.flink.table.executor.StreamExecutor
import org.apache.flink.table.planner.StreamPlanner
import org.apache.flink.table.runtime.utils.StreamTestData
import org.apache.flink.table.utils.TableTestBase
import org.apache.flink.table.utils.{CatalogManagerMocks, TableTestBase}
import org.apache.flink.table.utils.TableTestUtil.{binaryNode, streamTableNode, term, unaryNode}
import org.apache.flink.types.Row
import org.junit.Test
......@@ -203,15 +203,13 @@ class StreamTableEnvironmentTest extends TableTestBase {
val jStreamExecEnv = mock(classOf[JStreamExecEnv])
when(jStreamExecEnv.getStreamTimeCharacteristic).thenReturn(TimeCharacteristic.EventTime)
val config = new TableConfig
val manager: CatalogManager = new CatalogManager(
"default_catalog",
new GenericInMemoryCatalog("default_catalog", "default_database"))
val catalogManager = CatalogManagerMocks.createEmptyCatalogManager()
val moduleManager: ModuleManager = new ModuleManager
val executor: StreamExecutor = new StreamExecutor(jStreamExecEnv)
val functionCatalog = new FunctionCatalog(config, manager, moduleManager)
val streamPlanner = new StreamPlanner(executor, config, functionCatalog, manager)
val functionCatalog = new FunctionCatalog(config, catalogManager, moduleManager)
val streamPlanner = new StreamPlanner(executor, config, functionCatalog, catalogManager)
val jTEnv = new JStreamTableEnvironmentImpl(
manager,
catalogManager,
moduleManager,
functionCatalog,
config,
......
......@@ -26,12 +26,12 @@ import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.scala._
import org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl
import org.apache.flink.table.api.{TableConfig, Types}
import org.apache.flink.table.catalog.{CatalogManager, FunctionCatalog, GenericInMemoryCatalog, UnresolvedIdentifier}
import org.apache.flink.table.catalog.{FunctionCatalog, UnresolvedIdentifier}
import org.apache.flink.table.delegation.{Executor, Planner}
import org.apache.flink.table.functions.{AggregateFunction, AggregateFunctionDefinition}
import org.apache.flink.table.module.ModuleManager
import org.apache.flink.table.utils.TableTestUtil.{streamTableNode, term, unaryNode}
import org.apache.flink.table.utils.{StreamTableTestUtil, TableTestBase}
import org.apache.flink.table.utils.{CatalogManagerMocks, StreamTableTestUtil, TableTestBase}
import org.apache.flink.types.Row
import org.junit.Assert.{assertEquals, assertTrue}
import org.junit.Test
......@@ -67,10 +67,8 @@ class AggregateTest extends TableTestBase {
@Test
def testUserDefinedAggregateFunctionWithScalaAccumulator(): Unit = {
val defaultCatalog = "default_catalog"
val config = new TableConfig
val catalogManager = new CatalogManager(
defaultCatalog, new GenericInMemoryCatalog(defaultCatalog, "default_database"))
val catalogManager = CatalogManagerMocks.createEmptyCatalogManager()
val moduleManager = new ModuleManager
val functionCatalog = new FunctionCatalog(config, catalogManager, moduleManager)
val tablEnv = new StreamTableEnvironmentImpl(
......
......@@ -29,10 +29,11 @@ import org.apache.calcite.sql.`type`.SqlTypeName.{BIGINT, INTEGER, VARCHAR}
import org.apache.calcite.sql.fun.SqlStdOperatorTable
import org.apache.calcite.util.{DateString, TimeString, TimestampString}
import org.apache.flink.table.api.TableConfig
import org.apache.flink.table.catalog.{CatalogManager, FunctionCatalog, GenericInMemoryCatalog}
import org.apache.flink.table.catalog.FunctionCatalog
import org.apache.flink.table.expressions._
import org.apache.flink.table.module.ModuleManager
import org.apache.flink.table.plan.util.{RexNodeToExpressionConverter, RexProgramExtractor}
import org.apache.flink.table.utils.CatalogManagerMocks
import org.apache.flink.table.utils.InputTypeBuilder.inputOf
import org.hamcrest.CoreMatchers.is
import org.junit.Assert.{assertArrayEquals, assertEquals, assertThat}
......@@ -45,7 +46,7 @@ class RexProgramExtractorTest extends RexProgramTestBase {
private val functionCatalog: FunctionCatalog = new FunctionCatalog(
TableConfig.getDefault,
new CatalogManager("default_catalog", new GenericInMemoryCatalog("default_catalog")),
CatalogManagerMocks.createEmptyCatalogManager(),
new ModuleManager
)
private val expressionBridge: ExpressionBridge[PlannerExpression] =
......
......@@ -18,6 +18,8 @@
package org.apache.flink.table.utils
import org.apache.calcite.plan.RelOptUtil
import org.apache.calcite.rel.RelNode
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.{LocalEnvironment, DataSet => JDataSet}
import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment}
......@@ -30,16 +32,13 @@ import org.apache.flink.table.api.java.internal.{BatchTableEnvironmentImpl => Ja
import org.apache.flink.table.api.scala._
import org.apache.flink.table.api.scala.internal.{BatchTableEnvironmentImpl => ScalaBatchTableEnvironmentImpl, StreamTableEnvironmentImpl => ScalaStreamTableEnvironmentImpl}
import org.apache.flink.table.api.{Table, TableConfig, TableSchema}
import org.apache.flink.table.catalog.{CatalogManager, FunctionCatalog, GenericInMemoryCatalog}
import org.apache.flink.table.catalog.{CatalogManager, FunctionCatalog}
import org.apache.flink.table.executor.StreamExecutor
import org.apache.flink.table.expressions.Expression
import org.apache.flink.table.functions.{AggregateFunction, ScalarFunction, TableFunction}
import org.apache.flink.table.module.ModuleManager
import org.apache.flink.table.operations.{DataSetQueryOperation, JavaDataStreamQueryOperation, ScalaDataStreamQueryOperation}
import org.apache.flink.table.planner.StreamPlanner
import org.apache.flink.table.utils.TableTestUtil.createCatalogManager
import org.apache.calcite.plan.RelOptUtil
import org.apache.calcite.rel.RelNode
import org.apache.flink.table.module.ModuleManager
import org.junit.Assert.assertEquals
import org.junit.rules.ExpectedException
import org.junit.{ComparisonFailure, Rule}
......@@ -140,13 +139,6 @@ object TableTestUtil {
val ANY_SUBTREE = "%ANY_SUBTREE%"
def createCatalogManager(): CatalogManager = {
val defaultCatalog = "default_catalog"
new CatalogManager(
defaultCatalog,
new GenericInMemoryCatalog(defaultCatalog, "default_database"))
}
private[utils] def toRelNode(expected: Table) = {
expected.asInstanceOf[TableImpl].getTableEnvironment match {
case t: TableEnvImpl => t.getRelBuilder.tableOperation(expected.getQueryOperation).build()
......@@ -231,13 +223,15 @@ case class BatchTableTestUtil(
val javaTableEnv = new JavaBatchTableEnvironmentImpl(
javaEnv,
new TableConfig,
catalogManager.getOrElse(createCatalogManager()),
catalogManager
.getOrElse(CatalogManagerMocks.createEmptyCatalogManager()),
new ModuleManager)
val env = new ExecutionEnvironment(javaEnv)
val tableEnv = new ScalaBatchTableEnvironmentImpl(
env,
new TableConfig,
catalogManager.getOrElse(createCatalogManager()),
catalogManager
.getOrElse(CatalogManagerMocks.createEmptyCatalogManager()),
new ModuleManager)
def addTable[T: TypeInformation](
......@@ -329,7 +323,8 @@ case class StreamTableTestUtil(
javaEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
private val tableConfig = new TableConfig
private val manager: CatalogManager = catalogManager.getOrElse(createCatalogManager())
private val manager: CatalogManager = catalogManager
.getOrElse(CatalogManagerMocks.createEmptyCatalogManager())
private val moduleManager: ModuleManager = new ModuleManager
private val executor: StreamExecutor = new StreamExecutor(javaEnv)
private val functionCatalog = new FunctionCatalog(tableConfig, manager, moduleManager)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册