提交 f850ec78 编写于 作者: G Gao Yun 提交者: JingsongLi

[FLINK-17594][fs-connector] Support Hadoop path-based part-file writer

This closes #12134
上级 339f5d84
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.flink</groupId>
<artifactId>flink-formats</artifactId>
<version>1.11-SNAPSHOT</version>
<relativePath>..</relativePath>
</parent>
<artifactId>flink-hadoop-bulk_${scala.binary.version}</artifactId>
<name>flink-hadoop-bulk</name>
<packaging>jar</packaging>
<dependencies>
<!-- core dependencies -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
<!-- test dependencies -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<!-- hadoop dependencies -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<scope>provided</scope>
</dependency>
</dependencies>
<build>
<plugins>
<!-- skip dependency convergence due to Hadoop dependency -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-enforcer-plugin</artifactId>
<executions>
<execution>
<id>dependency-convergence</id>
<goals>
<goal>enforce</goal>
</goals>
<configuration>
<skip>true</skip>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.formats.hadoop.bulk;
import org.apache.flink.formats.hadoop.bulk.committer.HadoopRenameFileCommitter;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
/**
* The default hadoop file committer factory which always use {@link HadoopRenameFileCommitter}.
*/
public class DefaultHadoopFileCommitterFactory implements HadoopFileCommitterFactory {
private static final long serialVersionUID = 1L;
@Override
public HadoopFileCommitter create(Configuration configuration, Path targetFilePath) {
return new HadoopRenameFileCommitter(configuration, targetFilePath);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.formats.hadoop.bulk;
import org.apache.flink.annotation.Internal;
import org.apache.hadoop.fs.Path;
import java.io.IOException;
/**
* The committer publishes an intermediate Hadoop file to the target path after
* it finishes writing.
*/
@Internal
public interface HadoopFileCommitter {
/**
* Gets the target path to commit to.
*
* @return The target path to commit to.
*/
Path getTargetFilePath();
/**
* Gets the path of the intermediate file to commit.
*
* @return The path of the intermediate file to commit.
*/
Path getInProgressFilePath();
/**
* Prepares the intermediates file for committing.
*/
void preCommit() throws IOException;
/**
* Commits the in-progress file to the target path.
*/
void commit() throws IOException;
/**
* Re-commits the in-progress file to the target path after fail-over.
*/
void commitAfterRecovery() throws IOException;
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.formats.hadoop.bulk;
import org.apache.flink.annotation.Internal;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import java.io.IOException;
import java.io.Serializable;
/**
* The factory to create the {@link HadoopFileCommitter}.
*/
@Internal
public interface HadoopFileCommitterFactory extends Serializable {
/**
* Creates the corresponding Hadoop file committer according to the Hadoop
* configuration and the target path.
*
* @param configuration The hadoop configuration.
* @param targetFilePath The target path to commit.
* @return The corresponding Hadoop file committer.
*/
HadoopFileCommitter create(Configuration configuration, Path targetFilePath) throws IOException;
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.formats.hadoop.bulk;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.serialization.BulkWriter;
import org.apache.hadoop.fs.Path;
import java.io.IOException;
import java.io.Serializable;
/**
* Specialized {@link BulkWriter} which is expected to write to specified
* {@link Path}.
*/
@Internal
public interface HadoopPathBasedBulkWriter<T> extends BulkWriter<T> {
/**
* Gets the size written by the current writer.
*
* @return The size written by the current writer.
*/
long getSize() throws IOException;
/**
* Disposes the writer on failures. Unlike output-stream-based writers which
* could handled uniformly by closing the underlying output stream, the path-
* based writers need to be disposed explicitly.
*/
void dispose();
// ------------------------------------------------------------------------
/**
* A factory that creates a {@link HadoopPathBasedBulkWriter}.
*
* @param <T> The type of record to write.
*/
@FunctionalInterface
interface Factory<T> extends Serializable {
/**
* Creates a path-based writer that writes to the <tt>inProgressPath</tt> first
* and commits to <tt>targetPath</tt> finally.
*
* @param targetFilePath The final path to commit to.
* @param inProgressFilePath The intermediate path to write to before committing.
* @return The created writer.
*/
HadoopPathBasedBulkWriter<T> create(Path targetFilePath, Path inProgressFilePath);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.formats.hadoop.bulk;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.streaming.api.functions.sink.filesystem.AbstractPartFileWriter;
import org.apache.flink.streaming.api.functions.sink.filesystem.BucketWriter;
import org.apache.flink.streaming.api.functions.sink.filesystem.InProgressFileWriter;
import org.apache.flink.streaming.api.functions.sink.filesystem.WriterProperties;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
/**
* The part-file writer that writes to the specified hadoop path.
*/
public class HadoopPathBasedPartFileWriter<IN, BucketID> extends AbstractPartFileWriter<IN, BucketID> {
private final HadoopPathBasedBulkWriter<IN> writer;
private final HadoopFileCommitter fileCommitter;
public HadoopPathBasedPartFileWriter(
final BucketID bucketID,
HadoopPathBasedBulkWriter<IN> writer,
HadoopFileCommitter fileCommitter,
long createTime) {
super(bucketID, createTime);
this.writer = writer;
this.fileCommitter = fileCommitter;
}
@Override
public void write(IN element, long currentTime) throws IOException {
writer.addElement(element);
markWrite(currentTime);
}
@Override
public InProgressFileRecoverable persist() {
throw new UnsupportedOperationException("The path based writers do not support persisting");
}
@Override
public PendingFileRecoverable closeForCommit() throws IOException {
writer.flush();
writer.finish();
fileCommitter.preCommit();
return new HadoopPathBasedPendingFile(fileCommitter).getRecoverable();
}
@Override
public void dispose() {
writer.dispose();
}
@Override
public long getSize() throws IOException {
return writer.getSize();
}
static class HadoopPathBasedPendingFile implements BucketWriter.PendingFile {
private final HadoopFileCommitter fileCommitter;
public HadoopPathBasedPendingFile(HadoopFileCommitter fileCommitter) {
this.fileCommitter = fileCommitter;
}
@Override
public void commit() throws IOException {
fileCommitter.commit();
}
@Override
public void commitAfterRecovery() throws IOException {
fileCommitter.commitAfterRecovery();
}
public PendingFileRecoverable getRecoverable() {
return new HadoopPathBasedPendingFileRecoverable(
fileCommitter.getTargetFilePath());
}
}
@VisibleForTesting
static class HadoopPathBasedPendingFileRecoverable implements PendingFileRecoverable {
private final Path path;
public HadoopPathBasedPendingFileRecoverable(Path path) {
this.path = path;
}
public Path getPath() {
return path;
}
}
@VisibleForTesting
static class HadoopPathBasedPendingFileRecoverableSerializer
implements SimpleVersionedSerializer<HadoopPathBasedPendingFileRecoverable> {
static final HadoopPathBasedPendingFileRecoverableSerializer INSTANCE =
new HadoopPathBasedPendingFileRecoverableSerializer();
private static final Charset CHARSET = StandardCharsets.UTF_8;
private static final int MAGIC_NUMBER = 0x2c853c90;
@Override
public int getVersion() {
return 1;
}
@Override
public byte[] serialize(HadoopPathBasedPendingFileRecoverable pendingFileRecoverable) throws IOException {
byte[] pathBytes = pendingFileRecoverable.getPath().toUri().toString().getBytes(CHARSET);
byte[] targetBytes = new byte[8 + pathBytes.length];
ByteBuffer bb = ByteBuffer.wrap(targetBytes).order(ByteOrder.LITTLE_ENDIAN);
bb.putInt(MAGIC_NUMBER);
bb.putInt(pathBytes.length);
bb.put(pathBytes);
return targetBytes;
}
@Override
public HadoopPathBasedPendingFileRecoverable deserialize(int version, byte[] serialized) throws IOException {
switch (version) {
case 1:
return deserializeV1(serialized);
default:
throw new IOException("Unrecognized version or corrupt state: " + version);
}
}
private HadoopPathBasedPendingFileRecoverable deserializeV1(byte[] serialized) throws IOException {
final ByteBuffer bb = ByteBuffer.wrap(serialized).order(ByteOrder.LITTLE_ENDIAN);
if (bb.getInt() != MAGIC_NUMBER) {
throw new IOException("Corrupt data: Unexpected magic number.");
}
byte[] pathBytes = new byte[bb.getInt()];
bb.get(pathBytes);
String targetPath = new String(pathBytes, CHARSET);
return new HadoopPathBasedPendingFileRecoverable(new Path(targetPath));
}
}
private static class UnsupportedInProgressFileRecoverableSerializable
implements SimpleVersionedSerializer<InProgressFileRecoverable> {
static final UnsupportedInProgressFileRecoverableSerializable INSTANCE =
new UnsupportedInProgressFileRecoverableSerializable();
@Override
public int getVersion() {
throw new UnsupportedOperationException("Persists the path-based part file write is not supported");
}
@Override
public byte[] serialize(InProgressFileRecoverable obj) {
throw new UnsupportedOperationException("Persists the path-based part file write is not supported");
}
@Override
public InProgressFileRecoverable deserialize(int version, byte[] serialized) {
throw new UnsupportedOperationException("Persists the path-based part file write is not supported");
}
}
/**
* Factory to create {@link HadoopPathBasedPartFileWriter}.
*/
public static class Factory<IN, BucketID> implements BucketWriter<IN, BucketID> {
private final Configuration configuration;
private final HadoopPathBasedBulkWriter.Factory<IN> bulkWriterFactory;
private final HadoopFileCommitterFactory fileCommitterFactory;
public Factory(
Configuration configuration,
HadoopPathBasedBulkWriter.Factory<IN> bulkWriterFactory,
HadoopFileCommitterFactory fileCommitterFactory) {
this.configuration = configuration;
this.bulkWriterFactory = bulkWriterFactory;
this.fileCommitterFactory = fileCommitterFactory;
}
@Override
public HadoopPathBasedPartFileWriter<IN, BucketID> openNewInProgressFile(
BucketID bucketID,
org.apache.flink.core.fs.Path flinkPath,
long creationTime) throws IOException {
Path path = new Path(flinkPath.toUri());
HadoopFileCommitter fileCommitter = fileCommitterFactory.create(configuration, path);
Path inProgressFilePath = fileCommitter.getInProgressFilePath();
HadoopPathBasedBulkWriter<IN> writer = bulkWriterFactory.create(path, inProgressFilePath);
return new HadoopPathBasedPartFileWriter<IN, BucketID>(bucketID, writer, fileCommitter, creationTime);
}
@Override
public PendingFile recoverPendingFile(PendingFileRecoverable pendingFileRecoverable) throws IOException {
if (!(pendingFileRecoverable instanceof HadoopPathBasedPartFileWriter.HadoopPathBasedPendingFileRecoverable)) {
throw new UnsupportedOperationException("Only HadoopPathBasedPendingFileRecoverable is supported.");
}
Path path = new Path(((HadoopPathBasedPendingFileRecoverable) pendingFileRecoverable).getPath().toString());
return new HadoopPathBasedPendingFile(fileCommitterFactory.create(configuration, path));
}
@Override
public WriterProperties getProperties() {
return new WriterProperties(
UnsupportedInProgressFileRecoverableSerializable.INSTANCE,
HadoopPathBasedPendingFileRecoverableSerializer.INSTANCE,
false);
}
@Override
public InProgressFileWriter<IN, BucketID> resumeInProgressFileFrom(
BucketID bucketID,
InProgressFileRecoverable inProgressFileSnapshot,
long creationTime) {
throw new UnsupportedOperationException("Resume is not supported");
}
@Override
public boolean cleanupInProgressFileRecoverable(InProgressFileRecoverable inProgressFileRecoverable) {
return false;
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.formats.hadoop.bulk.committer;
import org.apache.flink.formats.hadoop.bulk.HadoopFileCommitter;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import java.io.IOException;
import static org.apache.flink.util.Preconditions.checkArgument;
/**
* The Hadoop file committer that directly rename the in-progress file
* to the target file. For FileSystem like S3, renaming may lead to
* additional copies.
*/
public class HadoopRenameFileCommitter implements HadoopFileCommitter {
private final Configuration configuration;
private final Path targetFilePath;
private final Path inProgressFilePath;
public HadoopRenameFileCommitter(Configuration configuration, Path targetFilePath) {
this.configuration = configuration;
this.targetFilePath = targetFilePath;
this.inProgressFilePath = generateInProgressFilePath();
}
@Override
public Path getTargetFilePath() {
return targetFilePath;
}
@Override
public Path getInProgressFilePath() {
return inProgressFilePath;
}
@Override
public void preCommit() {
// Do nothing.
}
@Override
public void commit() throws IOException {
rename(false);
}
@Override
public void commitAfterRecovery() throws IOException {
rename(true);
}
private void rename(boolean checkFileExists) throws IOException {
FileSystem fileSystem = FileSystem.get(targetFilePath.toUri(), configuration);
if (checkFileExists && !fileSystem.exists(inProgressFilePath)) {
throw new IOException(String.format("In progress file(%s) not exists.", inProgressFilePath));
}
try {
// If file exists, it will be overwritten.
fileSystem.rename(inProgressFilePath, targetFilePath);
} catch (IOException e) {
throw new IOException(
String.format("Could not commit file from %s to %s", inProgressFilePath, targetFilePath),
e);
}
}
private Path generateInProgressFilePath() {
checkArgument(targetFilePath.isAbsolute(), "Target file must be absolute");
Path parent = targetFilePath.getParent();
String name = targetFilePath.getName();
return new Path(parent, "." + name + ".inprogress");
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.api.functions.sink.filesystem;
import org.apache.flink.annotation.Internal;
import org.apache.flink.core.fs.Path;
import org.apache.flink.formats.hadoop.bulk.DefaultHadoopFileCommitterFactory;
import org.apache.flink.formats.hadoop.bulk.HadoopFileCommitterFactory;
import org.apache.flink.formats.hadoop.bulk.HadoopPathBasedBulkWriter;
import org.apache.flink.formats.hadoop.bulk.HadoopPathBasedPartFileWriter;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.CheckpointRollingPolicy;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.OnCheckpointRollingPolicy;
import org.apache.flink.util.Preconditions;
import org.apache.hadoop.conf.Configuration;
import javax.annotation.Nullable;
import java.io.IOException;
/**
* Buckets builder to create buckets that use {@link HadoopPathBasedPartFileWriter}.
*/
public class HadoopPathBasedBulkFormatBuilder<IN, BucketID, T extends HadoopPathBasedBulkFormatBuilder<IN, BucketID, T>>
extends StreamingFileSink.BucketsBuilder<IN, BucketID, T> {
private static final long serialVersionUID = 1L;
private final Path basePath;
private HadoopPathBasedBulkWriter.Factory<IN> writerFactory;
private HadoopFileCommitterFactory fileCommitterFactory;
private SerializableConfiguration serializableConfiguration;
private BucketAssigner<IN, BucketID> bucketAssigner;
private CheckpointRollingPolicy<IN, BucketID> rollingPolicy;
@Nullable
private BucketLifeCycleListener<IN, BucketID> bucketLifeCycleListener;
private BucketFactory<IN, BucketID> bucketFactory;
private OutputFileConfig outputFileConfig;
public HadoopPathBasedBulkFormatBuilder(
org.apache.hadoop.fs.Path basePath,
HadoopPathBasedBulkWriter.Factory<IN> writerFactory,
Configuration configuration,
BucketAssigner<IN, BucketID> assigner) {
this(
basePath,
writerFactory,
new DefaultHadoopFileCommitterFactory(),
configuration,
assigner,
OnCheckpointRollingPolicy.build(),
new DefaultBucketFactoryImpl<>(),
OutputFileConfig.builder().build());
}
public HadoopPathBasedBulkFormatBuilder(
org.apache.hadoop.fs.Path basePath,
HadoopPathBasedBulkWriter.Factory<IN> writerFactory,
HadoopFileCommitterFactory fileCommitterFactory,
Configuration configuration,
BucketAssigner<IN, BucketID> assigner,
CheckpointRollingPolicy<IN, BucketID> policy,
BucketFactory<IN, BucketID> bucketFactory,
OutputFileConfig outputFileConfig) {
this.basePath = new Path(Preconditions.checkNotNull(basePath).toString());
this.writerFactory = writerFactory;
this.fileCommitterFactory = fileCommitterFactory;
this.serializableConfiguration = new SerializableConfiguration(configuration);
this.bucketAssigner = Preconditions.checkNotNull(assigner);
this.rollingPolicy = Preconditions.checkNotNull(policy);
this.bucketFactory = Preconditions.checkNotNull(bucketFactory);
this.outputFileConfig = Preconditions.checkNotNull(outputFileConfig);
}
public T withBucketAssigner(BucketAssigner<IN, BucketID> assigner) {
this.bucketAssigner = Preconditions.checkNotNull(assigner);
return self();
}
public T withRollingPolicy(CheckpointRollingPolicy<IN, BucketID> rollingPolicy) {
this.rollingPolicy = Preconditions.checkNotNull(rollingPolicy);
return self();
}
@Internal
public T withBucketLifeCycleListener(final BucketLifeCycleListener<IN, BucketID> listener) {
this.bucketLifeCycleListener = Preconditions.checkNotNull(listener);
return self();
}
public T withBucketFactory(BucketFactory<IN, BucketID> factory) {
this.bucketFactory = Preconditions.checkNotNull(factory);
return self();
}
public T withOutputFileConfig(OutputFileConfig outputFileConfig) {
this.outputFileConfig = outputFileConfig;
return self();
}
public T withConfiguration(Configuration configuration) {
this.serializableConfiguration = new SerializableConfiguration(configuration);
return self();
}
@Override
public Buckets<IN, BucketID> createBuckets(int subtaskIndex) throws IOException {
return new Buckets<>(
basePath,
bucketAssigner,
bucketFactory,
new HadoopPathBasedPartFileWriter.Factory<>(
serializableConfiguration.getConfiguration(),
writerFactory,
fileCommitterFactory),
rollingPolicy,
bucketLifeCycleListener,
subtaskIndex,
outputFileConfig);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.api.functions.sink.filesystem;
import org.apache.hadoop.conf.Configuration;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;
/**
* Wrapper of hadoop Configuration to make it serializable.
*/
public class SerializableConfiguration implements Serializable {
private static final long serialVersionUID = 1L;
private transient Configuration configuration;
public SerializableConfiguration(Configuration configuration) {
this.configuration = configuration;
}
public Configuration getConfiguration() {
return configuration;
}
private void writeObject(ObjectOutputStream out) throws IOException {
configuration.write(out);
}
private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
if (configuration == null) {
configuration = new Configuration();
}
configuration.readFields(in);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.formats.hadoop.bulk;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.HadoopPathBasedBulkFormatBuilder;
import org.apache.flink.streaming.api.functions.sink.filesystem.TestStreamingFileSinkFactory;
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner;
import org.apache.flink.streaming.util.FiniteTestSource;
import org.apache.flink.test.util.AbstractTestBase;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;
import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import static org.apache.flink.formats.hadoop.bulk.HadoopPathBasedPartFileWriter.HadoopPathBasedPendingFileRecoverable;
import static org.apache.flink.formats.hadoop.bulk.HadoopPathBasedPartFileWriter.HadoopPathBasedPendingFileRecoverableSerializer;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
/**
* Base class for testing writing data to the hadoop file system with different configurations.
*/
public class HadoopPathBasedPartFileWriterTest extends AbstractTestBase {
@Rule
public final Timeout timeoutPerTest = Timeout.seconds(2000);
@Test
public void testPendingFileRecoverableSerializer() throws IOException {
HadoopPathBasedPendingFileRecoverable recoverable = new HadoopPathBasedPendingFileRecoverable(
new Path("hdfs://fake/path"));
HadoopPathBasedPendingFileRecoverableSerializer serializer =
new HadoopPathBasedPendingFileRecoverableSerializer();
byte[] serializedBytes = serializer.serialize(recoverable);
HadoopPathBasedPendingFileRecoverable deSerialized = serializer.deserialize(
serializer.getVersion(),
serializedBytes);
assertEquals(recoverable.getPath(), deSerialized.getPath());
}
@Test
public void testWriteFile() throws Exception {
File file = TEMPORARY_FOLDER.newFolder();
Path basePath = new Path(file.toURI());
List<String> data = Arrays.asList(
"first line",
"second line",
"third line");
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.enableCheckpointing(100);
DataStream<String> stream = env.addSource(
new FiniteTestSource<>(data), TypeInformation.of(String.class));
Configuration configuration = new Configuration();
HadoopPathBasedBulkFormatBuilder<String, String, ?> builder =
new HadoopPathBasedBulkFormatBuilder<>(
basePath,
new TestHadoopPathBasedBulkWriterFactory(),
configuration,
new DateTimeBucketAssigner<>());
TestStreamingFileSinkFactory<String> streamingFileSinkFactory = new TestStreamingFileSinkFactory<>();
stream.addSink(streamingFileSinkFactory.createSink(builder, 1000));
env.execute();
validateResult(data, configuration, basePath);
}
// ------------------------------------------------------------------------
private void validateResult(List<String> expected, Configuration config, Path basePath) throws IOException {
FileSystem fileSystem = FileSystem.get(basePath.toUri(), config);
FileStatus[] buckets = fileSystem.listStatus(basePath);
assertNotNull(buckets);
assertEquals(1, buckets.length);
FileStatus[] partFiles = fileSystem.listStatus(buckets[0].getPath());
assertNotNull(partFiles);
assertEquals(2, partFiles.length);
for (FileStatus partFile : partFiles) {
assertTrue(partFile.getLen() > 0);
List<String> fileContent = readHadoopPath(fileSystem, partFile.getPath());
assertEquals(expected, fileContent);
}
}
private List<String> readHadoopPath(FileSystem fileSystem, Path partFile) throws IOException {
try (FSDataInputStream dataInputStream = fileSystem.open(partFile)) {
List<String> lines = new ArrayList<>();
BufferedReader reader = new BufferedReader(new InputStreamReader(dataInputStream));
String line = null;
while ((line = reader.readLine()) != null) {
lines.add(line);
}
return lines;
}
}
private static class TestHadoopPathBasedBulkWriterFactory implements HadoopPathBasedBulkWriter.Factory<String> {
@Override
public HadoopPathBasedBulkWriter<String> create(Path targetFilePath, Path inProgressFilePath) {
try {
FileSystem fileSystem = FileSystem.get(inProgressFilePath.toUri(), new Configuration());
FSDataOutputStream output = fileSystem.create(inProgressFilePath);
return new FSDataOutputStreamBulkWriterHadoop(output);
} catch (IOException e) {
ExceptionUtils.rethrow(e);
}
return null;
}
}
private static class FSDataOutputStreamBulkWriterHadoop implements HadoopPathBasedBulkWriter<String> {
private final FSDataOutputStream outputStream;
public FSDataOutputStreamBulkWriterHadoop(FSDataOutputStream outputStream) {
this.outputStream = outputStream;
}
@Override
public long getSize() throws IOException {
return outputStream.getPos();
}
@Override
public void dispose() {
IOUtils.closeQuietly(outputStream);
}
@Override
public void addElement(String element) throws IOException {
outputStream.writeBytes(element + "\n");
}
@Override
public void flush() throws IOException {
outputStream.flush();
}
@Override
public void finish() throws IOException {
outputStream.flush();
outputStream.close();
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.api.functions.sink.filesystem;
/**
* Factory to create the streaming file sink.
*/
public class TestStreamingFileSinkFactory<IN> {
public StreamingFileSink<IN> createSink(
StreamingFileSink.BucketsBuilder<IN, ?, ? extends StreamingFileSink.BucketsBuilder<IN, ?, ?>> bucketsBuilder,
long bucketCheckInterval) {
return new StreamingFileSink<>(bucketsBuilder, bucketCheckInterval);
}
}
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
# Set root logger level to OFF to not flood build logs
# set manually to INFO for debugging purposes
rootLogger.level = OFF
rootLogger.appenderRef.test.ref = TestLogger
appender.testlogger.name = TestLogger
appender.testlogger.type = CONSOLE
appender.testlogger.target = SYSTEM_ERR
appender.testlogger.layout.type = PatternLayout
appender.testlogger.layout.pattern = %-4r [%t] %-5p %c %x - %m%n
......@@ -45,6 +45,7 @@ under the License.
<module>flink-csv</module>
<module>flink-orc</module>
<module>flink-orc-nohive</module>
<module>flink-hadoop-bulk</module>
</modules>
<!-- override these root dependencies as 'provided', so they don't end up
......
......@@ -52,7 +52,7 @@ public abstract class AbstractPartFileWriter<IN, BucketID> implements InProgress
return lastUpdateTime;
}
void markWrite(long now) {
protected void markWrite(long now) {
this.lastUpdateTime = now;
}
}
......@@ -27,7 +27,7 @@ import java.io.IOException;
* An interface for factories that create the different {@link InProgressFileWriter writers}.
*/
@Internal
interface BucketWriter<IN, BucketID> {
public interface BucketWriter<IN, BucketID> {
/**
* Used to create a new {@link InProgressFileWriter}.
......
......@@ -26,7 +26,7 @@ import java.io.IOException;
* The {@link Bucket} uses the {@link InProgressFileWriter} to write element to a part file.
*/
@Internal
interface InProgressFileWriter<IN, BucketID> extends PartFileInfo<BucketID> {
public interface InProgressFileWriter<IN, BucketID> extends PartFileInfo<BucketID> {
/**
* Write a element to the part file.
......
......@@ -98,31 +98,19 @@ public class StreamingFileSink<IN>
private final long bucketCheckInterval;
private final StreamingFileSink.BucketsBuilder<IN, ?, ? extends BucketsBuilder<IN, ?, ?>> bucketsBuilder;
private final BucketsBuilder<IN, ?, ? extends BucketsBuilder<IN, ?, ?>> bucketsBuilder;
// --------------------------- runtime fields -----------------------------
private transient StreamingFileSinkHelper<IN> helper;
/**
* Creates a new {@code StreamingFileSink} that writes files in row-based format to the given base directory.
* Creates a new {@code StreamingFileSink} that writes files to the given base directory
* with the give buckets properties.
*/
protected StreamingFileSink(
final RowFormatBuilder<IN, ?, ? extends BucketsBuilder<IN, ?, ?>> bucketsBuilder,
final long bucketCheckInterval) {
Preconditions.checkArgument(bucketCheckInterval > 0L);
this.bucketsBuilder = Preconditions.checkNotNull(bucketsBuilder);
this.bucketCheckInterval = bucketCheckInterval;
}
/**
* Creates a new {@code StreamingFileSink} that writes files in bulk-encoded format to the given base directory.
*/
protected StreamingFileSink(
final BulkFormatBuilder<IN, ?, ? extends BucketsBuilder<IN, ?, ?>> bucketsBuilder,
final long bucketCheckInterval) {
BucketsBuilder<IN, ?, ? extends BucketsBuilder<IN, ?, ?>> bucketsBuilder,
long bucketCheckInterval) {
Preconditions.checkArgument(bucketCheckInterval > 0L);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册