提交 dba7d7da 编写于 作者: M mtunique 提交者: twalthr

[FLINK-5268] Split TableProgramsTestBase into TableProgramsCollectionTestBase...

[FLINK-5268] Split TableProgramsTestBase into TableProgramsCollectionTestBase and TableProgramsClusterTestBase

This closes #3099.
上级 b1913a4a
......@@ -33,8 +33,9 @@ import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.api.java.tuple.Tuple5;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.types.Row;
import org.apache.flink.table.api.scala.batch.utils.TableProgramsCollectionTestBase;
import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase;
import org.apache.flink.types.Row;
import org.apache.flink.table.calcite.CalciteConfig;
import org.apache.flink.table.calcite.CalciteConfigBuilder;
import org.apache.flink.table.api.Table;
......@@ -46,17 +47,17 @@ import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@RunWith(Parameterized.class)
public class TableEnvironmentITCase extends TableProgramsTestBase {
public class TableEnvironmentITCase extends TableProgramsCollectionTestBase {
public TableEnvironmentITCase(TestExecutionMode mode, TableConfigMode configMode) {
super(mode, configMode);
public TableEnvironmentITCase(TableConfigMode configMode) {
super(configMode);
}
@Parameterized.Parameters(name = "Execution mode = {0}, Table config = {1}")
@Parameterized.Parameters(name = "Table config = {0}")
public static Collection<Object[]> parameters() {
return Arrays.asList(new Object[][] {
{ TestExecutionMode.COLLECTION, TableProgramsTestBase.DEFAULT() },
{ TestExecutionMode.COLLECTION, TableProgramsTestBase.EFFICIENT() }
{ TableProgramsTestBase.DEFAULT() },
{ TableProgramsTestBase.EFFICIENT() }
});
}
......
......@@ -23,7 +23,7 @@ import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.java.BatchTableEnvironment;
import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase;
import org.apache.flink.table.api.scala.batch.utils.TableProgramsCollectionTestBase;
import org.apache.flink.table.sources.BatchTableSource;
import org.apache.flink.table.utils.CommonTestData;
import org.apache.flink.types.Row;
......@@ -34,10 +34,10 @@ import org.junit.runners.Parameterized;
import java.util.List;
@RunWith(Parameterized.class)
public class TableSourceITCase extends TableProgramsTestBase {
public class TableSourceITCase extends TableProgramsCollectionTestBase {
public TableSourceITCase(TestExecutionMode mode, TableConfigMode configMode) {
super(mode, configMode);
public TableSourceITCase(TableConfigMode configMode) {
super(configMode);
}
@Test
......
......@@ -18,6 +18,8 @@
package org.apache.flink.table.api.java.batch.sql;
import java.util.Comparator;
import java.util.List;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
......@@ -27,7 +29,7 @@ import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.java.BatchTableEnvironment;
import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase;
import org.apache.flink.table.api.scala.batch.utils.TableProgramsClusterTestBase;
import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
import org.apache.flink.test.util.TestBaseUtils;
import org.apache.flink.types.Row;
......@@ -36,18 +38,20 @@ import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.util.Comparator;
import java.util.List;
/**
* This test should be replaced by a DataSetAggregateITCase.
* We should only perform logical unit tests here.
* Until then, we perform a cluster test here.
*/
@RunWith(Parameterized.class)
public class GroupingSetsITCase extends TableProgramsTestBase {
public class GroupingSetsITCase extends TableProgramsClusterTestBase {
private final static String TABLE_NAME = "MyTable";
private final static String TABLE_WITH_NULLS_NAME = "MyTableWithNulls";
private BatchTableEnvironment tableEnv;
public GroupingSetsITCase(TestExecutionMode mode, TableConfigMode tableConfigMode) {
super(mode, tableConfigMode);
public GroupingSetsITCase(TableConfigMode tableConfigMode) {
super(tableConfigMode);
}
@Before
......
......@@ -24,7 +24,7 @@ import org.apache.flink.table.api.java.BatchTableEnvironment;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple5;
import org.apache.flink.types.Row;
import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase;
import org.apache.flink.table.api.scala.batch.utils.TableProgramsCollectionTestBase;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
......@@ -35,10 +35,10 @@ import org.junit.runners.Parameterized;
import java.util.List;
@RunWith(Parameterized.class)
public class SqlITCase extends TableProgramsTestBase {
public class SqlITCase extends TableProgramsCollectionTestBase {
public SqlITCase(TestExecutionMode mode, TableConfigMode configMode) {
super(mode, configMode);
public SqlITCase(TableConfigMode configMode) {
super(configMode);
}
@Test
......
......@@ -21,13 +21,12 @@ package org.apache.flink.table.api.scala.batch
import java.util
import org.apache.flink.api.scala._
import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase
import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode
import org.apache.flink.table.api.scala._
import org.apache.flink.api.scala.util.CollectionDataSets
import org.apache.flink.table.api.scala.batch.utils.{TableProgramsCollectionTestBase, TableProgramsTestBase}
import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode
import org.apache.flink.types.Row
import org.apache.flink.table.api.{TableEnvironment, TableException}
import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
import org.apache.flink.test.util.TestBaseUtils
import org.junit._
import org.junit.runner.RunWith
......@@ -37,9 +36,8 @@ import scala.collection.JavaConverters._
@RunWith(classOf[Parameterized])
class TableEnvironmentITCase(
mode: TestExecutionMode,
configMode: TableConfigMode)
extends TableProgramsTestBase(mode, configMode) {
extends TableProgramsCollectionTestBase(configMode) {
@Test
def testSimpleRegister(): Unit = {
......@@ -260,11 +258,11 @@ class TableEnvironmentITCase(
object TableEnvironmentITCase {
@Parameterized.Parameters(name = "Execution mode = {0}, Table config = {1}")
@Parameterized.Parameters(name = "Table config = {0}")
def parameters(): util.Collection[Array[java.lang.Object]] = {
Seq[Array[AnyRef]](
Array(TestExecutionMode.COLLECTION, TableProgramsTestBase.DEFAULT),
Array(TestExecutionMode.COLLECTION, TableProgramsTestBase.EFFICIENT)).asJava
Array(TableProgramsTestBase.DEFAULT),
Array(TableProgramsTestBase.EFFICIENT)).asJava
}
}
......
......@@ -21,13 +21,12 @@ package org.apache.flink.table.api.scala.batch
import java.io.File
import org.apache.flink.api.scala.{ExecutionEnvironment, _}
import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase
import org.apache.flink.table.api.scala.batch.utils.TableProgramsCollectionTestBase
import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode
import org.apache.flink.table.api.scala._
import org.apache.flink.api.scala.util.CollectionDataSets
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.sinks.CsvTableSink
import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
import org.apache.flink.test.util.TestBaseUtils
import org.junit.Test
import org.junit.runner.RunWith
......@@ -36,9 +35,8 @@ import org.junit.runners.Parameterized
@RunWith(classOf[Parameterized])
class TableSinkITCase(
mode: TestExecutionMode,
configMode: TableConfigMode)
extends TableProgramsTestBase(mode, configMode) {
extends TableProgramsCollectionTestBase(configMode) {
@Test
def testBatchTableSink(): Unit = {
......
......@@ -19,14 +19,12 @@
package org.apache.flink.table.api.scala.batch
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase
import org.apache.flink.table.api.scala.batch.utils.TableProgramsCollectionTestBase
import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode
import org.apache.flink.table.api.scala._
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.utils.CommonTestData
import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
import org.apache.flink.test.util.TestBaseUtils
import org.apache.flink.types.Row
import org.junit.Test
import org.junit.runner.RunWith
import org.junit.runners.Parameterized
......@@ -35,9 +33,8 @@ import scala.collection.JavaConverters._
@RunWith(classOf[Parameterized])
class TableSourceITCase(
mode: TestExecutionMode,
configMode: TableConfigMode)
extends TableProgramsTestBase(mode, configMode) {
extends TableProgramsCollectionTestBase(configMode) {
@Test
def testCsvTableSource(): Unit = {
......
......@@ -19,13 +19,12 @@
package org.apache.flink.table.api.scala.batch.sql
import org.apache.flink.api.scala._
import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase
import org.apache.flink.table.api.scala.batch.utils.TableProgramsCollectionTestBase
import org.apache.flink.table.api.scala._
import org.apache.flink.api.scala.util.CollectionDataSets
import TableProgramsTestBase.TableConfigMode
import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode
import org.apache.flink.types.Row
import org.apache.flink.table.api.{TableEnvironment, TableException}
import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
import org.apache.flink.test.util.TestBaseUtils
import org.junit._
import org.junit.runner.RunWith
......@@ -35,9 +34,8 @@ import scala.collection.JavaConverters._
@RunWith(classOf[Parameterized])
class AggregationsITCase(
mode: TestExecutionMode,
configMode: TableConfigMode)
extends TableProgramsTestBase(mode, configMode) {
extends TableProgramsCollectionTestBase(configMode) {
@Test
def testAggregationTypes(): Unit = {
......
......@@ -24,13 +24,12 @@ import java.util
import org.apache.flink.api.scala._
import org.apache.flink.table.api.scala.batch.sql.FilterITCase.MyHashCode
import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase
import org.apache.flink.table.api.scala.batch.utils.{TableProgramsCollectionTestBase, TableProgramsTestBase}
import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode
import org.apache.flink.table.api.scala._
import org.apache.flink.api.scala.util.CollectionDataSets
import org.apache.flink.table.api.{TableEnvironment, ValidationException}
import org.apache.flink.table.functions.ScalarFunction
import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
import org.apache.flink.test.util.TestBaseUtils
import org.apache.flink.types.Row
import org.junit._
......@@ -41,9 +40,8 @@ import scala.collection.JavaConverters._
@RunWith(classOf[Parameterized])
class CalcITCase(
mode: TestExecutionMode,
configMode: TableConfigMode)
extends TableProgramsTestBase(mode, configMode) {
extends TableProgramsCollectionTestBase(configMode) {
@Test
def testSelectStarFromTable(): Unit = {
......@@ -320,10 +318,10 @@ object FilterITCase {
object CalcITCase {
@Parameterized.Parameters(name = "Execution mode = {0}, Table config = {1}")
@Parameterized.Parameters(name = "Table config = {0}")
def parameters(): util.Collection[Array[java.lang.Object]] = {
Seq[Array[AnyRef]](
Array(TestExecutionMode.COLLECTION, TableProgramsTestBase.DEFAULT),
Array(TestExecutionMode.COLLECTION, TableProgramsTestBase.NO_NULL)).asJava
Array(TableProgramsTestBase.DEFAULT),
Array(TableProgramsTestBase.NO_NULL)).asJava
}
}
......@@ -19,12 +19,11 @@
package org.apache.flink.table.api.scala.batch.sql
import org.apache.flink.api.scala._
import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase
import org.apache.flink.table.api.scala.batch.utils.TableProgramsCollectionTestBase
import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode
import org.apache.flink.table.api.scala._
import org.apache.flink.api.scala.util.CollectionDataSets
import org.apache.flink.table.api.{TableEnvironment, TableException, ValidationException}
import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
import org.apache.flink.test.util.TestBaseUtils
import org.apache.flink.types.Row
import org.junit._
......@@ -35,9 +34,8 @@ import scala.collection.JavaConverters._
@RunWith(classOf[Parameterized])
class JoinITCase(
mode: TestExecutionMode,
configMode: TableConfigMode)
extends TableProgramsTestBase(mode, configMode) {
extends TableProgramsCollectionTestBase(configMode) {
@Test
def testJoin(): Unit = {
......
......@@ -19,13 +19,12 @@
package org.apache.flink.table.api.scala.batch.sql
import org.apache.flink.api.scala._
import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase
import org.apache.flink.table.api.scala.batch.utils.TableProgramsCollectionTestBase
import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode
import org.apache.flink.table.api.scala._
import org.apache.flink.api.scala.util.CollectionDataSets
import org.apache.flink.types.Row
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
import org.apache.flink.test.util.TestBaseUtils
import org.junit._
import org.junit.runner.RunWith
......@@ -37,9 +36,8 @@ import scala.util.Random
@RunWith(classOf[Parameterized])
class SetOperatorsITCase(
mode: TestExecutionMode,
configMode: TableConfigMode)
extends TableProgramsTestBase(mode, configMode) {
extends TableProgramsCollectionTestBase(configMode) {
@Test
def testUnionAll(): Unit = {
......
......@@ -19,7 +19,7 @@
package org.apache.flink.table.api.scala.batch.sql
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase
import org.apache.flink.table.api.scala.batch.utils.TableProgramsCollectionTestBase
import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode
import org.apache.flink.table.api.scala.batch.utils.SortTestUtils._
import org.apache.flink.api.scala.util.CollectionDataSets
......@@ -27,7 +27,6 @@ import org.apache.flink.table.api.scala._
import org.apache.flink.api.scala._
import org.apache.flink.types.Row
import org.apache.flink.table.api.{TableEnvironment, TableException}
import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
import org.apache.flink.test.util.TestBaseUtils
import org.junit._
import org.junit.runner.RunWith
......@@ -37,9 +36,8 @@ import scala.collection.JavaConverters._
@RunWith(classOf[Parameterized])
class SortITCase(
mode: TestExecutionMode,
configMode: TableConfigMode)
extends TableProgramsTestBase(mode, configMode) {
extends TableProgramsCollectionTestBase(configMode) {
@Test
def testOrderByMultipleFieldsWithSql(): Unit = {
......
......@@ -19,13 +19,12 @@
package org.apache.flink.table.api.scala.batch.sql
import org.apache.flink.api.scala._
import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase
import org.apache.flink.table.api.scala.batch.utils.TableProgramsCollectionTestBase
import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode
import org.apache.flink.table.api.scala._
import org.apache.flink.api.scala.util.CollectionDataSets
import org.apache.flink.types.Row
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
import org.apache.flink.test.util.TestBaseUtils
import org.junit._
import org.junit.runner.RunWith
......@@ -35,9 +34,8 @@ import scala.collection.JavaConverters._
@RunWith(classOf[Parameterized])
class TableWithSQLITCase(
mode: TestExecutionMode,
configMode: TableConfigMode)
extends TableProgramsTestBase(mode, configMode) {
extends TableProgramsCollectionTestBase(configMode) {
@Test
def testSQLTable(): Unit = {
......
......@@ -19,14 +19,13 @@
package org.apache.flink.table.api.scala.batch.table
import org.apache.flink.api.scala._
import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase
import org.apache.flink.table.api.scala.batch.utils.TableProgramsCollectionTestBase
import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode
import org.apache.flink.table.api.scala._
import org.apache.flink.api.scala.util.CollectionDataSets
import org.apache.flink.types.Row
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.examples.scala.WordCountTable.{WC => MyWC}
import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
import org.apache.flink.test.util.TestBaseUtils
import org.junit._
import org.junit.runner.RunWith
......@@ -36,9 +35,8 @@ import scala.collection.JavaConverters._
@RunWith(classOf[Parameterized])
class AggregationsITCase(
mode: TestExecutionMode,
configMode: TableConfigMode)
extends TableProgramsTestBase(mode, configMode) {
extends TableProgramsCollectionTestBase(configMode) {
@Test
def testAggregationTypes(): Unit = {
......
......@@ -22,7 +22,7 @@ import java.sql.{Date, Time, Timestamp}
import java.util
import org.apache.flink.api.scala._
import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase
import org.apache.flink.table.api.scala.batch.utils.{TableProgramsCollectionTestBase, TableProgramsTestBase}
import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode
import org.apache.flink.table.api.scala._
import org.apache.flink.api.scala.util.CollectionDataSets
......@@ -30,7 +30,6 @@ import org.apache.flink.types.Row
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.expressions.Literal
import org.apache.flink.table.functions.ScalarFunction
import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
import org.apache.flink.test.util.TestBaseUtils
import org.junit._
import org.junit.runner.RunWith
......@@ -40,9 +39,8 @@ import scala.collection.JavaConverters._
@RunWith(classOf[Parameterized])
class CalcITCase(
mode: TestExecutionMode,
configMode: TableConfigMode)
extends TableProgramsTestBase(mode, configMode) {
extends TableProgramsCollectionTestBase(configMode) {
@Test
def testSimpleSelectAll(): Unit = {
......@@ -366,11 +364,11 @@ class CalcITCase(
object CalcITCase {
@Parameterized.Parameters(name = "Execution mode = {0}, Table config = {1}")
@Parameterized.Parameters(name = "Table config = {0}")
def parameters(): util.Collection[Array[java.lang.Object]] = {
Seq[Array[AnyRef]](
Array(TestExecutionMode.COLLECTION, TableProgramsTestBase.DEFAULT),
Array(TestExecutionMode.COLLECTION, TableProgramsTestBase.NO_NULL)).asJava
Array(TableProgramsTestBase.DEFAULT),
Array(TableProgramsTestBase.NO_NULL)).asJava
}
}
......
......@@ -19,12 +19,11 @@
package org.apache.flink.table.api.scala.batch.table
import org.apache.flink.api.scala._
import org.apache.flink.table.api.scala._
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.api.Types._
import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase
import org.apache.flink.table.api.scala._
import org.apache.flink.table.api.scala.batch.utils.TableProgramsCollectionTestBase
import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode
import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
import org.apache.flink.test.util.TestBaseUtils.compareResultAsText
import org.apache.flink.types.Row
import org.junit._
......@@ -34,10 +33,8 @@ import org.junit.runners.Parameterized
import scala.collection.JavaConverters._
@RunWith(classOf[Parameterized])
class CastingITCase(
mode: TestExecutionMode,
configMode: TableConfigMode)
extends TableProgramsTestBase(mode, configMode) {
class CastingITCase(configMode: TableConfigMode)
extends TableProgramsCollectionTestBase(configMode) {
@Test
def testNumericAutocastInArithmetic() {
......
......@@ -19,7 +19,7 @@
package org.apache.flink.table.api.scala.batch.table
import org.apache.flink.api.scala._
import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase
import org.apache.flink.table.api.scala.batch.utils.TableProgramsCollectionTestBase
import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode
import org.apache.flink.table.api.scala._
import org.apache.flink.api.scala.util.CollectionDataSets
......@@ -36,9 +36,8 @@ import scala.collection.JavaConverters._
@RunWith(classOf[Parameterized])
class JoinITCase(
mode: TestExecutionMode,
configMode: TableConfigMode)
extends TableProgramsTestBase(mode, configMode) {
extends TableProgramsCollectionTestBase(configMode) {
@Test
def testJoin(): Unit = {
......
......@@ -19,13 +19,12 @@
package org.apache.flink.table.api.scala.batch.table
import org.apache.flink.api.scala._
import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase
import org.apache.flink.table.api.scala.batch.utils.TableProgramsCollectionTestBase
import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode
import org.apache.flink.table.api.scala._
import org.apache.flink.api.scala.util.CollectionDataSets
import org.apache.flink.types.Row
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
import org.apache.flink.test.util.TestBaseUtils
import org.junit._
import org.junit.runner.RunWith
......@@ -37,9 +36,8 @@ import scala.util.Random
@RunWith(classOf[Parameterized])
class SetOperatorsITCase(
mode: TestExecutionMode,
configMode: TableConfigMode)
extends TableProgramsTestBase(mode, configMode) {
extends TableProgramsCollectionTestBase(configMode) {
@Test
def testUnionAll(): Unit = {
......
......@@ -19,14 +19,13 @@
package org.apache.flink.table.api.scala.batch.table
import org.apache.flink.table.api.scala.batch.utils.SortTestUtils._
import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase
import org.apache.flink.table.api.scala.batch.utils.TableProgramsCollectionTestBase
import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode
import org.apache.flink.table.api.scala._
import org.apache.flink.api.scala.util.CollectionDataSets
import org.apache.flink.api.scala.{ExecutionEnvironment, _}
import org.apache.flink.types.Row
import org.apache.flink.table.api.{TableEnvironment, ValidationException}
import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.test.util.TestBaseUtils
import org.junit._
import org.junit.runner.RunWith
......@@ -36,9 +35,8 @@ import scala.collection.JavaConverters._
@RunWith(classOf[Parameterized])
class SortITCase(
mode: TestExecutionMode,
configMode: TableConfigMode)
extends TableProgramsTestBase(mode, configMode) {
extends TableProgramsCollectionTestBase(configMode) {
def getExecutionEnvironment = {
val env = ExecutionEnvironment.getExecutionEnvironment
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.table.api.scala.batch.utils
import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode
import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
/**
* This test base provides full cluster-like integration tests for batch programs. Only runtime
* operator tests should use this test base as they are expensive.
* (e.g. [[org.apache.flink.table.runtime.dataset.DataSetWindowAggregateITCase]])
*/
class TableProgramsClusterTestBase(
tableConfigMode: TableConfigMode)
extends TableProgramsTestBase(TestExecutionMode.CLUSTER, tableConfigMode) {
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.table.api.scala.batch.utils
import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode
import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
/**
* This test base provides lightweight integration tests for batch programs. However, it does
* not test everything (e.g. combiners). Runtime operator tests should
* use [[TableProgramsClusterTestBase]].
*/
class TableProgramsCollectionTestBase(
tableConfigMode: TableConfigMode)
extends TableProgramsTestBase(TestExecutionMode.COLLECTION, tableConfigMode) {
}
......@@ -53,8 +53,8 @@ object TableProgramsTestBase {
val NO_NULL = TableConfigMode(nullCheck = false, efficientTypes = false)
val EFFICIENT = TableConfigMode(nullCheck = false, efficientTypes = true)
@Parameterized.Parameters(name = "Execution mode = {0}, Table config = {1}")
@Parameterized.Parameters(name = "Table config = {0}")
def parameters(): util.Collection[Array[java.lang.Object]] = {
Seq[Array[AnyRef]](Array(TestExecutionMode.COLLECTION, DEFAULT))
Seq[Array[AnyRef]](Array(DEFAULT))
}
}
......@@ -19,12 +19,11 @@ package org.apache.flink.table.runtime.dataset
import org.apache.flink.api.scala._
import org.apache.flink.types.Row
import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase
import org.apache.flink.table.api.scala.batch.utils.{TableProgramsClusterTestBase, TableProgramsCollectionTestBase}
import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode
import org.apache.flink.table.api.scala._
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.utils._
import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
import org.apache.flink.test.util.TestBaseUtils
import org.junit.Test
import org.junit.runner.RunWith
......@@ -35,9 +34,8 @@ import scala.collection.mutable
@RunWith(classOf[Parameterized])
class DataSetCorrelateITCase(
mode: TestExecutionMode,
configMode: TableConfigMode)
extends TableProgramsTestBase(mode, configMode) {
extends TableProgramsClusterTestBase(configMode) {
@Test
def testCrossJoin(): Unit = {
......
......@@ -21,9 +21,8 @@ package org.apache.flink.table.runtime.dataset
import org.apache.flink.api.scala._
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.api.scala._
import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase
import org.apache.flink.table.api.scala.batch.utils.TableProgramsClusterTestBase
import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode
import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
import org.apache.flink.test.util.TestBaseUtils
import org.apache.flink.types.Row
import org.junit._
......@@ -33,10 +32,8 @@ import org.junit.runners.Parameterized
import scala.collection.JavaConverters._
@RunWith(classOf[Parameterized])
class DataSetWindowAggregateITCase(
mode: TestExecutionMode,
configMode: TableConfigMode)
extends TableProgramsTestBase(mode, configMode) {
class DataSetWindowAggregateITCase(configMode: TableConfigMode)
extends TableProgramsClusterTestBase(configMode) {
val data = List(
(1L, 1, "Hi"),
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册