提交 547c168a 编写于 作者: J JingsongLi

[FLINK-14255][hive] Integrate hive to streaming file sink

This closes #12168
上级 f850ec78
......@@ -164,6 +164,12 @@ under the License.
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-hadoop-bulk_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>
<!-- format dependencies -->
<dependency>
......@@ -890,6 +896,7 @@ under the License.
<include>org.apache.flink:flink-orc-nohive_${scala.binary.version}</include>
<include>org.apache.flink:flink-hadoop-compatibility_${scala.binary.version}</include>
<include>org.apache.flink:flink-parquet_${scala.binary.version}</include>
<include>org.apache.flink:flink-hadoop-bulk_${scala.binary.version}</include>
<include>org.apache.parquet:parquet-hadoop</include>
<include>org.apache.parquet:parquet-format</include>
<include>org.apache.parquet:parquet-column</include>
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.connectors.hive;
import org.apache.flink.table.catalog.hive.client.HiveShim;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.util.DataFormatConverters;
import org.apache.flink.table.filesystem.RowDataPartitionComputer;
import org.apache.flink.table.functions.hive.conversion.HiveInspectors;
import org.apache.flink.table.functions.hive.conversion.HiveObjectConversion;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.utils.TypeConversions;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import java.util.Arrays;
import java.util.LinkedHashMap;
/**
* A {@link RowDataPartitionComputer} that converts Flink objects to Hive objects before computing the partition value strings.
*/
public class HiveRowDataPartitionComputer extends RowDataPartitionComputer {
private final DataFormatConverters.DataFormatConverter[] partitionConverters;
private final HiveObjectConversion[] hiveObjectConversions;
public HiveRowDataPartitionComputer(
HiveShim hiveShim,
String defaultPartValue,
String[] columnNames,
DataType[] columnTypes,
String[] partitionColumns) {
super(defaultPartValue, columnNames, columnTypes, partitionColumns);
this.partitionConverters = Arrays.stream(partitionTypes)
.map(TypeConversions::fromLogicalToDataType)
.map(DataFormatConverters::getConverterForDataType)
.toArray(DataFormatConverters.DataFormatConverter[]::new);
this.hiveObjectConversions = new HiveObjectConversion[partitionIndexes.length];
for (int i = 0; i < hiveObjectConversions.length; i++) {
DataType partColType = columnTypes[partitionIndexes[i]];
ObjectInspector objectInspector = HiveInspectors.getObjectInspector(partColType);
hiveObjectConversions[i] = HiveInspectors.getConversion(objectInspector, partColType.getLogicalType(), hiveShim);
}
}
@Override
public LinkedHashMap<String, String> generatePartValues(RowData in) {
LinkedHashMap<String, String> partSpec = new LinkedHashMap<>();
for (int i = 0; i < partitionIndexes.length; i++) {
Object field = partitionConverters[i].toExternal(in, partitionIndexes[i]);
String partitionValue = field != null ? hiveObjectConversions[i].toHiveObject(field).toString() : null;
if (StringUtils.isEmpty(partitionValue)) {
partitionValue = defaultPartValue;
}
partSpec.put(partitionColumns[i], partitionValue);
}
return partSpec;
}
}
......@@ -20,13 +20,11 @@ package org.apache.flink.connectors.hive;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.CatalogTableImpl;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.config.CatalogConfig;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.factories.TableFactoryUtil;
import org.apache.flink.table.factories.TableSinkFactory;
import org.apache.flink.table.factories.TableSourceFactory;
import org.apache.flink.table.sinks.OutputFormatTableSink;
import org.apache.flink.table.sinks.TableSink;
import org.apache.flink.table.sources.TableSource;
import org.apache.flink.types.Row;
......@@ -88,16 +86,13 @@ public class HiveTableFactory
boolean isGeneric = Boolean.parseBoolean(table.getProperties().get(CatalogConfig.IS_GENERIC));
if (!isGeneric) {
return createOutputFormatTableSink(context.getObjectIdentifier().toObjectPath(), table);
return new HiveTableSink(
context.isBounded(),
new JobConf(hiveConf),
context.getObjectIdentifier(),
table);
} else {
return TableFactoryUtil.findAndCreateTableSink(context);
}
}
/**
* Creates and configures a {@link org.apache.flink.table.sinks.OutputFormatTableSink} using the given {@link CatalogTable}.
*/
private OutputFormatTableSink<Row> createOutputFormatTableSink(ObjectPath tablePath, CatalogTable table) {
return new HiveTableSink(new JobConf(hiveConf), tablePath, table);
}
}
......@@ -18,12 +18,17 @@
package org.apache.flink.connectors.hive;
import org.apache.flink.api.common.io.OutputFormat;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.connectors.hive.write.HiveBulkWriterFactory;
import org.apache.flink.connectors.hive.write.HiveOutputFormatFactory;
import org.apache.flink.connectors.hive.write.HiveWriterFactory;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.functions.sink.filesystem.HadoopPathBasedBulkFormatBuilder;
import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.flink.table.catalog.hive.client.HiveMetastoreClientFactory;
import org.apache.flink.table.catalog.hive.client.HiveMetastoreClientWrapper;
......@@ -31,8 +36,13 @@ import org.apache.flink.table.catalog.hive.client.HiveShim;
import org.apache.flink.table.catalog.hive.client.HiveShimLoader;
import org.apache.flink.table.catalog.hive.descriptors.HiveCatalogValidator;
import org.apache.flink.table.catalog.hive.util.HiveReflectionUtils;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.filesystem.FileSystemOutputFormat;
import org.apache.flink.table.sinks.OutputFormatTableSink;
import org.apache.flink.table.filesystem.FileSystemTableSink;
import org.apache.flink.table.filesystem.FileSystemTableSink.TableBucketAssigner;
import org.apache.flink.table.filesystem.FileSystemTableSink.TableRollingPolicy;
import org.apache.flink.table.filesystem.stream.InactiveBucketListener;
import org.apache.flink.table.sinks.AppendStreamTableSink;
import org.apache.flink.table.sinks.OverwritableTableSink;
import org.apache.flink.table.sinks.PartitionableTableSink;
import org.apache.flink.table.sinks.TableSink;
......@@ -58,14 +68,18 @@ import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import static org.apache.flink.table.filesystem.FileSystemTableFactory.SINK_ROLLING_POLICY_FILE_SIZE;
import static org.apache.flink.table.filesystem.FileSystemTableFactory.SINK_ROLLING_POLICY_TIME_INTERVAL;
/**
* Table sink to write to Hive tables.
*/
public class HiveTableSink extends OutputFormatTableSink<Row> implements PartitionableTableSink, OverwritableTableSink {
public class HiveTableSink implements AppendStreamTableSink, PartitionableTableSink, OverwritableTableSink {
private final boolean isBounded;
private final JobConf jobConf;
private final CatalogTable catalogTable;
private final ObjectPath tablePath;
private final ObjectIdentifier identifier;
private final TableSchema tableSchema;
private final String hiveVersion;
private final HiveShim hiveShim;
......@@ -75,9 +89,10 @@ public class HiveTableSink extends OutputFormatTableSink<Row> implements Partiti
private boolean overwrite = false;
private boolean dynamicGrouping = false;
public HiveTableSink(JobConf jobConf, ObjectPath tablePath, CatalogTable table) {
public HiveTableSink(boolean isBounded, JobConf jobConf, ObjectIdentifier identifier, CatalogTable table) {
this.isBounded = isBounded;
this.jobConf = jobConf;
this.tablePath = tablePath;
this.identifier = identifier;
this.catalogTable = table;
hiveVersion = Preconditions.checkNotNull(jobConf.get(HiveCatalogValidator.CATALOG_HIVE_VERSION),
"Hive version is not defined");
......@@ -86,31 +101,21 @@ public class HiveTableSink extends OutputFormatTableSink<Row> implements Partiti
}
@Override
public OutputFormat<Row> getOutputFormat() {
String[] partitionColumns = getPartitionFieldNames().toArray(new String[0]);
String dbName = tablePath.getDatabaseName();
String tableName = tablePath.getObjectName();
public final DataStreamSink consumeDataStream(DataStream dataStream) {
String[] partitionColumns = getPartitionKeys().toArray(new String[0]);
String dbName = identifier.getDatabaseName();
String tableName = identifier.getObjectName();
try (HiveMetastoreClientWrapper client = HiveMetastoreClientFactory.create(
new HiveConf(jobConf, HiveConf.class), hiveVersion)) {
Table table = client.getTable(dbName, tableName);
StorageDescriptor sd = table.getSd();
HiveTableMetaStoreFactory msFactory = new HiveTableMetaStoreFactory(
jobConf, hiveVersion, dbName, tableName);
FileSystemOutputFormat.Builder<Row> builder = new FileSystemOutputFormat.Builder<>();
builder.setPartitionComputer(new HivePartitionComputer(
hiveShim,
jobConf.get(
HiveConf.ConfVars.DEFAULTPARTITIONNAME.varname,
HiveConf.ConfVars.DEFAULTPARTITIONNAME.defaultStrVal),
tableSchema.getFieldNames(),
tableSchema.getFieldDataTypes(),
partitionColumns));
builder.setDynamicGrouped(dynamicGrouping);
builder.setPartitionColumns(partitionColumns);
builder.setFileSystemFactory(new HadoopFileSystemFactory(jobConf));
Class hiveOutputFormatClz = hiveShim.getHiveOutputFormatClass(
Class.forName(sd.getOutputFormat()));
boolean isCompressed = jobConf.getBoolean(HiveConf.ConfVars.COMPRESSRESULT.varname, false);
Class hiveOutputFormatClz = hiveShim.getHiveOutputFormatClass(Class.forName(sd.getOutputFormat()));
builder.setFormatFactory(new HiveOutputFormatFactory(
HiveWriterFactory recordWriterFactory = new HiveWriterFactory(
jobConf,
hiveOutputFormatClz,
sd.getSerdeInfo(),
......@@ -118,20 +123,70 @@ public class HiveTableSink extends OutputFormatTableSink<Row> implements Partiti
partitionColumns,
HiveReflectionUtils.getTableMetadata(hiveShim, table),
hiveShim,
isCompressed));
builder.setMetaStoreFactory(
new HiveTableMetaStoreFactory(jobConf, hiveVersion, dbName, tableName));
builder.setOverwrite(overwrite);
builder.setStaticPartitions(staticPartitionSpec);
builder.setTempPath(new org.apache.flink.core.fs.Path(
toStagingDir(sd.getLocation(), jobConf)));
String extension = Utilities.getFileExtension(jobConf, isCompressed,
(HiveOutputFormat<?, ?>) hiveOutputFormatClz.newInstance());
extension = extension == null ? "" : extension;
OutputFileConfig outputFileConfig = new OutputFileConfig("", extension);
builder.setOutputFileConfig(outputFileConfig);
return builder.build();
isCompressed);
if (isBounded) {
FileSystemOutputFormat.Builder<Row> builder = new FileSystemOutputFormat.Builder<>();
builder.setPartitionComputer(new HivePartitionComputer(
hiveShim,
jobConf.get(
HiveConf.ConfVars.DEFAULTPARTITIONNAME.varname,
HiveConf.ConfVars.DEFAULTPARTITIONNAME.defaultStrVal),
tableSchema.getFieldNames(),
tableSchema.getFieldDataTypes(),
partitionColumns));
builder.setDynamicGrouped(dynamicGrouping);
builder.setPartitionColumns(partitionColumns);
builder.setFileSystemFactory(new HadoopFileSystemFactory(jobConf));
builder.setFormatFactory(new HiveOutputFormatFactory(recordWriterFactory));
builder.setMetaStoreFactory(
msFactory);
builder.setOverwrite(overwrite);
builder.setStaticPartitions(staticPartitionSpec);
builder.setTempPath(new org.apache.flink.core.fs.Path(
toStagingDir(sd.getLocation(), jobConf)));
String extension = Utilities.getFileExtension(jobConf, isCompressed,
(HiveOutputFormat<?, ?>) hiveOutputFormatClz.newInstance());
extension = extension == null ? "" : extension;
OutputFileConfig outputFileConfig = new OutputFileConfig("", extension);
builder.setOutputFileConfig(outputFileConfig);
return dataStream
.writeUsingOutputFormat(builder.build())
.setParallelism(dataStream.getParallelism());
} else {
org.apache.flink.configuration.Configuration conf = new org.apache.flink.configuration.Configuration();
catalogTable.getOptions().forEach(conf::setString);
HiveRowDataPartitionComputer partComputer = new HiveRowDataPartitionComputer(
hiveShim,
jobConf.get(
HiveConf.ConfVars.DEFAULTPARTITIONNAME.varname,
HiveConf.ConfVars.DEFAULTPARTITIONNAME.defaultStrVal),
tableSchema.getFieldNames(),
tableSchema.getFieldDataTypes(),
partitionColumns);
TableBucketAssigner assigner = new TableBucketAssigner(partComputer);
TableRollingPolicy rollingPolicy = new TableRollingPolicy(
true,
conf.get(SINK_ROLLING_POLICY_FILE_SIZE),
conf.get(SINK_ROLLING_POLICY_TIME_INTERVAL));
InactiveBucketListener listener = new InactiveBucketListener();
HiveBulkWriterFactory bulkFactory = new HiveBulkWriterFactory(recordWriterFactory);
HadoopPathBasedBulkFormatBuilder<RowData, String, ?> builder =
new HadoopPathBasedBulkFormatBuilder<>(
new Path(sd.getLocation()), bulkFactory, jobConf, assigner)
.withRollingPolicy(rollingPolicy)
.withBucketLifeCycleListener(listener);
return FileSystemTableSink.createStreamingSink(
conf,
new org.apache.flink.core.fs.Path(sd.getLocation()),
getPartitionKeys(),
identifier,
overwrite,
dataStream,
builder,
listener,
msFactory);
}
} catch (TException e) {
throw new CatalogException("Failed to query Hive metaStore", e);
} catch (IOException e) {
......@@ -143,14 +198,10 @@ public class HiveTableSink extends OutputFormatTableSink<Row> implements Partiti
}
}
@Override
public TableSink<Row> configure(String[] fieldNames, TypeInformation<?>[] fieldTypes) {
return new HiveTableSink(jobConf, tablePath, catalogTable);
}
@Override
public DataType getConsumedDataType() {
return getTableSchema().toRowDataType();
DataType dataType = getTableSchema().toRowDataType();
return isBounded ? dataType : dataType.bridgedTo(RowData.class);
}
@Override
......@@ -158,6 +209,11 @@ public class HiveTableSink extends OutputFormatTableSink<Row> implements Partiti
return tableSchema;
}
@Override
public TableSink configure(String[] fieldNames, TypeInformation[] fieldTypes) {
return this;
}
@Override
public boolean configurePartitionGrouping(boolean supportsGrouping) {
this.dynamicGrouping = supportsGrouping;
......@@ -179,7 +235,7 @@ public class HiveTableSink extends OutputFormatTableSink<Row> implements Partiti
return res;
}
private List<String> getPartitionFieldNames() {
private List<String> getPartitionKeys() {
return catalogTable.getPartitionKeys();
}
......@@ -187,7 +243,7 @@ public class HiveTableSink extends OutputFormatTableSink<Row> implements Partiti
public void setStaticPartition(Map<String, String> partitionSpec) {
// make it a LinkedHashMap to maintain partition column order
staticPartitionSpec = new LinkedHashMap<>();
for (String partitionCol : getPartitionFieldNames()) {
for (String partitionCol : getPartitionKeys()) {
if (partitionSpec.containsKey(partitionCol)) {
staticPartitionSpec.put(partitionCol, partitionSpec.get(partitionCol));
}
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.connectors.hive.write;
import org.apache.flink.formats.hadoop.bulk.HadoopPathBasedBulkWriter;
import org.apache.flink.table.data.RowData;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
import org.apache.hadoop.io.Writable;
import java.io.IOException;
import java.util.function.Function;
/**
* Hive bulk writer factory for path-based bulk file writer that writes to the specific hadoop path.
*/
public class HiveBulkWriterFactory implements HadoopPathBasedBulkWriter.Factory<RowData> {
private static final long serialVersionUID = 1L;
private final HiveWriterFactory factory;
public HiveBulkWriterFactory(HiveWriterFactory factory) {
this.factory = factory;
}
@Override
public HadoopPathBasedBulkWriter<RowData> create(Path targetPath, Path inProgressPath) throws IOException {
FileSinkOperator.RecordWriter recordWriter = factory.createRecordWriter(inProgressPath);
Function<RowData, Writable> rowConverter = factory.createRowDataConverter();
FileSystem fs = FileSystem.get(inProgressPath.toUri(), factory.getJobConf());
return new HadoopPathBasedBulkWriter<RowData>() {
@Override
public long getSize() throws IOException {
return fs.getFileStatus(inProgressPath).getLen();
}
@Override
public void dispose() {
// close silently.
try {
recordWriter.close(true);
} catch (IOException ignored) {
}
}
@Override
public void addElement(RowData element) throws IOException {
recordWriter.write(rowConverter.apply(element));
}
@Override
public void flush() {
}
@Override
public void finish() throws IOException {
recordWriter.close(false);
}
};
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.connectors.hive.write;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
import org.apache.flink.table.filesystem.OutputFormatFactory;
import org.apache.flink.types.Row;
import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter;
import org.apache.hadoop.io.Writable;
import java.io.IOException;
import java.util.function.Function;
/**
* Hive {@link OutputFormatFactory}, use {@link RecordWriter} to write record.
*/
public class HiveOutputFormatFactory implements OutputFormatFactory<Row> {
private static final long serialVersionUID = 2L;
private final HiveWriterFactory factory;
public HiveOutputFormatFactory(HiveWriterFactory factory) {
this.factory = factory;
}
@Override
public HiveOutputFormat createOutputFormat(Path path) {
return new HiveOutputFormat(
factory.createRecordWriter(HadoopFileSystem.toHadoopPath(path)),
factory.createRowConverter());
}
private class HiveOutputFormat implements org.apache.flink.api.common.io.OutputFormat<Row> {
private final RecordWriter recordWriter;
private final Function<Row, Writable> rowConverter;
private HiveOutputFormat(RecordWriter recordWriter, Function<Row, Writable> rowConverter) {
this.recordWriter = recordWriter;
this.rowConverter = rowConverter;
}
@Override
public void configure(Configuration parameters) {
}
@Override
public void open(int taskNumber, int numTasks) {
}
@Override
public void writeRecord(Row record) throws IOException {
recordWriter.write(rowConverter.apply(record));
}
@Override
public void close() throws IOException {
recordWriter.close(false);
}
}
}
......@@ -16,14 +16,15 @@
* limitations under the License.
*/
package org.apache.flink.connectors.hive;
package org.apache.flink.connectors.hive.write;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
import org.apache.flink.connectors.hive.FlinkHiveException;
import org.apache.flink.connectors.hive.JobConfWrapper;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.catalog.hive.client.HiveShim;
import org.apache.flink.table.filesystem.OutputFormatFactory;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.util.DataFormatConverters;
import org.apache.flink.table.data.util.DataFormatConverters.DataFormatConverter;
import org.apache.flink.table.functions.hive.conversion.HiveInspectors;
import org.apache.flink.table.functions.hive.conversion.HiveObjectConversion;
import org.apache.flink.table.types.DataType;
......@@ -31,9 +32,11 @@ import org.apache.flink.types.Row;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.StringUtils;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.SerDeInfo;
import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter;
import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
import org.apache.hadoop.hive.serde2.Deserializer;
import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.serde2.SerDeUtils;
......@@ -42,22 +45,24 @@ import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.SequenceFileOutputFormat;
import org.apache.hadoop.util.ReflectionUtils;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import java.util.function.Function;
/**
* Hive {@link OutputFormatFactory}, use {@link RecordWriter} to write record.
* Factory for creating {@link RecordWriter} and converters for writing.
*/
public class HiveOutputFormatFactory implements OutputFormatFactory<Row> {
public class HiveWriterFactory implements Serializable {
private static final long serialVersionUID = 1L;
......@@ -79,21 +84,25 @@ public class HiveOutputFormatFactory implements OutputFormatFactory<Row> {
private final boolean isCompressed;
// number of non-partitioning columns
private transient int numNonPartitionColumns;
// SerDe in Hive-1.2.1 and Hive-2.3.4 can be of different classes, make sure to use a common base class
private transient Serializer recordSerDe;
/**
* Field number excluding partition fields.
*/
private transient int formatFields;
// to convert Flink object to Hive object
private transient HiveObjectConversion[] hiveConversions;
private transient DataFormatConverter[] converters;
//StructObjectInspector represents the hive row structure.
private transient StructObjectInspector rowObjectInspector;
private transient StructObjectInspector formatInspector;
private transient boolean inited;
private transient boolean initialized;
public HiveOutputFormatFactory(
public HiveWriterFactory(
JobConf jobConf,
Class hiveOutputFormatClz,
SerDeInfo serDeInfo,
......@@ -102,7 +111,7 @@ public class HiveOutputFormatFactory implements OutputFormatFactory<Row> {
Properties tableProperties,
HiveShim hiveShim,
boolean isCompressed) {
Preconditions.checkArgument(org.apache.hadoop.hive.ql.io.HiveOutputFormat.class.isAssignableFrom(hiveOutputFormatClz),
Preconditions.checkArgument(HiveOutputFormat.class.isAssignableFrom(hiveOutputFormatClz),
"The output format should be an instance of HiveOutputFormat");
this.confWrapper = new JobConfWrapper(jobConf);
this.hiveOutputFormatClz = hiveOutputFormatClz;
......@@ -115,40 +124,12 @@ public class HiveOutputFormatFactory implements OutputFormatFactory<Row> {
this.isCompressed = isCompressed;
}
private void init() throws Exception {
JobConf jobConf = confWrapper.conf();
Object serdeLib = Class.forName(serDeInfo.getSerializationLib()).newInstance();
Preconditions.checkArgument(serdeLib instanceof Serializer && serdeLib instanceof Deserializer,
"Expect a SerDe lib implementing both Serializer and Deserializer, but actually got "
+ serdeLib.getClass().getName());
this.recordSerDe = (Serializer) serdeLib;
ReflectionUtils.setConf(recordSerDe, jobConf);
// TODO: support partition properties, for now assume they're same as table properties
SerDeUtils.initializeSerDe((Deserializer) recordSerDe, jobConf, tableProperties, null);
this.numNonPartitionColumns = allColumns.length - partitionColumns.length;
this.hiveConversions = new HiveObjectConversion[numNonPartitionColumns];
List<ObjectInspector> objectInspectors = new ArrayList<>(hiveConversions.length);
for (int i = 0; i < numNonPartitionColumns; i++) {
ObjectInspector objectInspector = HiveInspectors.getObjectInspector(allTypes[i]);
objectInspectors.add(objectInspector);
hiveConversions[i] = HiveInspectors.getConversion(objectInspector, allTypes[i].getLogicalType(), hiveShim);
}
this.rowObjectInspector = ObjectInspectorFactory.getStandardStructObjectInspector(
Arrays.asList(allColumns).subList(0, numNonPartitionColumns),
objectInspectors);
}
@Override
public HiveOutputFormat createOutputFormat(Path outPath) {
/**
* Create a {@link RecordWriter} from path.
*/
public RecordWriter createRecordWriter(Path path) {
try {
if (!inited) {
init();
inited = true;
}
checkInitialize();
JobConf conf = new JobConf(confWrapper.conf());
if (isCompressed) {
......@@ -167,56 +148,82 @@ public class HiveOutputFormatFactory implements OutputFormatFactory<Row> {
}
}
RecordWriter recordWriter = hiveShim.getHiveRecordWriter(
return hiveShim.getHiveRecordWriter(
conf,
hiveOutputFormatClz,
recordSerDe.getSerializedClass(),
isCompressed,
tableProperties,
HadoopFileSystem.toHadoopPath(outPath));
return new HiveOutputFormat(recordWriter);
path);
} catch (Exception e) {
throw new FlinkHiveException(e);
}
}
private class HiveOutputFormat implements org.apache.flink.api.common.io.OutputFormat<Row> {
private final RecordWriter recordWriter;
public JobConf getJobConf() {
return confWrapper.conf();
}
private HiveOutputFormat(RecordWriter recordWriter) {
this.recordWriter = recordWriter;
private void checkInitialize() throws Exception {
if (initialized) {
return;
}
// converts a Row to a list of Hive objects so that Hive can serialize it
private Object getConvertedRow(Row record) {
List<Object> res = new ArrayList<>(numNonPartitionColumns);
for (int i = 0; i < numNonPartitionColumns; i++) {
res.add(hiveConversions[i].toHiveObject(record.getField(i)));
}
return res;
}
JobConf jobConf = confWrapper.conf();
Object serdeLib = Class.forName(serDeInfo.getSerializationLib()).newInstance();
Preconditions.checkArgument(serdeLib instanceof Serializer && serdeLib instanceof Deserializer,
"Expect a SerDe lib implementing both Serializer and Deserializer, but actually got "
+ serdeLib.getClass().getName());
this.recordSerDe = (Serializer) serdeLib;
ReflectionUtils.setConf(recordSerDe, jobConf);
@Override
public void configure(Configuration parameters) {
}
// TODO: support partition properties, for now assume they're same as table properties
SerDeUtils.initializeSerDe((Deserializer) recordSerDe, jobConf, tableProperties, null);
@Override
public void open(int taskNumber, int numTasks) throws IOException {
this.formatFields = allColumns.length - partitionColumns.length;
this.hiveConversions = new HiveObjectConversion[formatFields];
this.converters = new DataFormatConverter[formatFields];
List<ObjectInspector> objectInspectors = new ArrayList<>(hiveConversions.length);
for (int i = 0; i < formatFields; i++) {
DataType type = allTypes[i];
ObjectInspector objectInspector = HiveInspectors.getObjectInspector(type);
objectInspectors.add(objectInspector);
hiveConversions[i] = HiveInspectors.getConversion(
objectInspector, type.getLogicalType(), hiveShim);
converters[i] = DataFormatConverters.getConverterForDataType(type);
}
@Override
public void writeRecord(Row record) throws IOException {
try {
recordWriter.write(recordSerDe.serialize(getConvertedRow(record), rowObjectInspector));
} catch (SerDeException e) {
throw new IOException(e);
this.formatInspector = ObjectInspectorFactory.getStandardStructObjectInspector(
Arrays.asList(allColumns).subList(0, formatFields),
objectInspectors);
this.initialized = true;
}
public Function<Row, Writable> createRowConverter() {
return row -> {
List<Object> fields = new ArrayList<>(formatFields);
for (int i = 0; i < formatFields; i++) {
fields.add(hiveConversions[i].toHiveObject(row.getField(i)));
}
}
return serialize(fields);
};
}
public Function<RowData, Writable> createRowDataConverter() {
return row -> {
List<Object> fields = new ArrayList<>(formatFields);
for (int i = 0; i < formatFields; i++) {
fields.add(hiveConversions[i].toHiveObject(converters[i].toExternal(row, i)));
}
return serialize(fields);
};
}
@Override
public void close() throws IOException {
recordWriter.close(false);
private Writable serialize(List<Object> fields) {
try {
return recordSerDe.serialize(fields, formatInspector);
} catch (SerDeException e) {
throw new FlinkHiveException(e);
}
}
}
......@@ -18,6 +18,8 @@
package org.apache.flink.connectors.hive;
import org.apache.flink.connectors.hive.write.HiveOutputFormatFactory;
import org.apache.flink.connectors.hive.write.HiveWriterFactory;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.catalog.hive.client.HiveShimLoader;
......@@ -52,7 +54,7 @@ public class HiveOutputFormatFactoryTest {
public void testCreateOutputFormat() {
TableSchema schema = TableSchema.builder().field("x", DataTypes.INT()).build();
SerDeInfo serDeInfo = new SerDeInfo("name", LazySimpleSerDe.class.getName(), Collections.emptyMap());
HiveOutputFormatFactory factory = new HiveOutputFormatFactory(
HiveWriterFactory writerFactory = new HiveWriterFactory(
new JobConf(),
VerifyURIOutputFormat.class,
serDeInfo, schema,
......@@ -60,6 +62,7 @@ public class HiveOutputFormatFactoryTest {
new Properties(),
HiveShimLoader.loadHiveShim(HiveShimLoader.getHiveVersion()),
false);
HiveOutputFormatFactory factory = new HiveOutputFormatFactory(writerFactory);
org.apache.flink.core.fs.Path path = new org.apache.flink.core.fs.Path(TEST_URI_SCHEME, TEST_URI_AUTHORITY, "/foo/path");
factory.createOutputFormat(path);
}
......
......@@ -21,12 +21,18 @@ package org.apache.flink.connectors.hive;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.io.CollectionInputFormat;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.util.FiniteTestSource;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.SqlDialect;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.CatalogTableImpl;
import org.apache.flink.table.catalog.ObjectPath;
......@@ -43,11 +49,14 @@ import com.klarna.hiverunner.HiveShell;
import com.klarna.hiverunner.annotations.HiveSQL;
import org.apache.hadoop.hive.conf.HiveConf;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
......@@ -56,7 +65,13 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Consumer;
import static org.apache.flink.table.api.Expressions.$;
import static org.apache.flink.table.filesystem.FileSystemOptions.PARTITION_TIME_EXTRACTOR_TIMESTAMP_PATTERN;
import static org.apache.flink.table.filesystem.FileSystemOptions.SINK_PARTITION_COMMIT_DELAY;
import static org.apache.flink.table.filesystem.FileSystemOptions.SINK_PARTITION_COMMIT_POLICY_KIND;
import static org.apache.flink.table.filesystem.FileSystemOptions.SINK_PARTITION_COMMIT_SUCCESS_FILE_NAME;
import static org.junit.Assert.assertEquals;
/**
......@@ -213,6 +228,95 @@ public class HiveTableSinkTest {
}
}
@Test(timeout = 120000)
public void testPartStreamingWrite() throws Exception {
testStreamingWrite(true, (path) -> {
File basePath = new File(path, "d=2020-05-03");
Assert.assertEquals(5, basePath.list().length);
Assert.assertTrue(new File(new File(basePath, "e=7"), "_MY_SUCCESS").exists());
Assert.assertTrue(new File(new File(basePath, "e=8"), "_MY_SUCCESS").exists());
Assert.assertTrue(new File(new File(basePath, "e=9"), "_MY_SUCCESS").exists());
Assert.assertTrue(new File(new File(basePath, "e=10"), "_MY_SUCCESS").exists());
Assert.assertTrue(new File(new File(basePath, "e=11"), "_MY_SUCCESS").exists());
});
}
@Test(timeout = 120000)
public void testNonPartStreamingWrite() throws Exception {
testStreamingWrite(false, (p) -> {});
}
private void testStreamingWrite(boolean part, Consumer<String> pathConsumer) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.enableCheckpointing(100);
StreamTableEnvironment tEnv = HiveTestUtils.createTableEnvWithBlinkPlannerStreamMode(env);
tEnv.registerCatalog(hiveCatalog.getName(), hiveCatalog);
tEnv.useCatalog(hiveCatalog.getName());
tEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
try {
tEnv.executeSql("create database db1");
tEnv.useDatabase("db1");
// prepare source
List<Row> data = Arrays.asList(
Row.of(1, "a", "b", "2020-05-03", "7"),
Row.of(2, "p", "q", "2020-05-03", "8"),
Row.of(3, "x", "y", "2020-05-03", "9"),
Row.of(4, "x", "y", "2020-05-03", "10"),
Row.of(5, "x", "y", "2020-05-03", "11"));
DataStream<Row> stream = env.addSource(
new FiniteTestSource<>(data),
new RowTypeInfo(Types.INT, Types.STRING, Types.STRING, Types.STRING, Types.STRING));
tEnv.createTemporaryView("my_table", stream, $("a"), $("b"), $("c"), $("d"), $("e"));
// DDL
tEnv.executeSql("create external table sink_table (a int,b string,c string" +
(part ? "" : ",d string,e string") +
") " +
(part ? "partitioned by (d string,e string) " : "") +
"TBLPROPERTIES (" +
"'" + PARTITION_TIME_EXTRACTOR_TIMESTAMP_PATTERN.key() + "'='$d $e:00:00'," +
"'" + SINK_PARTITION_COMMIT_DELAY.key() + "'='1h'," +
"'" + SINK_PARTITION_COMMIT_POLICY_KIND.key() + "'='metastore,success-file'," +
"'" + SINK_PARTITION_COMMIT_SUCCESS_FILE_NAME.key() + "'='_MY_SUCCESS'" +
")");
TableEnvUtil.execInsertTableAndWaitResult(
tEnv.sqlQuery("select * from my_table"),
"sink_table");
// using batch table env to query.
List<String> results = new ArrayList<>();
TableEnvironment batchTEnv = HiveTestUtils.createTableEnvWithBlinkPlannerBatchMode();
batchTEnv.registerCatalog(hiveCatalog.getName(), hiveCatalog);
batchTEnv.useCatalog(hiveCatalog.getName());
batchTEnv.executeSql("select * from db1.sink_table").collect()
.forEachRemaining(r -> results.add(r.toString()));
results.sort(String::compareTo);
Assert.assertEquals(
Arrays.asList(
"1,a,b,2020-05-03,7",
"1,a,b,2020-05-03,7",
"2,p,q,2020-05-03,8",
"2,p,q,2020-05-03,8",
"3,x,y,2020-05-03,9",
"3,x,y,2020-05-03,9",
"4,x,y,2020-05-03,10",
"4,x,y,2020-05-03,10",
"5,x,y,2020-05-03,11",
"5,x,y,2020-05-03,11"),
results);
pathConsumer.accept(URI.create(hiveCatalog.getHiveTable(
ObjectPath.fromString("db1.sink_table")).getSd().getLocation()).getPath());
} finally {
tEnv.executeSql("drop database db1 cascade");
}
}
private RowTypeInfo createHiveDestTable(String dbName, String tblName, TableSchema tableSchema, int numPartCols) throws Exception {
CatalogTable catalogTable = createHiveCatalogTable(tableSchema, numPartCols);
hiveCatalog.createTable(new ObjectPath(dbName, tblName), catalogTable, false);
......
......@@ -65,7 +65,7 @@ public interface HadoopPathBasedBulkWriter<T> extends BulkWriter<T> {
* @param inProgressFilePath The intermediate path to write to before committing.
* @return The created writer.
*/
HadoopPathBasedBulkWriter<T> create(Path targetFilePath, Path inProgressFilePath);
HadoopPathBasedBulkWriter<T> create(Path targetFilePath, Path inProgressFilePath) throws IOException;
}
}
......@@ -98,19 +98,18 @@ public class StreamingFileSink<IN>
private final long bucketCheckInterval;
private final BucketsBuilder<IN, ?, ? extends BucketsBuilder<IN, ?, ?>> bucketsBuilder;
private final StreamingFileSink.BucketsBuilder<IN, ?, ? extends BucketsBuilder<IN, ?, ?>> bucketsBuilder;
// --------------------------- runtime fields -----------------------------
private transient StreamingFileSinkHelper<IN> helper;
/**
* Creates a new {@code StreamingFileSink} that writes files to the given base directory
* with the give buckets properties.
* Creates a new {@code StreamingFileSink} that writes files to the given base directory.
*/
protected StreamingFileSink(
BucketsBuilder<IN, ?, ? extends BucketsBuilder<IN, ?, ?>> bucketsBuilder,
long bucketCheckInterval) {
final BucketsBuilder<IN, ?, ? extends BucketsBuilder<IN, ?, ?>> bucketsBuilder,
final long bucketCheckInterval) {
Preconditions.checkArgument(bucketCheckInterval > 0L);
......
......@@ -399,11 +399,11 @@ public class FileSystemTableSink implements
/**
* Table bucket assigner, wrap {@link PartitionComputer}.
*/
private static class TableBucketAssigner implements BucketAssigner<RowData, String> {
public static class TableBucketAssigner implements BucketAssigner<RowData, String> {
private final PartitionComputer<RowData> computer;
private TableBucketAssigner(PartitionComputer<RowData> computer) {
public TableBucketAssigner(PartitionComputer<RowData> computer) {
this.computer = computer;
}
......@@ -426,13 +426,13 @@ public class FileSystemTableSink implements
/**
* Table {@link RollingPolicy}, it extends {@link CheckpointRollingPolicy} for bulk writers.
*/
private static class TableRollingPolicy extends CheckpointRollingPolicy<RowData, String> {
public static class TableRollingPolicy extends CheckpointRollingPolicy<RowData, String> {
private final boolean rollOnCheckpoint;
private final long rollingFileSize;
private final long rollingTimeInterval;
private TableRollingPolicy(
public TableRollingPolicy(
boolean rollOnCheckpoint,
long rollingFileSize,
long rollingTimeInterval) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册