提交 339f5d84 编写于 作者: G GuoWei Ma 提交者: JingsongLi

[FLINK-17593][fs-connector] Support arbitrary recovery mechanism for PartFileWriter

This change includes two things:
1. Make the PartFileWriter generic and decouple the PartFileWriter and RecoverableStream. According to different pre-commit / commit methods,
this change allows us to extend different types of PartFileWriter.
2. Make the Bucket/Buckets depends on the PartFileFactory instead of RecoverableWriter.

This closes #12132
上级 cc8bf342
......@@ -138,10 +138,8 @@ public interface RecoverableWriter {
* recover from a (potential) failure. These can be temporary files that were written
* to the filesystem or objects that were uploaded to S3.
*
* <p><b>NOTE:</b> This operation should not throw an exception if the resumable has already
* been cleaned up and the resources have been freed. But the contract is that it will throw
* an {@link UnsupportedOperationException} if it is called for a {@code RecoverableWriter}
* whose {@link #requiresCleanupOfRecoverableState()} returns {@code false}.
* <p><b>NOTE:</b> This operation should not throw an exception, but return false if the cleanup did not
* happen for any reason.
*
* @param resumable The {@link ResumeRecoverable} whose state we want to clean-up.
* @return {@code true} if the resources were successfully freed, {@code false} otherwise
......
......@@ -77,7 +77,7 @@ public class LocalRecoverableWriter implements RecoverableWriter {
@Override
public boolean cleanupRecoverableState(ResumeRecoverable resumable) throws IOException {
throw new UnsupportedOperationException();
return false;
}
@Override
......
......@@ -95,7 +95,7 @@ public class HadoopRecoverableWriter implements RecoverableWriter {
@Override
public boolean cleanupRecoverableState(ResumeRecoverable resumable) throws IOException {
throw new UnsupportedOperationException();
return false;
}
@Override
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.api.functions.sink.filesystem;
/**
* An abstract writer for the currently open part file in a specific {@link Bucket}.
* @param <IN> the element type.
* @param <BucketID> the bucket id type.
*/
public abstract class AbstractPartFileWriter<IN, BucketID> implements InProgressFileWriter<IN, BucketID> {
private final BucketID bucketID;
private final long creationTime;
private long lastUpdateTime;
public AbstractPartFileWriter(final BucketID bucketID, final long createTime) {
this.bucketID = bucketID;
this.creationTime = createTime;
this.lastUpdateTime = createTime;
}
@Override
public BucketID getBucketId() {
return bucketID;
}
@Override
public long getCreationTime() {
return creationTime;
}
@Override
public long getLastUpdateTime() {
return lastUpdateTime;
}
void markWrite(long now) {
this.lastUpdateTime = now;
}
}
......@@ -21,10 +21,6 @@ package org.apache.flink.streaming.api.functions.sink.filesystem;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
import org.apache.flink.core.fs.RecoverableWriter;
import org.apache.flink.core.fs.RecoverableWriter.CommitRecoverable;
import org.apache.flink.core.fs.RecoverableWriter.ResumeRecoverable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -60,48 +56,44 @@ public class Bucket<IN, BucketID> {
private final int subtaskIndex;
private final PartFileWriter.PartFileFactory<IN, BucketID> partFileFactory;
private final RecoverableWriter fsWriter;
private final BucketWriter<IN, BucketID> bucketWriter;
private final RollingPolicy<IN, BucketID> rollingPolicy;
private final NavigableMap<Long, ResumeRecoverable> resumablesPerCheckpoint;
private final NavigableMap<Long, InProgressFileWriter.InProgressFileRecoverable> inProgressFileRecoverablesPerCheckpoint;
private final NavigableMap<Long, List<CommitRecoverable>> pendingPartsPerCheckpoint;
private final NavigableMap<Long, List<InProgressFileWriter.PendingFileRecoverable>> pendingFileRecoverablesPerCheckpoint;
private final OutputFileConfig outputFileConfig;
private long partCounter;
@Nullable
private PartFileWriter<IN, BucketID> inProgressPart;
private InProgressFileWriter<IN, BucketID> inProgressPart;
private List<CommitRecoverable> pendingPartsForCurrentCheckpoint;
private List<InProgressFileWriter.PendingFileRecoverable> pendingFileRecoverablesForCurrentCheckpoint;
/**
* Constructor to create a new empty bucket.
*/
private Bucket(
final RecoverableWriter fsWriter,
final int subtaskIndex,
final BucketID bucketId,
final Path bucketPath,
final long initialPartCounter,
final PartFileWriter.PartFileFactory<IN, BucketID> partFileFactory,
final BucketWriter<IN, BucketID> partFileFactory,
final RollingPolicy<IN, BucketID> rollingPolicy,
final OutputFileConfig outputFileConfig) {
this.fsWriter = checkNotNull(fsWriter);
this.subtaskIndex = subtaskIndex;
this.bucketId = checkNotNull(bucketId);
this.bucketPath = checkNotNull(bucketPath);
this.partCounter = initialPartCounter;
this.partFileFactory = checkNotNull(partFileFactory);
this.bucketWriter = checkNotNull(partFileFactory);
this.rollingPolicy = checkNotNull(rollingPolicy);
this.pendingPartsForCurrentCheckpoint = new ArrayList<>();
this.pendingPartsPerCheckpoint = new TreeMap<>();
this.resumablesPerCheckpoint = new TreeMap<>();
this.pendingFileRecoverablesForCurrentCheckpoint = new ArrayList<>();
this.pendingFileRecoverablesPerCheckpoint = new TreeMap<>();
this.inProgressFileRecoverablesPerCheckpoint = new TreeMap<>();
this.outputFileConfig = checkNotNull(outputFileConfig);
}
......@@ -110,16 +102,14 @@ public class Bucket<IN, BucketID> {
* Constructor to restore a bucket from checkpointed state.
*/
private Bucket(
final RecoverableWriter fsWriter,
final int subtaskIndex,
final long initialPartCounter,
final PartFileWriter.PartFileFactory<IN, BucketID> partFileFactory,
final BucketWriter<IN, BucketID> partFileFactory,
final RollingPolicy<IN, BucketID> rollingPolicy,
final BucketState<BucketID> bucketState,
final OutputFileConfig outputFileConfig) throws IOException {
this(
fsWriter,
subtaskIndex,
bucketState.getBucketId(),
bucketState.getBucketPath(),
......@@ -133,31 +123,29 @@ public class Bucket<IN, BucketID> {
}
private void restoreInProgressFile(final BucketState<BucketID> state) throws IOException {
if (!state.hasInProgressResumableFile()) {
if (!state.hasInProgressFileRecoverable()) {
return;
}
// we try to resume the previous in-progress file
final ResumeRecoverable resumable = state.getInProgressResumableFile();
final InProgressFileWriter.InProgressFileRecoverable inProgressFileRecoverable = state.getInProgressFileRecoverable();
if (fsWriter.supportsResume()) {
final RecoverableFsDataOutputStream stream = fsWriter.recover(resumable);
inProgressPart = partFileFactory.resumeFrom(
bucketId, stream, resumable, state.getInProgressFileCreationTime());
if (bucketWriter.getProperties().supportsResume()) {
inProgressPart = bucketWriter.resumeInProgressFileFrom(
bucketId, inProgressFileRecoverable, state.getInProgressFileCreationTime());
} else {
// if the writer does not support resume, then we close the
// in-progress part and commit it, as done in the case of pending files.
fsWriter.recoverForCommit(resumable).commitAfterRecovery();
bucketWriter.recoverPendingFile(inProgressFileRecoverable).commitAfterRecovery();
}
}
private void commitRecoveredPendingFiles(final BucketState<BucketID> state) throws IOException {
// we commit pending files for checkpoints that precess the last successful one, from which we are recovering
for (List<CommitRecoverable> committables: state.getCommittableFilesPerCheckpoint().values()) {
for (CommitRecoverable committable: committables) {
fsWriter.recoverForCommit(committable).commitAfterRecovery();
for (List<InProgressFileWriter.PendingFileRecoverable> pendingFileRecoverables: state.getPendingFileRecoverablesPerCheckpoint().values()) {
for (InProgressFileWriter.PendingFileRecoverable pendingFileRecoverable: pendingFileRecoverables) {
bucketWriter.recoverPendingFile(pendingFileRecoverable).commitAfterRecovery();
}
}
}
......@@ -175,7 +163,7 @@ public class Bucket<IN, BucketID> {
}
boolean isActive() {
return inProgressPart != null || !pendingPartsForCurrentCheckpoint.isEmpty() || !pendingPartsPerCheckpoint.isEmpty();
return inProgressPart != null || !pendingFileRecoverablesForCurrentCheckpoint.isEmpty() || !pendingFileRecoverablesPerCheckpoint.isEmpty();
}
void merge(final Bucket<IN, BucketID> bucket) throws IOException {
......@@ -184,16 +172,16 @@ public class Bucket<IN, BucketID> {
// There should be no pending files in the "to-merge" states.
// The reason is that:
// 1) the pendingPartsForCurrentCheckpoint is emptied whenever we take a snapshot (see prepareBucketForCheckpointing()).
// So a snapshot, including the one we are recovering from, will never contain such files.
// 2) the files in pendingPartsPerCheckpoint are committed upon recovery (see commitRecoveredPendingFiles()).
// 1) the pendingFileRecoverablesForCurrentCheckpoint is emptied whenever we take a Recoverable (see prepareBucketForCheckpointing()).
// So a Recoverable, including the one we are recovering from, will never contain such files.
// 2) the files in pendingFileRecoverablesPerCheckpoint are committed upon recovery (see commitRecoveredPendingFiles()).
checkState(bucket.pendingPartsForCurrentCheckpoint.isEmpty());
checkState(bucket.pendingPartsPerCheckpoint.isEmpty());
checkState(bucket.pendingFileRecoverablesForCurrentCheckpoint.isEmpty());
checkState(bucket.pendingFileRecoverablesPerCheckpoint.isEmpty());
CommitRecoverable committable = bucket.closePartFile();
if (committable != null) {
pendingPartsForCurrentCheckpoint.add(committable);
InProgressFileWriter.PendingFileRecoverable pendingFileRecoverable = bucket.closePartFile();
if (pendingFileRecoverable != null) {
pendingFileRecoverablesForCurrentCheckpoint.add(pendingFileRecoverable);
}
if (LOG.isDebugEnabled()) {
......@@ -218,8 +206,7 @@ public class Bucket<IN, BucketID> {
closePartFile();
final Path partFilePath = assembleNewPartPath();
final RecoverableFsDataOutputStream stream = fsWriter.open(partFilePath);
inProgressPart = partFileFactory.openNew(bucketId, stream, partFilePath, currentTime);
inProgressPart = bucketWriter.openNewInProgressFile(bucketId, partFilePath, currentTime);
if (LOG.isDebugEnabled()) {
LOG.debug("Subtask {} opening new part file \"{}\" for bucket id={}.",
......@@ -233,14 +220,14 @@ public class Bucket<IN, BucketID> {
return new Path(bucketPath, outputFileConfig.getPartPrefix() + '-' + subtaskIndex + '-' + partCounter + outputFileConfig.getPartSuffix());
}
private CommitRecoverable closePartFile() throws IOException {
CommitRecoverable committable = null;
private InProgressFileWriter.PendingFileRecoverable closePartFile() throws IOException {
InProgressFileWriter.PendingFileRecoverable pendingFileRecoverable = null;
if (inProgressPart != null) {
committable = inProgressPart.closeForCommit();
pendingPartsForCurrentCheckpoint.add(committable);
pendingFileRecoverable = inProgressPart.closeForCommit();
pendingFileRecoverablesForCurrentCheckpoint.add(pendingFileRecoverable);
inProgressPart = null;
}
return committable;
return pendingFileRecoverable;
}
void disposePartFile() {
......@@ -252,24 +239,16 @@ public class Bucket<IN, BucketID> {
BucketState<BucketID> onReceptionOfCheckpoint(long checkpointId) throws IOException {
prepareBucketForCheckpointing(checkpointId);
ResumeRecoverable inProgressResumable = null;
InProgressFileWriter.InProgressFileRecoverable inProgressFileRecoverable = null;
long inProgressFileCreationTime = Long.MAX_VALUE;
if (inProgressPart != null) {
inProgressResumable = inProgressPart.persist();
inProgressFileRecoverable = inProgressPart.persist();
inProgressFileCreationTime = inProgressPart.getCreationTime();
// the following is an optimization so that writers that do not
// require cleanup, they do not have to keep track of resumables
// and later iterate over the active buckets.
// (see onSuccessfulCompletionOfCheckpoint())
if (fsWriter.requiresCleanupOfRecoverableState()) {
this.resumablesPerCheckpoint.put(checkpointId, inProgressResumable);
}
this.inProgressFileRecoverablesPerCheckpoint.put(checkpointId, inProgressFileRecoverable);
}
return new BucketState<>(bucketId, bucketPath, inProgressFileCreationTime, inProgressResumable, pendingPartsPerCheckpoint);
return new BucketState<>(bucketId, bucketPath, inProgressFileCreationTime, inProgressFileRecoverable, pendingFileRecoverablesPerCheckpoint);
}
private void prepareBucketForCheckpointing(long checkpointId) throws IOException {
......@@ -280,49 +259,46 @@ public class Bucket<IN, BucketID> {
closePartFile();
}
if (!pendingPartsForCurrentCheckpoint.isEmpty()) {
pendingPartsPerCheckpoint.put(checkpointId, pendingPartsForCurrentCheckpoint);
pendingPartsForCurrentCheckpoint = new ArrayList<>();
if (!pendingFileRecoverablesForCurrentCheckpoint.isEmpty()) {
pendingFileRecoverablesPerCheckpoint.put(checkpointId, pendingFileRecoverablesForCurrentCheckpoint);
pendingFileRecoverablesForCurrentCheckpoint = new ArrayList<>();
}
}
void onSuccessfulCompletionOfCheckpoint(long checkpointId) throws IOException {
checkNotNull(fsWriter);
checkNotNull(bucketWriter);
Iterator<Map.Entry<Long, List<CommitRecoverable>>> it =
pendingPartsPerCheckpoint.headMap(checkpointId, true)
Iterator<Map.Entry<Long, List<InProgressFileWriter.PendingFileRecoverable>>> it =
pendingFileRecoverablesPerCheckpoint.headMap(checkpointId, true)
.entrySet().iterator();
while (it.hasNext()) {
Map.Entry<Long, List<CommitRecoverable>> entry = it.next();
Map.Entry<Long, List<InProgressFileWriter.PendingFileRecoverable>> entry = it.next();
for (CommitRecoverable committable : entry.getValue()) {
fsWriter.recoverForCommit(committable).commit();
for (InProgressFileWriter.PendingFileRecoverable pendingFileRecoverable : entry.getValue()) {
bucketWriter.recoverPendingFile(pendingFileRecoverable).commit();
}
it.remove();
}
cleanupOutdatedResumables(checkpointId);
cleanupInProgressFileRecoverables(checkpointId);
}
private void cleanupOutdatedResumables(long checkpointId) throws IOException {
Iterator<Map.Entry<Long, ResumeRecoverable>> it =
resumablesPerCheckpoint.headMap(checkpointId, false)
private void cleanupInProgressFileRecoverables(long checkpointId) throws IOException {
Iterator<Map.Entry<Long, InProgressFileWriter.InProgressFileRecoverable>> it =
inProgressFileRecoverablesPerCheckpoint.headMap(checkpointId, false)
.entrySet().iterator();
while (it.hasNext()) {
final ResumeRecoverable recoverable = it.next().getValue();
final InProgressFileWriter.InProgressFileRecoverable inProgressFileRecoverable = it.next().getValue();
// this check is redundant, as we only put entries in the resumablesPerCheckpoint map
// list when the requiresCleanupOfRecoverableState() returns true, but having it makes
// this check is redundant, as we only put entries in the inProgressFileRecoverablesPerCheckpoint map
// list when the requiresCleanupOfInProgressFileRecoverableState() returns true, but having it makes
// the code more readable.
if (fsWriter.requiresCleanupOfRecoverableState()) {
final boolean successfullyDeleted = fsWriter.cleanupRecoverableState(recoverable);
if (LOG.isDebugEnabled() && successfullyDeleted) {
LOG.debug("Subtask {} successfully deleted incomplete part for bucket id={}.", subtaskIndex, bucketId);
}
final boolean successfullyDeleted = bucketWriter.cleanupInProgressFileRecoverable(inProgressFileRecoverable);
if (LOG.isDebugEnabled() && successfullyDeleted) {
LOG.debug("Subtask {} successfully deleted incomplete part for bucket id={}.", subtaskIndex, bucketId);
}
it.remove();
}
......@@ -342,54 +318,51 @@ public class Bucket<IN, BucketID> {
// --------------------------- Testing Methods -----------------------------
@VisibleForTesting
Map<Long, List<CommitRecoverable>> getPendingPartsPerCheckpoint() {
return pendingPartsPerCheckpoint;
Map<Long, List<InProgressFileWriter.PendingFileRecoverable>> getPendingFileRecoverablesPerCheckpoint() {
return pendingFileRecoverablesPerCheckpoint;
}
@Nullable
@VisibleForTesting
PartFileWriter<IN, BucketID> getInProgressPart() {
InProgressFileWriter<IN, BucketID> getInProgressPart() {
return inProgressPart;
}
@VisibleForTesting
List<CommitRecoverable> getPendingPartsForCurrentCheckpoint() {
return pendingPartsForCurrentCheckpoint;
List<InProgressFileWriter.PendingFileRecoverable> getPendingFileRecoverablesForCurrentCheckpoint() {
return pendingFileRecoverablesForCurrentCheckpoint;
}
// --------------------------- Static Factory Methods -----------------------------
/**
* Creates a new empty {@code Bucket}.
* @param fsWriter the filesystem-specific {@link RecoverableWriter}.
* @param subtaskIndex the index of the subtask creating the bucket.
* @param bucketId the identifier of the bucket, as returned by the {@link BucketAssigner}.
* @param bucketPath the path to where the part files for the bucket will be written to.
* @param initialPartCounter the initial counter for the part files of the bucket.
* @param partFileFactory the {@link PartFileWriter.PartFileFactory} the factory creating part file writers.
* @param partFileFactory the {@link BucketWriter} the factory creating part file writers.
* @param <IN> the type of input elements to the sink.
* @param <BucketID> the type of the identifier of the bucket, as returned by the {@link BucketAssigner}
* @param outputFileConfig the part file configuration.
* @return The new Bucket.
*/
static <IN, BucketID> Bucket<IN, BucketID> getNew(
final RecoverableWriter fsWriter,
final int subtaskIndex,
final BucketID bucketId,
final Path bucketPath,
final long initialPartCounter,
final PartFileWriter.PartFileFactory<IN, BucketID> partFileFactory,
final BucketWriter<IN, BucketID> partFileFactory,
final RollingPolicy<IN, BucketID> rollingPolicy,
final OutputFileConfig outputFileConfig) {
return new Bucket<>(fsWriter, subtaskIndex, bucketId, bucketPath, initialPartCounter, partFileFactory, rollingPolicy, outputFileConfig);
return new Bucket<>(subtaskIndex, bucketId, bucketPath, initialPartCounter, partFileFactory, rollingPolicy, outputFileConfig);
}
/**
* Restores a {@code Bucket} from the state included in the provided {@link BucketState}.
* @param fsWriter the filesystem-specific {@link RecoverableWriter}.
* @param subtaskIndex the index of the subtask creating the bucket.
* @param initialPartCounter the initial counter for the part files of the bucket.
* @param partFileFactory the {@link PartFileWriter.PartFileFactory} the factory creating part file writers.
* @param partFileFactory the {@link BucketWriter} the factory creating part file writers.
* @param bucketState the initial state of the restored bucket.
* @param <IN> the type of input elements to the sink.
* @param <BucketID> the type of the identifier of the bucket, as returned by the {@link BucketAssigner}
......@@ -397,13 +370,12 @@ public class Bucket<IN, BucketID> {
* @return The restored Bucket.
*/
static <IN, BucketID> Bucket<IN, BucketID> restore(
final RecoverableWriter fsWriter,
final int subtaskIndex,
final long initialPartCounter,
final PartFileWriter.PartFileFactory<IN, BucketID> partFileFactory,
final BucketWriter<IN, BucketID> partFileFactory,
final RollingPolicy<IN, BucketID> rollingPolicy,
final BucketState<BucketID> bucketState,
final OutputFileConfig outputFileConfig) throws IOException {
return new Bucket<>(fsWriter, subtaskIndex, initialPartCounter, partFileFactory, rollingPolicy, bucketState, outputFileConfig);
return new Bucket<>(subtaskIndex, initialPartCounter, partFileFactory, rollingPolicy, bucketState, outputFileConfig);
}
}
......@@ -20,7 +20,6 @@ package org.apache.flink.streaming.api.functions.sink.filesystem;
import org.apache.flink.annotation.Internal;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.fs.RecoverableWriter;
import java.io.IOException;
import java.io.Serializable;
......@@ -32,20 +31,18 @@ import java.io.Serializable;
interface BucketFactory<IN, BucketID> extends Serializable {
Bucket<IN, BucketID> getNewBucket(
final RecoverableWriter fsWriter,
final int subtaskIndex,
final BucketID bucketId,
final Path bucketPath,
final long initialPartCounter,
final PartFileWriter.PartFileFactory<IN, BucketID> partFileWriterFactory,
final BucketWriter<IN, BucketID> partFileFactory,
final RollingPolicy<IN, BucketID> rollingPolicy,
final OutputFileConfig outputFileConfig) throws IOException;
Bucket<IN, BucketID> restoreBucket(
final RecoverableWriter fsWriter,
final int subtaskIndex,
final long initialPartCounter,
final PartFileWriter.PartFileFactory<IN, BucketID> partFileWriterFactory,
final BucketWriter<IN, BucketID> partFileFactory,
final RollingPolicy<IN, BucketID> rollingPolicy,
final BucketState<BucketID> bucketState,
final OutputFileConfig outputFileConfig) throws IOException;
......
......@@ -46,30 +46,30 @@ class BucketState<BucketID> {
private final long inProgressFileCreationTime;
/**
* A {@link RecoverableWriter.ResumeRecoverable} for the currently open
* A {@link InProgressFileWriter.InProgressFileRecoverable} for the currently open
* part file, or null if there is no currently open part file.
*/
@Nullable
private final RecoverableWriter.ResumeRecoverable inProgressResumableFile;
private final InProgressFileWriter.InProgressFileRecoverable inProgressFileRecoverable;
/**
* The {@link RecoverableWriter.CommitRecoverable files} pending to be
* committed, organized by checkpoint id.
*/
private final Map<Long, List<RecoverableWriter.CommitRecoverable>> committableFilesPerCheckpoint;
private final Map<Long, List<InProgressFileWriter.PendingFileRecoverable>> pendingFileRecoverablesPerCheckpoint;
BucketState(
final BucketID bucketId,
final Path bucketPath,
final long inProgressFileCreationTime,
@Nullable final RecoverableWriter.ResumeRecoverable inProgressResumableFile,
final Map<Long, List<RecoverableWriter.CommitRecoverable>> pendingCommittablesPerCheckpoint
@Nullable final InProgressFileWriter.InProgressFileRecoverable inProgressFileRecoverable,
final Map<Long, List<InProgressFileWriter.PendingFileRecoverable>> pendingFileRecoverablesPerCheckpoint
) {
this.bucketId = Preconditions.checkNotNull(bucketId);
this.bucketPath = Preconditions.checkNotNull(bucketPath);
this.inProgressFileCreationTime = inProgressFileCreationTime;
this.inProgressResumableFile = inProgressResumableFile;
this.committableFilesPerCheckpoint = Preconditions.checkNotNull(pendingCommittablesPerCheckpoint);
this.inProgressFileRecoverable = inProgressFileRecoverable;
this.pendingFileRecoverablesPerCheckpoint = Preconditions.checkNotNull(pendingFileRecoverablesPerCheckpoint);
}
BucketID getBucketId() {
......@@ -84,17 +84,17 @@ class BucketState<BucketID> {
return inProgressFileCreationTime;
}
boolean hasInProgressResumableFile() {
return inProgressResumableFile != null;
boolean hasInProgressFileRecoverable() {
return inProgressFileRecoverable != null;
}
@Nullable
RecoverableWriter.ResumeRecoverable getInProgressResumableFile() {
return inProgressResumableFile;
InProgressFileWriter.InProgressFileRecoverable getInProgressFileRecoverable() {
return inProgressFileRecoverable;
}
Map<Long, List<RecoverableWriter.CommitRecoverable>> getCommittableFilesPerCheckpoint() {
return committableFilesPerCheckpoint;
Map<Long, List<InProgressFileWriter.PendingFileRecoverable>> getPendingFileRecoverablesPerCheckpoint() {
return pendingFileRecoverablesPerCheckpoint;
}
@Override
......@@ -105,13 +105,13 @@ class BucketState<BucketID> {
.append("BucketState for bucketId=").append(bucketId)
.append(" and bucketPath=").append(bucketPath);
if (hasInProgressResumableFile()) {
if (hasInProgressFileRecoverable()) {
strBuilder.append(", has open part file created @ ").append(inProgressFileCreationTime);
}
if (!committableFilesPerCheckpoint.isEmpty()) {
if (!pendingFileRecoverablesPerCheckpoint.isEmpty()) {
strBuilder.append(", has pending files for checkpoints: {");
for (long checkpointId: committableFilesPerCheckpoint.keySet()) {
for (long checkpointId: pendingFileRecoverablesPerCheckpoint.keySet()) {
strBuilder.append(checkpointId).append(' ');
}
strBuilder.append('}');
......
......@@ -19,7 +19,6 @@
package org.apache.flink.streaming.api.functions.sink.filesystem;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.fs.RecoverableWriter;
import org.apache.flink.core.io.SimpleVersionedSerialization;
......@@ -45,119 +44,172 @@ class BucketStateSerializer<BucketID> implements SimpleVersionedSerializer<Bucke
private static final int MAGIC_NUMBER = 0x1e764b79;
private final SimpleVersionedSerializer<RecoverableWriter.ResumeRecoverable> resumableSerializer;
private final SimpleVersionedSerializer<InProgressFileWriter.InProgressFileRecoverable> inProgressFileRecoverableSerializer;
private final SimpleVersionedSerializer<RecoverableWriter.CommitRecoverable> commitableSerializer;
private final SimpleVersionedSerializer<InProgressFileWriter.PendingFileRecoverable> pendingFileRecoverableSerializer;
private final SimpleVersionedSerializer<BucketID> bucketIdSerializer;
BucketStateSerializer(
final SimpleVersionedSerializer<RecoverableWriter.ResumeRecoverable> resumableSerializer,
final SimpleVersionedSerializer<RecoverableWriter.CommitRecoverable> commitableSerializer,
final SimpleVersionedSerializer<InProgressFileWriter.InProgressFileRecoverable> inProgressFileRecoverableSerializer,
final SimpleVersionedSerializer<InProgressFileWriter.PendingFileRecoverable> pendingFileRecoverableSerializer,
final SimpleVersionedSerializer<BucketID> bucketIdSerializer
) {
this.resumableSerializer = Preconditions.checkNotNull(resumableSerializer);
this.commitableSerializer = Preconditions.checkNotNull(commitableSerializer);
this.inProgressFileRecoverableSerializer = Preconditions.checkNotNull(inProgressFileRecoverableSerializer);
this.pendingFileRecoverableSerializer = Preconditions.checkNotNull(pendingFileRecoverableSerializer);
this.bucketIdSerializer = Preconditions.checkNotNull(bucketIdSerializer);
}
@Override
public int getVersion() {
return 1;
return 2;
}
@Override
public byte[] serialize(BucketState<BucketID> state) throws IOException {
DataOutputSerializer out = new DataOutputSerializer(256);
out.writeInt(MAGIC_NUMBER);
serializeV1(state, out);
serializeV2(state, out);
return out.getCopyOfBuffer();
}
@Override
public BucketState<BucketID> deserialize(int version, byte[] serialized) throws IOException {
final DataInputDeserializer in = new DataInputDeserializer(serialized);
switch (version) {
case 1:
DataInputDeserializer in = new DataInputDeserializer(serialized);
validateMagicNumber(in);
return deserializeV1(in);
case 2:
validateMagicNumber(in);
return deserializeV2(in);
default:
throw new IOException("Unrecognized version or corrupt state: " + version);
}
}
@VisibleForTesting
void serializeV1(BucketState<BucketID> state, DataOutputView out) throws IOException {
SimpleVersionedSerialization.writeVersionAndSerialize(bucketIdSerializer, state.getBucketId(), out);
out.writeUTF(state.getBucketPath().toString());
out.writeLong(state.getInProgressFileCreationTime());
private void serializeV2(BucketState<BucketID> state, DataOutputView dataOutputView) throws IOException {
SimpleVersionedSerialization.writeVersionAndSerialize(bucketIdSerializer, state.getBucketId(), dataOutputView);
dataOutputView.writeUTF(state.getBucketPath().toString());
dataOutputView.writeLong(state.getInProgressFileCreationTime());
// put the current open part file
if (state.hasInProgressResumableFile()) {
final RecoverableWriter.ResumeRecoverable resumable = state.getInProgressResumableFile();
out.writeBoolean(true);
SimpleVersionedSerialization.writeVersionAndSerialize(resumableSerializer, resumable, out);
}
else {
out.writeBoolean(false);
if (state.hasInProgressFileRecoverable()) {
final InProgressFileWriter.InProgressFileRecoverable inProgressFileRecoverable = state.getInProgressFileRecoverable();
dataOutputView.writeBoolean(true);
SimpleVersionedSerialization.writeVersionAndSerialize(inProgressFileRecoverableSerializer, inProgressFileRecoverable, dataOutputView);
} else {
dataOutputView.writeBoolean(false);
}
// put the map of pending files per checkpoint
final Map<Long, List<RecoverableWriter.CommitRecoverable>> pendingCommitters = state.getCommittableFilesPerCheckpoint();
final Map<Long, List<InProgressFileWriter.PendingFileRecoverable>> pendingFileRecoverables = state.getPendingFileRecoverablesPerCheckpoint();
// manually keep the version here to safe some bytes
out.writeInt(commitableSerializer.getVersion());
dataOutputView.writeInt(pendingFileRecoverableSerializer.getVersion());
out.writeInt(pendingCommitters.size());
for (Entry<Long, List<RecoverableWriter.CommitRecoverable>> resumablesForCheckpoint : pendingCommitters.entrySet()) {
List<RecoverableWriter.CommitRecoverable> resumables = resumablesForCheckpoint.getValue();
dataOutputView.writeInt(pendingFileRecoverables.size());
out.writeLong(resumablesForCheckpoint.getKey());
out.writeInt(resumables.size());
for (Entry<Long, List<InProgressFileWriter.PendingFileRecoverable>> pendingFilesForCheckpoint : pendingFileRecoverables.entrySet()) {
final List<InProgressFileWriter.PendingFileRecoverable> pendingFileRecoverableList = pendingFilesForCheckpoint.getValue();
for (RecoverableWriter.CommitRecoverable resumable : resumables) {
byte[] serialized = commitableSerializer.serialize(resumable);
out.writeInt(serialized.length);
out.write(serialized);
dataOutputView.writeLong(pendingFilesForCheckpoint.getKey());
dataOutputView.writeInt(pendingFileRecoverableList.size());
for (InProgressFileWriter.PendingFileRecoverable pendingFileRecoverable : pendingFileRecoverableList) {
byte[] serialized = pendingFileRecoverableSerializer.serialize(pendingFileRecoverable);
dataOutputView.writeInt(serialized.length);
dataOutputView.write(serialized);
}
}
}
@VisibleForTesting
BucketState<BucketID> deserializeV1(DataInputView in) throws IOException {
private BucketState<BucketID> deserializeV1(DataInputView in) throws IOException {
final SimpleVersionedSerializer<RecoverableWriter.CommitRecoverable> commitableSerializer = getCommitableSerializer();
final SimpleVersionedSerializer<RecoverableWriter.ResumeRecoverable> resumableSerializer = getResumableSerializer();
final BucketID bucketId = SimpleVersionedSerialization.readVersionAndDeSerialize(bucketIdSerializer, in);
final String bucketPathStr = in.readUTF();
final long creationTime = in.readLong();
// then get the current resumable stream
RecoverableWriter.ResumeRecoverable current = null;
InProgressFileWriter.InProgressFileRecoverable current = null;
if (in.readBoolean()) {
current = SimpleVersionedSerialization.readVersionAndDeSerialize(resumableSerializer, in);
current =
new OutputStreamBasedPartFileWriter.OutputStreamBasedInProgressFileRecoverable(
SimpleVersionedSerialization.readVersionAndDeSerialize(resumableSerializer, in));
}
final int committableVersion = in.readInt();
final int numCheckpoints = in.readInt();
final HashMap<Long, List<RecoverableWriter.CommitRecoverable>> resumablesPerCheckpoint = new HashMap<>(numCheckpoints);
final HashMap<Long, List<InProgressFileWriter.PendingFileRecoverable>> pendingFileRecoverablePerCheckpoint = new HashMap<>(numCheckpoints);
for (int i = 0; i < numCheckpoints; i++) {
final long checkpointId = in.readLong();
final int noOfResumables = in.readInt();
final List<RecoverableWriter.CommitRecoverable> resumables = new ArrayList<>(noOfResumables);
final List<InProgressFileWriter.PendingFileRecoverable> pendingFileRecoverables = new ArrayList<>(noOfResumables);
for (int j = 0; j < noOfResumables; j++) {
final byte[] bytes = new byte[in.readInt()];
in.readFully(bytes);
resumables.add(commitableSerializer.deserialize(committableVersion, bytes));
pendingFileRecoverables.add(
new OutputStreamBasedPartFileWriter.OutputStreamBasedPendingFileRecoverable(commitableSerializer.deserialize(committableVersion, bytes)));
}
resumablesPerCheckpoint.put(checkpointId, resumables);
pendingFileRecoverablePerCheckpoint.put(checkpointId, pendingFileRecoverables);
}
return new BucketState<>(
bucketId,
new Path(bucketPathStr),
creationTime,
current,
resumablesPerCheckpoint);
bucketId,
new Path(bucketPathStr),
creationTime,
current,
pendingFileRecoverablePerCheckpoint);
}
private BucketState<BucketID> deserializeV2(DataInputView dataInputView) throws IOException {
final BucketID bucketId = SimpleVersionedSerialization.readVersionAndDeSerialize(bucketIdSerializer, dataInputView);
final String bucketPathStr = dataInputView.readUTF();
final long creationTime = dataInputView.readLong();
// then get the current resumable stream
InProgressFileWriter.InProgressFileRecoverable current = null;
if (dataInputView.readBoolean()) {
current = SimpleVersionedSerialization.readVersionAndDeSerialize(inProgressFileRecoverableSerializer, dataInputView);
}
final int pendingFileRecoverableSerializerVersion = dataInputView.readInt();
final int numCheckpoints = dataInputView.readInt();
final HashMap<Long, List<InProgressFileWriter.PendingFileRecoverable>> pendingFileRecoverablesPerCheckpoint = new HashMap<>(numCheckpoints);
for (int i = 0; i < numCheckpoints; i++) {
final long checkpointId = dataInputView.readLong();
final int numOfPendingFileRecoverables = dataInputView.readInt();
final List<InProgressFileWriter.PendingFileRecoverable> pendingFileRecoverables = new ArrayList<>(numOfPendingFileRecoverables);
for (int j = 0; j < numOfPendingFileRecoverables; j++) {
final byte[] bytes = new byte[dataInputView.readInt()];
dataInputView.readFully(bytes);
pendingFileRecoverables.add(pendingFileRecoverableSerializer.deserialize(pendingFileRecoverableSerializerVersion, bytes));
}
pendingFileRecoverablesPerCheckpoint.put(checkpointId, pendingFileRecoverables);
}
return new BucketState<>(bucketId, new Path(bucketPathStr), creationTime, current, pendingFileRecoverablesPerCheckpoint);
}
private SimpleVersionedSerializer<RecoverableWriter.ResumeRecoverable> getResumableSerializer() {
final OutputStreamBasedPartFileWriter.OutputStreamBasedInProgressFileRecoverableSerializer
outputStreamBasedInProgressFileRecoverableSerializer =
(OutputStreamBasedPartFileWriter.OutputStreamBasedInProgressFileRecoverableSerializer) inProgressFileRecoverableSerializer;
return outputStreamBasedInProgressFileRecoverableSerializer.getResumeSerializer();
}
private SimpleVersionedSerializer<RecoverableWriter.CommitRecoverable> getCommitableSerializer() {
final OutputStreamBasedPartFileWriter.OutputStreamBasedPendingFileRecoverableSerializer
outputStreamBasedPendingFileRecoverableSerializer =
(OutputStreamBasedPartFileWriter.OutputStreamBasedPendingFileRecoverableSerializer) pendingFileRecoverableSerializer;
return outputStreamBasedPendingFileRecoverableSerializer.getCommitSerializer();
}
private static void validateMagicNumber(DataInputView in) throws IOException {
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.api.functions.sink.filesystem;
import org.apache.flink.annotation.Internal;
import org.apache.flink.core.fs.Path;
import java.io.IOException;
/**
* An interface for factories that create the different {@link InProgressFileWriter writers}.
*/
@Internal
interface BucketWriter<IN, BucketID> {
/**
* Used to create a new {@link InProgressFileWriter}.
* @param bucketID the id of the bucket this writer is writing to.
* @param path the path this writer will write to.
* @param creationTime the creation time of the file.
* @return the new {@link InProgressFileWriter}
* @throws IOException Thrown if creating a writer fails.
*/
InProgressFileWriter<IN, BucketID> openNewInProgressFile(
final BucketID bucketID,
final Path path,
final long creationTime) throws IOException;
/**
* Used to resume a {@link InProgressFileWriter} from a {@link InProgressFileWriter.InProgressFileRecoverable}.
* @param bucketID the id of the bucket this writer is writing to.
* @param inProgressFileSnapshot the state of the part file.
* @param creationTime the creation time of the file.
* @return the resumed {@link InProgressFileWriter}
* @throws IOException Thrown if resuming a writer fails.
*/
InProgressFileWriter<IN, BucketID> resumeInProgressFileFrom(
final BucketID bucketID,
final InProgressFileWriter.InProgressFileRecoverable inProgressFileSnapshot,
final long creationTime) throws IOException;
WriterProperties getProperties();
/**
* Recovers a pending file for finalizing and committing.
* @param pendingFileRecoverable The handle with the recovery information.
* @return A pending file
* @throws IOException Thrown if recovering a pending file fails.
*/
PendingFile recoverPendingFile(final InProgressFileWriter.PendingFileRecoverable pendingFileRecoverable) throws IOException;
/**
* Frees up any resources that were previously occupied in order to be able to
* recover from a (potential) failure.
*
* <p><b>NOTE:</b> This operation should not throw an exception, but return false if the cleanup did not
* happen for any reason.
*
* @param inProgressFileRecoverable the {@link InProgressFileWriter.InProgressFileRecoverable} whose state we want to clean-up.
* @return {@code true} if the resources were successfully freed, {@code false} otherwise
* (e.g. the file to be deleted was not there for any reason - already deleted or never created).
* @throws IOException if an I/O error occurs
*/
boolean cleanupInProgressFileRecoverable(final InProgressFileWriter.InProgressFileRecoverable inProgressFileRecoverable) throws IOException;
/**
* This represents the file that can not write any data to.
*/
interface PendingFile {
/**
* Commits the pending file, making it visible. The file will contain the exact data
* as when the pending file was created.
*
* @throws IOException Thrown if committing fails.
*/
void commit() throws IOException;
/**
* Commits the pending file, making it visible. The file will contain the exact data
* as when the pending file was created.
*
* <p>This method tolerates situations where the file was already committed and
* will not raise an exception in that case. This is important for idempotent
* commit retries as they need to happen after recovery.
*
* @throws IOException Thrown if committing fails.
*/
void commitAfterRecovery() throws IOException;
}
}
......@@ -21,9 +21,7 @@ package org.apache.flink.streaming.api.functions.sink.filesystem;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.fs.RecoverableWriter;
import org.apache.flink.core.io.SimpleVersionedSerialization;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.util.Preconditions;
......@@ -61,7 +59,7 @@ public class Buckets<IN, BucketID> {
private final BucketAssigner<IN, BucketID> bucketAssigner;
private final PartFileWriter.PartFileFactory<IN, BucketID> partFileWriterFactory;
private final BucketWriter<IN, BucketID> partFileWriterFactory;
private final RollingPolicy<IN, BucketID> rollingPolicy;
......@@ -78,8 +76,6 @@ public class Buckets<IN, BucketID> {
private long maxPartCounter;
private final RecoverableWriter fsWriter;
private final OutputFileConfig outputFileConfig;
// --------------------------- State Related Fields -----------------------------
......@@ -92,18 +88,18 @@ public class Buckets<IN, BucketID> {
* @param basePath The base path for our buckets.
* @param bucketAssigner The {@link BucketAssigner} provided by the user.
* @param bucketFactory The {@link BucketFactory} to be used to create buckets.
* @param partFileWriterFactory The {@link PartFileWriter.PartFileFactory} to be used when writing data.
* @param partFileWriterFactory The {@link BucketWriter} to be used when writing data.
* @param rollingPolicy The {@link RollingPolicy} as specified by the user.
*/
Buckets(
final Path basePath,
final BucketAssigner<IN, BucketID> bucketAssigner,
final BucketFactory<IN, BucketID> bucketFactory,
final PartFileWriter.PartFileFactory<IN, BucketID> partFileWriterFactory,
final BucketWriter<IN, BucketID> partFileWriterFactory,
final RollingPolicy<IN, BucketID> rollingPolicy,
@Nullable final BucketLifeCycleListener<IN, BucketID> bucketLifeCycleListener,
final int subtaskIndex,
final OutputFileConfig outputFileConfig) throws IOException {
final OutputFileConfig outputFileConfig) {
this.basePath = Preconditions.checkNotNull(basePath);
this.bucketAssigner = Preconditions.checkNotNull(bucketAssigner);
......@@ -118,19 +114,10 @@ public class Buckets<IN, BucketID> {
this.activeBuckets = new HashMap<>();
this.bucketerContext = new Buckets.BucketerContext();
try {
this.fsWriter = FileSystem.get(basePath.toUri()).createRecoverableWriter();
} catch (IOException e) {
LOG.error("Unable to create filesystem for path: {}", basePath);
throw e;
}
this.bucketStateSerializer = new BucketStateSerializer<>(
fsWriter.getResumeRecoverableSerializer(),
fsWriter.getCommitRecoverableSerializer(),
bucketAssigner.getSerializer()
);
this.bucketStateSerializer = new BucketStateSerializer(
partFileWriterFactory.getProperties().getInProgressFileRecoverableSerializer(),
partFileWriterFactory.getProperties().getPendingFileRecoverableSerializer(),
bucketAssigner.getSerializer());
this.maxPartCounter = 0L;
}
......@@ -185,7 +172,6 @@ public class Buckets<IN, BucketID> {
final Bucket<IN, BucketID> restoredBucket = bucketFactory
.restoreBucket(
fsWriter,
subtaskIndex,
maxPartCounter,
partFileWriterFactory,
......@@ -238,7 +224,7 @@ public class Buckets<IN, BucketID> {
final ListState<Long> partCounterStateContainer) throws Exception {
Preconditions.checkState(
fsWriter != null && bucketStateSerializer != null,
partFileWriterFactory != null && bucketStateSerializer != null,
"sink has not been initialized");
LOG.info("Subtask {} checkpointing for checkpoint with id={} (max part counter={}).",
......@@ -308,7 +294,6 @@ public class Buckets<IN, BucketID> {
if (bucket == null) {
final Path bucketPath = assembleBucketPath(bucketId);
bucket = bucketFactory.getNewBucket(
fsWriter,
subtaskIndex,
bucketId,
bucketPath,
......
......@@ -28,11 +28,11 @@ import org.apache.flink.util.Preconditions;
import java.io.IOException;
/**
* A {@link PartFileWriter} for bulk-encoding formats that use an {@link BulkPartWriter}.
* A {@link InProgressFileWriter} for bulk-encoding formats that use an {@link BulkPartWriter}.
* This also implements the {@link PartFileInfo}.
*/
@Internal
final class BulkPartWriter<IN, BucketID> extends PartFileWriter<IN, BucketID> {
final class BulkPartWriter<IN, BucketID> extends OutputStreamBasedPartFileWriter<IN, BucketID> {
private final BulkWriter<IN> writer;
......@@ -46,18 +46,18 @@ final class BulkPartWriter<IN, BucketID> extends PartFileWriter<IN, BucketID> {
}
@Override
void write(IN element, long currentTime) throws IOException {
public void write(IN element, long currentTime) throws IOException {
writer.addElement(element);
markWrite(currentTime);
}
@Override
RecoverableWriter.ResumeRecoverable persist() {
public InProgressFileRecoverable persist() {
throw new UnsupportedOperationException("Bulk Part Writers do not support \"pause and resume\" operations.");
}
@Override
RecoverableWriter.CommitRecoverable closeForCommit() throws IOException {
public PendingFileRecoverable closeForCommit() throws IOException {
writer.flush();
writer.finish();
return super.closeForCommit();
......@@ -68,16 +68,17 @@ final class BulkPartWriter<IN, BucketID> extends PartFileWriter<IN, BucketID> {
* @param <IN> The type of input elements.
* @param <BucketID> The type of ids for the buckets, as returned by the {@link BucketAssigner}.
*/
static class Factory<IN, BucketID> implements PartFileWriter.PartFileFactory<IN, BucketID> {
static class Factory<IN, BucketID> extends OutputStreamBasedPartFileWriter.OutputStreamBasedPartFileFactory<IN, BucketID>{
private final BulkWriter.Factory<IN> writerFactory;
Factory(BulkWriter.Factory<IN> writerFactory) {
Factory(final RecoverableWriter recoverableWriter, BulkWriter.Factory<IN> writerFactory) throws IOException {
super(recoverableWriter);
this.writerFactory = writerFactory;
}
@Override
public PartFileWriter<IN, BucketID> resumeFrom(
public InProgressFileWriter<IN, BucketID> resumeFrom(
final BucketID bucketId,
final RecoverableFsDataOutputStream stream,
final RecoverableWriter.ResumeRecoverable resumable,
......@@ -91,7 +92,7 @@ final class BulkPartWriter<IN, BucketID> extends PartFileWriter<IN, BucketID> {
}
@Override
public PartFileWriter<IN, BucketID> openNew(
public InProgressFileWriter<IN, BucketID> openNew(
final BucketID bucketId,
final RecoverableFsDataOutputStream stream,
final Path path,
......
......@@ -20,7 +20,6 @@ package org.apache.flink.streaming.api.functions.sink.filesystem;
import org.apache.flink.annotation.Internal;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.fs.RecoverableWriter;
import java.io.IOException;
......@@ -34,17 +33,15 @@ class DefaultBucketFactoryImpl<IN, BucketID> implements BucketFactory<IN, Bucket
@Override
public Bucket<IN, BucketID> getNewBucket(
final RecoverableWriter fsWriter,
final int subtaskIndex,
final BucketID bucketId,
final Path bucketPath,
final long initialPartCounter,
final PartFileWriter.PartFileFactory<IN, BucketID> partFileWriterFactory,
final BucketWriter<IN, BucketID> partFileWriterFactory,
final RollingPolicy<IN, BucketID> rollingPolicy,
final OutputFileConfig outputFileConfig) {
return Bucket.getNew(
fsWriter,
subtaskIndex,
bucketId,
bucketPath,
......@@ -56,16 +53,14 @@ class DefaultBucketFactoryImpl<IN, BucketID> implements BucketFactory<IN, Bucket
@Override
public Bucket<IN, BucketID> restoreBucket(
final RecoverableWriter fsWriter,
final int subtaskIndex,
final long initialPartCounter,
final PartFileWriter.PartFileFactory<IN, BucketID> partFileWriterFactory,
final BucketWriter<IN, BucketID> partFileWriterFactory,
final RollingPolicy<IN, BucketID> rollingPolicy,
final BucketState<BucketID> bucketState,
final OutputFileConfig outputFileConfig) throws IOException {
return Bucket.restore(
fsWriter,
subtaskIndex,
initialPartCounter,
partFileWriterFactory,
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.api.functions.sink.filesystem;
import org.apache.flink.annotation.Internal;
import java.io.IOException;
/**
* The {@link Bucket} uses the {@link InProgressFileWriter} to write element to a part file.
*/
@Internal
interface InProgressFileWriter<IN, BucketID> extends PartFileInfo<BucketID> {
/**
* Write a element to the part file.
* @param element the element to be written.
* @param currentTime the writing time.
* @throws IOException Thrown if writing the element fails.
*/
void write(final IN element, final long currentTime) throws IOException;
/**
* @return The state of the current part file.
* @throws IOException Thrown if persisting the part file fails.
*/
InProgressFileRecoverable persist() throws IOException;
/**
* @return The state of the pending part file. {@link Bucket} uses this to commit the pending file.
* @throws IOException Thrown if an I/O error occurs.
*/
PendingFileRecoverable closeForCommit() throws IOException;
/**
* Dispose the part file.
*/
void dispose();
// ------------------------------------------------------------------------
/**
* A handle can be used to recover in-progress file..
*/
interface InProgressFileRecoverable extends PendingFileRecoverable {}
/**
* The handle can be used to recover pending file.
*/
interface PendingFileRecoverable {}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.api.functions.sink.filesystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
import org.apache.flink.core.fs.RecoverableWriter;
import org.apache.flink.core.io.SimpleVersionedSerialization;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.core.memory.DataInputDeserializer;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputSerializer;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.util.IOUtils;
import java.io.IOException;
/**
* The base class for all the part file writer that use {@link org.apache.flink.core.fs.RecoverableFsDataOutputStream}.
* @param <IN> the element type
* @param <BucketID> the bucket type
*/
public abstract class OutputStreamBasedPartFileWriter<IN, BucketID> extends AbstractPartFileWriter<IN, BucketID> {
final RecoverableFsDataOutputStream currentPartStream;
OutputStreamBasedPartFileWriter(
final BucketID bucketID,
final RecoverableFsDataOutputStream recoverableFsDataOutputStream,
final long createTime) {
super(bucketID, createTime);
this.currentPartStream = recoverableFsDataOutputStream;
}
@Override
public InProgressFileRecoverable persist() throws IOException {
return new OutputStreamBasedInProgressFileRecoverable(currentPartStream.persist());
}
@Override
public PendingFileRecoverable closeForCommit() throws IOException {
return new OutputStreamBasedPendingFileRecoverable(currentPartStream.closeForCommit().getRecoverable());
}
@Override
public void dispose() {
// we can suppress exceptions here, because we do not rely on close() to
// flush or persist any data
IOUtils.closeQuietly(currentPartStream);
}
@Override
public long getSize() throws IOException {
return currentPartStream.getPos();
}
abstract static class OutputStreamBasedPartFileFactory<IN, BucketID> implements BucketWriter<IN, BucketID> {
private final RecoverableWriter recoverableWriter;
OutputStreamBasedPartFileFactory(final RecoverableWriter recoverableWriter) {
this.recoverableWriter = recoverableWriter;
}
@Override
public InProgressFileWriter<IN, BucketID> openNewInProgressFile(final BucketID bucketID, final Path path, final long creationTime) throws IOException {
return openNew(bucketID, recoverableWriter.open(path), path, creationTime);
}
@Override
public InProgressFileWriter<IN, BucketID> resumeInProgressFileFrom(final BucketID bucketID, final InProgressFileRecoverable inProgressFileRecoverable, final long creationTime) throws IOException {
final OutputStreamBasedInProgressFileRecoverable outputStreamBasedInProgressRecoverable = (OutputStreamBasedInProgressFileRecoverable) inProgressFileRecoverable;
return resumeFrom(
bucketID,
recoverableWriter.recover(outputStreamBasedInProgressRecoverable.getResumeRecoverable()),
outputStreamBasedInProgressRecoverable.getResumeRecoverable(),
creationTime);
}
@Override
public PendingFile recoverPendingFile(final PendingFileRecoverable pendingFileRecoverable) throws IOException {
final RecoverableWriter.CommitRecoverable commitRecoverable;
if (pendingFileRecoverable instanceof OutputStreamBasedPendingFileRecoverable) {
commitRecoverable = ((OutputStreamBasedPendingFileRecoverable) pendingFileRecoverable).getCommitRecoverable();
} else if (pendingFileRecoverable instanceof OutputStreamBasedInProgressFileRecoverable) {
commitRecoverable = ((OutputStreamBasedInProgressFileRecoverable) pendingFileRecoverable).getResumeRecoverable();
} else {
throw new IllegalArgumentException("can not recover from the pendingFileRecoverable");
}
return new OutputStreamBasedPendingFile(recoverableWriter.recoverForCommit(commitRecoverable));
}
@Override
public boolean cleanupInProgressFileRecoverable(InProgressFileRecoverable inProgressFileRecoverable) throws IOException {
final RecoverableWriter.ResumeRecoverable resumeRecoverable =
((OutputStreamBasedInProgressFileRecoverable) inProgressFileRecoverable).getResumeRecoverable();
return recoverableWriter.cleanupRecoverableState(resumeRecoverable);
}
@Override
public WriterProperties getProperties() {
return new WriterProperties(
new OutputStreamBasedInProgressFileRecoverableSerializer(recoverableWriter.getResumeRecoverableSerializer()),
new OutputStreamBasedPendingFileRecoverableSerializer(recoverableWriter.getCommitRecoverableSerializer()),
recoverableWriter.supportsResume());
}
public abstract InProgressFileWriter<IN, BucketID> openNew(
final BucketID bucketId,
final RecoverableFsDataOutputStream stream,
final Path path,
final long creationTime) throws IOException;
public abstract InProgressFileWriter<IN, BucketID> resumeFrom(
final BucketID bucketId,
final RecoverableFsDataOutputStream stream,
final RecoverableWriter.ResumeRecoverable resumable,
final long creationTime) throws IOException;
}
static final class OutputStreamBasedPendingFileRecoverable implements PendingFileRecoverable {
private final RecoverableWriter.CommitRecoverable commitRecoverable;
OutputStreamBasedPendingFileRecoverable(final RecoverableWriter.CommitRecoverable commitRecoverable) {
this.commitRecoverable = commitRecoverable;
}
RecoverableWriter.CommitRecoverable getCommitRecoverable() {
return commitRecoverable;
}
}
static final class OutputStreamBasedInProgressFileRecoverable implements InProgressFileRecoverable {
private final RecoverableWriter.ResumeRecoverable resumeRecoverable;
OutputStreamBasedInProgressFileRecoverable(final RecoverableWriter.ResumeRecoverable resumeRecoverable) {
this.resumeRecoverable = resumeRecoverable;
}
RecoverableWriter.ResumeRecoverable getResumeRecoverable() {
return resumeRecoverable;
}
}
static final class OutputStreamBasedPendingFile implements BucketWriter.PendingFile {
private final RecoverableFsDataOutputStream.Committer committer;
OutputStreamBasedPendingFile(final RecoverableFsDataOutputStream.Committer committer) {
this.committer = committer;
}
@Override
public void commit() throws IOException {
committer.commit();
}
@Override
public void commitAfterRecovery() throws IOException {
committer.commitAfterRecovery();
}
}
static class OutputStreamBasedInProgressFileRecoverableSerializer implements SimpleVersionedSerializer<InProgressFileRecoverable> {
private static final int MAGIC_NUMBER = 0xb3a4073d;
private final SimpleVersionedSerializer<RecoverableWriter.ResumeRecoverable> resumeSerializer;
OutputStreamBasedInProgressFileRecoverableSerializer(SimpleVersionedSerializer<RecoverableWriter.ResumeRecoverable> resumeSerializer) {
this.resumeSerializer = resumeSerializer;
}
@Override
public int getVersion() {
return 1;
}
@Override
public byte[] serialize(InProgressFileRecoverable inProgressRecoverable) throws IOException {
OutputStreamBasedInProgressFileRecoverable outputStreamBasedInProgressRecoverable = (OutputStreamBasedInProgressFileRecoverable) inProgressRecoverable;
DataOutputSerializer dataOutputSerializer = new DataOutputSerializer(256);
dataOutputSerializer.writeInt(MAGIC_NUMBER);
serializeV1(outputStreamBasedInProgressRecoverable, dataOutputSerializer);
return dataOutputSerializer.getCopyOfBuffer();
}
@Override
public InProgressFileRecoverable deserialize(int version, byte[] serialized) throws IOException {
switch (version) {
case 1:
DataInputView dataInputView = new DataInputDeserializer(serialized);
validateMagicNumber(dataInputView);
return deserializeV1(dataInputView);
default:
throw new IOException("Unrecognized version or corrupt state: " + version);
}
}
SimpleVersionedSerializer<RecoverableWriter.ResumeRecoverable> getResumeSerializer() {
return resumeSerializer;
}
private void serializeV1(final OutputStreamBasedInProgressFileRecoverable outputStreamBasedInProgressRecoverable, final DataOutputView dataOutputView) throws IOException {
SimpleVersionedSerialization.writeVersionAndSerialize(resumeSerializer, outputStreamBasedInProgressRecoverable.getResumeRecoverable(), dataOutputView);
}
private OutputStreamBasedInProgressFileRecoverable deserializeV1(final DataInputView dataInputView) throws IOException {
return new OutputStreamBasedInProgressFileRecoverable(SimpleVersionedSerialization.readVersionAndDeSerialize(resumeSerializer, dataInputView));
}
private static void validateMagicNumber(final DataInputView dataInputView) throws IOException {
final int magicNumber = dataInputView.readInt();
if (magicNumber != MAGIC_NUMBER) {
throw new IOException(String.format("Corrupt data: Unexpected magic number %08X", magicNumber));
}
}
}
static class OutputStreamBasedPendingFileRecoverableSerializer implements SimpleVersionedSerializer<PendingFileRecoverable> {
private static final int MAGIC_NUMBER = 0x2c853c89;
private final SimpleVersionedSerializer<RecoverableWriter.CommitRecoverable> commitSerializer;
OutputStreamBasedPendingFileRecoverableSerializer(final SimpleVersionedSerializer<RecoverableWriter.CommitRecoverable> commitSerializer) {
this.commitSerializer = commitSerializer;
}
@Override
public int getVersion() {
return 1;
}
@Override
public byte[] serialize(PendingFileRecoverable pendingFileRecoverable) throws IOException {
OutputStreamBasedPendingFileRecoverable outputStreamBasedPendingFileRecoverable = (OutputStreamBasedPendingFileRecoverable) pendingFileRecoverable;
DataOutputSerializer dataOutputSerializer = new DataOutputSerializer(256);
dataOutputSerializer.writeInt(MAGIC_NUMBER);
serializeV1(outputStreamBasedPendingFileRecoverable, dataOutputSerializer);
return dataOutputSerializer.getCopyOfBuffer();
}
@Override
public PendingFileRecoverable deserialize(int version, byte[] serialized) throws IOException {
switch (version) {
case 1:
DataInputDeserializer in = new DataInputDeserializer(serialized);
validateMagicNumber(in);
return deserializeV1(in);
default:
throw new IOException("Unrecognized version or corrupt state: " + version);
}
}
SimpleVersionedSerializer<RecoverableWriter.CommitRecoverable> getCommitSerializer() {
return this.commitSerializer;
}
private void serializeV1(final OutputStreamBasedPendingFileRecoverable outputStreamBasedPendingFileRecoverable, final DataOutputView dataOutputView) throws IOException {
SimpleVersionedSerialization.writeVersionAndSerialize(commitSerializer, outputStreamBasedPendingFileRecoverable.getCommitRecoverable(), dataOutputView);
}
private OutputStreamBasedPendingFileRecoverable deserializeV1(final DataInputView dataInputView) throws IOException {
return new OutputStreamBasedPendingFileRecoverable(SimpleVersionedSerialization.readVersionAndDeSerialize(commitSerializer, dataInputView));
}
private static void validateMagicNumber(final DataInputView dataInputView) throws IOException {
final int magicNumber = dataInputView.readInt();
if (magicNumber != MAGIC_NUMBER) {
throw new IOException(String.format("Corrupt data: Unexpected magic number %08X", magicNumber));
}
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.api.functions.sink.filesystem;
import org.apache.flink.annotation.Internal;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
import org.apache.flink.core.fs.RecoverableWriter;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.Preconditions;
import java.io.IOException;
/**
* An abstract writer for the currently open part file in a specific {@link Bucket}.
*
* <p>Currently, there are two subclasses, of this class:
* <ol>
* <li>One for row-wise formats: the {@link RowWisePartWriter}.</li>
* <li>One for bulk encoding formats: the {@link BulkPartWriter}.</li>
* </ol>
*
* <p>This also implements the {@link PartFileInfo}.
*/
@Internal
abstract class PartFileWriter<IN, BucketID> implements PartFileInfo<BucketID> {
private final BucketID bucketId;
private final long creationTime;
protected final RecoverableFsDataOutputStream currentPartStream;
private long lastUpdateTime;
protected PartFileWriter(
final BucketID bucketId,
final RecoverableFsDataOutputStream currentPartStream,
final long creationTime) {
Preconditions.checkArgument(creationTime >= 0L);
this.bucketId = Preconditions.checkNotNull(bucketId);
this.currentPartStream = Preconditions.checkNotNull(currentPartStream);
this.creationTime = creationTime;
this.lastUpdateTime = creationTime;
}
abstract void write(IN element, long currentTime) throws IOException;
RecoverableWriter.ResumeRecoverable persist() throws IOException {
return currentPartStream.persist();
}
RecoverableWriter.CommitRecoverable closeForCommit() throws IOException {
return currentPartStream.closeForCommit().getRecoverable();
}
void dispose() {
// we can suppress exceptions here, because we do not rely on close() to
// flush or persist any data
IOUtils.closeQuietly(currentPartStream);
}
void markWrite(long now) {
this.lastUpdateTime = now;
}
@Override
public BucketID getBucketId() {
return bucketId;
}
@Override
public long getCreationTime() {
return creationTime;
}
@Override
public long getSize() throws IOException {
return currentPartStream.getPos();
}
@Override
public long getLastUpdateTime() {
return lastUpdateTime;
}
// ------------------------------------------------------------------------
/**
* An interface for factories that create the different {@link PartFileWriter writers}.
*/
interface PartFileFactory<IN, BucketID> {
/**
* Used upon recovery from a failure to recover a {@link PartFileWriter writer}.
* @param bucketId the id of the bucket this writer is writing to.
* @param stream the filesystem-specific output stream to use when writing to the filesystem.
* @param resumable the state of the stream we are resurrecting.
* @param creationTime the creation time of the stream.
* @return the recovered {@link PartFileWriter writer}.
* @throws IOException
*/
PartFileWriter<IN, BucketID> resumeFrom(
final BucketID bucketId,
final RecoverableFsDataOutputStream stream,
final RecoverableWriter.ResumeRecoverable resumable,
final long creationTime) throws IOException;
/**
* Used to create a new {@link PartFileWriter writer}.
* @param bucketId the id of the bucket this writer is writing to.
* @param stream the filesystem-specific output stream to use when writing to the filesystem.
* @param path the part this writer will write to.
* @param creationTime the creation time of the stream.
* @return the new {@link PartFileWriter writer}.
* @throws IOException
*/
PartFileWriter<IN, BucketID> openNew(
final BucketID bucketId,
final RecoverableFsDataOutputStream stream,
final Path path,
final long creationTime) throws IOException;
}
}
......@@ -28,11 +28,11 @@ import org.apache.flink.util.Preconditions;
import java.io.IOException;
/**
* A {@link PartFileWriter} for row-wise formats that use an {@link Encoder}.
* A {@link InProgressFileWriter} for row-wise formats that use an {@link Encoder}.
* This also implements the {@link PartFileInfo}.
*/
@Internal
final class RowWisePartWriter<IN, BucketID> extends PartFileWriter<IN, BucketID> {
final class RowWisePartWriter<IN, BucketID> extends OutputStreamBasedPartFileWriter<IN, BucketID> {
private final Encoder<IN> encoder;
......@@ -46,7 +46,7 @@ final class RowWisePartWriter<IN, BucketID> extends PartFileWriter<IN, BucketID>
}
@Override
void write(IN element, long currentTime) throws IOException {
public void write(final IN element, final long currentTime) throws IOException {
encoder.encode(element, currentPartStream);
markWrite(currentTime);
}
......@@ -56,20 +56,21 @@ final class RowWisePartWriter<IN, BucketID> extends PartFileWriter<IN, BucketID>
* @param <IN> The type of input elements.
* @param <BucketID> The type of ids for the buckets, as returned by the {@link BucketAssigner}.
*/
static class Factory<IN, BucketID> implements PartFileWriter.PartFileFactory<IN, BucketID> {
static class Factory<IN, BucketID> extends OutputStreamBasedPartFileWriter.OutputStreamBasedPartFileFactory<IN, BucketID> {
private final Encoder<IN> encoder;
Factory(Encoder<IN> encoder) {
Factory(final RecoverableWriter recoverableWriter, final Encoder<IN> encoder) {
super(recoverableWriter);
this.encoder = encoder;
}
@Override
public PartFileWriter<IN, BucketID> resumeFrom(
public InProgressFileWriter<IN, BucketID> resumeFrom(
final BucketID bucketId,
final RecoverableFsDataOutputStream stream,
final RecoverableWriter.ResumeRecoverable resumable,
final long creationTime) throws IOException {
final long creationTime) {
Preconditions.checkNotNull(stream);
Preconditions.checkNotNull(resumable);
......@@ -78,11 +79,11 @@ final class RowWisePartWriter<IN, BucketID> extends PartFileWriter<IN, BucketID>
}
@Override
public PartFileWriter<IN, BucketID> openNew(
public InProgressFileWriter<IN, BucketID> openNew(
final BucketID bucketId,
final RecoverableFsDataOutputStream stream,
final Path path,
final long creationTime) throws IOException {
final long creationTime) {
Preconditions.checkNotNull(stream);
Preconditions.checkNotNull(path);
......
......@@ -277,7 +277,7 @@ public class StreamingFileSink<IN>
basePath,
bucketAssigner,
bucketFactory,
new RowWisePartWriter.Factory<>(encoder),
new RowWisePartWriter.Factory<>(FileSystem.get(basePath.toUri()).createRecoverableWriter(), encoder),
rollingPolicy,
bucketLifeCycleListener,
subtaskIndex,
......@@ -397,7 +397,7 @@ public class StreamingFileSink<IN>
basePath,
bucketAssigner,
bucketFactory,
new BulkPartWriter.Factory<>(writerFactory),
new BulkPartWriter.Factory<>(FileSystem.get(basePath.toUri()).createRecoverableWriter(), writerFactory),
rollingPolicy,
bucketLifeCycleListener,
subtaskIndex,
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.api.functions.sink.filesystem;
import org.apache.flink.annotation.Internal;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import static org.apache.flink.util.Preconditions.checkNotNull;
/**
* Javadoc.
*/
@Internal
public class WriterProperties {
private final SimpleVersionedSerializer<? extends InProgressFileWriter.InProgressFileRecoverable> inProgressFileRecoverableSerializer;
private final SimpleVersionedSerializer<? extends InProgressFileWriter.PendingFileRecoverable> pendingFileRecoverableSerializer;
private final boolean supportsResume;
public WriterProperties(
SimpleVersionedSerializer<? extends InProgressFileWriter.InProgressFileRecoverable> inProgressFileRecoverableSerializer,
SimpleVersionedSerializer<? extends InProgressFileWriter.PendingFileRecoverable> pendingFileRecoverableSerializer,
boolean supportsResume) {
this.inProgressFileRecoverableSerializer = checkNotNull(inProgressFileRecoverableSerializer);
this.pendingFileRecoverableSerializer = checkNotNull(pendingFileRecoverableSerializer);
this.supportsResume = supportsResume;
}
boolean supportsResume() {
return supportsResume;
}
/**
* @return the serializer for the {@link InProgressFileWriter.PendingFileRecoverable}.
*/
SimpleVersionedSerializer<? extends InProgressFileWriter.PendingFileRecoverable> getPendingFileRecoverableSerializer() {
return pendingFileRecoverableSerializer;
}
/**
* @return the serializer for the {@link InProgressFileWriter.InProgressFileRecoverable}.
*/
SimpleVersionedSerializer<? extends InProgressFileWriter.InProgressFileRecoverable> getInProgressFileRecoverableSerializer() {
return inProgressFileRecoverableSerializer;
}
}
......@@ -19,6 +19,7 @@
package org.apache.flink.streaming.api.functions.sink.filesystem;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.BasePathBucketAssigner;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
......@@ -54,7 +55,7 @@ public class BucketAssignerITCases {
basePath,
new BasePathBucketAssigner<>(),
new DefaultBucketFactoryImpl<>(),
new RowWisePartWriter.Factory<>(new SimpleStringEncoder<>()),
new RowWisePartWriter.Factory<>(FileSystem.get(basePath.toUri()).createRecoverableWriter(), new SimpleStringEncoder<>()),
rollingPolicy,
null,
0,
......
......@@ -18,6 +18,7 @@
package org.apache.flink.streaming.api.functions.sink.filesystem;
import org.apache.flink.api.common.serialization.Encoder;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
......@@ -171,7 +172,7 @@ public class BucketTest {
return new TypeSafeMatcher<BucketState<String>>() {
@Override
protected boolean matchesSafely(BucketState<String> state) {
return state.getInProgressResumableFile() != null;
return state.getInProgressFileRecoverable() != null;
}
@Override
......@@ -185,7 +186,7 @@ public class BucketTest {
return new TypeSafeMatcher<BucketState<String>>() {
@Override
protected boolean matchesSafely(BucketState<String> state) {
return state.getInProgressResumableFile() == null;
return state.getInProgressFileRecoverable() == null;
}
@Override
......@@ -200,7 +201,7 @@ public class BucketTest {
return new TypeSafeMatcher<Bucket<String, String>>() {
@Override
protected boolean matchesSafely(Bucket<String, String> bucket) {
final PartFileWriter<String, String> inProgressPart = bucket.getInProgressPart();
final InProgressFileWriter<String, String> inProgressPart = bucket.getInProgressPart();
return isNull == (inProgressPart == null);
}
......@@ -349,23 +350,21 @@ public class BucketTest {
private static final RollingPolicy<String, String> rollingPolicy = DefaultRollingPolicy.builder().build();
private static final PartFileWriter.PartFileFactory<String, String> partFileFactory =
new RowWisePartWriter.Factory<>(new SimpleStringEncoder<>());
private static final Encoder ENCODER = new SimpleStringEncoder<>();
private static Bucket<String, String> createBucket(
final RecoverableWriter writer,
final Path bucketPath,
final int subtaskIdx,
final int initialPartCounter,
final OutputFileConfig outputFileConfig) {
final OutputFileConfig outputFileConfig) throws IOException {
return Bucket.getNew(
writer,
subtaskIdx,
bucketId,
bucketPath,
initialPartCounter,
partFileFactory,
new RowWisePartWriter.Factory<>(writer, ENCODER),
rollingPolicy,
outputFileConfig);
}
......@@ -378,10 +377,9 @@ public class BucketTest {
final OutputFileConfig outputFileConfig) throws Exception {
return Bucket.restore(
writer,
subtaskIndex,
initialPartCounter,
partFileFactory,
new RowWisePartWriter.Factory<>(writer, ENCODER),
rollingPolicy,
bucketState,
outputFileConfig);
......@@ -402,24 +400,46 @@ public class BucketTest {
private Bucket<String, String> getRestoredBucketWithOnlyInProgressPart(final BaseStubWriter writer) throws IOException {
final BucketState<String> stateWithOnlyInProgressFile =
new BucketState<>("test", new Path(), 12345L, new NoOpRecoverable(), new HashMap<>());
return Bucket.restore(writer, 0, 1L, partFileFactory, rollingPolicy, stateWithOnlyInProgressFile, OutputFileConfig.builder().build());
new BucketState<>(
"test",
new Path(),
12345L,
new OutputStreamBasedPartFileWriter.OutputStreamBasedInProgressFileRecoverable(new NoOpRecoverable()),
new HashMap<>());
return Bucket.restore(
0,
1L,
new RowWisePartWriter.Factory<>(writer, ENCODER),
rollingPolicy,
stateWithOnlyInProgressFile,
OutputFileConfig.builder().build());
}
private Bucket<String, String> getRestoredBucketWithOnlyPendingParts(final BaseStubWriter writer, final int numberOfPendingParts) throws IOException {
final Map<Long, List<RecoverableWriter.CommitRecoverable>> completePartsPerCheckpoint =
final Map<Long, List<InProgressFileWriter.PendingFileRecoverable>> completePartsPerCheckpoint =
createPendingPartsPerCheckpoint(numberOfPendingParts);
final BucketState<String> initStateWithOnlyInProgressFile =
new BucketState<>("test", new Path(), 12345L, null, completePartsPerCheckpoint);
return Bucket.restore(writer, 0, 1L, partFileFactory, rollingPolicy, initStateWithOnlyInProgressFile, OutputFileConfig.builder().build());
new BucketState<>(
"test",
new Path(),
12345L,
null,
completePartsPerCheckpoint);
return Bucket.restore(
0,
1L,
new RowWisePartWriter.Factory<>(writer, ENCODER),
rollingPolicy,
initStateWithOnlyInProgressFile, OutputFileConfig.builder().build());
}
private Map<Long, List<RecoverableWriter.CommitRecoverable>> createPendingPartsPerCheckpoint(int noOfCheckpoints) {
final Map<Long, List<RecoverableWriter.CommitRecoverable>> pendingCommittablesPerCheckpoint = new HashMap<>();
private Map<Long, List<InProgressFileWriter.PendingFileRecoverable>> createPendingPartsPerCheckpoint(int noOfCheckpoints) {
final Map<Long, List<InProgressFileWriter.PendingFileRecoverable>> pendingCommittablesPerCheckpoint = new HashMap<>();
for (int checkpointId = 0; checkpointId < noOfCheckpoints; checkpointId++) {
final List<RecoverableWriter.CommitRecoverable> pending = new ArrayList<>();
pending.add(new NoOpRecoverable());
final List<InProgressFileWriter.PendingFileRecoverable> pending = new ArrayList<>();
pending.add(new OutputStreamBasedPartFileWriter.OutputStreamBasedPendingFileRecoverable(new NoOpRecoverable()));
pendingCommittablesPerCheckpoint.put((long) checkpointId, pending);
}
return pendingCommittablesPerCheckpoint;
......
......@@ -21,6 +21,7 @@ package org.apache.flink.streaming.api.functions.sink.filesystem;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.streaming.api.functions.sink.filesystem.TestUtils.MockListState;
......@@ -93,8 +94,8 @@ public class BucketsTest {
return bucket.getBucketId().equals(bucketId) &&
bucket.getBucketPath().equals(new Path(testTmpPath, bucketId)) &&
bucket.getInProgressPart() == null &&
bucket.getPendingPartsForCurrentCheckpoint().isEmpty() &&
bucket.getPendingPartsPerCheckpoint().size() == 1;
bucket.getPendingFileRecoverablesForCurrentCheckpoint().isEmpty() &&
bucket.getPendingFileRecoverablesPerCheckpoint().size() == 1;
}
@Override
......@@ -145,7 +146,7 @@ public class BucketsTest {
Assert.assertEquals(2L, bucketsTwo.getMaxPartCounter());
// make sure we have one in-progress file here and a pending
Assert.assertEquals(1L, bucketsTwo.getActiveBuckets().get("test1").getPendingPartsPerCheckpoint().size());
Assert.assertEquals(1L, bucketsTwo.getActiveBuckets().get("test1").getPendingFileRecoverablesPerCheckpoint().size());
Assert.assertNotNull(bucketsTwo.getActiveBuckets().get("test1").getInProgressPart());
final ListState<byte[]> mergedBucketStateContainer = new MockListState<>();
......@@ -175,10 +176,10 @@ public class BucketsTest {
// this is due to the Bucket#merge(). The in progress file of one
// of the previous tasks is put in the list of pending files.
Assert.assertEquals(1L, bucket.getPendingPartsForCurrentCheckpoint().size());
Assert.assertEquals(1L, bucket.getPendingFileRecoverablesForCurrentCheckpoint().size());
// we commit the pending for previous checkpoints
Assert.assertTrue(bucket.getPendingPartsPerCheckpoint().isEmpty());
Assert.assertTrue(bucket.getPendingFileRecoverablesPerCheckpoint().isEmpty());
}
@Test
......@@ -210,8 +211,8 @@ public class BucketsTest {
Assert.assertEquals("test", bucket.getBucketId());
Assert.assertNull(bucket.getInProgressPart());
Assert.assertEquals(1L, bucket.getPendingPartsForCurrentCheckpoint().size());
Assert.assertTrue(bucket.getPendingPartsPerCheckpoint().isEmpty());
Assert.assertEquals(1L, bucket.getPendingFileRecoverablesForCurrentCheckpoint().size());
Assert.assertTrue(bucket.getPendingFileRecoverablesPerCheckpoint().isEmpty());
}
@Test
......@@ -321,7 +322,7 @@ public class BucketsTest {
path,
new VerifyingBucketAssigner(timestamp, watermark, processingTime),
new DefaultBucketFactoryImpl<>(),
new RowWisePartWriter.Factory<>(new SimpleStringEncoder<>()),
new RowWisePartWriter.Factory<>(FileSystem.get(path.toUri()).createRecoverableWriter(), new SimpleStringEncoder<>()),
DefaultRollingPolicy.builder().build(),
null,
2,
......@@ -458,7 +459,7 @@ public class BucketsTest {
basePath,
new TestUtils.StringIdentityBucketAssigner(),
new DefaultBucketFactoryImpl<>(),
new RowWisePartWriter.Factory<>(new SimpleStringEncoder<>()),
new RowWisePartWriter.Factory<>(FileSystem.get(basePath.toUri()).createRecoverableWriter(), new SimpleStringEncoder<>()),
rollingPolicy,
bucketLifeCycleListener,
subtaskIdx,
......
......@@ -19,6 +19,7 @@
package org.apache.flink.streaming.api.functions.sink.filesystem;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.OnCheckpointRollingPolicy;
......@@ -201,7 +202,7 @@ public class RollingPolicyTest {
basePath,
new TestUtils.StringIdentityBucketAssigner(),
new DefaultBucketFactoryImpl<>(),
new RowWisePartWriter.Factory<>(new SimpleStringEncoder<>()),
new RowWisePartWriter.Factory<>(FileSystem.get(basePath.toUri()).createRecoverableWriter(), new SimpleStringEncoder<>()),
rollingPolicyToTest,
null,
0,
......
......@@ -23,11 +23,13 @@ import org.apache.flink.api.common.serialization.Encoder;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
import org.apache.flink.core.fs.local.LocalFileSystem;
import org.apache.flink.core.fs.local.LocalRecoverableWriter;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.SimpleVersionedStringSerializer;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.OnCheckpointRollingPolicy;
import org.apache.flink.streaming.api.operators.StreamSink;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
......@@ -40,6 +42,7 @@ import javax.annotation.Nullable;
import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
import java.lang.reflect.Field;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.charset.StandardCharsets;
......@@ -50,6 +53,8 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import static org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.OnCheckpointRollingPolicy.build;
/**
* Utilities for the {@link StreamingFileSink} tests.
*/
......@@ -158,7 +163,7 @@ public class TestUtils {
.forBulkFormat(new Path(outDir.toURI()), writer)
.withBucketAssigner(bucketer)
.withBucketCheckInterval(bucketCheckInterval)
.withRollingPolicy(OnCheckpointRollingPolicy.build())
.withRollingPolicy(build())
.withBucketFactory(bucketFactory)
.withOutputFileConfig(outputFileConfig)
.build();
......@@ -199,7 +204,7 @@ public class TestUtils {
StreamingFileSink<Tuple2<String, Integer>> sink = StreamingFileSink
.forBulkFormat(new Path(outDir.toURI()), writer)
.withNewBucketAssigner(bucketer)
.withRollingPolicy(OnCheckpointRollingPolicy.build())
.withRollingPolicy(build())
.withBucketCheckInterval(bucketCheckInterval)
.withBucketFactory(bucketFactory)
.withOutputFileConfig(outputFileConfig)
......@@ -414,4 +419,50 @@ public class TestUtils {
backingList.clear();
}
}
static class LocalRecoverableWriterForBucketStateMigrationTest extends LocalRecoverableWriter {
final String prefix = "src/test/resources/";
LocalRecoverableWriterForBucketStateMigrationTest() {
super(new LocalFileSystem());
}
public RecoverableFsDataOutputStream open(Path filePath) throws IOException {
RecoverableFsDataOutputStream recoverableFsDataOutputStream = super.open(filePath);
try {
final Field targetFileField = recoverableFsDataOutputStream.getClass().getDeclaredField("targetFile");
targetFileField.setAccessible(true);
final File targetFile = (File) targetFileField.get(recoverableFsDataOutputStream);
final int indexOfTargetFileRelativePath = targetFile.toString().indexOf(prefix);
final File relativeTargetFile = new FileForBucketSateMigrationTest(targetFile.toString().substring(indexOfTargetFileRelativePath));
targetFileField.set(recoverableFsDataOutputStream, relativeTargetFile);
final Field tempFileField = recoverableFsDataOutputStream.getClass().getDeclaredField("tempFile");
tempFileField.setAccessible(true);
final File tempFile = (File) tempFileField.get(recoverableFsDataOutputStream);
final int indexOfTempFileRelativePath = tempFile.toString().indexOf(prefix);
final File relativeTemptFile = new FileForBucketSateMigrationTest(tempFile.toString().substring(indexOfTempFileRelativePath));
tempFileField.set(recoverableFsDataOutputStream, relativeTemptFile);
} catch (NoSuchFieldException | IllegalAccessException e) {
e.printStackTrace();
return null;
}
return recoverableFsDataOutputStream;
}
}
static class FileForBucketSateMigrationTest extends File {
FileForBucketSateMigrationTest(String pathname) {
super(pathname);
}
@Override
@Nonnull
public String getAbsolutePath() {
return getPath();
}
}
}
......@@ -50,7 +50,7 @@ public class NoOpRecoverableWriter implements RecoverableWriter {
@Override
public boolean cleanupRecoverableState(ResumeRecoverable resumable) throws IOException {
throw new UnsupportedOperationException();
return false;
}
@Override
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册