未验证 提交 1f668dd3 编写于 作者: J Jingsong Lee 提交者: GitHub

[FLINK-14255][hive] Integrate hive with parquet and orc format to streaming file sink


This closes #12206
上级 1892bede
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.connectors.hive;
import org.apache.flink.table.catalog.hive.client.HiveShim;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.util.DataFormatConverters;
import org.apache.flink.table.filesystem.RowDataPartitionComputer;
import org.apache.flink.table.functions.hive.conversion.HiveInspectors;
import org.apache.flink.table.functions.hive.conversion.HiveObjectConversion;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.utils.TypeConversions;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import java.util.Arrays;
import java.util.LinkedHashMap;
/**
* A {@link RowDataPartitionComputer} that converts Flink objects to Hive objects before computing the partition value strings.
*/
public class HiveRowDataPartitionComputer extends RowDataPartitionComputer {
private final DataFormatConverters.DataFormatConverter[] partitionConverters;
private final HiveObjectConversion[] hiveObjectConversions;
public HiveRowDataPartitionComputer(
HiveShim hiveShim,
String defaultPartValue,
String[] columnNames,
DataType[] columnTypes,
String[] partitionColumns) {
super(defaultPartValue, columnNames, columnTypes, partitionColumns);
this.partitionConverters = Arrays.stream(partitionTypes)
.map(TypeConversions::fromLogicalToDataType)
.map(DataFormatConverters::getConverterForDataType)
.toArray(DataFormatConverters.DataFormatConverter[]::new);
this.hiveObjectConversions = new HiveObjectConversion[partitionIndexes.length];
for (int i = 0; i < hiveObjectConversions.length; i++) {
DataType partColType = columnTypes[partitionIndexes[i]];
ObjectInspector objectInspector = HiveInspectors.getObjectInspector(partColType);
hiveObjectConversions[i] = HiveInspectors.getConversion(objectInspector, partColType.getLogicalType(), hiveShim);
}
}
@Override
public LinkedHashMap<String, String> generatePartValues(RowData in) {
LinkedHashMap<String, String> partSpec = new LinkedHashMap<>();
for (int i = 0; i < partitionIndexes.length; i++) {
Object field = partitionConverters[i].toExternal(in, partitionIndexes[i]);
String partitionValue = field != null ? hiveObjectConversions[i].toHiveObject(field).toString() : null;
if (StringUtils.isEmpty(partitionValue)) {
partitionValue = defaultPartValue;
}
partSpec.put(partitionColumns[i], partitionValue);
}
return partSpec;
}
}
......@@ -33,13 +33,13 @@ import java.util.LinkedHashMap;
/**
* A RowPartitionComputer that converts Flink objects to Hive objects before computing the partition value strings.
*/
public class HivePartitionComputer extends RowPartitionComputer {
public class HiveRowPartitionComputer extends RowPartitionComputer {
private static final long serialVersionUID = 1L;
private final HiveObjectConversion[] partColConversions;
HivePartitionComputer(HiveShim hiveShim, String defaultPartValue, String[] columnNames,
HiveRowPartitionComputer(HiveShim hiveShim, String defaultPartValue, String[] columnNames,
DataType[] columnTypes, String[] partitionColumns) {
super(defaultPartValue, columnNames, partitionColumns);
partColConversions = new HiveObjectConversion[partitionIndexes.length];
......
......@@ -20,13 +20,11 @@ package org.apache.flink.connectors.hive;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.CatalogTableImpl;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.config.CatalogConfig;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.factories.TableFactoryUtil;
import org.apache.flink.table.factories.TableSinkFactory;
import org.apache.flink.table.factories.TableSourceFactory;
import org.apache.flink.table.sinks.OutputFormatTableSink;
import org.apache.flink.table.sinks.TableSink;
import org.apache.flink.table.sources.TableSource;
import org.apache.flink.types.Row;
......@@ -88,16 +86,13 @@ public class HiveTableFactory
boolean isGeneric = Boolean.parseBoolean(table.getProperties().get(CatalogConfig.IS_GENERIC));
if (!isGeneric) {
return createOutputFormatTableSink(context.getObjectIdentifier().toObjectPath(), table);
return new HiveTableSink(
context.isBounded(),
new JobConf(hiveConf),
context.getObjectIdentifier(),
table);
} else {
return TableFactoryUtil.findAndCreateTableSink(context);
}
}
/**
* Creates and configures a {@link org.apache.flink.table.sinks.OutputFormatTableSink} using the given {@link CatalogTable}.
*/
private OutputFormatTableSink<Row> createOutputFormatTableSink(ObjectPath tablePath, CatalogTable table) {
return new HiveTableSink(new JobConf(hiveConf), tablePath, table);
}
}
......@@ -18,12 +18,20 @@
package org.apache.flink.connectors.hive;
import org.apache.flink.api.common.io.OutputFormat;
import org.apache.flink.api.common.serialization.BulkWriter;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.connectors.hive.write.HiveOutputFormatFactory;
import org.apache.flink.connectors.hive.write.HiveWriterFactory;
import org.apache.flink.formats.parquet.row.ParquetRowDataBuilder;
import org.apache.flink.orc.OrcSplitReaderUtil;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.BulkFormatBuilder;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.flink.table.catalog.hive.client.HiveMetastoreClientFactory;
import org.apache.flink.table.catalog.hive.client.HiveMetastoreClientWrapper;
......@@ -31,12 +39,19 @@ import org.apache.flink.table.catalog.hive.client.HiveShim;
import org.apache.flink.table.catalog.hive.client.HiveShimLoader;
import org.apache.flink.table.catalog.hive.descriptors.HiveCatalogValidator;
import org.apache.flink.table.catalog.hive.util.HiveReflectionUtils;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.filesystem.FileSystemOutputFormat;
import org.apache.flink.table.sinks.OutputFormatTableSink;
import org.apache.flink.table.filesystem.FileSystemTableSink;
import org.apache.flink.table.filesystem.FileSystemTableSink.TableBucketAssigner;
import org.apache.flink.table.filesystem.FileSystemTableSink.TableRollingPolicy;
import org.apache.flink.table.filesystem.stream.InactiveBucketListener;
import org.apache.flink.table.sinks.AppendStreamTableSink;
import org.apache.flink.table.sinks.OverwritableTableSink;
import org.apache.flink.table.sinks.PartitionableTableSink;
import org.apache.flink.table.sinks.TableSink;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.utils.TableSchemaUtils;
import org.apache.flink.types.Row;
import org.apache.flink.util.FlinkRuntimeException;
......@@ -51,6 +66,7 @@ import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.orc.TypeDescription;
import org.apache.thrift.TException;
import java.io.IOException;
......@@ -58,14 +74,18 @@ import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import static org.apache.flink.table.filesystem.FileSystemTableFactory.SINK_ROLLING_POLICY_FILE_SIZE;
import static org.apache.flink.table.filesystem.FileSystemTableFactory.SINK_ROLLING_POLICY_TIME_INTERVAL;
/**
* Table sink to write to Hive tables.
*/
public class HiveTableSink extends OutputFormatTableSink<Row> implements PartitionableTableSink, OverwritableTableSink {
public class HiveTableSink implements AppendStreamTableSink, PartitionableTableSink, OverwritableTableSink {
private final boolean isBounded;
private final JobConf jobConf;
private final CatalogTable catalogTable;
private final ObjectPath tablePath;
private final ObjectIdentifier identifier;
private final TableSchema tableSchema;
private final String hiveVersion;
private final HiveShim hiveShim;
......@@ -75,9 +95,10 @@ public class HiveTableSink extends OutputFormatTableSink<Row> implements Partiti
private boolean overwrite = false;
private boolean dynamicGrouping = false;
public HiveTableSink(JobConf jobConf, ObjectPath tablePath, CatalogTable table) {
public HiveTableSink(boolean isBounded, JobConf jobConf, ObjectIdentifier identifier, CatalogTable table) {
this.isBounded = isBounded;
this.jobConf = jobConf;
this.tablePath = tablePath;
this.identifier = identifier;
this.catalogTable = table;
hiveVersion = Preconditions.checkNotNull(jobConf.get(HiveCatalogValidator.CATALOG_HIVE_VERSION),
"Hive version is not defined");
......@@ -86,31 +107,21 @@ public class HiveTableSink extends OutputFormatTableSink<Row> implements Partiti
}
@Override
public OutputFormat<Row> getOutputFormat() {
String[] partitionColumns = getPartitionFieldNames().toArray(new String[0]);
String dbName = tablePath.getDatabaseName();
String tableName = tablePath.getObjectName();
public final DataStreamSink consumeDataStream(DataStream dataStream) {
String[] partitionColumns = getPartitionKeys().toArray(new String[0]);
String dbName = identifier.getDatabaseName();
String tableName = identifier.getObjectName();
try (HiveMetastoreClientWrapper client = HiveMetastoreClientFactory.create(
new HiveConf(jobConf, HiveConf.class), hiveVersion)) {
Table table = client.getTable(dbName, tableName);
StorageDescriptor sd = table.getSd();
HiveTableMetaStoreFactory msFactory = new HiveTableMetaStoreFactory(
jobConf, hiveVersion, dbName, tableName);
FileSystemOutputFormat.Builder<Row> builder = new FileSystemOutputFormat.Builder<>();
builder.setPartitionComputer(new HivePartitionComputer(
hiveShim,
jobConf.get(
HiveConf.ConfVars.DEFAULTPARTITIONNAME.varname,
HiveConf.ConfVars.DEFAULTPARTITIONNAME.defaultStrVal),
tableSchema.getFieldNames(),
tableSchema.getFieldDataTypes(),
partitionColumns));
builder.setDynamicGrouped(dynamicGrouping);
builder.setPartitionColumns(partitionColumns);
builder.setFileSystemFactory(new HadoopFileSystemFactory(jobConf));
Class hiveOutputFormatClz = hiveShim.getHiveOutputFormatClass(
Class.forName(sd.getOutputFormat()));
boolean isCompressed = jobConf.getBoolean(HiveConf.ConfVars.COMPRESSRESULT.varname, false);
Class hiveOutputFormatClz = hiveShim.getHiveOutputFormatClass(Class.forName(sd.getOutputFormat()));
builder.setFormatFactory(new HiveOutputFormatFactory(
HiveWriterFactory recordWriterFactory = new HiveWriterFactory(
jobConf,
hiveOutputFormatClz,
sd.getSerdeInfo(),
......@@ -118,20 +129,72 @@ public class HiveTableSink extends OutputFormatTableSink<Row> implements Partiti
partitionColumns,
HiveReflectionUtils.getTableMetadata(hiveShim, table),
hiveShim,
isCompressed));
builder.setMetaStoreFactory(
new HiveTableMetaStoreFactory(jobConf, hiveVersion, dbName, tableName));
builder.setOverwrite(overwrite);
builder.setStaticPartitions(staticPartitionSpec);
builder.setTempPath(new org.apache.flink.core.fs.Path(
toStagingDir(sd.getLocation(), jobConf)));
String extension = Utilities.getFileExtension(jobConf, isCompressed,
(HiveOutputFormat<?, ?>) hiveOutputFormatClz.newInstance());
extension = extension == null ? "" : extension;
OutputFileConfig outputFileConfig = new OutputFileConfig("", extension);
builder.setOutputFileConfig(outputFileConfig);
return builder.build();
isCompressed);
if (isBounded) {
FileSystemOutputFormat.Builder<Row> builder = new FileSystemOutputFormat.Builder<>();
builder.setPartitionComputer(new HiveRowPartitionComputer(
hiveShim,
jobConf.get(
HiveConf.ConfVars.DEFAULTPARTITIONNAME.varname,
HiveConf.ConfVars.DEFAULTPARTITIONNAME.defaultStrVal),
tableSchema.getFieldNames(),
tableSchema.getFieldDataTypes(),
partitionColumns));
builder.setDynamicGrouped(dynamicGrouping);
builder.setPartitionColumns(partitionColumns);
builder.setFileSystemFactory(new HadoopFileSystemFactory(jobConf));
builder.setFormatFactory(new HiveOutputFormatFactory(recordWriterFactory));
builder.setMetaStoreFactory(
msFactory);
builder.setOverwrite(overwrite);
builder.setStaticPartitions(staticPartitionSpec);
builder.setTempPath(new org.apache.flink.core.fs.Path(
toStagingDir(sd.getLocation(), jobConf)));
String extension = Utilities.getFileExtension(jobConf, isCompressed,
(HiveOutputFormat<?, ?>) hiveOutputFormatClz.newInstance());
extension = extension == null ? "" : extension;
OutputFileConfig outputFileConfig = new OutputFileConfig("", extension);
builder.setOutputFileConfig(outputFileConfig);
return dataStream
.writeUsingOutputFormat(builder.build())
.setParallelism(dataStream.getParallelism());
} else {
BulkWriter.Factory<RowData> bulkFactory = createBulkWriterFactory(partitionColumns, sd);
org.apache.flink.configuration.Configuration conf = new org.apache.flink.configuration.Configuration();
catalogTable.getOptions().forEach(conf::setString);
HiveRowDataPartitionComputer partComputer = new HiveRowDataPartitionComputer(
hiveShim,
jobConf.get(
HiveConf.ConfVars.DEFAULTPARTITIONNAME.varname,
HiveConf.ConfVars.DEFAULTPARTITIONNAME.defaultStrVal),
tableSchema.getFieldNames(),
tableSchema.getFieldDataTypes(),
partitionColumns);
TableBucketAssigner assigner = new TableBucketAssigner(partComputer);
TableRollingPolicy rollingPolicy = new TableRollingPolicy(
true,
conf.get(SINK_ROLLING_POLICY_FILE_SIZE),
conf.get(SINK_ROLLING_POLICY_TIME_INTERVAL));
InactiveBucketListener listener = new InactiveBucketListener();
BulkFormatBuilder<RowData, String, ?> builder = StreamingFileSink.forBulkFormat(
new org.apache.flink.core.fs.Path(sd.getLocation()),
new FileSystemTableSink.ProjectionBulkFactory(bulkFactory, partComputer))
.withBucketAssigner(assigner)
.withBucketLifeCycleListener(listener)
.withRollingPolicy(rollingPolicy);
return FileSystemTableSink.createStreamingSink(
conf,
new org.apache.flink.core.fs.Path(sd.getLocation()),
getPartitionKeys(),
identifier,
overwrite,
dataStream,
builder,
listener,
msFactory);
}
} catch (TException e) {
throw new CatalogException("Failed to query Hive metaStore", e);
} catch (IOException e) {
......@@ -143,14 +206,39 @@ public class HiveTableSink extends OutputFormatTableSink<Row> implements Partiti
}
}
@Override
public TableSink<Row> configure(String[] fieldNames, TypeInformation<?>[] fieldTypes) {
return new HiveTableSink(jobConf, tablePath, catalogTable);
private BulkWriter.Factory<RowData> createBulkWriterFactory(String[] partitionColumns,
StorageDescriptor sd) {
String serLib = sd.getSerdeInfo().getSerializationLib().toLowerCase();
int formatFieldCount = tableSchema.getFieldCount() - partitionColumns.length;
String[] formatNames = new String[formatFieldCount];
LogicalType[] formatTypes = new LogicalType[formatFieldCount];
for (int i = 0; i < formatFieldCount; i++) {
formatNames[i] = tableSchema.getFieldName(i).get();
formatTypes[i] = tableSchema.getFieldDataType(i).get().getLogicalType();
}
RowType formatType = RowType.of(formatTypes, formatNames);
Configuration formatConf = new Configuration(jobConf);
sd.getSerdeInfo().getParameters().forEach(formatConf::set);
BulkWriter.Factory<RowData> bulkFactory;
if (serLib.contains("parquet")) {
bulkFactory = ParquetRowDataBuilder.createWriterFactory(
formatType, formatConf, hiveVersion.startsWith("3."));
} else if (serLib.contains("orc")) {
TypeDescription typeDescription = OrcSplitReaderUtil.logicalTypeToOrcType(formatType);
bulkFactory = hiveShim.createOrcBulkWriterFactory(
formatConf, typeDescription.toString(), formatTypes);
} else {
throw new UnsupportedOperationException(String.format(
"Only parquet or orc can support streaming writing, but now serialization lib is %s.",
serLib));
}
return bulkFactory;
}
@Override
public DataType getConsumedDataType() {
return getTableSchema().toRowDataType();
DataType dataType = getTableSchema().toRowDataType();
return isBounded ? dataType : dataType.bridgedTo(RowData.class);
}
@Override
......@@ -158,6 +246,11 @@ public class HiveTableSink extends OutputFormatTableSink<Row> implements Partiti
return tableSchema;
}
@Override
public TableSink configure(String[] fieldNames, TypeInformation[] fieldTypes) {
return this;
}
@Override
public boolean configurePartitionGrouping(boolean supportsGrouping) {
this.dynamicGrouping = supportsGrouping;
......@@ -179,7 +272,7 @@ public class HiveTableSink extends OutputFormatTableSink<Row> implements Partiti
return res;
}
private List<String> getPartitionFieldNames() {
private List<String> getPartitionKeys() {
return catalogTable.getPartitionKeys();
}
......@@ -187,7 +280,7 @@ public class HiveTableSink extends OutputFormatTableSink<Row> implements Partiti
public void setStaticPartition(Map<String, String> partitionSpec) {
// make it a LinkedHashMap to maintain partition column order
staticPartitionSpec = new LinkedHashMap<>();
for (String partitionCol : getPartitionFieldNames()) {
for (String partitionCol : getPartitionKeys()) {
if (partitionSpec.containsKey(partitionCol)) {
staticPartitionSpec.put(partitionCol, partitionSpec.get(partitionCol));
}
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.connectors.hive.write;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
import org.apache.flink.table.filesystem.OutputFormatFactory;
import org.apache.flink.types.Row;
import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter;
import org.apache.hadoop.io.Writable;
import java.io.IOException;
import java.util.function.Function;
/**
* Hive {@link OutputFormatFactory}, use {@link RecordWriter} to write record.
*/
public class HiveOutputFormatFactory implements OutputFormatFactory<Row> {
private static final long serialVersionUID = 2L;
private final HiveWriterFactory factory;
public HiveOutputFormatFactory(HiveWriterFactory factory) {
this.factory = factory;
}
@Override
public HiveOutputFormat createOutputFormat(Path path) {
return new HiveOutputFormat(
factory.createRecordWriter(HadoopFileSystem.toHadoopPath(path)),
factory.createRowConverter());
}
private class HiveOutputFormat implements org.apache.flink.api.common.io.OutputFormat<Row> {
private final RecordWriter recordWriter;
private final Function<Row, Writable> rowConverter;
private HiveOutputFormat(RecordWriter recordWriter, Function<Row, Writable> rowConverter) {
this.recordWriter = recordWriter;
this.rowConverter = rowConverter;
}
@Override
public void configure(Configuration parameters) {
}
@Override
public void open(int taskNumber, int numTasks) {
}
@Override
public void writeRecord(Row record) throws IOException {
recordWriter.write(rowConverter.apply(record));
}
@Override
public void close() throws IOException {
recordWriter.close(false);
}
}
}
......@@ -16,14 +16,15 @@
* limitations under the License.
*/
package org.apache.flink.connectors.hive;
package org.apache.flink.connectors.hive.write;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
import org.apache.flink.connectors.hive.FlinkHiveException;
import org.apache.flink.connectors.hive.JobConfWrapper;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.catalog.hive.client.HiveShim;
import org.apache.flink.table.filesystem.OutputFormatFactory;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.util.DataFormatConverters;
import org.apache.flink.table.data.util.DataFormatConverters.DataFormatConverter;
import org.apache.flink.table.functions.hive.conversion.HiveInspectors;
import org.apache.flink.table.functions.hive.conversion.HiveObjectConversion;
import org.apache.flink.table.types.DataType;
......@@ -31,9 +32,11 @@ import org.apache.flink.types.Row;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.StringUtils;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.SerDeInfo;
import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter;
import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
import org.apache.hadoop.hive.serde2.Deserializer;
import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.serde2.SerDeUtils;
......@@ -42,22 +45,24 @@ import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.SequenceFileOutputFormat;
import org.apache.hadoop.util.ReflectionUtils;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import java.util.function.Function;
/**
* Hive {@link OutputFormatFactory}, use {@link RecordWriter} to write record.
* Factory for creating {@link RecordWriter} and converters for writing.
*/
public class HiveOutputFormatFactory implements OutputFormatFactory<Row> {
public class HiveWriterFactory implements Serializable {
private static final long serialVersionUID = 1L;
......@@ -79,21 +84,25 @@ public class HiveOutputFormatFactory implements OutputFormatFactory<Row> {
private final boolean isCompressed;
// number of non-partitioning columns
private transient int numNonPartitionColumns;
// SerDe in Hive-1.2.1 and Hive-2.3.4 can be of different classes, make sure to use a common base class
private transient Serializer recordSerDe;
/**
* Field number excluding partition fields.
*/
private transient int formatFields;
// to convert Flink object to Hive object
private transient HiveObjectConversion[] hiveConversions;
private transient DataFormatConverter[] converters;
//StructObjectInspector represents the hive row structure.
private transient StructObjectInspector rowObjectInspector;
private transient StructObjectInspector formatInspector;
private transient boolean inited;
private transient boolean initialized;
public HiveOutputFormatFactory(
public HiveWriterFactory(
JobConf jobConf,
Class hiveOutputFormatClz,
SerDeInfo serDeInfo,
......@@ -102,7 +111,7 @@ public class HiveOutputFormatFactory implements OutputFormatFactory<Row> {
Properties tableProperties,
HiveShim hiveShim,
boolean isCompressed) {
Preconditions.checkArgument(org.apache.hadoop.hive.ql.io.HiveOutputFormat.class.isAssignableFrom(hiveOutputFormatClz),
Preconditions.checkArgument(HiveOutputFormat.class.isAssignableFrom(hiveOutputFormatClz),
"The output format should be an instance of HiveOutputFormat");
this.confWrapper = new JobConfWrapper(jobConf);
this.hiveOutputFormatClz = hiveOutputFormatClz;
......@@ -115,40 +124,12 @@ public class HiveOutputFormatFactory implements OutputFormatFactory<Row> {
this.isCompressed = isCompressed;
}
private void init() throws Exception {
JobConf jobConf = confWrapper.conf();
Object serdeLib = Class.forName(serDeInfo.getSerializationLib()).newInstance();
Preconditions.checkArgument(serdeLib instanceof Serializer && serdeLib instanceof Deserializer,
"Expect a SerDe lib implementing both Serializer and Deserializer, but actually got "
+ serdeLib.getClass().getName());
this.recordSerDe = (Serializer) serdeLib;
ReflectionUtils.setConf(recordSerDe, jobConf);
// TODO: support partition properties, for now assume they're same as table properties
SerDeUtils.initializeSerDe((Deserializer) recordSerDe, jobConf, tableProperties, null);
this.numNonPartitionColumns = allColumns.length - partitionColumns.length;
this.hiveConversions = new HiveObjectConversion[numNonPartitionColumns];
List<ObjectInspector> objectInspectors = new ArrayList<>(hiveConversions.length);
for (int i = 0; i < numNonPartitionColumns; i++) {
ObjectInspector objectInspector = HiveInspectors.getObjectInspector(allTypes[i]);
objectInspectors.add(objectInspector);
hiveConversions[i] = HiveInspectors.getConversion(objectInspector, allTypes[i].getLogicalType(), hiveShim);
}
this.rowObjectInspector = ObjectInspectorFactory.getStandardStructObjectInspector(
Arrays.asList(allColumns).subList(0, numNonPartitionColumns),
objectInspectors);
}
@Override
public HiveOutputFormat createOutputFormat(Path outPath) {
/**
* Create a {@link RecordWriter} from path.
*/
public RecordWriter createRecordWriter(Path path) {
try {
if (!inited) {
init();
inited = true;
}
checkInitialize();
JobConf conf = new JobConf(confWrapper.conf());
if (isCompressed) {
......@@ -167,56 +148,82 @@ public class HiveOutputFormatFactory implements OutputFormatFactory<Row> {
}
}
RecordWriter recordWriter = hiveShim.getHiveRecordWriter(
return hiveShim.getHiveRecordWriter(
conf,
hiveOutputFormatClz,
recordSerDe.getSerializedClass(),
isCompressed,
tableProperties,
HadoopFileSystem.toHadoopPath(outPath));
return new HiveOutputFormat(recordWriter);
path);
} catch (Exception e) {
throw new FlinkHiveException(e);
}
}
private class HiveOutputFormat implements org.apache.flink.api.common.io.OutputFormat<Row> {
private final RecordWriter recordWriter;
public JobConf getJobConf() {
return confWrapper.conf();
}
private HiveOutputFormat(RecordWriter recordWriter) {
this.recordWriter = recordWriter;
private void checkInitialize() throws Exception {
if (initialized) {
return;
}
// converts a Row to a list of Hive objects so that Hive can serialize it
private Object getConvertedRow(Row record) {
List<Object> res = new ArrayList<>(numNonPartitionColumns);
for (int i = 0; i < numNonPartitionColumns; i++) {
res.add(hiveConversions[i].toHiveObject(record.getField(i)));
}
return res;
}
JobConf jobConf = confWrapper.conf();
Object serdeLib = Class.forName(serDeInfo.getSerializationLib()).newInstance();
Preconditions.checkArgument(serdeLib instanceof Serializer && serdeLib instanceof Deserializer,
"Expect a SerDe lib implementing both Serializer and Deserializer, but actually got "
+ serdeLib.getClass().getName());
this.recordSerDe = (Serializer) serdeLib;
ReflectionUtils.setConf(recordSerDe, jobConf);
@Override
public void configure(Configuration parameters) {
}
// TODO: support partition properties, for now assume they're same as table properties
SerDeUtils.initializeSerDe((Deserializer) recordSerDe, jobConf, tableProperties, null);
@Override
public void open(int taskNumber, int numTasks) throws IOException {
this.formatFields = allColumns.length - partitionColumns.length;
this.hiveConversions = new HiveObjectConversion[formatFields];
this.converters = new DataFormatConverter[formatFields];
List<ObjectInspector> objectInspectors = new ArrayList<>(hiveConversions.length);
for (int i = 0; i < formatFields; i++) {
DataType type = allTypes[i];
ObjectInspector objectInspector = HiveInspectors.getObjectInspector(type);
objectInspectors.add(objectInspector);
hiveConversions[i] = HiveInspectors.getConversion(
objectInspector, type.getLogicalType(), hiveShim);
converters[i] = DataFormatConverters.getConverterForDataType(type);
}
@Override
public void writeRecord(Row record) throws IOException {
try {
recordWriter.write(recordSerDe.serialize(getConvertedRow(record), rowObjectInspector));
} catch (SerDeException e) {
throw new IOException(e);
this.formatInspector = ObjectInspectorFactory.getStandardStructObjectInspector(
Arrays.asList(allColumns).subList(0, formatFields),
objectInspectors);
this.initialized = true;
}
public Function<Row, Writable> createRowConverter() {
return row -> {
List<Object> fields = new ArrayList<>(formatFields);
for (int i = 0; i < formatFields; i++) {
fields.add(hiveConversions[i].toHiveObject(row.getField(i)));
}
}
return serialize(fields);
};
}
public Function<RowData, Writable> createRowDataConverter() {
return row -> {
List<Object> fields = new ArrayList<>(formatFields);
for (int i = 0; i < formatFields; i++) {
fields.add(hiveConversions[i].toHiveObject(converters[i].toExternal(row, i)));
}
return serialize(fields);
};
}
@Override
public void close() throws IOException {
recordWriter.close(false);
private Writable serialize(List<Object> fields) {
try {
return recordSerDe.serialize(fields, formatInspector);
} catch (SerDeException e) {
throw new FlinkHiveException(e);
}
}
}
......@@ -18,8 +18,11 @@
package org.apache.flink.table.catalog.hive.client;
import org.apache.flink.api.common.serialization.BulkWriter;
import org.apache.flink.table.api.constraints.UniqueConstraint;
import org.apache.flink.table.catalog.stats.CatalogColumnStatisticsDataDate;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
......@@ -205,4 +208,10 @@ public interface HiveShim extends Serializable {
*/
void createTableWithConstraints(IMetaStoreClient client, Table table, Configuration conf,
UniqueConstraint pk, List<Byte> pkTraits, List<String> notNullCols, List<Byte> nnTraits);
/**
* Create orc {@link BulkWriter.Factory} for different hive versions.
*/
BulkWriter.Factory<RowData> createOrcBulkWriterFactory(
Configuration conf, String schema, LogicalType[] fieldTypes);
}
......@@ -18,10 +18,14 @@
package org.apache.flink.table.catalog.hive.client;
import org.apache.flink.api.common.serialization.BulkWriter;
import org.apache.flink.connectors.hive.FlinkHiveException;
import org.apache.flink.orc.nohive.OrcNoHiveBulkWriterFactory;
import org.apache.flink.table.api.constraints.UniqueConstraint;
import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.flink.table.catalog.stats.CatalogColumnStatisticsDataDate;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.util.Preconditions;
import org.apache.hadoop.conf.Configuration;
......@@ -308,6 +312,12 @@ public class HiveShimV100 implements HiveShim {
throw new UnsupportedOperationException("Table constraints not supported until 2.1.0");
}
@Override
public BulkWriter.Factory<RowData> createOrcBulkWriterFactory(
Configuration conf, String schema, LogicalType[] fieldTypes) {
return new OrcNoHiveBulkWriterFactory(conf, schema, fieldTypes);
}
Optional<Writable> javaToWritable(@Nonnull Object value) {
Writable writable = null;
// in case value is already a Writable
......
......@@ -18,14 +18,21 @@
package org.apache.flink.table.catalog.hive.client;
import org.apache.flink.api.common.serialization.BulkWriter;
import org.apache.flink.orc.vector.RowDataVectorizer;
import org.apache.flink.orc.writer.OrcBulkWriterFactory;
import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.metastore.RetryingMetaStoreClient;
import java.lang.reflect.Method;
import java.util.Properties;
/**
* Shim for Hive version 2.0.0.
......@@ -47,4 +54,13 @@ public class HiveShimV200 extends HiveShimV122 {
}
}
@Override
public BulkWriter.Factory<RowData> createOrcBulkWriterFactory(
Configuration conf, String schema, LogicalType[] fieldTypes) {
return new OrcBulkWriterFactory<>(
new RowDataVectorizer(schema, fieldTypes),
new Properties(),
conf);
}
}
......@@ -18,6 +18,8 @@
package org.apache.flink.connectors.hive;
import org.apache.flink.connectors.hive.write.HiveOutputFormatFactory;
import org.apache.flink.connectors.hive.write.HiveWriterFactory;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.catalog.hive.client.HiveShimLoader;
......@@ -52,7 +54,7 @@ public class HiveOutputFormatFactoryTest {
public void testCreateOutputFormat() {
TableSchema schema = TableSchema.builder().field("x", DataTypes.INT()).build();
SerDeInfo serDeInfo = new SerDeInfo("name", LazySimpleSerDe.class.getName(), Collections.emptyMap());
HiveOutputFormatFactory factory = new HiveOutputFormatFactory(
HiveWriterFactory writerFactory = new HiveWriterFactory(
new JobConf(),
VerifyURIOutputFormat.class,
serDeInfo, schema,
......@@ -60,6 +62,7 @@ public class HiveOutputFormatFactoryTest {
new Properties(),
HiveShimLoader.loadHiveShim(HiveShimLoader.getHiveVersion()),
false);
HiveOutputFormatFactory factory = new HiveOutputFormatFactory(writerFactory);
org.apache.flink.core.fs.Path path = new org.apache.flink.core.fs.Path(TEST_URI_SCHEME, TEST_URI_AUTHORITY, "/foo/path");
factory.createOutputFormat(path);
}
......
......@@ -21,12 +21,18 @@ package org.apache.flink.connectors.hive;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.io.CollectionInputFormat;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.util.FiniteTestSource;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.SqlDialect;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.CatalogTableImpl;
import org.apache.flink.table.catalog.ObjectPath;
......@@ -43,11 +49,14 @@ import com.klarna.hiverunner.HiveShell;
import com.klarna.hiverunner.annotations.HiveSQL;
import org.apache.hadoop.hive.conf.HiveConf;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
......@@ -56,7 +65,13 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Consumer;
import static org.apache.flink.table.api.Expressions.$;
import static org.apache.flink.table.filesystem.FileSystemOptions.PARTITION_TIME_EXTRACTOR_TIMESTAMP_PATTERN;
import static org.apache.flink.table.filesystem.FileSystemOptions.SINK_PARTITION_COMMIT_DELAY;
import static org.apache.flink.table.filesystem.FileSystemOptions.SINK_PARTITION_COMMIT_POLICY_KIND;
import static org.apache.flink.table.filesystem.FileSystemOptions.SINK_PARTITION_COMMIT_SUCCESS_FILE_NAME;
import static org.junit.Assert.assertEquals;
/**
......@@ -213,6 +228,95 @@ public class HiveTableSinkTest {
}
}
@Test(timeout = 120000)
public void testPartStreamingWrite() throws Exception {
testStreamingWrite(true, (path) -> {
File basePath = new File(path, "d=2020-05-03");
Assert.assertEquals(5, basePath.list().length);
Assert.assertTrue(new File(new File(basePath, "e=7"), "_MY_SUCCESS").exists());
Assert.assertTrue(new File(new File(basePath, "e=8"), "_MY_SUCCESS").exists());
Assert.assertTrue(new File(new File(basePath, "e=9"), "_MY_SUCCESS").exists());
Assert.assertTrue(new File(new File(basePath, "e=10"), "_MY_SUCCESS").exists());
Assert.assertTrue(new File(new File(basePath, "e=11"), "_MY_SUCCESS").exists());
});
}
@Test(timeout = 120000)
public void testNonPartStreamingWrite() throws Exception {
testStreamingWrite(false, (p) -> {});
}
private void testStreamingWrite(boolean part, Consumer<String> pathConsumer) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.enableCheckpointing(100);
StreamTableEnvironment tEnv = HiveTestUtils.createTableEnvWithBlinkPlannerStreamMode(env);
tEnv.registerCatalog(hiveCatalog.getName(), hiveCatalog);
tEnv.useCatalog(hiveCatalog.getName());
tEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
try {
tEnv.executeSql("create database db1");
tEnv.useDatabase("db1");
// prepare source
List<Row> data = Arrays.asList(
Row.of(1, "a", "b", "2020-05-03", "7"),
Row.of(2, "p", "q", "2020-05-03", "8"),
Row.of(3, "x", "y", "2020-05-03", "9"),
Row.of(4, "x", "y", "2020-05-03", "10"),
Row.of(5, "x", "y", "2020-05-03", "11"));
DataStream<Row> stream = env.addSource(
new FiniteTestSource<>(data),
new RowTypeInfo(Types.INT, Types.STRING, Types.STRING, Types.STRING, Types.STRING));
tEnv.createTemporaryView("my_table", stream, $("a"), $("b"), $("c"), $("d"), $("e"));
// DDL
tEnv.executeSql("create external table sink_table (a int,b string,c string" +
(part ? "" : ",d string,e string") +
") " +
(part ? "partitioned by (d string,e string) " : "") +
" stored as parquet TBLPROPERTIES (" +
"'" + PARTITION_TIME_EXTRACTOR_TIMESTAMP_PATTERN.key() + "'='$d $e:00:00'," +
"'" + SINK_PARTITION_COMMIT_DELAY.key() + "'='1h'," +
"'" + SINK_PARTITION_COMMIT_POLICY_KIND.key() + "'='metastore,success-file'," +
"'" + SINK_PARTITION_COMMIT_SUCCESS_FILE_NAME.key() + "'='_MY_SUCCESS'" +
")");
TableEnvUtil.execInsertTableAndWaitResult(
tEnv.sqlQuery("select * from my_table"),
"sink_table");
// using batch table env to query.
List<String> results = new ArrayList<>();
TableEnvironment batchTEnv = HiveTestUtils.createTableEnvWithBlinkPlannerBatchMode();
batchTEnv.registerCatalog(hiveCatalog.getName(), hiveCatalog);
batchTEnv.useCatalog(hiveCatalog.getName());
batchTEnv.executeSql("select * from db1.sink_table").collect()
.forEachRemaining(r -> results.add(r.toString()));
results.sort(String::compareTo);
Assert.assertEquals(
Arrays.asList(
"1,a,b,2020-05-03,7",
"1,a,b,2020-05-03,7",
"2,p,q,2020-05-03,8",
"2,p,q,2020-05-03,8",
"3,x,y,2020-05-03,9",
"3,x,y,2020-05-03,9",
"4,x,y,2020-05-03,10",
"4,x,y,2020-05-03,10",
"5,x,y,2020-05-03,11",
"5,x,y,2020-05-03,11"),
results);
pathConsumer.accept(URI.create(hiveCatalog.getHiveTable(
ObjectPath.fromString("db1.sink_table")).getSd().getLocation()).getPath());
} finally {
tEnv.executeSql("drop database db1 cascade");
}
}
private RowTypeInfo createHiveDestTable(String dbName, String tblName, TableSchema tableSchema, int numPartCols) throws Exception {
CatalogTable catalogTable = createHiveCatalogTable(tableSchema, numPartCols);
hiveCatalog.createTable(new ObjectPath(dbName, tblName), catalogTable, false);
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.orc.nohive;
import org.apache.flink.api.common.serialization.BulkWriter;
import org.apache.flink.core.fs.FSDataOutputStream;
import org.apache.flink.orc.writer.PhysicalWriterImpl;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.DecimalType;
import org.apache.flink.table.types.logical.LocalZonedTimestampType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.TimestampType;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.orc.OrcFile;
import org.apache.orc.TypeDescription;
import org.apache.orc.impl.WriterImpl;
import org.apache.orc.storage.common.type.HiveDecimal;
import org.apache.orc.storage.ql.exec.vector.BytesColumnVector;
import org.apache.orc.storage.ql.exec.vector.ColumnVector;
import org.apache.orc.storage.ql.exec.vector.DecimalColumnVector;
import org.apache.orc.storage.ql.exec.vector.DoubleColumnVector;
import org.apache.orc.storage.ql.exec.vector.LongColumnVector;
import org.apache.orc.storage.ql.exec.vector.TimestampColumnVector;
import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch;
import java.io.IOException;
import java.sql.Timestamp;
import java.util.Properties;
/**
* A {@link BulkWriter.Factory} from orc no-hive version.
*/
public class OrcNoHiveBulkWriterFactory implements BulkWriter.Factory<RowData> {
private final Configuration conf;
private final String schema;
private final LogicalType[] fieldTypes;
public OrcNoHiveBulkWriterFactory(Configuration conf, String schema, LogicalType[] fieldTypes) {
this.conf = conf;
this.schema = schema;
this.fieldTypes = fieldTypes;
}
@Override
public BulkWriter<RowData> create(FSDataOutputStream out) throws IOException {
OrcFile.WriterOptions opts = OrcFile.writerOptions(new Properties(), conf);
TypeDescription description = TypeDescription.fromString(schema);
opts.setSchema(description);
opts.physicalWriter(new PhysicalWriterImpl(out, opts));
WriterImpl writer = new WriterImpl(null, new Path("."), opts);
VectorizedRowBatch rowBatch = description.createRowBatch();
return new BulkWriter<RowData>() {
@Override
public void addElement(RowData row) throws IOException {
int rowId = rowBatch.size++;
for (int i = 0; i < row.getArity(); ++i) {
setColumn(rowId, rowBatch.cols[i], fieldTypes[i], row, i);
}
if (rowBatch.size == rowBatch.getMaxSize()) {
writer.addRowBatch(rowBatch);
rowBatch.reset();
}
}
@Override
public void flush() throws IOException {
if (rowBatch.size != 0) {
writer.addRowBatch(rowBatch);
rowBatch.reset();
}
}
@Override
public void finish() throws IOException {
flush();
writer.close();
}
};
}
private static void setColumn(
int rowId, ColumnVector column, LogicalType type, RowData row, int columnId) {
if (row.isNullAt(columnId)) {
column.noNulls = false;
column.isNull[rowId] = true;
return;
}
switch (type.getTypeRoot()) {
case CHAR:
case VARCHAR: {
BytesColumnVector vector = (BytesColumnVector) column;
byte[] bytes = row.getString(columnId).toBytes();
vector.setVal(rowId, bytes, 0, bytes.length);
break;
}
case BOOLEAN: {
LongColumnVector vector = (LongColumnVector) column;
vector.vector[rowId] =
row.getBoolean(columnId) ? 1 : 0;
break;
}
case BINARY:
case VARBINARY: {
BytesColumnVector vector = (BytesColumnVector) column;
byte[] bytes = row.getBinary(columnId);
vector.setVal(rowId, bytes, 0, bytes.length);
break;
}
case DECIMAL: {
DecimalType dt = (DecimalType) type;
DecimalColumnVector vector = (DecimalColumnVector) column;
vector.set(rowId, HiveDecimal.create(
row.getDecimal(columnId, dt.getPrecision(), dt.getScale()).toBigDecimal()));
break;
}
case TINYINT: {
LongColumnVector vector = (LongColumnVector) column;
vector.vector[rowId] = row.getByte(columnId);
break;
}
case SMALLINT: {
LongColumnVector vector = (LongColumnVector) column;
vector.vector[rowId] = row.getShort(columnId);
break;
}
case DATE:
case TIME_WITHOUT_TIME_ZONE:
case INTEGER: {
LongColumnVector vector = (LongColumnVector) column;
vector.vector[rowId] = row.getInt(columnId);
break;
}
case BIGINT: {
LongColumnVector vector = (LongColumnVector) column;
vector.vector[rowId] = row.getLong(columnId);
break;
}
case FLOAT: {
DoubleColumnVector vector = (DoubleColumnVector) column;
vector.vector[rowId] = row.getFloat(columnId);
break;
}
case DOUBLE: {
DoubleColumnVector vector = (DoubleColumnVector) column;
vector.vector[rowId] = row.getDouble(columnId);
break;
}
case TIMESTAMP_WITHOUT_TIME_ZONE: {
TimestampType tt = (TimestampType) type;
Timestamp timestamp = row.getTimestamp(columnId, tt.getPrecision()).toTimestamp();
TimestampColumnVector vector = (TimestampColumnVector) column;
vector.set(rowId, timestamp);
break;
}
case TIMESTAMP_WITH_LOCAL_TIME_ZONE: {
LocalZonedTimestampType lt = (LocalZonedTimestampType) type;
Timestamp timestamp = row.getTimestamp(columnId, lt.getPrecision()).toTimestamp();
TimestampColumnVector vector = (TimestampColumnVector) column;
vector.set(rowId, timestamp);
break;
}
default:
throw new UnsupportedOperationException("Unsupported type: " + type);
}
}
}
......@@ -134,7 +134,7 @@ public class OrcSplitReaderUtil {
/**
* See {@code org.apache.flink.table.catalog.hive.util.HiveTypeUtil}.
*/
static TypeDescription logicalTypeToOrcType(LogicalType type) {
public static TypeDescription logicalTypeToOrcType(LogicalType type) {
type = type.copy(true);
switch (type.getTypeRoot()) {
case CHAR:
......
......@@ -80,7 +80,7 @@ public class PhysicalWriterImpl implements PhysicalWriter {
private int metadataLength;
private int footerLength;
PhysicalWriterImpl(FSDataOutputStream out, OrcFile.WriterOptions opts) throws IOException {
public PhysicalWriterImpl(FSDataOutputStream out, OrcFile.WriterOptions opts) throws IOException {
if (opts.isEnforceBufferSize()) {
this.bufferSize = opts.getBufferSize();
} else {
......
......@@ -399,11 +399,11 @@ public class FileSystemTableSink implements
/**
* Table bucket assigner, wrap {@link PartitionComputer}.
*/
private static class TableBucketAssigner implements BucketAssigner<RowData, String> {
public static class TableBucketAssigner implements BucketAssigner<RowData, String> {
private final PartitionComputer<RowData> computer;
private TableBucketAssigner(PartitionComputer<RowData> computer) {
public TableBucketAssigner(PartitionComputer<RowData> computer) {
this.computer = computer;
}
......@@ -426,13 +426,13 @@ public class FileSystemTableSink implements
/**
* Table {@link RollingPolicy}, it extends {@link CheckpointRollingPolicy} for bulk writers.
*/
private static class TableRollingPolicy extends CheckpointRollingPolicy<RowData, String> {
public static class TableRollingPolicy extends CheckpointRollingPolicy<RowData, String> {
private final boolean rollOnCheckpoint;
private final long rollingFileSize;
private final long rollingTimeInterval;
private TableRollingPolicy(
public TableRollingPolicy(
boolean rollOnCheckpoint,
long rollingFileSize,
long rollingTimeInterval) {
......@@ -483,12 +483,15 @@ public class FileSystemTableSink implements
}
}
private static class ProjectionBulkFactory implements BulkWriter.Factory<RowData> {
/**
* Project row to non-partition fields.
*/
public static class ProjectionBulkFactory implements BulkWriter.Factory<RowData> {
private final BulkWriter.Factory<RowData> factory;
private final RowDataPartitionComputer computer;
private ProjectionBulkFactory(BulkWriter.Factory<RowData> factory, RowDataPartitionComputer computer) {
public ProjectionBulkFactory(BulkWriter.Factory<RowData> factory, RowDataPartitionComputer computer) {
this.factory = factory;
this.computer = computer;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册