From 853fc3c580fa1166a25cec76596d843a9b49c948 Mon Sep 17 00:00:00 2001 From: Jingsong Lee Date: Wed, 1 Apr 2020 14:46:00 +0800 Subject: [PATCH] [FLINK-16858][table] Expose partitioned by grammar This closes #11559 --- .../table/catalog/hive/HiveTestUtils.java | 2 - .../src/main/codegen/includes/parserImpls.ftl | 7 +--- .../sql/parser/utils/ParserResource.java | 3 -- .../parser/validate/FlinkSqlConformance.java | 7 ---- .../ParserResource.properties | 1 - .../sql/parser/FlinkSqlParserImplTest.java | 40 +------------------ .../SqlToOperationConverterTest.java | 8 ++-- .../sqlexec/SqlToOperationConverterTest.java | 8 ++-- .../batch/sql/PartitionableSinkITCase.scala | 1 - 9 files changed, 10 insertions(+), 67 deletions(-) diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveTestUtils.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveTestUtils.java index 88184fc9d05..b3753351cd4 100644 --- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveTestUtils.java +++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveTestUtils.java @@ -19,7 +19,6 @@ package org.apache.flink.table.catalog.hive; import org.apache.flink.table.api.EnvironmentSettings; -import org.apache.flink.table.api.SqlDialect; import org.apache.flink.table.api.TableEnvironment; import org.apache.flink.table.catalog.CatalogTest; import org.apache.flink.table.catalog.exceptions.CatalogException; @@ -114,7 +113,6 @@ public class HiveTestUtils { EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build(); TableEnvironment tableEnv = TableEnvironment.create(settings); tableEnv.getConfig().getConfiguration().setInteger(TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM.key(), 1); - tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE); return tableEnv; } diff --git a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl index 96a25a42fe8..3401d95db29 100644 --- a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl +++ b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl @@ -601,12 +601,7 @@ SqlCreate SqlCreateTable(Span s, boolean replace) : }] [ - partitionColumns = ParenthesizedSimpleIdentifierList() { - if (!((FlinkSqlConformance) this.conformance).allowCreatePartitionedTable()) { - throw SqlUtil.newContextException(getPos(), - ParserResource.RESOURCE.createPartitionedTableIsOnlyAllowedForHive()); - } - } + partitionColumns = ParenthesizedSimpleIdentifierList() ] [ diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/utils/ParserResource.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/utils/ParserResource.java index 5989a374fe2..555ee5fa912 100644 --- a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/utils/ParserResource.java +++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/utils/ParserResource.java @@ -35,7 +35,4 @@ public interface ParserResource { @Resources.BaseMessage("OVERWRITE expression is only used with INSERT statement.") Resources.ExInst overwriteIsOnlyUsedWithInsert(); - - @Resources.BaseMessage("Creating partitioned table is only allowed for HIVE dialect.") - Resources.ExInst createPartitionedTableIsOnlyAllowedForHive(); } diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/validate/FlinkSqlConformance.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/validate/FlinkSqlConformance.java index cf32026680d..b6b2e83ea9b 100644 --- a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/validate/FlinkSqlConformance.java +++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/validate/FlinkSqlConformance.java @@ -155,11 +155,4 @@ public enum FlinkSqlConformance implements SqlConformance { public boolean allowQualifyingCommonColumn() { return true; } - - /** - * Whether to allow "create table T(i int, j int) partitioned by (i)" grammar. - */ - public boolean allowCreatePartitionedTable() { - return this == FlinkSqlConformance.HIVE; - } } diff --git a/flink-table/flink-sql-parser/src/main/resources/org.apache.flink.sql.parser.utils/ParserResource.properties b/flink-table/flink-sql-parser/src/main/resources/org.apache.flink.sql.parser.utils/ParserResource.properties index 9993e7c2e99..8a7d5a1c46c 100644 --- a/flink-table/flink-sql-parser/src/main/resources/org.apache.flink.sql.parser.utils/ParserResource.properties +++ b/flink-table/flink-sql-parser/src/main/resources/org.apache.flink.sql.parser.utils/ParserResource.properties @@ -18,4 +18,3 @@ # MultipleWatermarksUnsupported=Multiple WATERMARK statements is not supported yet. OverwriteIsOnlyUsedWithInsert=OVERWRITE expression is only used with INSERT statement. -CreatePartitionedTableIsOnlyAllowedForHive=Creating partitioned table is only allowed for HIVE dialect. diff --git a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java index 0509e915333..337de11b177 100644 --- a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java +++ b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java @@ -21,19 +21,14 @@ package org.apache.flink.sql.parser; import org.apache.flink.sql.parser.ddl.SqlCreateTable; import org.apache.flink.sql.parser.error.SqlValidateException; import org.apache.flink.sql.parser.impl.FlinkSqlParserImpl; -import org.apache.flink.sql.parser.validate.FlinkSqlConformance; -import org.apache.calcite.avatica.util.Casing; -import org.apache.calcite.avatica.util.Quoting; import org.apache.calcite.sql.SqlNode; import org.apache.calcite.sql.parser.SqlParseException; import org.apache.calcite.sql.parser.SqlParser; import org.apache.calcite.sql.parser.SqlParserImplFactory; import org.apache.calcite.sql.parser.SqlParserTest; -import org.apache.calcite.sql.validate.SqlConformance; import org.hamcrest.BaseMatcher; import org.hamcrest.Description; -import org.junit.Before; import org.junit.Test; import java.io.Reader; @@ -45,7 +40,6 @@ import static org.junit.Assert.fail; /** FlinkSqlParserImpl tests. **/ public class FlinkSqlParserImplTest extends SqlParserTest { - private SqlConformance conformance0; @Override protected SqlParserImplFactory parserImplFactory() { @@ -54,25 +48,7 @@ public class FlinkSqlParserImplTest extends SqlParserTest { protected SqlParser getSqlParser(Reader source, UnaryOperator transform) { - if (conformance0 == null) { - return super.getSqlParser(source, transform); - } else { - // overwrite the default sql conformance. - return SqlParser.create(source, - SqlParser.configBuilder() - .setParserFactory(parserImplFactory()) - .setQuoting(Quoting.DOUBLE_QUOTE) - .setUnquotedCasing(Casing.TO_UPPER) - .setQuotedCasing(Casing.UNCHANGED) - .setConformance(conformance0) - .build()); - } - } - - @Before - public void before() { - // clear the custom sql conformance. - conformance0 = null; + return super.getSqlParser(source, transform); } @Test @@ -224,7 +200,6 @@ public class FlinkSqlParserImplTest extends SqlParserTest { @Test public void testCreateTable() { - conformance0 = FlinkSqlConformance.HIVE; final String sql = "CREATE TABLE tbl1 (\n" + " a bigint,\n" + " h varchar, \n" + @@ -258,7 +233,6 @@ public class FlinkSqlParserImplTest extends SqlParserTest { @Test public void testCreateTableWithComment() { - conformance0 = FlinkSqlConformance.HIVE; final String sql = "CREATE TABLE tbl1 (\n" + " a bigint comment 'test column comment AAA.',\n" + " h varchar, \n" + @@ -294,7 +268,6 @@ public class FlinkSqlParserImplTest extends SqlParserTest { @Test public void testCreateTableWithPrimaryKeyAndUniqueKey() { - conformance0 = FlinkSqlConformance.HIVE; final String sql = "CREATE TABLE tbl1 (\n" + " a bigint comment 'test column comment AAA.',\n" + " h varchar, \n" + @@ -565,7 +538,6 @@ public class FlinkSqlParserImplTest extends SqlParserTest { @Test public void testCreateInvalidPartitionedTable() { - conformance0 = FlinkSqlConformance.HIVE; final String sql = "create table sls_stream1(\n" + " a bigint,\n" + " b VARCHAR,\n" + @@ -578,16 +550,6 @@ public class FlinkSqlParserImplTest extends SqlParserTest { .fails("Partition column [C] not defined in columns, at line 6, column 3")); } - @Test - public void testNotAllowedCreatePartition() { - conformance0 = FlinkSqlConformance.DEFAULT; - final String sql = "create table sls_stream1(\n" + - " a bigint,\n" + - " b VARCHAR\n" + - ") PARTITIONED BY (a^)^ with ( 'x' = 'y', 'asd' = 'dada')"; - sql(sql).fails("Creating partitioned table is only allowed for HIVE dialect."); - } - @Test public void testCreateTableWithMinusInOptionKey() { final String sql = "create table source_table(\n" + diff --git a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java index 55e00238274..e4d0460a0a4 100644 --- a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java +++ b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java @@ -246,8 +246,8 @@ public class SqlToOperationConverterTest { " 'connector' = 'kafka', \n" + " 'kafka.topic' = 'log.test'\n" + ")\n"; - FlinkPlannerImpl planner = getPlannerBySqlDialect(SqlDialect.HIVE); - final CalciteParser parser = getParserBySqlDialect(SqlDialect.HIVE); + FlinkPlannerImpl planner = getPlannerBySqlDialect(SqlDialect.DEFAULT); + final CalciteParser parser = getParserBySqlDialect(SqlDialect.DEFAULT); Operation operation = parse(sql, planner, parser); assert operation instanceof CreateTableOperation; CreateTableOperation op = (CreateTableOperation) operation; @@ -265,8 +265,8 @@ public class SqlToOperationConverterTest { @Test(expected = SqlConversionException.class) public void testCreateTableWithPkUniqueKeys() { - FlinkPlannerImpl planner = getPlannerBySqlDialect(SqlDialect.HIVE); - final CalciteParser parser = getParserBySqlDialect(SqlDialect.HIVE); + FlinkPlannerImpl planner = getPlannerBySqlDialect(SqlDialect.DEFAULT); + final CalciteParser parser = getParserBySqlDialect(SqlDialect.DEFAULT); final String sql = "CREATE TABLE tbl1 (\n" + " a bigint,\n" + " b varchar, \n" + diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/sqlexec/SqlToOperationConverterTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/sqlexec/SqlToOperationConverterTest.java index b95ea40b714..cd981cb4997 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/sqlexec/SqlToOperationConverterTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/sqlexec/SqlToOperationConverterTest.java @@ -245,8 +245,8 @@ public class SqlToOperationConverterTest { " 'connector' = 'kafka', \n" + " 'kafka.topic' = 'log.test'\n" + ")\n"; - final FlinkPlannerImpl planner = getPlannerBySqlDialect(SqlDialect.HIVE); - SqlNode node = getParserBySqlDialect(SqlDialect.HIVE).parse(sql); + final FlinkPlannerImpl planner = getPlannerBySqlDialect(SqlDialect.DEFAULT); + SqlNode node = getParserBySqlDialect(SqlDialect.DEFAULT).parse(sql); assert node instanceof SqlCreateTable; Operation operation = SqlToOperationConverter.convert(planner, catalogManager, node).get(); assert operation instanceof CreateTableOperation; @@ -306,8 +306,8 @@ public class SqlToOperationConverterTest { " 'connector' = 'kafka', \n" + " 'kafka.topic' = 'log.test'\n" + ")\n"; - final FlinkPlannerImpl planner = getPlannerBySqlDialect(SqlDialect.HIVE); - SqlNode node = getParserBySqlDialect(SqlDialect.HIVE).parse(sql); + final FlinkPlannerImpl planner = getPlannerBySqlDialect(SqlDialect.DEFAULT); + SqlNode node = getParserBySqlDialect(SqlDialect.DEFAULT).parse(sql); assert node instanceof SqlCreateTable; SqlToOperationConverter.convert(planner, catalogManager, node); } diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/batch/sql/PartitionableSinkITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/batch/sql/PartitionableSinkITCase.scala index 219b2db818e..90a6f247f77 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/batch/sql/PartitionableSinkITCase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/batch/sql/PartitionableSinkITCase.scala @@ -62,7 +62,6 @@ class PartitionableSinkITCase extends AbstractTestBase { def before(): Unit = { batchExec.setParallelism(1) tEnv = BatchTableEnvironment.create(batchExec) - tEnv.getConfig.setSqlDialect(SqlDialect.HIVE) registerTableSource("nonSortTable", testData.toList) registerTableSource("sortTable", testData1.toList) PartitionableSinkITCase.init() -- GitLab