未验证 提交 293fd5a9 编写于 作者: J Jingsong Lee 提交者: GitHub

[FLINK-16265][table][csv] CsvTableSinkFactoryBase should compare LogicalTypes...

[FLINK-16265][table][csv] CsvTableSinkFactoryBase should compare LogicalTypes instead of TableSchema (#11229)
上级 966dd2c1
......@@ -380,7 +380,7 @@ public class LocalExecutorITCase extends TestLogger {
final TableSchema actualTableSchema = executor.getTableSchema(sessionId, "TableNumber2");
final TableSchema expectedTableSchema = new TableSchema(
new String[]{"IntegerField2", "StringField2", "TimestampField3"},
new String[]{"IntegerField2", "StringField2", "TimestampField2"},
new TypeInformation[]{Types.INT, Types.STRING, Types.SQL_TIMESTAMP});
assertEquals(expectedTableSchema, actualTableSchema);
......@@ -782,11 +782,12 @@ public class LocalExecutorITCase extends TestLogger {
try {
// Case 1: Registered sink
// Case 1.1: Registered sink with uppercase insert into keyword.
final String statement1 = "INSERT INTO TableSourceSink SELECT IntegerField1 = 42, StringField1 FROM TableNumber1";
final String statement1 = "INSERT INTO TableSourceSink SELECT IntegerField1 = 42," +
" StringField1, TimestampField1 FROM TableNumber1";
executeAndVerifySinkResult(executor, sessionId, statement1, csvOutputPath);
// Case 1.2: Registered sink with lowercase insert into keyword.
final String statement2 = "insert Into TableSourceSink \n "
+ "SELECT IntegerField1 = 42, StringField1 "
+ "SELECT IntegerField1 = 42, StringField1, TimestampField1 "
+ "FROM TableNumber1";
executeAndVerifySinkResult(executor, sessionId, statement2, csvOutputPath);
// Case 1.3: Execute the same statement again, the results should expect to be the same.
......@@ -1153,12 +1154,12 @@ public class LocalExecutorITCase extends TestLogger {
final List<String> actualResults = new ArrayList<>();
TestBaseUtils.readAllResultLines(actualResults, path);
final List<String> expectedResults = new ArrayList<>();
expectedResults.add("true,Hello World");
expectedResults.add("false,Hello World");
expectedResults.add("false,Hello World");
expectedResults.add("false,Hello World");
expectedResults.add("true,Hello World");
expectedResults.add("false,Hello World!!!!");
expectedResults.add("true,Hello World,2020-01-01 00:00:01.0");
expectedResults.add("false,Hello World,2020-01-01 00:00:02.0");
expectedResults.add("false,Hello World,2020-01-01 00:00:03.0");
expectedResults.add("false,Hello World,2020-01-01 00:00:04.0");
expectedResults.add("true,Hello World,2020-01-01 00:00:05.0");
expectedResults.add("false,Hello World!!!!,2020-01-01 00:00:06.0");
TestBaseUtils.compareResultCollections(expectedResults, actualResults, Comparator.naturalOrder());
}
......
......@@ -15,9 +15,9 @@
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
42,Hello World
22,Hello World
32,Hello World
32,Hello World
42,Hello World
52,Hello World!!!!
42,Hello World,2020-01-01 00:00:01
22,Hello World,2020-01-01 00:00:02
32,Hello World,2020-01-01 00:00:03
32,Hello World,2020-01-01 00:00:04
42,Hello World,2020-01-01 00:00:05
52,Hello World!!!!,2020-01-01 00:00:06
......@@ -32,6 +32,8 @@ tables:
type: INT
- name: StringField1
type: VARCHAR
- name: TimestampField1
type: TIMESTAMP
connector:
type: filesystem
path: "$VAR_SOURCE_PATH1"
......@@ -42,6 +44,8 @@ tables:
type: INT
- name: StringField1
type: VARCHAR
- name: TimestampField1
type: TIMESTAMP
line-delimiter: "\n"
comment-prefix: "#"
- name: TestView1
......@@ -56,7 +60,7 @@ tables:
type: INT
- name: StringField2
type: VARCHAR
- name: TimestampField3
- name: TimestampField2
type: TIMESTAMP
connector:
type: filesystem
......@@ -68,7 +72,7 @@ tables:
type: INT
- name: StringField2
type: VARCHAR
- name: TimestampField3
- name: TimestampField2
type: TIMESTAMP
line-delimiter: "\n"
comment-prefix: "#"
......@@ -80,6 +84,8 @@ tables:
type: BOOLEAN
- name: StringField
type: VARCHAR
- name: TimestampField
type: TIMESTAMP
connector:
type: filesystem
path: "$VAR_SOURCE_SINK_PATH"
......@@ -90,6 +96,8 @@ tables:
type: BOOLEAN
- name: StringField
type: VARCHAR
- name: TimestampField
type: TIMESTAMP
- name: TestView2
type: view
query: SELECT * FROM TestView1
......
......@@ -49,6 +49,7 @@ import static org.apache.flink.table.descriptors.OldCsvValidator.FORMAT_FIELDS;
import static org.apache.flink.table.descriptors.OldCsvValidator.FORMAT_FIELD_DELIMITER;
import static org.apache.flink.table.descriptors.OldCsvValidator.FORMAT_TYPE_VALUE;
import static org.apache.flink.table.descriptors.Schema.SCHEMA;
import static org.apache.flink.table.sources.CsvTableSourceFactoryBase.getFieldLogicalTypes;
/**
* Factory base for creating configured instances of {@link CsvTableSink}.
......@@ -105,9 +106,12 @@ public abstract class CsvTableSinkFactoryBase implements TableFactory {
final boolean hasSchema = params.hasPrefix(FORMAT_FIELDS);
if (hasSchema) {
TableSchema formatSchema = params.getTableSchema(FORMAT_FIELDS);
if (!formatSchema.equals(tableSchema)) {
throw new TableException(
"Encodings that differ from the schema are not supported yet for CsvTableSink.");
if (!getFieldLogicalTypes(formatSchema).equals(getFieldLogicalTypes(tableSchema))) {
throw new TableException(String.format(
"Encodings that differ from the schema are not supported yet for" +
" CsvTableSink, format schema is '%s', but table schema is '%s'.",
formatSchema,
tableSchema));
}
}
......
......@@ -128,10 +128,12 @@ public abstract class CsvTableSourceFactoryBase implements TableFactory {
// the CsvTableSource needs some rework first
// for now the schema must be equal to the encoding
// Ignore conversion classes in DataType
if (!getFieldLogicalTypes(formatSchema)
.equals(getFieldLogicalTypes(tableSchema))) {
throw new TableException(
"Encodings that differ from the schema are not supported yet for CsvTableSources.");
if (!getFieldLogicalTypes(formatSchema).equals(getFieldLogicalTypes(tableSchema))) {
throw new TableException(String.format(
"Encodings that differ from the schema are not supported yet for" +
" CsvTableSource, format schema is '%s', but table schema is '%s'.",
formatSchema,
tableSchema));
}
}
......@@ -159,7 +161,7 @@ public abstract class CsvTableSourceFactoryBase implements TableFactory {
return csvTableSourceBuilder.build();
}
private static List<LogicalType> getFieldLogicalTypes(TableSchema schema) {
public static List<LogicalType> getFieldLogicalTypes(TableSchema schema) {
return Arrays
.stream(schema.getFieldDataTypes())
.map(DataType::getLogicalType)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册