未验证 提交 186724f1 编写于 作者: L Leonard Xu 提交者: GitHub

[FLINK-17286][table][json] Integrate json to file system connector


This closes #12010
上级 477f39fd
......@@ -116,7 +116,7 @@ import java.util.stream.Collectors;
import static org.apache.flink.table.catalog.config.CatalogConfig.FLINK_PROPERTY_PREFIX;
import static org.apache.flink.table.catalog.hive.util.HiveStatsUtil.parsePositiveIntStat;
import static org.apache.flink.table.catalog.hive.util.HiveStatsUtil.parsePositiveLongStat;
import static org.apache.flink.table.filesystem.PartitionPathUtils.unescapePathName;
import static org.apache.flink.table.utils.PartitionPathUtils.unescapePathName;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
......
......@@ -144,9 +144,9 @@ public abstract class DelimitedInputFormat<OT> extends FileInputFormat<OT> imple
private transient int limit;
private transient byte[] currBuffer; // buffer in which current record byte sequence is found
private transient int currOffset; // offset in above buffer
private transient int currLen; // length of current byte sequence
protected transient byte[] currBuffer; // buffer in which current record byte sequence is found
protected transient int currOffset; // offset in above buffer
protected transient int currLen; // length of current byte sequence
private transient boolean overLimit;
......
......@@ -77,6 +77,23 @@ under the License.
<scope>test</scope>
</dependency>
<!-- Json filesystem format factory ITCase test dependency -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<scope>test</scope>
<type>test-jar</type>
</dependency>
<!-- test utils dependency -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<!-- JSON RowData schema test dependency -->
<dependency>
<groupId>org.scala-lang</groupId>
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.formats.json;
import org.apache.flink.api.common.io.DelimitedInputFormat;
import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.api.common.serialization.BulkWriter;
import org.apache.flink.api.common.serialization.Encoder;
import org.apache.flink.api.java.typeutils.GenericTypeInfo;
import org.apache.flink.core.fs.FileInputSplit;
import org.apache.flink.core.fs.Path;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.descriptors.DescriptorProperties;
import org.apache.flink.table.factories.FileSystemFormatFactory;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.utils.PartitionPathUtils;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import static org.apache.flink.table.descriptors.FormatDescriptorValidator.FORMAT;
import static org.apache.flink.table.descriptors.JsonValidator.FORMAT_FAIL_ON_MISSING_FIELD;
import static org.apache.flink.table.descriptors.JsonValidator.FORMAT_IGNORE_PARSE_ERRORS;
/**
* Factory to build reader/writer to read/write json format file.
*/
public class JsonFileSystemFormatFactory implements FileSystemFormatFactory {
@Override
public Map<String, String> requiredContext() {
Map<String, String> context = new HashMap<>();
context.put(FORMAT, "json");
return context;
}
@Override
public List<String> supportedProperties() {
ArrayList<String> properties = new ArrayList<>();
properties.add(FORMAT_FAIL_ON_MISSING_FIELD);
properties.add(FORMAT_IGNORE_PARSE_ERRORS);
return properties;
}
@Override
public InputFormat<RowData, ?> createReader(ReaderContext context) {
DescriptorProperties properties = getValidatedProperties(context.getFormatProperties());
boolean failOnMissingField = properties.getOptionalBoolean(FORMAT_FAIL_ON_MISSING_FIELD).orElse(false);
boolean ignoreParseErrors = properties.getOptionalBoolean(FORMAT_IGNORE_PARSE_ERRORS).orElse(false);
RowType formatRowType = context.getFormatRowType();
JsonRowDataDeserializationSchema deserializationSchema = new JsonRowDataDeserializationSchema(
formatRowType,
new GenericTypeInfo(GenericRowData.class),
failOnMissingField,
ignoreParseErrors);
String[] fieldNames = context.getSchema().getFieldNames();
List<String> projectFields = Arrays.stream(context.getProjectFields())
.mapToObj(idx -> fieldNames[idx])
.collect(Collectors.toList());
List<String> jsonFields = Arrays.stream(fieldNames)
.filter(field -> !context.getPartitionKeys().contains(field))
.collect(Collectors.toList());
int[] jsonSelectFieldToProjectFieldMapping = context.getFormatProjectFields().stream()
.mapToInt(projectFields::indexOf)
.toArray();
int[] jsonSelectFieldToJsonFieldMapping = context.getFormatProjectFields().stream()
.mapToInt(jsonFields::indexOf)
.toArray();
return new JsonInputFormat(
context.getPaths(),
context.getSchema().getFieldDataTypes(),
context.getSchema().getFieldNames(),
context.getProjectFields(),
context.getPartitionKeys(),
context.getDefaultPartName(),
context.getPushedDownLimit(),
jsonSelectFieldToProjectFieldMapping,
jsonSelectFieldToJsonFieldMapping,
deserializationSchema);
}
@Override
public Optional<Encoder<RowData>> createEncoder(WriterContext context) {
return Optional.of(new JsonRowDataEncoder(new JsonRowDataSerializationSchema(context.getFormatRowType())));
}
@Override
public Optional<BulkWriter.Factory<RowData>> createBulkWriterFactory(WriterContext context) {
return Optional.empty();
}
@Override
public boolean supportsSchemaDerivation() {
return true;
}
private static DescriptorProperties getValidatedProperties(Map<String, String> propertiesMap) {
final DescriptorProperties properties = new DescriptorProperties(true);
properties.putProperties(propertiesMap);
properties.validateBoolean(FORMAT_FAIL_ON_MISSING_FIELD, true);
properties.validateBoolean(FORMAT_IGNORE_PARSE_ERRORS, true);
return properties;
}
/**
* A {@link JsonInputFormat} is responsible to read {@link RowData} records
* from json format files.
*/
public static class JsonInputFormat extends DelimitedInputFormat<RowData> {
/**
* Code of \r, used to remove \r from a line when the line ends with \r\n.
*/
private static final byte CARRIAGE_RETURN = (byte) '\r';
/**
* Code of \n, used to identify if \n is used as delimiter.
*/
private static final byte NEW_LINE = (byte) '\n';
private final DataType[] fieldTypes;
private final String[] fieldNames;
private final int[] selectFields;
private final List<String> partitionKeys;
private final String defaultPartValue;
private final long limit;
private final int[] jsonSelectFieldToProjectFieldMapping;
private final int[] jsonSelectFieldToJsonFieldMapping;
private final JsonRowDataDeserializationSchema deserializationSchema;
private transient boolean end;
private transient long emitted;
// reuse object for per record
private transient GenericRowData rowData;
public JsonInputFormat(
Path[] filePaths,
DataType[] fieldTypes,
String[] fieldNames,
int[] selectFields,
List<String> partitionKeys,
String defaultPartValue,
long limit,
int[] jsonSelectFieldToProjectFieldMapping,
int[] jsonSelectFieldToJsonFieldMapping,
JsonRowDataDeserializationSchema deserializationSchema) {
super.setFilePaths(filePaths);
this.fieldTypes = fieldTypes;
this.fieldNames = fieldNames;
this.selectFields = selectFields;
this.partitionKeys = partitionKeys;
this.defaultPartValue = defaultPartValue;
this.limit = limit;
this.jsonSelectFieldToProjectFieldMapping = jsonSelectFieldToProjectFieldMapping;
this.jsonSelectFieldToJsonFieldMapping = jsonSelectFieldToJsonFieldMapping;
this.deserializationSchema = deserializationSchema;
}
@Override
public boolean supportsMultiPaths() {
return true;
}
@Override
public void open(FileInputSplit split) throws IOException {
super.open(split);
this.end = false;
this.emitted = 0L;
this.rowData = PartitionPathUtils.fillPartitionValueForRecord(fieldNames, fieldTypes, selectFields,
partitionKeys, currentSplit.getPath(), defaultPartValue);
}
@Override
public boolean reachedEnd() {
return emitted >= limit || end;
}
@Override
public RowData readRecord(RowData reuse, byte[] bytes, int offset, int numBytes) throws IOException {
// remove \r from a line when the line ends with \r\n
if (this.getDelimiter() != null && this.getDelimiter().length == 1
&& this.getDelimiter()[0] == NEW_LINE && offset + numBytes >= 1
&& bytes[offset + numBytes - 1] == CARRIAGE_RETURN) {
numBytes -= 1;
}
byte[] trimBytes = Arrays.copyOfRange(bytes, offset, offset + numBytes);
GenericRowData jsonRow = (GenericRowData) deserializationSchema.deserialize(trimBytes);
if (jsonRow == null) {
return null;
}
GenericRowData returnRecord = rowData;
for (int i = 0; i < jsonSelectFieldToJsonFieldMapping.length; i++) {
returnRecord.setField(jsonSelectFieldToProjectFieldMapping[i],
jsonRow.getField(jsonSelectFieldToJsonFieldMapping[i]));
}
emitted++;
return returnRecord;
}
@Override
public RowData nextRecord(RowData record) throws IOException {
while (true) {
if (readLine()) {
RowData row = readRecord(record, this.currBuffer, this.currOffset, this.currLen);
if (row == null) {
continue;
} else {
return row;
}
} else {
this.end = true;
return null;
}
}
}
}
/**
* A {@link JsonRowDataEncoder} is responsible to encode a {@link RowData} to {@link java.io.OutputStream}
* with json format.
*/
public static class JsonRowDataEncoder implements Encoder<RowData> {
private static final long serialVersionUID = 1L;
private static final String DEFAULT_LINE_DELIMITER = "\n";
private final JsonRowDataSerializationSchema serializationSchema;
public JsonRowDataEncoder(JsonRowDataSerializationSchema serializationSchema) {
this.serializationSchema = serializationSchema;
}
@Override
public void encode(RowData element, OutputStream stream) throws IOException {
stream.write(serializationSchema.serialize(element));
stream.write(DEFAULT_LINE_DELIMITER.getBytes(StandardCharsets.UTF_8));
}
}
}
......@@ -14,3 +14,4 @@
# limitations under the License.
org.apache.flink.formats.json.JsonRowFormatFactory
org.apache.flink.formats.json.JsonFileSystemFormatFactory
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.formats.json;
import org.apache.flink.table.planner.runtime.batch.sql.BatchFileSystemITCaseBase;
import org.apache.flink.types.Row;
import org.apache.flink.util.FileUtils;
import org.junit.Test;
import java.io.File;
import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
/**
* ITCase to test json format for {@link JsonFileSystemFormatFactory}.
*/
public class JsonBatchFileSystemITCase extends BatchFileSystemITCaseBase {
@Override
public String[] formatProperties() {
List<String> ret = new ArrayList<>();
ret.add("'format'='json'");
ret.add("'format.ignore-parse-errors'='true'");
return ret.toArray(new String[0]);
}
@Test
public void testParseError() throws Exception {
String path = new URI(resultPath()).getPath();
new File(path).mkdirs();
File file = new File(path, "temp_file");
file.createNewFile();
FileUtils.writeFileUtf8(file,
"{\"x\":\"x5\",\"y\":5,\"a\":1,\"b\":1}\n" +
"{I am a wrong json.}\n" +
"{\"x\":\"x5\",\"y\":5,\"a\":1,\"b\":1}");
check("select * from nonPartitionedTable",
Arrays.asList(
Row.of("x5,5,1,1"),
Row.of("x5,5,1,1")));
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.formats.json;
import org.apache.flink.table.planner.runtime.stream.FsStreamingSinkITCaseBase;
import java.util.ArrayList;
import java.util.List;
/**
* Test checkpoint for file system table factory with json format.
*/
public class JsonFsStreamSinkITCase extends FsStreamingSinkITCaseBase {
@Override
public String[] additionalProperties() {
List<String> ret = new ArrayList<>();
ret.add("'format'='json'");
// for test purpose
ret.add("'sink.rolling-policy.file-size'='1'");
return ret.toArray(new String[0]);
}
}
......@@ -29,10 +29,10 @@ import org.apache.flink.orc.writer.OrcBulkWriterFactory;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.descriptors.DescriptorProperties;
import org.apache.flink.table.factories.FileSystemFormatFactory;
import org.apache.flink.table.filesystem.PartitionPathUtils;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.utils.PartitionPathUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
......@@ -111,12 +111,12 @@ public class OrcFileSystemFormatFactory implements FileSystemFormatFactory {
DescriptorProperties properties = new DescriptorProperties();
properties.putProperties(context.getFormatProperties());
LogicalType[] orcTypes = Arrays.stream(context.getFieldTypesWithoutPartKeys())
LogicalType[] orcTypes = Arrays.stream(context.getFormatFieldTypes())
.map(DataType::getLogicalType)
.toArray(LogicalType[]::new);
TypeDescription typeDescription = OrcSplitReaderUtil.logicalTypeToOrcType(
RowType.of(orcTypes, context.getFieldNamesWithoutPartKeys()));
RowType.of(orcTypes, context.getFormatFieldNames()));
OrcBulkWriterFactory<RowData> factory = new OrcBulkWriterFactory<>(
new RowDataVectorizer(typeDescription.toString(), orcTypes),
......
......@@ -32,10 +32,10 @@ import org.apache.flink.formats.parquet.vector.ParquetSplitReaderUtil;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.descriptors.DescriptorProperties;
import org.apache.flink.table.factories.FileSystemFormatFactory;
import org.apache.flink.table.filesystem.PartitionPathUtils;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.utils.PartitionPathUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.parquet.hadoop.ParquetOutputFormat;
......@@ -135,10 +135,10 @@ public class ParquetFileSystemFormatFactory implements FileSystemFormatFactory {
properties.putProperties(context.getFormatProperties());
return Optional.of(ParquetRowDataBuilder.createWriterFactory(
RowType.of(Arrays.stream(context.getFieldTypesWithoutPartKeys())
RowType.of(Arrays.stream(context.getFormatFieldTypes())
.map(DataType::getLogicalType)
.toArray(LogicalType[]::new),
context.getFieldNamesWithoutPartKeys()),
context.getFormatFieldNames()),
getParquetConfiguration(properties),
isUtcTimestamp(properties)
));
......
......@@ -27,11 +27,14 @@ import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.expressions.Expression;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
/**
* File system format factory for creating configured instances of reader and writer.
......@@ -102,6 +105,48 @@ public interface FileSystemFormatFactory extends TableFormatFactory<RowData> {
* The follow up operator will filter the records again.
*/
List<Expression> getPushedDownFilters();
/**
* Get field names without partition keys.
*/
default String[] getFormatFieldNames() {
return Arrays.stream(getSchema().getFieldNames())
.filter(name -> !getPartitionKeys().contains(name))
.toArray(String[]::new);
}
/**
* Get field types without partition keys.
*/
default DataType[] getFormatFieldTypes() {
return Arrays.stream(getSchema().getFieldNames())
.filter(name -> !getPartitionKeys().contains(name))
.map(name -> getSchema().getFieldDataType(name).get())
.toArray(DataType[]::new);
}
/**
* RowType of table that excludes partition key fields.
*/
default RowType getFormatRowType() {
return RowType.of(
Arrays.stream(getFormatFieldTypes())
.map(DataType::getLogicalType)
.toArray(LogicalType[]::new),
getFormatFieldNames());
}
/**
* Mapping from non-partition project fields index to all project fields index.
*/
default List<String> getFormatProjectFields() {
final List<String> selectFieldNames = Arrays.stream(getProjectFields())
.mapToObj(i -> getSchema().getFieldNames()[i])
.collect(Collectors.toList());
return selectFieldNames.stream()
.filter(name -> !getPartitionKeys().contains(name))
.collect(Collectors.toList());
}
}
/**
......@@ -127,7 +172,7 @@ public interface FileSystemFormatFactory extends TableFormatFactory<RowData> {
/**
* Get field names without partition keys.
*/
default String[] getFieldNamesWithoutPartKeys() {
default String[] getFormatFieldNames() {
return Arrays.stream(getSchema().getFieldNames())
.filter(name -> !getPartitionKeys().contains(name))
.toArray(String[]::new);
......@@ -136,11 +181,23 @@ public interface FileSystemFormatFactory extends TableFormatFactory<RowData> {
/**
* Get field types without partition keys.
*/
default DataType[] getFieldTypesWithoutPartKeys() {
default DataType[] getFormatFieldTypes() {
return Arrays.stream(getSchema().getFieldNames())
.filter(name -> !getPartitionKeys().contains(name))
.map(name -> getSchema().getFieldDataType(name).get())
.toArray(DataType[]::new);
}
/**
* Get RowType of the table without partition keys.
* @return
*/
default RowType getFormatRowType() {
return RowType.of(
Arrays.stream(getFormatFieldTypes())
.map(DataType::getLogicalType)
.toArray(LogicalType[]::new),
getFormatFieldNames());
}
}
}
......@@ -16,7 +16,7 @@
* limitations under the License.
*/
package org.apache.flink.table.filesystem;
package org.apache.flink.table.utils;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.java.tuple.Tuple2;
......@@ -24,8 +24,15 @@ import org.apache.flink.core.fs.FileStatus;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalTypeRoot;
import java.io.IOException;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.BitSet;
......@@ -201,6 +208,80 @@ public class PartitionPathUtils {
return ret;
}
/**
* Extract partition value from path and fill to record.
* @param fieldNames record field names.
* @param fieldTypes record field types.
* @param selectFields the selected fields.
* @param partitionKeys the partition field names.
* @param path the file path that the partition located.
* @param defaultPartValue default value of partition field.
* @return the filled record.
*/
public static GenericRowData fillPartitionValueForRecord(
String[] fieldNames,
DataType[] fieldTypes,
int[] selectFields,
List<String> partitionKeys,
Path path,
String defaultPartValue) {
GenericRowData record = new GenericRowData(selectFields.length);
LinkedHashMap<String, String> partSpec = PartitionPathUtils.extractPartitionSpecFromPath(path);
for (int i = 0; i < selectFields.length; i++) {
int selectField = selectFields[i];
String name = fieldNames[selectField];
if (partitionKeys.contains(name)) {
String value = partSpec.get(name);
value = defaultPartValue.equals(value) ? null : value;
record.setField(i, PartitionPathUtils.convertStringToInternalValue(value, fieldTypes[selectField]));
}
}
return record;
}
/**
* Restore partition value from string and type.
*
* @param valStr string partition value.
* @param type type of partition field.
* @return partition value.
*/
public static Object convertStringToInternalValue(String valStr, DataType type) {
if (valStr == null) {
return null;
}
LogicalTypeRoot typeRoot = type.getLogicalType().getTypeRoot();
switch (typeRoot) {
case CHAR:
case VARCHAR:
return StringData.fromString(valStr);
case BOOLEAN:
return Boolean.parseBoolean(valStr);
case TINYINT:
return Byte.parseByte(valStr);
case SMALLINT:
return Short.parseShort(valStr);
case INTEGER:
return Integer.parseInt(valStr);
case BIGINT:
return Long.parseLong(valStr);
case FLOAT:
return Float.parseFloat(valStr);
case DOUBLE:
return Double.parseDouble(valStr);
case DATE:
return (int) LocalDate.parse(valStr).toEpochDay();
case TIMESTAMP_WITHOUT_TIME_ZONE:
return TimestampData.fromLocalDateTime(LocalDateTime.parse(valStr));
default:
throw new RuntimeException(String.format(
"Can not convert %s to type %s for partition value",
valStr,
type));
}
}
private static FileStatus[] getFileStatusRecurse(Path path, int expectLevel, FileSystem fs) {
ArrayList<FileStatus> result = new ArrayList<>();
......
......@@ -89,7 +89,7 @@ public class TestCsvFileSystemFormatFactory implements FileSystemFormatFactory {
return Optional.empty();
}
DataType[] types = context.getFieldTypesWithoutPartKeys();
DataType[] types = context.getFormatFieldTypes();
return Optional.of((rowData, stream) -> {
writeCsvToStream(types, rowData, stream);
});
......@@ -127,7 +127,7 @@ public class TestCsvFileSystemFormatFactory implements FileSystemFormatFactory {
return Optional.empty();
}
DataType[] types = context.getFieldTypesWithoutPartKeys();
DataType[] types = context.getFormatFieldTypes();
return Optional.of(out -> new CsvBulkWriter(types, out));
}
......
......@@ -34,8 +34,8 @@ import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.util.DataFormatConverters;
import org.apache.flink.table.filesystem.PartitionPathUtils;
import org.apache.flink.table.types.utils.TypeConversions;
import org.apache.flink.table.utils.PartitionPathUtils;
import org.apache.flink.types.Row;
import java.io.IOException;
......
......@@ -30,7 +30,7 @@ import org.apache.flink.types.Row
import org.junit.rules.TemporaryFolder
import org.junit.{Rule, Test}
import scala.collection.Seq
import scala.collection.{JavaConverters, Seq}
/**
* Test File system table factory.
......@@ -49,6 +49,11 @@ trait FileSystemITCaseBase {
def check(sqlQuery: String, expectedResult: Seq[Row]): Unit
def check(sqlQuery: String, expectedResult: java.util.List[Row]): Unit = {
check(sqlQuery,
JavaConverters.asScalaIteratorConverter(expectedResult.iterator()).asScala.toSeq)
}
def open(): Unit = {
resultPath = fileTmpFolder.newFolder().toURI.toString
BatchTableEnvUtil.registerCollection(
......
......@@ -24,7 +24,7 @@ import org.apache.flink.api.common.io.OutputFormat;
import java.util.HashMap;
import java.util.Map;
import static org.apache.flink.table.filesystem.PartitionPathUtils.generatePartitionPath;
import static org.apache.flink.table.utils.PartitionPathUtils.generatePartitionPath;
/**
* Dynamic partition writer to writing multiple partitions at the same time, it maybe consumes more memory.
......
......@@ -44,6 +44,7 @@ import org.apache.flink.table.sinks.OverwritableTableSink;
import org.apache.flink.table.sinks.PartitionableTableSink;
import org.apache.flink.table.sinks.TableSink;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.utils.PartitionPathUtils;
import org.apache.flink.util.Preconditions;
import java.io.IOException;
......
......@@ -33,6 +33,7 @@ import org.apache.flink.table.sources.LimitableTableSource;
import org.apache.flink.table.sources.PartitionableTableSource;
import org.apache.flink.table.sources.ProjectableTableSource;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.utils.PartitionPathUtils;
import java.util.ArrayList;
import java.util.Arrays;
......
......@@ -21,7 +21,7 @@ package org.apache.flink.table.filesystem;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.io.OutputFormat;
import static org.apache.flink.table.filesystem.PartitionPathUtils.generatePartitionPath;
import static org.apache.flink.table.utils.PartitionPathUtils.generatePartitionPath;
/**
* {@link PartitionWriter} for grouped dynamic partition inserting. It will create a new format
......
......@@ -31,8 +31,8 @@ import java.util.LinkedHashMap;
import java.util.List;
import java.util.Optional;
import static org.apache.flink.table.filesystem.PartitionPathUtils.generatePartitionPath;
import static org.apache.flink.table.filesystem.PartitionPathUtils.listStatusWithoutHidden;
import static org.apache.flink.table.utils.PartitionPathUtils.generatePartitionPath;
import static org.apache.flink.table.utils.PartitionPathUtils.listStatusWithoutHidden;
/**
* Loader to temporary files to final output path and meta store. According to overwrite,
......
......@@ -31,7 +31,7 @@ import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import static org.apache.flink.table.filesystem.PartitionPathUtils.searchPartSpecAndPaths;
import static org.apache.flink.table.utils.PartitionPathUtils.searchPartSpecAndPaths;
import static org.apache.flink.util.Preconditions.checkArgument;
/**
......
......@@ -24,7 +24,7 @@ import org.apache.flink.api.common.io.OutputFormat;
import java.io.IOException;
import java.util.LinkedHashMap;
import static org.apache.flink.table.filesystem.PartitionPathUtils.generatePartitionPath;
import static org.apache.flink.table.utils.PartitionPathUtils.generatePartitionPath;
/**
* {@link PartitionWriter} for single directory writer. It just use one format to write.
......
......@@ -23,7 +23,7 @@ import org.apache.flink.types.Row;
import org.junit.Assert;
import org.junit.Test;
import static org.apache.flink.table.filesystem.PartitionPathUtils.generatePartitionPath;
import static org.apache.flink.table.utils.PartitionPathUtils.generatePartitionPath;
/**
* Test for {@link RowPartitionComputer}.
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册