Revert "[FLINK-17593][fs-connector] Support arbitrary recovery mechanism for PartFileWriter"

This reverts commit 339f5d84.

I'm reverting these three related commits because it is important to
have confidence in our testing and to clearly separate the addition of
the Bucket State upgrade test from changing the serializer.
上级 aa2f67f5
......@@ -138,8 +138,10 @@ public interface RecoverableWriter {
* recover from a (potential) failure. These can be temporary files that were written
* to the filesystem or objects that were uploaded to S3.
*
* <p><b>NOTE:</b> This operation should not throw an exception, but return false if the cleanup did not
* happen for any reason.
* <p><b>NOTE:</b> This operation should not throw an exception if the resumable has already
* been cleaned up and the resources have been freed. But the contract is that it will throw
* an {@link UnsupportedOperationException} if it is called for a {@code RecoverableWriter}
* whose {@link #requiresCleanupOfRecoverableState()} returns {@code false}.
*
* @param resumable The {@link ResumeRecoverable} whose state we want to clean-up.
* @return {@code true} if the resources were successfully freed, {@code false} otherwise
......
......@@ -77,7 +77,7 @@ public class LocalRecoverableWriter implements RecoverableWriter {
@Override
public boolean cleanupRecoverableState(ResumeRecoverable resumable) throws IOException {
return false;
throw new UnsupportedOperationException();
}
@Override
......
......@@ -95,7 +95,7 @@ public class HadoopRecoverableWriter implements RecoverableWriter {
@Override
public boolean cleanupRecoverableState(ResumeRecoverable resumable) throws IOException {
return false;
throw new UnsupportedOperationException();
}
@Override
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.api.functions.sink.filesystem;
/**
* An abstract writer for the currently open part file in a specific {@link Bucket}.
* @param <IN> the element type.
* @param <BucketID> the bucket id type.
*/
public abstract class AbstractPartFileWriter<IN, BucketID> implements InProgressFileWriter<IN, BucketID> {
private final BucketID bucketID;
private final long creationTime;
private long lastUpdateTime;
public AbstractPartFileWriter(final BucketID bucketID, final long createTime) {
this.bucketID = bucketID;
this.creationTime = createTime;
this.lastUpdateTime = createTime;
}
@Override
public BucketID getBucketId() {
return bucketID;
}
@Override
public long getCreationTime() {
return creationTime;
}
@Override
public long getLastUpdateTime() {
return lastUpdateTime;
}
void markWrite(long now) {
this.lastUpdateTime = now;
}
}
......@@ -21,6 +21,10 @@ package org.apache.flink.streaming.api.functions.sink.filesystem;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
import org.apache.flink.core.fs.RecoverableWriter;
import org.apache.flink.core.fs.RecoverableWriter.CommitRecoverable;
import org.apache.flink.core.fs.RecoverableWriter.ResumeRecoverable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -56,44 +60,48 @@ public class Bucket<IN, BucketID> {
private final int subtaskIndex;
private final BucketWriter<IN, BucketID> bucketWriter;
private final PartFileWriter.PartFileFactory<IN, BucketID> partFileFactory;
private final RecoverableWriter fsWriter;
private final RollingPolicy<IN, BucketID> rollingPolicy;
private final NavigableMap<Long, InProgressFileWriter.InProgressFileRecoverable> inProgressFileRecoverablesPerCheckpoint;
private final NavigableMap<Long, ResumeRecoverable> resumablesPerCheckpoint;
private final NavigableMap<Long, List<InProgressFileWriter.PendingFileRecoverable>> pendingFileRecoverablesPerCheckpoint;
private final NavigableMap<Long, List<CommitRecoverable>> pendingPartsPerCheckpoint;
private final OutputFileConfig outputFileConfig;
private long partCounter;
@Nullable
private InProgressFileWriter<IN, BucketID> inProgressPart;
private PartFileWriter<IN, BucketID> inProgressPart;
private List<InProgressFileWriter.PendingFileRecoverable> pendingFileRecoverablesForCurrentCheckpoint;
private List<CommitRecoverable> pendingPartsForCurrentCheckpoint;
/**
* Constructor to create a new empty bucket.
*/
private Bucket(
final RecoverableWriter fsWriter,
final int subtaskIndex,
final BucketID bucketId,
final Path bucketPath,
final long initialPartCounter,
final BucketWriter<IN, BucketID> partFileFactory,
final PartFileWriter.PartFileFactory<IN, BucketID> partFileFactory,
final RollingPolicy<IN, BucketID> rollingPolicy,
final OutputFileConfig outputFileConfig) {
this.fsWriter = checkNotNull(fsWriter);
this.subtaskIndex = subtaskIndex;
this.bucketId = checkNotNull(bucketId);
this.bucketPath = checkNotNull(bucketPath);
this.partCounter = initialPartCounter;
this.bucketWriter = checkNotNull(partFileFactory);
this.partFileFactory = checkNotNull(partFileFactory);
this.rollingPolicy = checkNotNull(rollingPolicy);
this.pendingFileRecoverablesForCurrentCheckpoint = new ArrayList<>();
this.pendingFileRecoverablesPerCheckpoint = new TreeMap<>();
this.inProgressFileRecoverablesPerCheckpoint = new TreeMap<>();
this.pendingPartsForCurrentCheckpoint = new ArrayList<>();
this.pendingPartsPerCheckpoint = new TreeMap<>();
this.resumablesPerCheckpoint = new TreeMap<>();
this.outputFileConfig = checkNotNull(outputFileConfig);
}
......@@ -102,14 +110,16 @@ public class Bucket<IN, BucketID> {
* Constructor to restore a bucket from checkpointed state.
*/
private Bucket(
final RecoverableWriter fsWriter,
final int subtaskIndex,
final long initialPartCounter,
final BucketWriter<IN, BucketID> partFileFactory,
final PartFileWriter.PartFileFactory<IN, BucketID> partFileFactory,
final RollingPolicy<IN, BucketID> rollingPolicy,
final BucketState<BucketID> bucketState,
final OutputFileConfig outputFileConfig) throws IOException {
this(
fsWriter,
subtaskIndex,
bucketState.getBucketId(),
bucketState.getBucketPath(),
......@@ -123,29 +133,31 @@ public class Bucket<IN, BucketID> {
}
private void restoreInProgressFile(final BucketState<BucketID> state) throws IOException {
if (!state.hasInProgressFileRecoverable()) {
if (!state.hasInProgressResumableFile()) {
return;
}
// we try to resume the previous in-progress file
final InProgressFileWriter.InProgressFileRecoverable inProgressFileRecoverable = state.getInProgressFileRecoverable();
final ResumeRecoverable resumable = state.getInProgressResumableFile();
if (bucketWriter.getProperties().supportsResume()) {
inProgressPart = bucketWriter.resumeInProgressFileFrom(
bucketId, inProgressFileRecoverable, state.getInProgressFileCreationTime());
if (fsWriter.supportsResume()) {
final RecoverableFsDataOutputStream stream = fsWriter.recover(resumable);
inProgressPart = partFileFactory.resumeFrom(
bucketId, stream, resumable, state.getInProgressFileCreationTime());
} else {
// if the writer does not support resume, then we close the
// in-progress part and commit it, as done in the case of pending files.
bucketWriter.recoverPendingFile(inProgressFileRecoverable).commitAfterRecovery();
fsWriter.recoverForCommit(resumable).commitAfterRecovery();
}
}
private void commitRecoveredPendingFiles(final BucketState<BucketID> state) throws IOException {
// we commit pending files for checkpoints that precess the last successful one, from which we are recovering
for (List<InProgressFileWriter.PendingFileRecoverable> pendingFileRecoverables: state.getPendingFileRecoverablesPerCheckpoint().values()) {
for (InProgressFileWriter.PendingFileRecoverable pendingFileRecoverable: pendingFileRecoverables) {
bucketWriter.recoverPendingFile(pendingFileRecoverable).commitAfterRecovery();
for (List<CommitRecoverable> committables: state.getCommittableFilesPerCheckpoint().values()) {
for (CommitRecoverable committable: committables) {
fsWriter.recoverForCommit(committable).commitAfterRecovery();
}
}
}
......@@ -163,7 +175,7 @@ public class Bucket<IN, BucketID> {
}
boolean isActive() {
return inProgressPart != null || !pendingFileRecoverablesForCurrentCheckpoint.isEmpty() || !pendingFileRecoverablesPerCheckpoint.isEmpty();
return inProgressPart != null || !pendingPartsForCurrentCheckpoint.isEmpty() || !pendingPartsPerCheckpoint.isEmpty();
}
void merge(final Bucket<IN, BucketID> bucket) throws IOException {
......@@ -172,16 +184,16 @@ public class Bucket<IN, BucketID> {
// There should be no pending files in the "to-merge" states.
// The reason is that:
// 1) the pendingFileRecoverablesForCurrentCheckpoint is emptied whenever we take a Recoverable (see prepareBucketForCheckpointing()).
// So a Recoverable, including the one we are recovering from, will never contain such files.
// 2) the files in pendingFileRecoverablesPerCheckpoint are committed upon recovery (see commitRecoveredPendingFiles()).
// 1) the pendingPartsForCurrentCheckpoint is emptied whenever we take a snapshot (see prepareBucketForCheckpointing()).
// So a snapshot, including the one we are recovering from, will never contain such files.
// 2) the files in pendingPartsPerCheckpoint are committed upon recovery (see commitRecoveredPendingFiles()).
checkState(bucket.pendingFileRecoverablesForCurrentCheckpoint.isEmpty());
checkState(bucket.pendingFileRecoverablesPerCheckpoint.isEmpty());
checkState(bucket.pendingPartsForCurrentCheckpoint.isEmpty());
checkState(bucket.pendingPartsPerCheckpoint.isEmpty());
InProgressFileWriter.PendingFileRecoverable pendingFileRecoverable = bucket.closePartFile();
if (pendingFileRecoverable != null) {
pendingFileRecoverablesForCurrentCheckpoint.add(pendingFileRecoverable);
CommitRecoverable committable = bucket.closePartFile();
if (committable != null) {
pendingPartsForCurrentCheckpoint.add(committable);
}
if (LOG.isDebugEnabled()) {
......@@ -206,7 +218,8 @@ public class Bucket<IN, BucketID> {
closePartFile();
final Path partFilePath = assembleNewPartPath();
inProgressPart = bucketWriter.openNewInProgressFile(bucketId, partFilePath, currentTime);
final RecoverableFsDataOutputStream stream = fsWriter.open(partFilePath);
inProgressPart = partFileFactory.openNew(bucketId, stream, partFilePath, currentTime);
if (LOG.isDebugEnabled()) {
LOG.debug("Subtask {} opening new part file \"{}\" for bucket id={}.",
......@@ -220,14 +233,14 @@ public class Bucket<IN, BucketID> {
return new Path(bucketPath, outputFileConfig.getPartPrefix() + '-' + subtaskIndex + '-' + partCounter + outputFileConfig.getPartSuffix());
}
private InProgressFileWriter.PendingFileRecoverable closePartFile() throws IOException {
InProgressFileWriter.PendingFileRecoverable pendingFileRecoverable = null;
private CommitRecoverable closePartFile() throws IOException {
CommitRecoverable committable = null;
if (inProgressPart != null) {
pendingFileRecoverable = inProgressPart.closeForCommit();
pendingFileRecoverablesForCurrentCheckpoint.add(pendingFileRecoverable);
committable = inProgressPart.closeForCommit();
pendingPartsForCurrentCheckpoint.add(committable);
inProgressPart = null;
}
return pendingFileRecoverable;
return committable;
}
void disposePartFile() {
......@@ -239,16 +252,24 @@ public class Bucket<IN, BucketID> {
BucketState<BucketID> onReceptionOfCheckpoint(long checkpointId) throws IOException {
prepareBucketForCheckpointing(checkpointId);
InProgressFileWriter.InProgressFileRecoverable inProgressFileRecoverable = null;
ResumeRecoverable inProgressResumable = null;
long inProgressFileCreationTime = Long.MAX_VALUE;
if (inProgressPart != null) {
inProgressFileRecoverable = inProgressPart.persist();
inProgressResumable = inProgressPart.persist();
inProgressFileCreationTime = inProgressPart.getCreationTime();
this.inProgressFileRecoverablesPerCheckpoint.put(checkpointId, inProgressFileRecoverable);
// the following is an optimization so that writers that do not
// require cleanup, they do not have to keep track of resumables
// and later iterate over the active buckets.
// (see onSuccessfulCompletionOfCheckpoint())
if (fsWriter.requiresCleanupOfRecoverableState()) {
this.resumablesPerCheckpoint.put(checkpointId, inProgressResumable);
}
}
return new BucketState<>(bucketId, bucketPath, inProgressFileCreationTime, inProgressFileRecoverable, pendingFileRecoverablesPerCheckpoint);
return new BucketState<>(bucketId, bucketPath, inProgressFileCreationTime, inProgressResumable, pendingPartsPerCheckpoint);
}
private void prepareBucketForCheckpointing(long checkpointId) throws IOException {
......@@ -259,46 +280,49 @@ public class Bucket<IN, BucketID> {
closePartFile();
}
if (!pendingFileRecoverablesForCurrentCheckpoint.isEmpty()) {
pendingFileRecoverablesPerCheckpoint.put(checkpointId, pendingFileRecoverablesForCurrentCheckpoint);
pendingFileRecoverablesForCurrentCheckpoint = new ArrayList<>();
if (!pendingPartsForCurrentCheckpoint.isEmpty()) {
pendingPartsPerCheckpoint.put(checkpointId, pendingPartsForCurrentCheckpoint);
pendingPartsForCurrentCheckpoint = new ArrayList<>();
}
}
void onSuccessfulCompletionOfCheckpoint(long checkpointId) throws IOException {
checkNotNull(bucketWriter);
checkNotNull(fsWriter);
Iterator<Map.Entry<Long, List<InProgressFileWriter.PendingFileRecoverable>>> it =
pendingFileRecoverablesPerCheckpoint.headMap(checkpointId, true)
Iterator<Map.Entry<Long, List<CommitRecoverable>>> it =
pendingPartsPerCheckpoint.headMap(checkpointId, true)
.entrySet().iterator();
while (it.hasNext()) {
Map.Entry<Long, List<InProgressFileWriter.PendingFileRecoverable>> entry = it.next();
Map.Entry<Long, List<CommitRecoverable>> entry = it.next();
for (InProgressFileWriter.PendingFileRecoverable pendingFileRecoverable : entry.getValue()) {
bucketWriter.recoverPendingFile(pendingFileRecoverable).commit();
for (CommitRecoverable committable : entry.getValue()) {
fsWriter.recoverForCommit(committable).commit();
}
it.remove();
}
cleanupInProgressFileRecoverables(checkpointId);
cleanupOutdatedResumables(checkpointId);
}
private void cleanupInProgressFileRecoverables(long checkpointId) throws IOException {
Iterator<Map.Entry<Long, InProgressFileWriter.InProgressFileRecoverable>> it =
inProgressFileRecoverablesPerCheckpoint.headMap(checkpointId, false)
private void cleanupOutdatedResumables(long checkpointId) throws IOException {
Iterator<Map.Entry<Long, ResumeRecoverable>> it =
resumablesPerCheckpoint.headMap(checkpointId, false)
.entrySet().iterator();
while (it.hasNext()) {
final InProgressFileWriter.InProgressFileRecoverable inProgressFileRecoverable = it.next().getValue();
final ResumeRecoverable recoverable = it.next().getValue();
// this check is redundant, as we only put entries in the inProgressFileRecoverablesPerCheckpoint map
// list when the requiresCleanupOfInProgressFileRecoverableState() returns true, but having it makes
// this check is redundant, as we only put entries in the resumablesPerCheckpoint map
// list when the requiresCleanupOfRecoverableState() returns true, but having it makes
// the code more readable.
final boolean successfullyDeleted = bucketWriter.cleanupInProgressFileRecoverable(inProgressFileRecoverable);
if (LOG.isDebugEnabled() && successfullyDeleted) {
LOG.debug("Subtask {} successfully deleted incomplete part for bucket id={}.", subtaskIndex, bucketId);
if (fsWriter.requiresCleanupOfRecoverableState()) {
final boolean successfullyDeleted = fsWriter.cleanupRecoverableState(recoverable);
if (LOG.isDebugEnabled() && successfullyDeleted) {
LOG.debug("Subtask {} successfully deleted incomplete part for bucket id={}.", subtaskIndex, bucketId);
}
}
it.remove();
}
......@@ -318,51 +342,54 @@ public class Bucket<IN, BucketID> {
// --------------------------- Testing Methods -----------------------------
@VisibleForTesting
Map<Long, List<InProgressFileWriter.PendingFileRecoverable>> getPendingFileRecoverablesPerCheckpoint() {
return pendingFileRecoverablesPerCheckpoint;
Map<Long, List<CommitRecoverable>> getPendingPartsPerCheckpoint() {
return pendingPartsPerCheckpoint;
}
@Nullable
@VisibleForTesting
InProgressFileWriter<IN, BucketID> getInProgressPart() {
PartFileWriter<IN, BucketID> getInProgressPart() {
return inProgressPart;
}
@VisibleForTesting
List<InProgressFileWriter.PendingFileRecoverable> getPendingFileRecoverablesForCurrentCheckpoint() {
return pendingFileRecoverablesForCurrentCheckpoint;
List<CommitRecoverable> getPendingPartsForCurrentCheckpoint() {
return pendingPartsForCurrentCheckpoint;
}
// --------------------------- Static Factory Methods -----------------------------
/**
* Creates a new empty {@code Bucket}.
* @param fsWriter the filesystem-specific {@link RecoverableWriter}.
* @param subtaskIndex the index of the subtask creating the bucket.
* @param bucketId the identifier of the bucket, as returned by the {@link BucketAssigner}.
* @param bucketPath the path to where the part files for the bucket will be written to.
* @param initialPartCounter the initial counter for the part files of the bucket.
* @param partFileFactory the {@link BucketWriter} the factory creating part file writers.
* @param partFileFactory the {@link PartFileWriter.PartFileFactory} the factory creating part file writers.
* @param <IN> the type of input elements to the sink.
* @param <BucketID> the type of the identifier of the bucket, as returned by the {@link BucketAssigner}
* @param outputFileConfig the part file configuration.
* @return The new Bucket.
*/
static <IN, BucketID> Bucket<IN, BucketID> getNew(
final RecoverableWriter fsWriter,
final int subtaskIndex,
final BucketID bucketId,
final Path bucketPath,
final long initialPartCounter,
final BucketWriter<IN, BucketID> partFileFactory,
final PartFileWriter.PartFileFactory<IN, BucketID> partFileFactory,
final RollingPolicy<IN, BucketID> rollingPolicy,
final OutputFileConfig outputFileConfig) {
return new Bucket<>(subtaskIndex, bucketId, bucketPath, initialPartCounter, partFileFactory, rollingPolicy, outputFileConfig);
return new Bucket<>(fsWriter, subtaskIndex, bucketId, bucketPath, initialPartCounter, partFileFactory, rollingPolicy, outputFileConfig);
}
/**
* Restores a {@code Bucket} from the state included in the provided {@link BucketState}.
* @param fsWriter the filesystem-specific {@link RecoverableWriter}.
* @param subtaskIndex the index of the subtask creating the bucket.
* @param initialPartCounter the initial counter for the part files of the bucket.
* @param partFileFactory the {@link BucketWriter} the factory creating part file writers.
* @param partFileFactory the {@link PartFileWriter.PartFileFactory} the factory creating part file writers.
* @param bucketState the initial state of the restored bucket.
* @param <IN> the type of input elements to the sink.
* @param <BucketID> the type of the identifier of the bucket, as returned by the {@link BucketAssigner}
......@@ -370,12 +397,13 @@ public class Bucket<IN, BucketID> {
* @return The restored Bucket.
*/
static <IN, BucketID> Bucket<IN, BucketID> restore(
final RecoverableWriter fsWriter,
final int subtaskIndex,
final long initialPartCounter,
final BucketWriter<IN, BucketID> partFileFactory,
final PartFileWriter.PartFileFactory<IN, BucketID> partFileFactory,
final RollingPolicy<IN, BucketID> rollingPolicy,
final BucketState<BucketID> bucketState,
final OutputFileConfig outputFileConfig) throws IOException {
return new Bucket<>(subtaskIndex, initialPartCounter, partFileFactory, rollingPolicy, bucketState, outputFileConfig);
return new Bucket<>(fsWriter, subtaskIndex, initialPartCounter, partFileFactory, rollingPolicy, bucketState, outputFileConfig);
}
}
......@@ -20,6 +20,7 @@ package org.apache.flink.streaming.api.functions.sink.filesystem;
import org.apache.flink.annotation.Internal;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.fs.RecoverableWriter;
import java.io.IOException;
import java.io.Serializable;
......@@ -31,18 +32,20 @@ import java.io.Serializable;
interface BucketFactory<IN, BucketID> extends Serializable {
Bucket<IN, BucketID> getNewBucket(
final RecoverableWriter fsWriter,
final int subtaskIndex,
final BucketID bucketId,
final Path bucketPath,
final long initialPartCounter,
final BucketWriter<IN, BucketID> partFileFactory,
final PartFileWriter.PartFileFactory<IN, BucketID> partFileWriterFactory,
final RollingPolicy<IN, BucketID> rollingPolicy,
final OutputFileConfig outputFileConfig) throws IOException;
Bucket<IN, BucketID> restoreBucket(
final RecoverableWriter fsWriter,
final int subtaskIndex,
final long initialPartCounter,
final BucketWriter<IN, BucketID> partFileFactory,
final PartFileWriter.PartFileFactory<IN, BucketID> partFileWriterFactory,
final RollingPolicy<IN, BucketID> rollingPolicy,
final BucketState<BucketID> bucketState,
final OutputFileConfig outputFileConfig) throws IOException;
......
......@@ -46,30 +46,30 @@ class BucketState<BucketID> {
private final long inProgressFileCreationTime;
/**
* A {@link InProgressFileWriter.InProgressFileRecoverable} for the currently open
* A {@link RecoverableWriter.ResumeRecoverable} for the currently open
* part file, or null if there is no currently open part file.
*/
@Nullable
private final InProgressFileWriter.InProgressFileRecoverable inProgressFileRecoverable;
private final RecoverableWriter.ResumeRecoverable inProgressResumableFile;
/**
* The {@link RecoverableWriter.CommitRecoverable files} pending to be
* committed, organized by checkpoint id.
*/
private final Map<Long, List<InProgressFileWriter.PendingFileRecoverable>> pendingFileRecoverablesPerCheckpoint;
private final Map<Long, List<RecoverableWriter.CommitRecoverable>> committableFilesPerCheckpoint;
BucketState(
final BucketID bucketId,
final Path bucketPath,
final long inProgressFileCreationTime,
@Nullable final InProgressFileWriter.InProgressFileRecoverable inProgressFileRecoverable,
final Map<Long, List<InProgressFileWriter.PendingFileRecoverable>> pendingFileRecoverablesPerCheckpoint
@Nullable final RecoverableWriter.ResumeRecoverable inProgressResumableFile,
final Map<Long, List<RecoverableWriter.CommitRecoverable>> pendingCommittablesPerCheckpoint
) {
this.bucketId = Preconditions.checkNotNull(bucketId);
this.bucketPath = Preconditions.checkNotNull(bucketPath);
this.inProgressFileCreationTime = inProgressFileCreationTime;
this.inProgressFileRecoverable = inProgressFileRecoverable;
this.pendingFileRecoverablesPerCheckpoint = Preconditions.checkNotNull(pendingFileRecoverablesPerCheckpoint);
this.inProgressResumableFile = inProgressResumableFile;
this.committableFilesPerCheckpoint = Preconditions.checkNotNull(pendingCommittablesPerCheckpoint);
}
BucketID getBucketId() {
......@@ -84,17 +84,17 @@ class BucketState<BucketID> {
return inProgressFileCreationTime;
}
boolean hasInProgressFileRecoverable() {
return inProgressFileRecoverable != null;
boolean hasInProgressResumableFile() {
return inProgressResumableFile != null;
}
@Nullable
InProgressFileWriter.InProgressFileRecoverable getInProgressFileRecoverable() {
return inProgressFileRecoverable;
RecoverableWriter.ResumeRecoverable getInProgressResumableFile() {
return inProgressResumableFile;
}
Map<Long, List<InProgressFileWriter.PendingFileRecoverable>> getPendingFileRecoverablesPerCheckpoint() {
return pendingFileRecoverablesPerCheckpoint;
Map<Long, List<RecoverableWriter.CommitRecoverable>> getCommittableFilesPerCheckpoint() {
return committableFilesPerCheckpoint;
}
@Override
......@@ -105,13 +105,13 @@ class BucketState<BucketID> {
.append("BucketState for bucketId=").append(bucketId)
.append(" and bucketPath=").append(bucketPath);
if (hasInProgressFileRecoverable()) {
if (hasInProgressResumableFile()) {
strBuilder.append(", has open part file created @ ").append(inProgressFileCreationTime);
}
if (!pendingFileRecoverablesPerCheckpoint.isEmpty()) {
if (!committableFilesPerCheckpoint.isEmpty()) {
strBuilder.append(", has pending files for checkpoints: {");
for (long checkpointId: pendingFileRecoverablesPerCheckpoint.keySet()) {
for (long checkpointId: committableFilesPerCheckpoint.keySet()) {
strBuilder.append(checkpointId).append(' ');
}
strBuilder.append('}');
......
......@@ -19,6 +19,7 @@
package org.apache.flink.streaming.api.functions.sink.filesystem;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.fs.RecoverableWriter;
import org.apache.flink.core.io.SimpleVersionedSerialization;
......@@ -44,172 +45,119 @@ class BucketStateSerializer<BucketID> implements SimpleVersionedSerializer<Bucke
private static final int MAGIC_NUMBER = 0x1e764b79;
private final SimpleVersionedSerializer<InProgressFileWriter.InProgressFileRecoverable> inProgressFileRecoverableSerializer;
private final SimpleVersionedSerializer<RecoverableWriter.ResumeRecoverable> resumableSerializer;
private final SimpleVersionedSerializer<InProgressFileWriter.PendingFileRecoverable> pendingFileRecoverableSerializer;
private final SimpleVersionedSerializer<RecoverableWriter.CommitRecoverable> commitableSerializer;
private final SimpleVersionedSerializer<BucketID> bucketIdSerializer;
BucketStateSerializer(
final SimpleVersionedSerializer<InProgressFileWriter.InProgressFileRecoverable> inProgressFileRecoverableSerializer,
final SimpleVersionedSerializer<InProgressFileWriter.PendingFileRecoverable> pendingFileRecoverableSerializer,
final SimpleVersionedSerializer<RecoverableWriter.ResumeRecoverable> resumableSerializer,
final SimpleVersionedSerializer<RecoverableWriter.CommitRecoverable> commitableSerializer,
final SimpleVersionedSerializer<BucketID> bucketIdSerializer
) {
this.inProgressFileRecoverableSerializer = Preconditions.checkNotNull(inProgressFileRecoverableSerializer);
this.pendingFileRecoverableSerializer = Preconditions.checkNotNull(pendingFileRecoverableSerializer);
this.resumableSerializer = Preconditions.checkNotNull(resumableSerializer);
this.commitableSerializer = Preconditions.checkNotNull(commitableSerializer);
this.bucketIdSerializer = Preconditions.checkNotNull(bucketIdSerializer);
}
@Override
public int getVersion() {
return 2;
return 1;
}
@Override
public byte[] serialize(BucketState<BucketID> state) throws IOException {
DataOutputSerializer out = new DataOutputSerializer(256);
out.writeInt(MAGIC_NUMBER);
serializeV2(state, out);
serializeV1(state, out);
return out.getCopyOfBuffer();
}
@Override
public BucketState<BucketID> deserialize(int version, byte[] serialized) throws IOException {
final DataInputDeserializer in = new DataInputDeserializer(serialized);
switch (version) {
case 1:
DataInputDeserializer in = new DataInputDeserializer(serialized);
validateMagicNumber(in);
return deserializeV1(in);
case 2:
validateMagicNumber(in);
return deserializeV2(in);
default:
throw new IOException("Unrecognized version or corrupt state: " + version);
}
}
private void serializeV2(BucketState<BucketID> state, DataOutputView dataOutputView) throws IOException {
SimpleVersionedSerialization.writeVersionAndSerialize(bucketIdSerializer, state.getBucketId(), dataOutputView);
dataOutputView.writeUTF(state.getBucketPath().toString());
dataOutputView.writeLong(state.getInProgressFileCreationTime());
@VisibleForTesting
void serializeV1(BucketState<BucketID> state, DataOutputView out) throws IOException {
SimpleVersionedSerialization.writeVersionAndSerialize(bucketIdSerializer, state.getBucketId(), out);
out.writeUTF(state.getBucketPath().toString());
out.writeLong(state.getInProgressFileCreationTime());
// put the current open part file
if (state.hasInProgressFileRecoverable()) {
final InProgressFileWriter.InProgressFileRecoverable inProgressFileRecoverable = state.getInProgressFileRecoverable();
dataOutputView.writeBoolean(true);
SimpleVersionedSerialization.writeVersionAndSerialize(inProgressFileRecoverableSerializer, inProgressFileRecoverable, dataOutputView);
} else {
dataOutputView.writeBoolean(false);
if (state.hasInProgressResumableFile()) {
final RecoverableWriter.ResumeRecoverable resumable = state.getInProgressResumableFile();
out.writeBoolean(true);
SimpleVersionedSerialization.writeVersionAndSerialize(resumableSerializer, resumable, out);
}
else {
out.writeBoolean(false);
}
// put the map of pending files per checkpoint
final Map<Long, List<InProgressFileWriter.PendingFileRecoverable>> pendingFileRecoverables = state.getPendingFileRecoverablesPerCheckpoint();
final Map<Long, List<RecoverableWriter.CommitRecoverable>> pendingCommitters = state.getCommittableFilesPerCheckpoint();
dataOutputView.writeInt(pendingFileRecoverableSerializer.getVersion());
// manually keep the version here to safe some bytes
out.writeInt(commitableSerializer.getVersion());
dataOutputView.writeInt(pendingFileRecoverables.size());
out.writeInt(pendingCommitters.size());
for (Entry<Long, List<RecoverableWriter.CommitRecoverable>> resumablesForCheckpoint : pendingCommitters.entrySet()) {
List<RecoverableWriter.CommitRecoverable> resumables = resumablesForCheckpoint.getValue();
for (Entry<Long, List<InProgressFileWriter.PendingFileRecoverable>> pendingFilesForCheckpoint : pendingFileRecoverables.entrySet()) {
final List<InProgressFileWriter.PendingFileRecoverable> pendingFileRecoverableList = pendingFilesForCheckpoint.getValue();
out.writeLong(resumablesForCheckpoint.getKey());
out.writeInt(resumables.size());
dataOutputView.writeLong(pendingFilesForCheckpoint.getKey());
dataOutputView.writeInt(pendingFileRecoverableList.size());
for (InProgressFileWriter.PendingFileRecoverable pendingFileRecoverable : pendingFileRecoverableList) {
byte[] serialized = pendingFileRecoverableSerializer.serialize(pendingFileRecoverable);
dataOutputView.writeInt(serialized.length);
dataOutputView.write(serialized);
for (RecoverableWriter.CommitRecoverable resumable : resumables) {
byte[] serialized = commitableSerializer.serialize(resumable);
out.writeInt(serialized.length);
out.write(serialized);
}
}
}
private BucketState<BucketID> deserializeV1(DataInputView in) throws IOException {
final SimpleVersionedSerializer<RecoverableWriter.CommitRecoverable> commitableSerializer = getCommitableSerializer();
final SimpleVersionedSerializer<RecoverableWriter.ResumeRecoverable> resumableSerializer = getResumableSerializer();
@VisibleForTesting
BucketState<BucketID> deserializeV1(DataInputView in) throws IOException {
final BucketID bucketId = SimpleVersionedSerialization.readVersionAndDeSerialize(bucketIdSerializer, in);
final String bucketPathStr = in.readUTF();
final long creationTime = in.readLong();
// then get the current resumable stream
InProgressFileWriter.InProgressFileRecoverable current = null;
RecoverableWriter.ResumeRecoverable current = null;
if (in.readBoolean()) {
current =
new OutputStreamBasedPartFileWriter.OutputStreamBasedInProgressFileRecoverable(
SimpleVersionedSerialization.readVersionAndDeSerialize(resumableSerializer, in));
current = SimpleVersionedSerialization.readVersionAndDeSerialize(resumableSerializer, in);
}
final int committableVersion = in.readInt();
final int numCheckpoints = in.readInt();
final HashMap<Long, List<InProgressFileWriter.PendingFileRecoverable>> pendingFileRecoverablePerCheckpoint = new HashMap<>(numCheckpoints);
final HashMap<Long, List<RecoverableWriter.CommitRecoverable>> resumablesPerCheckpoint = new HashMap<>(numCheckpoints);
for (int i = 0; i < numCheckpoints; i++) {
final long checkpointId = in.readLong();
final int noOfResumables = in.readInt();
final List<InProgressFileWriter.PendingFileRecoverable> pendingFileRecoverables = new ArrayList<>(noOfResumables);
final List<RecoverableWriter.CommitRecoverable> resumables = new ArrayList<>(noOfResumables);
for (int j = 0; j < noOfResumables; j++) {
final byte[] bytes = new byte[in.readInt()];
in.readFully(bytes);
pendingFileRecoverables.add(
new OutputStreamBasedPartFileWriter.OutputStreamBasedPendingFileRecoverable(commitableSerializer.deserialize(committableVersion, bytes)));
resumables.add(commitableSerializer.deserialize(committableVersion, bytes));
}
pendingFileRecoverablePerCheckpoint.put(checkpointId, pendingFileRecoverables);
resumablesPerCheckpoint.put(checkpointId, resumables);
}
return new BucketState<>(
bucketId,
new Path(bucketPathStr),
creationTime,
current,
pendingFileRecoverablePerCheckpoint);
}
private BucketState<BucketID> deserializeV2(DataInputView dataInputView) throws IOException {
final BucketID bucketId = SimpleVersionedSerialization.readVersionAndDeSerialize(bucketIdSerializer, dataInputView);
final String bucketPathStr = dataInputView.readUTF();
final long creationTime = dataInputView.readLong();
// then get the current resumable stream
InProgressFileWriter.InProgressFileRecoverable current = null;
if (dataInputView.readBoolean()) {
current = SimpleVersionedSerialization.readVersionAndDeSerialize(inProgressFileRecoverableSerializer, dataInputView);
}
final int pendingFileRecoverableSerializerVersion = dataInputView.readInt();
final int numCheckpoints = dataInputView.readInt();
final HashMap<Long, List<InProgressFileWriter.PendingFileRecoverable>> pendingFileRecoverablesPerCheckpoint = new HashMap<>(numCheckpoints);
for (int i = 0; i < numCheckpoints; i++) {
final long checkpointId = dataInputView.readLong();
final int numOfPendingFileRecoverables = dataInputView.readInt();
final List<InProgressFileWriter.PendingFileRecoverable> pendingFileRecoverables = new ArrayList<>(numOfPendingFileRecoverables);
for (int j = 0; j < numOfPendingFileRecoverables; j++) {
final byte[] bytes = new byte[dataInputView.readInt()];
dataInputView.readFully(bytes);
pendingFileRecoverables.add(pendingFileRecoverableSerializer.deserialize(pendingFileRecoverableSerializerVersion, bytes));
}
pendingFileRecoverablesPerCheckpoint.put(checkpointId, pendingFileRecoverables);
}
return new BucketState<>(bucketId, new Path(bucketPathStr), creationTime, current, pendingFileRecoverablesPerCheckpoint);
}
private SimpleVersionedSerializer<RecoverableWriter.ResumeRecoverable> getResumableSerializer() {
final OutputStreamBasedPartFileWriter.OutputStreamBasedInProgressFileRecoverableSerializer
outputStreamBasedInProgressFileRecoverableSerializer =
(OutputStreamBasedPartFileWriter.OutputStreamBasedInProgressFileRecoverableSerializer) inProgressFileRecoverableSerializer;
return outputStreamBasedInProgressFileRecoverableSerializer.getResumeSerializer();
}
private SimpleVersionedSerializer<RecoverableWriter.CommitRecoverable> getCommitableSerializer() {
final OutputStreamBasedPartFileWriter.OutputStreamBasedPendingFileRecoverableSerializer
outputStreamBasedPendingFileRecoverableSerializer =
(OutputStreamBasedPartFileWriter.OutputStreamBasedPendingFileRecoverableSerializer) pendingFileRecoverableSerializer;
return outputStreamBasedPendingFileRecoverableSerializer.getCommitSerializer();
bucketId,
new Path(bucketPathStr),
creationTime,
current,
resumablesPerCheckpoint);
}
private static void validateMagicNumber(DataInputView in) throws IOException {
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.api.functions.sink.filesystem;
import org.apache.flink.annotation.Internal;
import org.apache.flink.core.fs.Path;
import java.io.IOException;
/**
* An interface for factories that create the different {@link InProgressFileWriter writers}.
*/
@Internal
interface BucketWriter<IN, BucketID> {
/**
* Used to create a new {@link InProgressFileWriter}.
* @param bucketID the id of the bucket this writer is writing to.
* @param path the path this writer will write to.
* @param creationTime the creation time of the file.
* @return the new {@link InProgressFileWriter}
* @throws IOException Thrown if creating a writer fails.
*/
InProgressFileWriter<IN, BucketID> openNewInProgressFile(
final BucketID bucketID,
final Path path,
final long creationTime) throws IOException;
/**
* Used to resume a {@link InProgressFileWriter} from a {@link InProgressFileWriter.InProgressFileRecoverable}.
* @param bucketID the id of the bucket this writer is writing to.
* @param inProgressFileSnapshot the state of the part file.
* @param creationTime the creation time of the file.
* @return the resumed {@link InProgressFileWriter}
* @throws IOException Thrown if resuming a writer fails.
*/
InProgressFileWriter<IN, BucketID> resumeInProgressFileFrom(
final BucketID bucketID,
final InProgressFileWriter.InProgressFileRecoverable inProgressFileSnapshot,
final long creationTime) throws IOException;
WriterProperties getProperties();
/**
* Recovers a pending file for finalizing and committing.
* @param pendingFileRecoverable The handle with the recovery information.
* @return A pending file
* @throws IOException Thrown if recovering a pending file fails.
*/
PendingFile recoverPendingFile(final InProgressFileWriter.PendingFileRecoverable pendingFileRecoverable) throws IOException;
/**
* Frees up any resources that were previously occupied in order to be able to
* recover from a (potential) failure.
*
* <p><b>NOTE:</b> This operation should not throw an exception, but return false if the cleanup did not
* happen for any reason.
*
* @param inProgressFileRecoverable the {@link InProgressFileWriter.InProgressFileRecoverable} whose state we want to clean-up.
* @return {@code true} if the resources were successfully freed, {@code false} otherwise
* (e.g. the file to be deleted was not there for any reason - already deleted or never created).
* @throws IOException if an I/O error occurs
*/
boolean cleanupInProgressFileRecoverable(final InProgressFileWriter.InProgressFileRecoverable inProgressFileRecoverable) throws IOException;
/**
* This represents the file that can not write any data to.
*/
interface PendingFile {
/**
* Commits the pending file, making it visible. The file will contain the exact data
* as when the pending file was created.
*
* @throws IOException Thrown if committing fails.
*/
void commit() throws IOException;
/**
* Commits the pending file, making it visible. The file will contain the exact data
* as when the pending file was created.
*
* <p>This method tolerates situations where the file was already committed and
* will not raise an exception in that case. This is important for idempotent
* commit retries as they need to happen after recovery.
*
* @throws IOException Thrown if committing fails.
*/
void commitAfterRecovery() throws IOException;
}
}
......@@ -21,7 +21,9 @@ package org.apache.flink.streaming.api.functions.sink.filesystem;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.fs.RecoverableWriter;
import org.apache.flink.core.io.SimpleVersionedSerialization;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.util.Preconditions;
......@@ -59,7 +61,7 @@ public class Buckets<IN, BucketID> {
private final BucketAssigner<IN, BucketID> bucketAssigner;
private final BucketWriter<IN, BucketID> partFileWriterFactory;
private final PartFileWriter.PartFileFactory<IN, BucketID> partFileWriterFactory;
private final RollingPolicy<IN, BucketID> rollingPolicy;
......@@ -76,6 +78,8 @@ public class Buckets<IN, BucketID> {
private long maxPartCounter;
private final RecoverableWriter fsWriter;
private final OutputFileConfig outputFileConfig;
// --------------------------- State Related Fields -----------------------------
......@@ -88,18 +92,18 @@ public class Buckets<IN, BucketID> {
* @param basePath The base path for our buckets.
* @param bucketAssigner The {@link BucketAssigner} provided by the user.
* @param bucketFactory The {@link BucketFactory} to be used to create buckets.
* @param partFileWriterFactory The {@link BucketWriter} to be used when writing data.
* @param partFileWriterFactory The {@link PartFileWriter.PartFileFactory} to be used when writing data.
* @param rollingPolicy The {@link RollingPolicy} as specified by the user.
*/
Buckets(
final Path basePath,
final BucketAssigner<IN, BucketID> bucketAssigner,
final BucketFactory<IN, BucketID> bucketFactory,
final BucketWriter<IN, BucketID> partFileWriterFactory,
final PartFileWriter.PartFileFactory<IN, BucketID> partFileWriterFactory,
final RollingPolicy<IN, BucketID> rollingPolicy,
@Nullable final BucketLifeCycleListener<IN, BucketID> bucketLifeCycleListener,
final int subtaskIndex,
final OutputFileConfig outputFileConfig) {
final OutputFileConfig outputFileConfig) throws IOException {
this.basePath = Preconditions.checkNotNull(basePath);
this.bucketAssigner = Preconditions.checkNotNull(bucketAssigner);
......@@ -114,10 +118,19 @@ public class Buckets<IN, BucketID> {
this.activeBuckets = new HashMap<>();
this.bucketerContext = new Buckets.BucketerContext();
this.bucketStateSerializer = new BucketStateSerializer(
partFileWriterFactory.getProperties().getInProgressFileRecoverableSerializer(),
partFileWriterFactory.getProperties().getPendingFileRecoverableSerializer(),
bucketAssigner.getSerializer());
try {
this.fsWriter = FileSystem.get(basePath.toUri()).createRecoverableWriter();
} catch (IOException e) {
LOG.error("Unable to create filesystem for path: {}", basePath);
throw e;
}
this.bucketStateSerializer = new BucketStateSerializer<>(
fsWriter.getResumeRecoverableSerializer(),
fsWriter.getCommitRecoverableSerializer(),
bucketAssigner.getSerializer()
);
this.maxPartCounter = 0L;
}
......@@ -172,6 +185,7 @@ public class Buckets<IN, BucketID> {
final Bucket<IN, BucketID> restoredBucket = bucketFactory
.restoreBucket(
fsWriter,
subtaskIndex,
maxPartCounter,
partFileWriterFactory,
......@@ -224,7 +238,7 @@ public class Buckets<IN, BucketID> {
final ListState<Long> partCounterStateContainer) throws Exception {
Preconditions.checkState(
partFileWriterFactory != null && bucketStateSerializer != null,
fsWriter != null && bucketStateSerializer != null,
"sink has not been initialized");
LOG.info("Subtask {} checkpointing for checkpoint with id={} (max part counter={}).",
......@@ -294,6 +308,7 @@ public class Buckets<IN, BucketID> {
if (bucket == null) {
final Path bucketPath = assembleBucketPath(bucketId);
bucket = bucketFactory.getNewBucket(
fsWriter,
subtaskIndex,
bucketId,
bucketPath,
......
......@@ -28,11 +28,11 @@ import org.apache.flink.util.Preconditions;
import java.io.IOException;
/**
* A {@link InProgressFileWriter} for bulk-encoding formats that use an {@link BulkPartWriter}.
* A {@link PartFileWriter} for bulk-encoding formats that use an {@link BulkPartWriter}.
* This also implements the {@link PartFileInfo}.
*/
@Internal
final class BulkPartWriter<IN, BucketID> extends OutputStreamBasedPartFileWriter<IN, BucketID> {
final class BulkPartWriter<IN, BucketID> extends PartFileWriter<IN, BucketID> {
private final BulkWriter<IN> writer;
......@@ -46,18 +46,18 @@ final class BulkPartWriter<IN, BucketID> extends OutputStreamBasedPartFileWriter
}
@Override
public void write(IN element, long currentTime) throws IOException {
void write(IN element, long currentTime) throws IOException {
writer.addElement(element);
markWrite(currentTime);
}
@Override
public InProgressFileRecoverable persist() {
RecoverableWriter.ResumeRecoverable persist() {
throw new UnsupportedOperationException("Bulk Part Writers do not support \"pause and resume\" operations.");
}
@Override
public PendingFileRecoverable closeForCommit() throws IOException {
RecoverableWriter.CommitRecoverable closeForCommit() throws IOException {
writer.flush();
writer.finish();
return super.closeForCommit();
......@@ -68,17 +68,16 @@ final class BulkPartWriter<IN, BucketID> extends OutputStreamBasedPartFileWriter
* @param <IN> The type of input elements.
* @param <BucketID> The type of ids for the buckets, as returned by the {@link BucketAssigner}.
*/
static class Factory<IN, BucketID> extends OutputStreamBasedPartFileWriter.OutputStreamBasedPartFileFactory<IN, BucketID>{
static class Factory<IN, BucketID> implements PartFileWriter.PartFileFactory<IN, BucketID> {
private final BulkWriter.Factory<IN> writerFactory;
Factory(final RecoverableWriter recoverableWriter, BulkWriter.Factory<IN> writerFactory) throws IOException {
super(recoverableWriter);
Factory(BulkWriter.Factory<IN> writerFactory) {
this.writerFactory = writerFactory;
}
@Override
public InProgressFileWriter<IN, BucketID> resumeFrom(
public PartFileWriter<IN, BucketID> resumeFrom(
final BucketID bucketId,
final RecoverableFsDataOutputStream stream,
final RecoverableWriter.ResumeRecoverable resumable,
......@@ -92,7 +91,7 @@ final class BulkPartWriter<IN, BucketID> extends OutputStreamBasedPartFileWriter
}
@Override
public InProgressFileWriter<IN, BucketID> openNew(
public PartFileWriter<IN, BucketID> openNew(
final BucketID bucketId,
final RecoverableFsDataOutputStream stream,
final Path path,
......
......@@ -20,6 +20,7 @@ package org.apache.flink.streaming.api.functions.sink.filesystem;
import org.apache.flink.annotation.Internal;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.fs.RecoverableWriter;
import java.io.IOException;
......@@ -33,15 +34,17 @@ class DefaultBucketFactoryImpl<IN, BucketID> implements BucketFactory<IN, Bucket
@Override
public Bucket<IN, BucketID> getNewBucket(
final RecoverableWriter fsWriter,
final int subtaskIndex,
final BucketID bucketId,
final Path bucketPath,
final long initialPartCounter,
final BucketWriter<IN, BucketID> partFileWriterFactory,
final PartFileWriter.PartFileFactory<IN, BucketID> partFileWriterFactory,
final RollingPolicy<IN, BucketID> rollingPolicy,
final OutputFileConfig outputFileConfig) {
return Bucket.getNew(
fsWriter,
subtaskIndex,
bucketId,
bucketPath,
......@@ -53,14 +56,16 @@ class DefaultBucketFactoryImpl<IN, BucketID> implements BucketFactory<IN, Bucket
@Override
public Bucket<IN, BucketID> restoreBucket(
final RecoverableWriter fsWriter,
final int subtaskIndex,
final long initialPartCounter,
final BucketWriter<IN, BucketID> partFileWriterFactory,
final PartFileWriter.PartFileFactory<IN, BucketID> partFileWriterFactory,
final RollingPolicy<IN, BucketID> rollingPolicy,
final BucketState<BucketID> bucketState,
final OutputFileConfig outputFileConfig) throws IOException {
return Bucket.restore(
fsWriter,
subtaskIndex,
initialPartCounter,
partFileWriterFactory,
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.api.functions.sink.filesystem;
import org.apache.flink.annotation.Internal;
import java.io.IOException;
/**
* The {@link Bucket} uses the {@link InProgressFileWriter} to write element to a part file.
*/
@Internal
interface InProgressFileWriter<IN, BucketID> extends PartFileInfo<BucketID> {
/**
* Write a element to the part file.
* @param element the element to be written.
* @param currentTime the writing time.
* @throws IOException Thrown if writing the element fails.
*/
void write(final IN element, final long currentTime) throws IOException;
/**
* @return The state of the current part file.
* @throws IOException Thrown if persisting the part file fails.
*/
InProgressFileRecoverable persist() throws IOException;
/**
* @return The state of the pending part file. {@link Bucket} uses this to commit the pending file.
* @throws IOException Thrown if an I/O error occurs.
*/
PendingFileRecoverable closeForCommit() throws IOException;
/**
* Dispose the part file.
*/
void dispose();
// ------------------------------------------------------------------------
/**
* A handle can be used to recover in-progress file..
*/
interface InProgressFileRecoverable extends PendingFileRecoverable {}
/**
* The handle can be used to recover pending file.
*/
interface PendingFileRecoverable {}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.api.functions.sink.filesystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
import org.apache.flink.core.fs.RecoverableWriter;
import org.apache.flink.core.io.SimpleVersionedSerialization;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.core.memory.DataInputDeserializer;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputSerializer;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.util.IOUtils;
import java.io.IOException;
/**
* The base class for all the part file writer that use {@link org.apache.flink.core.fs.RecoverableFsDataOutputStream}.
* @param <IN> the element type
* @param <BucketID> the bucket type
*/
public abstract class OutputStreamBasedPartFileWriter<IN, BucketID> extends AbstractPartFileWriter<IN, BucketID> {
final RecoverableFsDataOutputStream currentPartStream;
OutputStreamBasedPartFileWriter(
final BucketID bucketID,
final RecoverableFsDataOutputStream recoverableFsDataOutputStream,
final long createTime) {
super(bucketID, createTime);
this.currentPartStream = recoverableFsDataOutputStream;
}
@Override
public InProgressFileRecoverable persist() throws IOException {
return new OutputStreamBasedInProgressFileRecoverable(currentPartStream.persist());
}
@Override
public PendingFileRecoverable closeForCommit() throws IOException {
return new OutputStreamBasedPendingFileRecoverable(currentPartStream.closeForCommit().getRecoverable());
}
@Override
public void dispose() {
// we can suppress exceptions here, because we do not rely on close() to
// flush or persist any data
IOUtils.closeQuietly(currentPartStream);
}
@Override
public long getSize() throws IOException {
return currentPartStream.getPos();
}
abstract static class OutputStreamBasedPartFileFactory<IN, BucketID> implements BucketWriter<IN, BucketID> {
private final RecoverableWriter recoverableWriter;
OutputStreamBasedPartFileFactory(final RecoverableWriter recoverableWriter) {
this.recoverableWriter = recoverableWriter;
}
@Override
public InProgressFileWriter<IN, BucketID> openNewInProgressFile(final BucketID bucketID, final Path path, final long creationTime) throws IOException {
return openNew(bucketID, recoverableWriter.open(path), path, creationTime);
}
@Override
public InProgressFileWriter<IN, BucketID> resumeInProgressFileFrom(final BucketID bucketID, final InProgressFileRecoverable inProgressFileRecoverable, final long creationTime) throws IOException {
final OutputStreamBasedInProgressFileRecoverable outputStreamBasedInProgressRecoverable = (OutputStreamBasedInProgressFileRecoverable) inProgressFileRecoverable;
return resumeFrom(
bucketID,
recoverableWriter.recover(outputStreamBasedInProgressRecoverable.getResumeRecoverable()),
outputStreamBasedInProgressRecoverable.getResumeRecoverable(),
creationTime);
}
@Override
public PendingFile recoverPendingFile(final PendingFileRecoverable pendingFileRecoverable) throws IOException {
final RecoverableWriter.CommitRecoverable commitRecoverable;
if (pendingFileRecoverable instanceof OutputStreamBasedPendingFileRecoverable) {
commitRecoverable = ((OutputStreamBasedPendingFileRecoverable) pendingFileRecoverable).getCommitRecoverable();
} else if (pendingFileRecoverable instanceof OutputStreamBasedInProgressFileRecoverable) {
commitRecoverable = ((OutputStreamBasedInProgressFileRecoverable) pendingFileRecoverable).getResumeRecoverable();
} else {
throw new IllegalArgumentException("can not recover from the pendingFileRecoverable");
}
return new OutputStreamBasedPendingFile(recoverableWriter.recoverForCommit(commitRecoverable));
}
@Override
public boolean cleanupInProgressFileRecoverable(InProgressFileRecoverable inProgressFileRecoverable) throws IOException {
final RecoverableWriter.ResumeRecoverable resumeRecoverable =
((OutputStreamBasedInProgressFileRecoverable) inProgressFileRecoverable).getResumeRecoverable();
return recoverableWriter.cleanupRecoverableState(resumeRecoverable);
}
@Override
public WriterProperties getProperties() {
return new WriterProperties(
new OutputStreamBasedInProgressFileRecoverableSerializer(recoverableWriter.getResumeRecoverableSerializer()),
new OutputStreamBasedPendingFileRecoverableSerializer(recoverableWriter.getCommitRecoverableSerializer()),
recoverableWriter.supportsResume());
}
public abstract InProgressFileWriter<IN, BucketID> openNew(
final BucketID bucketId,
final RecoverableFsDataOutputStream stream,
final Path path,
final long creationTime) throws IOException;
public abstract InProgressFileWriter<IN, BucketID> resumeFrom(
final BucketID bucketId,
final RecoverableFsDataOutputStream stream,
final RecoverableWriter.ResumeRecoverable resumable,
final long creationTime) throws IOException;
}
static final class OutputStreamBasedPendingFileRecoverable implements PendingFileRecoverable {
private final RecoverableWriter.CommitRecoverable commitRecoverable;
OutputStreamBasedPendingFileRecoverable(final RecoverableWriter.CommitRecoverable commitRecoverable) {
this.commitRecoverable = commitRecoverable;
}
RecoverableWriter.CommitRecoverable getCommitRecoverable() {
return commitRecoverable;
}
}
static final class OutputStreamBasedInProgressFileRecoverable implements InProgressFileRecoverable {
private final RecoverableWriter.ResumeRecoverable resumeRecoverable;
OutputStreamBasedInProgressFileRecoverable(final RecoverableWriter.ResumeRecoverable resumeRecoverable) {
this.resumeRecoverable = resumeRecoverable;
}
RecoverableWriter.ResumeRecoverable getResumeRecoverable() {
return resumeRecoverable;
}
}
static final class OutputStreamBasedPendingFile implements BucketWriter.PendingFile {
private final RecoverableFsDataOutputStream.Committer committer;
OutputStreamBasedPendingFile(final RecoverableFsDataOutputStream.Committer committer) {
this.committer = committer;
}
@Override
public void commit() throws IOException {
committer.commit();
}
@Override
public void commitAfterRecovery() throws IOException {
committer.commitAfterRecovery();
}
}
static class OutputStreamBasedInProgressFileRecoverableSerializer implements SimpleVersionedSerializer<InProgressFileRecoverable> {
private static final int MAGIC_NUMBER = 0xb3a4073d;
private final SimpleVersionedSerializer<RecoverableWriter.ResumeRecoverable> resumeSerializer;
OutputStreamBasedInProgressFileRecoverableSerializer(SimpleVersionedSerializer<RecoverableWriter.ResumeRecoverable> resumeSerializer) {
this.resumeSerializer = resumeSerializer;
}
@Override
public int getVersion() {
return 1;
}
@Override
public byte[] serialize(InProgressFileRecoverable inProgressRecoverable) throws IOException {
OutputStreamBasedInProgressFileRecoverable outputStreamBasedInProgressRecoverable = (OutputStreamBasedInProgressFileRecoverable) inProgressRecoverable;
DataOutputSerializer dataOutputSerializer = new DataOutputSerializer(256);
dataOutputSerializer.writeInt(MAGIC_NUMBER);
serializeV1(outputStreamBasedInProgressRecoverable, dataOutputSerializer);
return dataOutputSerializer.getCopyOfBuffer();
}
@Override
public InProgressFileRecoverable deserialize(int version, byte[] serialized) throws IOException {
switch (version) {
case 1:
DataInputView dataInputView = new DataInputDeserializer(serialized);
validateMagicNumber(dataInputView);
return deserializeV1(dataInputView);
default:
throw new IOException("Unrecognized version or corrupt state: " + version);
}
}
SimpleVersionedSerializer<RecoverableWriter.ResumeRecoverable> getResumeSerializer() {
return resumeSerializer;
}
private void serializeV1(final OutputStreamBasedInProgressFileRecoverable outputStreamBasedInProgressRecoverable, final DataOutputView dataOutputView) throws IOException {
SimpleVersionedSerialization.writeVersionAndSerialize(resumeSerializer, outputStreamBasedInProgressRecoverable.getResumeRecoverable(), dataOutputView);
}
private OutputStreamBasedInProgressFileRecoverable deserializeV1(final DataInputView dataInputView) throws IOException {
return new OutputStreamBasedInProgressFileRecoverable(SimpleVersionedSerialization.readVersionAndDeSerialize(resumeSerializer, dataInputView));
}
private static void validateMagicNumber(final DataInputView dataInputView) throws IOException {
final int magicNumber = dataInputView.readInt();
if (magicNumber != MAGIC_NUMBER) {
throw new IOException(String.format("Corrupt data: Unexpected magic number %08X", magicNumber));
}
}
}
static class OutputStreamBasedPendingFileRecoverableSerializer implements SimpleVersionedSerializer<PendingFileRecoverable> {
private static final int MAGIC_NUMBER = 0x2c853c89;
private final SimpleVersionedSerializer<RecoverableWriter.CommitRecoverable> commitSerializer;
OutputStreamBasedPendingFileRecoverableSerializer(final SimpleVersionedSerializer<RecoverableWriter.CommitRecoverable> commitSerializer) {
this.commitSerializer = commitSerializer;
}
@Override
public int getVersion() {
return 1;
}
@Override
public byte[] serialize(PendingFileRecoverable pendingFileRecoverable) throws IOException {
OutputStreamBasedPendingFileRecoverable outputStreamBasedPendingFileRecoverable = (OutputStreamBasedPendingFileRecoverable) pendingFileRecoverable;
DataOutputSerializer dataOutputSerializer = new DataOutputSerializer(256);
dataOutputSerializer.writeInt(MAGIC_NUMBER);
serializeV1(outputStreamBasedPendingFileRecoverable, dataOutputSerializer);
return dataOutputSerializer.getCopyOfBuffer();
}
@Override
public PendingFileRecoverable deserialize(int version, byte[] serialized) throws IOException {
switch (version) {
case 1:
DataInputDeserializer in = new DataInputDeserializer(serialized);
validateMagicNumber(in);
return deserializeV1(in);
default:
throw new IOException("Unrecognized version or corrupt state: " + version);
}
}
SimpleVersionedSerializer<RecoverableWriter.CommitRecoverable> getCommitSerializer() {
return this.commitSerializer;
}
private void serializeV1(final OutputStreamBasedPendingFileRecoverable outputStreamBasedPendingFileRecoverable, final DataOutputView dataOutputView) throws IOException {
SimpleVersionedSerialization.writeVersionAndSerialize(commitSerializer, outputStreamBasedPendingFileRecoverable.getCommitRecoverable(), dataOutputView);
}
private OutputStreamBasedPendingFileRecoverable deserializeV1(final DataInputView dataInputView) throws IOException {
return new OutputStreamBasedPendingFileRecoverable(SimpleVersionedSerialization.readVersionAndDeSerialize(commitSerializer, dataInputView));
}
private static void validateMagicNumber(final DataInputView dataInputView) throws IOException {
final int magicNumber = dataInputView.readInt();
if (magicNumber != MAGIC_NUMBER) {
throw new IOException(String.format("Corrupt data: Unexpected magic number %08X", magicNumber));
}
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.api.functions.sink.filesystem;
import org.apache.flink.annotation.Internal;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
import org.apache.flink.core.fs.RecoverableWriter;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.Preconditions;
import java.io.IOException;
/**
* An abstract writer for the currently open part file in a specific {@link Bucket}.
*
* <p>Currently, there are two subclasses, of this class:
* <ol>
* <li>One for row-wise formats: the {@link RowWisePartWriter}.</li>
* <li>One for bulk encoding formats: the {@link BulkPartWriter}.</li>
* </ol>
*
* <p>This also implements the {@link PartFileInfo}.
*/
@Internal
abstract class PartFileWriter<IN, BucketID> implements PartFileInfo<BucketID> {
private final BucketID bucketId;
private final long creationTime;
protected final RecoverableFsDataOutputStream currentPartStream;
private long lastUpdateTime;
protected PartFileWriter(
final BucketID bucketId,
final RecoverableFsDataOutputStream currentPartStream,
final long creationTime) {
Preconditions.checkArgument(creationTime >= 0L);
this.bucketId = Preconditions.checkNotNull(bucketId);
this.currentPartStream = Preconditions.checkNotNull(currentPartStream);
this.creationTime = creationTime;
this.lastUpdateTime = creationTime;
}
abstract void write(IN element, long currentTime) throws IOException;
RecoverableWriter.ResumeRecoverable persist() throws IOException {
return currentPartStream.persist();
}
RecoverableWriter.CommitRecoverable closeForCommit() throws IOException {
return currentPartStream.closeForCommit().getRecoverable();
}
void dispose() {
// we can suppress exceptions here, because we do not rely on close() to
// flush or persist any data
IOUtils.closeQuietly(currentPartStream);
}
void markWrite(long now) {
this.lastUpdateTime = now;
}
@Override
public BucketID getBucketId() {
return bucketId;
}
@Override
public long getCreationTime() {
return creationTime;
}
@Override
public long getSize() throws IOException {
return currentPartStream.getPos();
}
@Override
public long getLastUpdateTime() {
return lastUpdateTime;
}
// ------------------------------------------------------------------------
/**
* An interface for factories that create the different {@link PartFileWriter writers}.
*/
interface PartFileFactory<IN, BucketID> {
/**
* Used upon recovery from a failure to recover a {@link PartFileWriter writer}.
* @param bucketId the id of the bucket this writer is writing to.
* @param stream the filesystem-specific output stream to use when writing to the filesystem.
* @param resumable the state of the stream we are resurrecting.
* @param creationTime the creation time of the stream.
* @return the recovered {@link PartFileWriter writer}.
* @throws IOException
*/
PartFileWriter<IN, BucketID> resumeFrom(
final BucketID bucketId,
final RecoverableFsDataOutputStream stream,
final RecoverableWriter.ResumeRecoverable resumable,
final long creationTime) throws IOException;
/**
* Used to create a new {@link PartFileWriter writer}.
* @param bucketId the id of the bucket this writer is writing to.
* @param stream the filesystem-specific output stream to use when writing to the filesystem.
* @param path the part this writer will write to.
* @param creationTime the creation time of the stream.
* @return the new {@link PartFileWriter writer}.
* @throws IOException
*/
PartFileWriter<IN, BucketID> openNew(
final BucketID bucketId,
final RecoverableFsDataOutputStream stream,
final Path path,
final long creationTime) throws IOException;
}
}
......@@ -28,11 +28,11 @@ import org.apache.flink.util.Preconditions;
import java.io.IOException;
/**
* A {@link InProgressFileWriter} for row-wise formats that use an {@link Encoder}.
* A {@link PartFileWriter} for row-wise formats that use an {@link Encoder}.
* This also implements the {@link PartFileInfo}.
*/
@Internal
final class RowWisePartWriter<IN, BucketID> extends OutputStreamBasedPartFileWriter<IN, BucketID> {
final class RowWisePartWriter<IN, BucketID> extends PartFileWriter<IN, BucketID> {
private final Encoder<IN> encoder;
......@@ -46,7 +46,7 @@ final class RowWisePartWriter<IN, BucketID> extends OutputStreamBasedPartFileWri
}
@Override
public void write(final IN element, final long currentTime) throws IOException {
void write(IN element, long currentTime) throws IOException {
encoder.encode(element, currentPartStream);
markWrite(currentTime);
}
......@@ -56,21 +56,20 @@ final class RowWisePartWriter<IN, BucketID> extends OutputStreamBasedPartFileWri
* @param <IN> The type of input elements.
* @param <BucketID> The type of ids for the buckets, as returned by the {@link BucketAssigner}.
*/
static class Factory<IN, BucketID> extends OutputStreamBasedPartFileWriter.OutputStreamBasedPartFileFactory<IN, BucketID> {
static class Factory<IN, BucketID> implements PartFileWriter.PartFileFactory<IN, BucketID> {
private final Encoder<IN> encoder;
Factory(final RecoverableWriter recoverableWriter, final Encoder<IN> encoder) {
super(recoverableWriter);
Factory(Encoder<IN> encoder) {
this.encoder = encoder;
}
@Override
public InProgressFileWriter<IN, BucketID> resumeFrom(
public PartFileWriter<IN, BucketID> resumeFrom(
final BucketID bucketId,
final RecoverableFsDataOutputStream stream,
final RecoverableWriter.ResumeRecoverable resumable,
final long creationTime) {
final long creationTime) throws IOException {
Preconditions.checkNotNull(stream);
Preconditions.checkNotNull(resumable);
......@@ -79,11 +78,11 @@ final class RowWisePartWriter<IN, BucketID> extends OutputStreamBasedPartFileWri
}
@Override
public InProgressFileWriter<IN, BucketID> openNew(
public PartFileWriter<IN, BucketID> openNew(
final BucketID bucketId,
final RecoverableFsDataOutputStream stream,
final Path path,
final long creationTime) {
final long creationTime) throws IOException {
Preconditions.checkNotNull(stream);
Preconditions.checkNotNull(path);
......
......@@ -277,7 +277,7 @@ public class StreamingFileSink<IN>
basePath,
bucketAssigner,
bucketFactory,
new RowWisePartWriter.Factory<>(FileSystem.get(basePath.toUri()).createRecoverableWriter(), encoder),
new RowWisePartWriter.Factory<>(encoder),
rollingPolicy,
bucketLifeCycleListener,
subtaskIndex,
......@@ -397,7 +397,7 @@ public class StreamingFileSink<IN>
basePath,
bucketAssigner,
bucketFactory,
new BulkPartWriter.Factory<>(FileSystem.get(basePath.toUri()).createRecoverableWriter(), writerFactory),
new BulkPartWriter.Factory<>(writerFactory),
rollingPolicy,
bucketLifeCycleListener,
subtaskIndex,
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.api.functions.sink.filesystem;
import org.apache.flink.annotation.Internal;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import static org.apache.flink.util.Preconditions.checkNotNull;
/**
* Javadoc.
*/
@Internal
public class WriterProperties {
private final SimpleVersionedSerializer<? extends InProgressFileWriter.InProgressFileRecoverable> inProgressFileRecoverableSerializer;
private final SimpleVersionedSerializer<? extends InProgressFileWriter.PendingFileRecoverable> pendingFileRecoverableSerializer;
private final boolean supportsResume;
public WriterProperties(
SimpleVersionedSerializer<? extends InProgressFileWriter.InProgressFileRecoverable> inProgressFileRecoverableSerializer,
SimpleVersionedSerializer<? extends InProgressFileWriter.PendingFileRecoverable> pendingFileRecoverableSerializer,
boolean supportsResume) {
this.inProgressFileRecoverableSerializer = checkNotNull(inProgressFileRecoverableSerializer);
this.pendingFileRecoverableSerializer = checkNotNull(pendingFileRecoverableSerializer);
this.supportsResume = supportsResume;
}
boolean supportsResume() {
return supportsResume;
}
/**
* @return the serializer for the {@link InProgressFileWriter.PendingFileRecoverable}.
*/
SimpleVersionedSerializer<? extends InProgressFileWriter.PendingFileRecoverable> getPendingFileRecoverableSerializer() {
return pendingFileRecoverableSerializer;
}
/**
* @return the serializer for the {@link InProgressFileWriter.InProgressFileRecoverable}.
*/
SimpleVersionedSerializer<? extends InProgressFileWriter.InProgressFileRecoverable> getInProgressFileRecoverableSerializer() {
return inProgressFileRecoverableSerializer;
}
}
......@@ -19,7 +19,6 @@
package org.apache.flink.streaming.api.functions.sink.filesystem;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.BasePathBucketAssigner;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
......@@ -55,7 +54,7 @@ public class BucketAssignerITCases {
basePath,
new BasePathBucketAssigner<>(),
new DefaultBucketFactoryImpl<>(),
new RowWisePartWriter.Factory<>(FileSystem.get(basePath.toUri()).createRecoverableWriter(), new SimpleStringEncoder<>()),
new RowWisePartWriter.Factory<>(new SimpleStringEncoder<>()),
rollingPolicy,
null,
0,
......
......@@ -18,7 +18,6 @@
package org.apache.flink.streaming.api.functions.sink.filesystem;
import org.apache.flink.api.common.serialization.Encoder;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
......@@ -172,7 +171,7 @@ public class BucketTest {
return new TypeSafeMatcher<BucketState<String>>() {
@Override
protected boolean matchesSafely(BucketState<String> state) {
return state.getInProgressFileRecoverable() != null;
return state.getInProgressResumableFile() != null;
}
@Override
......@@ -186,7 +185,7 @@ public class BucketTest {
return new TypeSafeMatcher<BucketState<String>>() {
@Override
protected boolean matchesSafely(BucketState<String> state) {
return state.getInProgressFileRecoverable() == null;
return state.getInProgressResumableFile() == null;
}
@Override
......@@ -201,7 +200,7 @@ public class BucketTest {
return new TypeSafeMatcher<Bucket<String, String>>() {
@Override
protected boolean matchesSafely(Bucket<String, String> bucket) {
final InProgressFileWriter<String, String> inProgressPart = bucket.getInProgressPart();
final PartFileWriter<String, String> inProgressPart = bucket.getInProgressPart();
return isNull == (inProgressPart == null);
}
......@@ -350,21 +349,23 @@ public class BucketTest {
private static final RollingPolicy<String, String> rollingPolicy = DefaultRollingPolicy.builder().build();
private static final Encoder ENCODER = new SimpleStringEncoder<>();
private static final PartFileWriter.PartFileFactory<String, String> partFileFactory =
new RowWisePartWriter.Factory<>(new SimpleStringEncoder<>());
private static Bucket<String, String> createBucket(
final RecoverableWriter writer,
final Path bucketPath,
final int subtaskIdx,
final int initialPartCounter,
final OutputFileConfig outputFileConfig) throws IOException {
final OutputFileConfig outputFileConfig) {
return Bucket.getNew(
writer,
subtaskIdx,
bucketId,
bucketPath,
initialPartCounter,
new RowWisePartWriter.Factory<>(writer, ENCODER),
partFileFactory,
rollingPolicy,
outputFileConfig);
}
......@@ -377,9 +378,10 @@ public class BucketTest {
final OutputFileConfig outputFileConfig) throws Exception {
return Bucket.restore(
writer,
subtaskIndex,
initialPartCounter,
new RowWisePartWriter.Factory<>(writer, ENCODER),
partFileFactory,
rollingPolicy,
bucketState,
outputFileConfig);
......@@ -400,46 +402,24 @@ public class BucketTest {
private Bucket<String, String> getRestoredBucketWithOnlyInProgressPart(final BaseStubWriter writer) throws IOException {
final BucketState<String> stateWithOnlyInProgressFile =
new BucketState<>(
"test",
new Path(),
12345L,
new OutputStreamBasedPartFileWriter.OutputStreamBasedInProgressFileRecoverable(new NoOpRecoverable()),
new HashMap<>());
return Bucket.restore(
0,
1L,
new RowWisePartWriter.Factory<>(writer, ENCODER),
rollingPolicy,
stateWithOnlyInProgressFile,
OutputFileConfig.builder().build());
new BucketState<>("test", new Path(), 12345L, new NoOpRecoverable(), new HashMap<>());
return Bucket.restore(writer, 0, 1L, partFileFactory, rollingPolicy, stateWithOnlyInProgressFile, OutputFileConfig.builder().build());
}
private Bucket<String, String> getRestoredBucketWithOnlyPendingParts(final BaseStubWriter writer, final int numberOfPendingParts) throws IOException {
final Map<Long, List<InProgressFileWriter.PendingFileRecoverable>> completePartsPerCheckpoint =
final Map<Long, List<RecoverableWriter.CommitRecoverable>> completePartsPerCheckpoint =
createPendingPartsPerCheckpoint(numberOfPendingParts);
final BucketState<String> initStateWithOnlyInProgressFile =
new BucketState<>(
"test",
new Path(),
12345L,
null,
completePartsPerCheckpoint);
return Bucket.restore(
0,
1L,
new RowWisePartWriter.Factory<>(writer, ENCODER),
rollingPolicy,
initStateWithOnlyInProgressFile, OutputFileConfig.builder().build());
new BucketState<>("test", new Path(), 12345L, null, completePartsPerCheckpoint);
return Bucket.restore(writer, 0, 1L, partFileFactory, rollingPolicy, initStateWithOnlyInProgressFile, OutputFileConfig.builder().build());
}
private Map<Long, List<InProgressFileWriter.PendingFileRecoverable>> createPendingPartsPerCheckpoint(int noOfCheckpoints) {
final Map<Long, List<InProgressFileWriter.PendingFileRecoverable>> pendingCommittablesPerCheckpoint = new HashMap<>();
private Map<Long, List<RecoverableWriter.CommitRecoverable>> createPendingPartsPerCheckpoint(int noOfCheckpoints) {
final Map<Long, List<RecoverableWriter.CommitRecoverable>> pendingCommittablesPerCheckpoint = new HashMap<>();
for (int checkpointId = 0; checkpointId < noOfCheckpoints; checkpointId++) {
final List<InProgressFileWriter.PendingFileRecoverable> pending = new ArrayList<>();
pending.add(new OutputStreamBasedPartFileWriter.OutputStreamBasedPendingFileRecoverable(new NoOpRecoverable()));
final List<RecoverableWriter.CommitRecoverable> pending = new ArrayList<>();
pending.add(new NoOpRecoverable());
pendingCommittablesPerCheckpoint.put((long) checkpointId, pending);
}
return pendingCommittablesPerCheckpoint;
......
......@@ -21,7 +21,6 @@ package org.apache.flink.streaming.api.functions.sink.filesystem;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.streaming.api.functions.sink.filesystem.TestUtils.MockListState;
......@@ -94,8 +93,8 @@ public class BucketsTest {
return bucket.getBucketId().equals(bucketId) &&
bucket.getBucketPath().equals(new Path(testTmpPath, bucketId)) &&
bucket.getInProgressPart() == null &&
bucket.getPendingFileRecoverablesForCurrentCheckpoint().isEmpty() &&
bucket.getPendingFileRecoverablesPerCheckpoint().size() == 1;
bucket.getPendingPartsForCurrentCheckpoint().isEmpty() &&
bucket.getPendingPartsPerCheckpoint().size() == 1;
}
@Override
......@@ -146,7 +145,7 @@ public class BucketsTest {
Assert.assertEquals(2L, bucketsTwo.getMaxPartCounter());
// make sure we have one in-progress file here and a pending
Assert.assertEquals(1L, bucketsTwo.getActiveBuckets().get("test1").getPendingFileRecoverablesPerCheckpoint().size());
Assert.assertEquals(1L, bucketsTwo.getActiveBuckets().get("test1").getPendingPartsPerCheckpoint().size());
Assert.assertNotNull(bucketsTwo.getActiveBuckets().get("test1").getInProgressPart());
final ListState<byte[]> mergedBucketStateContainer = new MockListState<>();
......@@ -176,10 +175,10 @@ public class BucketsTest {
// this is due to the Bucket#merge(). The in progress file of one
// of the previous tasks is put in the list of pending files.
Assert.assertEquals(1L, bucket.getPendingFileRecoverablesForCurrentCheckpoint().size());
Assert.assertEquals(1L, bucket.getPendingPartsForCurrentCheckpoint().size());
// we commit the pending for previous checkpoints
Assert.assertTrue(bucket.getPendingFileRecoverablesPerCheckpoint().isEmpty());
Assert.assertTrue(bucket.getPendingPartsPerCheckpoint().isEmpty());
}
@Test
......@@ -211,8 +210,8 @@ public class BucketsTest {
Assert.assertEquals("test", bucket.getBucketId());
Assert.assertNull(bucket.getInProgressPart());
Assert.assertEquals(1L, bucket.getPendingFileRecoverablesForCurrentCheckpoint().size());
Assert.assertTrue(bucket.getPendingFileRecoverablesPerCheckpoint().isEmpty());
Assert.assertEquals(1L, bucket.getPendingPartsForCurrentCheckpoint().size());
Assert.assertTrue(bucket.getPendingPartsPerCheckpoint().isEmpty());
}
@Test
......@@ -322,7 +321,7 @@ public class BucketsTest {
path,
new VerifyingBucketAssigner(timestamp, watermark, processingTime),
new DefaultBucketFactoryImpl<>(),
new RowWisePartWriter.Factory<>(FileSystem.get(path.toUri()).createRecoverableWriter(), new SimpleStringEncoder<>()),
new RowWisePartWriter.Factory<>(new SimpleStringEncoder<>()),
DefaultRollingPolicy.builder().build(),
null,
2,
......@@ -459,7 +458,7 @@ public class BucketsTest {
basePath,
new TestUtils.StringIdentityBucketAssigner(),
new DefaultBucketFactoryImpl<>(),
new RowWisePartWriter.Factory<>(FileSystem.get(basePath.toUri()).createRecoverableWriter(), new SimpleStringEncoder<>()),
new RowWisePartWriter.Factory<>(new SimpleStringEncoder<>()),
rollingPolicy,
bucketLifeCycleListener,
subtaskIdx,
......
......@@ -19,7 +19,6 @@
package org.apache.flink.streaming.api.functions.sink.filesystem;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.OnCheckpointRollingPolicy;
......@@ -202,7 +201,7 @@ public class RollingPolicyTest {
basePath,
new TestUtils.StringIdentityBucketAssigner(),
new DefaultBucketFactoryImpl<>(),
new RowWisePartWriter.Factory<>(FileSystem.get(basePath.toUri()).createRecoverableWriter(), new SimpleStringEncoder<>()),
new RowWisePartWriter.Factory<>(new SimpleStringEncoder<>()),
rollingPolicyToTest,
null,
0,
......
......@@ -23,13 +23,11 @@ import org.apache.flink.api.common.serialization.Encoder;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
import org.apache.flink.core.fs.local.LocalFileSystem;
import org.apache.flink.core.fs.local.LocalRecoverableWriter;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.SimpleVersionedStringSerializer;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.OnCheckpointRollingPolicy;
import org.apache.flink.streaming.api.operators.StreamSink;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
......@@ -42,7 +40,6 @@ import javax.annotation.Nullable;
import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
import java.lang.reflect.Field;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.charset.StandardCharsets;
......@@ -53,8 +50,6 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import static org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.OnCheckpointRollingPolicy.build;
/**
* Utilities for the {@link StreamingFileSink} tests.
*/
......@@ -163,7 +158,7 @@ public class TestUtils {
.forBulkFormat(new Path(outDir.toURI()), writer)
.withBucketAssigner(bucketer)
.withBucketCheckInterval(bucketCheckInterval)
.withRollingPolicy(build())
.withRollingPolicy(OnCheckpointRollingPolicy.build())
.withBucketFactory(bucketFactory)
.withOutputFileConfig(outputFileConfig)
.build();
......@@ -204,7 +199,7 @@ public class TestUtils {
StreamingFileSink<Tuple2<String, Integer>> sink = StreamingFileSink
.forBulkFormat(new Path(outDir.toURI()), writer)
.withNewBucketAssigner(bucketer)
.withRollingPolicy(build())
.withRollingPolicy(OnCheckpointRollingPolicy.build())
.withBucketCheckInterval(bucketCheckInterval)
.withBucketFactory(bucketFactory)
.withOutputFileConfig(outputFileConfig)
......@@ -419,50 +414,4 @@ public class TestUtils {
backingList.clear();
}
}
static class LocalRecoverableWriterForBucketStateMigrationTest extends LocalRecoverableWriter {
final String prefix = "src/test/resources/";
LocalRecoverableWriterForBucketStateMigrationTest() {
super(new LocalFileSystem());
}
public RecoverableFsDataOutputStream open(Path filePath) throws IOException {
RecoverableFsDataOutputStream recoverableFsDataOutputStream = super.open(filePath);
try {
final Field targetFileField = recoverableFsDataOutputStream.getClass().getDeclaredField("targetFile");
targetFileField.setAccessible(true);
final File targetFile = (File) targetFileField.get(recoverableFsDataOutputStream);
final int indexOfTargetFileRelativePath = targetFile.toString().indexOf(prefix);
final File relativeTargetFile = new FileForBucketSateMigrationTest(targetFile.toString().substring(indexOfTargetFileRelativePath));
targetFileField.set(recoverableFsDataOutputStream, relativeTargetFile);
final Field tempFileField = recoverableFsDataOutputStream.getClass().getDeclaredField("tempFile");
tempFileField.setAccessible(true);
final File tempFile = (File) tempFileField.get(recoverableFsDataOutputStream);
final int indexOfTempFileRelativePath = tempFile.toString().indexOf(prefix);
final File relativeTemptFile = new FileForBucketSateMigrationTest(tempFile.toString().substring(indexOfTempFileRelativePath));
tempFileField.set(recoverableFsDataOutputStream, relativeTemptFile);
} catch (NoSuchFieldException | IllegalAccessException e) {
e.printStackTrace();
return null;
}
return recoverableFsDataOutputStream;
}
}
static class FileForBucketSateMigrationTest extends File {
FileForBucketSateMigrationTest(String pathname) {
super(pathname);
}
@Override
@Nonnull
public String getAbsolutePath() {
return getPath();
}
}
}
......@@ -50,7 +50,7 @@ public class NoOpRecoverableWriter implements RecoverableWriter {
@Override
public boolean cleanupRecoverableState(ResumeRecoverable resumable) throws IOException {
return false;
throw new UnsupportedOperationException();
}
@Override
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册