未验证 提交 853fc3c5 编写于 作者: J Jingsong Lee 提交者: GitHub

[FLINK-16858][table] Expose partitioned by grammar


This closes #11559
上级 15d42c87
......@@ -19,7 +19,6 @@
package org.apache.flink.table.catalog.hive;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.SqlDialect;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.catalog.CatalogTest;
import org.apache.flink.table.catalog.exceptions.CatalogException;
......@@ -114,7 +113,6 @@ public class HiveTestUtils {
EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();
TableEnvironment tableEnv = TableEnvironment.create(settings);
tableEnv.getConfig().getConfiguration().setInteger(TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM.key(), 1);
tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
return tableEnv;
}
......
......@@ -601,12 +601,7 @@ SqlCreate SqlCreateTable(Span s, boolean replace) :
}]
[
<PARTITIONED> <BY>
partitionColumns = ParenthesizedSimpleIdentifierList() {
if (!((FlinkSqlConformance) this.conformance).allowCreatePartitionedTable()) {
throw SqlUtil.newContextException(getPos(),
ParserResource.RESOURCE.createPartitionedTableIsOnlyAllowedForHive());
}
}
partitionColumns = ParenthesizedSimpleIdentifierList()
]
[
<WITH>
......
......@@ -35,7 +35,4 @@ public interface ParserResource {
@Resources.BaseMessage("OVERWRITE expression is only used with INSERT statement.")
Resources.ExInst<ParseException> overwriteIsOnlyUsedWithInsert();
@Resources.BaseMessage("Creating partitioned table is only allowed for HIVE dialect.")
Resources.ExInst<ParseException> createPartitionedTableIsOnlyAllowedForHive();
}
......@@ -155,11 +155,4 @@ public enum FlinkSqlConformance implements SqlConformance {
public boolean allowQualifyingCommonColumn() {
return true;
}
/**
* Whether to allow "create table T(i int, j int) partitioned by (i)" grammar.
*/
public boolean allowCreatePartitionedTable() {
return this == FlinkSqlConformance.HIVE;
}
}
......@@ -18,4 +18,3 @@
#
MultipleWatermarksUnsupported=Multiple WATERMARK statements is not supported yet.
OverwriteIsOnlyUsedWithInsert=OVERWRITE expression is only used with INSERT statement.
CreatePartitionedTableIsOnlyAllowedForHive=Creating partitioned table is only allowed for HIVE dialect.
......@@ -21,19 +21,14 @@ package org.apache.flink.sql.parser;
import org.apache.flink.sql.parser.ddl.SqlCreateTable;
import org.apache.flink.sql.parser.error.SqlValidateException;
import org.apache.flink.sql.parser.impl.FlinkSqlParserImpl;
import org.apache.flink.sql.parser.validate.FlinkSqlConformance;
import org.apache.calcite.avatica.util.Casing;
import org.apache.calcite.avatica.util.Quoting;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.parser.SqlParseException;
import org.apache.calcite.sql.parser.SqlParser;
import org.apache.calcite.sql.parser.SqlParserImplFactory;
import org.apache.calcite.sql.parser.SqlParserTest;
import org.apache.calcite.sql.validate.SqlConformance;
import org.hamcrest.BaseMatcher;
import org.hamcrest.Description;
import org.junit.Before;
import org.junit.Test;
import java.io.Reader;
......@@ -45,7 +40,6 @@ import static org.junit.Assert.fail;
/** FlinkSqlParserImpl tests. **/
public class FlinkSqlParserImplTest extends SqlParserTest {
private SqlConformance conformance0;
@Override
protected SqlParserImplFactory parserImplFactory() {
......@@ -54,25 +48,7 @@ public class FlinkSqlParserImplTest extends SqlParserTest {
protected SqlParser getSqlParser(Reader source,
UnaryOperator<SqlParser.ConfigBuilder> transform) {
if (conformance0 == null) {
return super.getSqlParser(source, transform);
} else {
// overwrite the default sql conformance.
return SqlParser.create(source,
SqlParser.configBuilder()
.setParserFactory(parserImplFactory())
.setQuoting(Quoting.DOUBLE_QUOTE)
.setUnquotedCasing(Casing.TO_UPPER)
.setQuotedCasing(Casing.UNCHANGED)
.setConformance(conformance0)
.build());
}
}
@Before
public void before() {
// clear the custom sql conformance.
conformance0 = null;
return super.getSqlParser(source, transform);
}
@Test
......@@ -224,7 +200,6 @@ public class FlinkSqlParserImplTest extends SqlParserTest {
@Test
public void testCreateTable() {
conformance0 = FlinkSqlConformance.HIVE;
final String sql = "CREATE TABLE tbl1 (\n" +
" a bigint,\n" +
" h varchar, \n" +
......@@ -258,7 +233,6 @@ public class FlinkSqlParserImplTest extends SqlParserTest {
@Test
public void testCreateTableWithComment() {
conformance0 = FlinkSqlConformance.HIVE;
final String sql = "CREATE TABLE tbl1 (\n" +
" a bigint comment 'test column comment AAA.',\n" +
" h varchar, \n" +
......@@ -294,7 +268,6 @@ public class FlinkSqlParserImplTest extends SqlParserTest {
@Test
public void testCreateTableWithPrimaryKeyAndUniqueKey() {
conformance0 = FlinkSqlConformance.HIVE;
final String sql = "CREATE TABLE tbl1 (\n" +
" a bigint comment 'test column comment AAA.',\n" +
" h varchar, \n" +
......@@ -565,7 +538,6 @@ public class FlinkSqlParserImplTest extends SqlParserTest {
@Test
public void testCreateInvalidPartitionedTable() {
conformance0 = FlinkSqlConformance.HIVE;
final String sql = "create table sls_stream1(\n" +
" a bigint,\n" +
" b VARCHAR,\n" +
......@@ -578,16 +550,6 @@ public class FlinkSqlParserImplTest extends SqlParserTest {
.fails("Partition column [C] not defined in columns, at line 6, column 3"));
}
@Test
public void testNotAllowedCreatePartition() {
conformance0 = FlinkSqlConformance.DEFAULT;
final String sql = "create table sls_stream1(\n" +
" a bigint,\n" +
" b VARCHAR\n" +
") PARTITIONED BY (a^)^ with ( 'x' = 'y', 'asd' = 'dada')";
sql(sql).fails("Creating partitioned table is only allowed for HIVE dialect.");
}
@Test
public void testCreateTableWithMinusInOptionKey() {
final String sql = "create table source_table(\n" +
......
......@@ -246,8 +246,8 @@ public class SqlToOperationConverterTest {
" 'connector' = 'kafka', \n" +
" 'kafka.topic' = 'log.test'\n" +
")\n";
FlinkPlannerImpl planner = getPlannerBySqlDialect(SqlDialect.HIVE);
final CalciteParser parser = getParserBySqlDialect(SqlDialect.HIVE);
FlinkPlannerImpl planner = getPlannerBySqlDialect(SqlDialect.DEFAULT);
final CalciteParser parser = getParserBySqlDialect(SqlDialect.DEFAULT);
Operation operation = parse(sql, planner, parser);
assert operation instanceof CreateTableOperation;
CreateTableOperation op = (CreateTableOperation) operation;
......@@ -265,8 +265,8 @@ public class SqlToOperationConverterTest {
@Test(expected = SqlConversionException.class)
public void testCreateTableWithPkUniqueKeys() {
FlinkPlannerImpl planner = getPlannerBySqlDialect(SqlDialect.HIVE);
final CalciteParser parser = getParserBySqlDialect(SqlDialect.HIVE);
FlinkPlannerImpl planner = getPlannerBySqlDialect(SqlDialect.DEFAULT);
final CalciteParser parser = getParserBySqlDialect(SqlDialect.DEFAULT);
final String sql = "CREATE TABLE tbl1 (\n" +
" a bigint,\n" +
" b varchar, \n" +
......
......@@ -245,8 +245,8 @@ public class SqlToOperationConverterTest {
" 'connector' = 'kafka', \n" +
" 'kafka.topic' = 'log.test'\n" +
")\n";
final FlinkPlannerImpl planner = getPlannerBySqlDialect(SqlDialect.HIVE);
SqlNode node = getParserBySqlDialect(SqlDialect.HIVE).parse(sql);
final FlinkPlannerImpl planner = getPlannerBySqlDialect(SqlDialect.DEFAULT);
SqlNode node = getParserBySqlDialect(SqlDialect.DEFAULT).parse(sql);
assert node instanceof SqlCreateTable;
Operation operation = SqlToOperationConverter.convert(planner, catalogManager, node).get();
assert operation instanceof CreateTableOperation;
......@@ -306,8 +306,8 @@ public class SqlToOperationConverterTest {
" 'connector' = 'kafka', \n" +
" 'kafka.topic' = 'log.test'\n" +
")\n";
final FlinkPlannerImpl planner = getPlannerBySqlDialect(SqlDialect.HIVE);
SqlNode node = getParserBySqlDialect(SqlDialect.HIVE).parse(sql);
final FlinkPlannerImpl planner = getPlannerBySqlDialect(SqlDialect.DEFAULT);
SqlNode node = getParserBySqlDialect(SqlDialect.DEFAULT).parse(sql);
assert node instanceof SqlCreateTable;
SqlToOperationConverter.convert(planner, catalogManager, node);
}
......
......@@ -62,7 +62,6 @@ class PartitionableSinkITCase extends AbstractTestBase {
def before(): Unit = {
batchExec.setParallelism(1)
tEnv = BatchTableEnvironment.create(batchExec)
tEnv.getConfig.setSqlDialect(SqlDialect.HIVE)
registerTableSource("nonSortTable", testData.toList)
registerTableSource("sortTable", testData1.toList)
PartitionableSinkITCase.init()
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册