提交 0bbc91eb 编写于 作者: K kkloudas 提交者: Stephan Ewen

[FLINK-9750] [DataStream API] Add new StreamingFileSink on top of the ResumableWriter.

上级 975fdbe5
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.api.common.serialization;
import org.apache.flink.annotation.PublicEvolving;
import java.io.IOException;
import java.io.OutputStream;
import java.io.Serializable;
/**
* A {@link Encoder} is used by the streaming file sink to perform the actual writing
* of the incoming elements to the files in a bucket.
*
* @param <IN> The type of the elements that are being written by the sink.
*/
@PublicEvolving
public interface Encoder<IN> extends Serializable {
/**
* Writes one element to the bucket file.
* @param element the element to be written.
* @param stream the stream to write the element to.
*/
void encode(IN element, OutputStream stream) throws IOException;
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.api.common.serialization;
import org.apache.flink.annotation.PublicEvolving;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.charset.Charset;
/**
* A simple {@link Encoder} that uses {@code toString()} on the input elements and
* writes them to the output bucket file separated by newline.
*
* @param <IN> The type of the elements that are being written by the sink.
*/
@PublicEvolving
public class SimpleStringEncoder<IN> implements Encoder<IN> {
private static final long serialVersionUID = -6865107843734614452L;
private String charsetName;
private transient Charset charset;
/**
* Creates a new {@code StringWriter} that uses {@code "UTF-8"} charset to convert
* strings to bytes.
*/
public SimpleStringEncoder() {
this("UTF-8");
}
/**
* Creates a new {@code StringWriter} that uses the given charset to convert
* strings to bytes.
*
* @param charsetName Name of the charset to be used, must be valid input for {@code Charset.forName(charsetName)}
*/
public SimpleStringEncoder(String charsetName) {
this.charsetName = charsetName;
}
@Override
public void encode(IN element, OutputStream stream) throws IOException {
if (charset == null) {
charset = Charset.forName(charsetName);
}
stream.write(element.toString().getBytes(charset));
stream.write('\n');
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.core.io;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import java.io.IOException;
import java.util.Arrays;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
/**
* Simple serialization / deserialization methods for the {@link SimpleVersionedSerializer}.
*/
@PublicEvolving
public class SimpleVersionedSerialization {
/**
* Serializes the version and datum into a stream.
*
* <p>Data serialized via this method can be deserialized via
* {@link #readVersionAndDeSerialize(SimpleVersionedSerializer, DataInputView)}.
*
* <p>The first four bytes will be occupied by the version, as returned by
* {@link SimpleVersionedSerializer#getVersion()}. The remaining bytes will be the serialized
* datum, as produced by {@link SimpleVersionedSerializer#serialize(Object)}, plus its length.
* The resulting array will hence be eight bytes larger than the serialized datum.
*
* @param serializer The serializer to serialize the datum with.
* @param datum The datum to serialize.
* @param out The stream to serialize to.
*/
public static <T> void writeVersionAndSerialize(SimpleVersionedSerializer<T> serializer, T datum, DataOutputView out) throws IOException {
checkNotNull(serializer, "serializer");
checkNotNull(datum, "datum");
checkNotNull(out, "out");
final byte[] data = serializer.serialize(datum);
out.writeInt(serializer.getVersion());
out.writeInt(data.length);
out.write(data);
}
/**
* Deserializes the version and datum from a stream.
*
* <p>This method deserializes data serialized via
* {@link #writeVersionAndSerialize(SimpleVersionedSerializer, Object, DataOutputView)}.
*
* <p>The first four bytes will be interpreted as the version. The next four bytes will be
* interpreted as the length of the datum bytes, then length-many bytes will be read.
* Finally, the datum is deserialized via the {@link SimpleVersionedSerializer#deserialize(int, byte[])}
* method.
*
* @param serializer The serializer to serialize the datum with.
* @param in The stream to deserialize from.
*/
public static <T> T readVersionAndDeSerialize(SimpleVersionedSerializer<T> serializer, DataInputView in) throws IOException {
checkNotNull(serializer, "serializer");
checkNotNull(in, "in");
final int version = in.readInt();
final int length = in.readInt();
final byte[] data = new byte[length];
in.readFully(data);
return serializer.deserialize(version, data);
}
/**
* Serializes the version and datum into a byte array. The first four bytes will be occupied by
* the version (as returned by {@link SimpleVersionedSerializer#getVersion()}),
* written in <i>big-endian</i> encoding. The remaining bytes will be the serialized
* datum, as produced by {@link SimpleVersionedSerializer#serialize(Object)}. The resulting array
* will hence be four bytes larger than the serialized datum.
*
* <p>Data serialized via this method can be deserialized via
* {@link #readVersionAndDeSerialize(SimpleVersionedSerializer, byte[])}.
*
* @param serializer The serializer to serialize the datum with.
* @param datum The datum to serialize.
*
* @return A byte array containing the serialized version and serialized datum.
*
* @throws IOException Exceptions from the {@link SimpleVersionedSerializer#serialize(Object)}
* method are forwarded.
*/
public static <T> byte[] writeVersionAndSerialize(SimpleVersionedSerializer<T> serializer, T datum) throws IOException {
checkNotNull(serializer, "serializer");
checkNotNull(datum, "datum");
final byte[] data = serializer.serialize(datum);
final byte[] versionAndData = new byte[data.length + 4];
final int version = serializer.getVersion();
versionAndData[0] = (byte) (version >> 24);
versionAndData[1] = (byte) (version >> 16);
versionAndData[2] = (byte) (version >> 8);
versionAndData[3] = (byte) version;
// move the data to the array
System.arraycopy(data, 0, versionAndData, 4, data.length);
return versionAndData;
}
/**
* Deserializes the version and datum from a byte array. The first four bytes will be read as
* the version, in <i>big-endian</i> encoding. The remaining bytes will be passed to the serializer
* for deserialization, via {@link SimpleVersionedSerializer#deserialize(int, byte[])}.
*
* @param serializer The serializer to deserialize the datum with.
* @param bytes The bytes to deserialize from.
*
* @return The deserialized datum.
*
* @throws IOException Exceptions from the {@link SimpleVersionedSerializer#deserialize(int, byte[])}
* method are forwarded.
*/
public static <T> T readVersionAndDeSerialize(SimpleVersionedSerializer<T> serializer, byte[] bytes) throws IOException {
checkNotNull(serializer, "serializer");
checkNotNull(bytes, "bytes");
checkArgument(bytes.length >= 4, "byte array below minimum length (4 bytes)");
final byte[] dataOnly = Arrays.copyOfRange(bytes, 4, bytes.length);
final int version =
((bytes[0] & 0xff) << 24) |
((bytes[1] & 0xff) << 16) |
((bytes[2] & 0xff) << 8) |
(bytes[3] & 0xff);
return serializer.deserialize(version, dataOnly);
}
// ------------------------------------------------------------------------
/** Utility class, not meant to be instantiated. */
private SimpleVersionedSerialization() {}
}
......@@ -24,11 +24,11 @@ import java.io.IOException;
/**
* A simple serializer interface for versioned serialization.
*
*
* <p>The serializer has a version (returned by {@link #getVersion()}) which can be attached
* to the serialized data. When the serializer evolves, the version can be used to identify
* with which prior version the data was serialized.
*
*
* <pre>{@code
* MyType someObject = ...;
* SimpleVersionedSerializer<MyType> serializer = ...;
......@@ -37,13 +37,13 @@ import java.io.IOException;
* int version = serializer.getVersion();
*
* MyType deserialized = serializer.deserialize(version, serializedData);
*
*
* byte[] someOldData = ...;
* int oldVersion = ...;
* MyType deserializedOldObject = serializer.deserialize(oldVersion, someOldData);
*
*
* }</pre>
*
*
* @param <E> The data type serialized / deserialized by this serializer.
*/
@Internal
......@@ -51,7 +51,7 @@ public interface SimpleVersionedSerializer<E> extends Versioned {
/**
* Gets the version with which this serializer serializes.
*
*
* @return The version of the serialization schema.
*/
@Override
......@@ -61,10 +61,9 @@ public interface SimpleVersionedSerializer<E> extends Versioned {
* Serializes the given object. The serialization is assumed to correspond to the
* current serialization version (as returned by {@link #getVersion()}.
*
*
* @param obj The object to serialize.
* @return The serialized data (bytes).
*
*
* @throws IOException Thrown, if the serialization fails.
*/
byte[] serialize(E obj) throws IOException;
......@@ -72,11 +71,11 @@ public interface SimpleVersionedSerializer<E> extends Versioned {
/**
* De-serializes the given data (bytes) which was serialized with the scheme of the
* indicated version.
*
*
* @param version The version in which the data was serialized
* @param serialized The serialized data
* @return The deserialized object
*
*
* @throws IOException Thrown, if the deserialization fails.
*/
E deserialize(int version, byte[] serialized) throws IOException;
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.core.io;
import org.apache.flink.core.memory.DataInputDeserializer;
import org.apache.flink.core.memory.DataOutputSerializer;
import org.junit.Test;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
/**
* Tests for the {@link SimpleVersionedSerialization} class.
*/
public class SimpleVersionedSerializationTest {
@Test
public void testStreamSerializationRoundTrip() throws IOException {
final SimpleVersionedSerializer<String> utfEncoder = new SimpleVersionedSerializer<String>() {
private static final int VERSION = Integer.MAX_VALUE / 2; // version should occupy many bytes
@Override
public int getVersion() {
return VERSION;
}
@Override
public byte[] serialize(String str) throws IOException {
return str.getBytes(StandardCharsets.UTF_8);
}
@Override
public String deserialize(int version, byte[] serialized) throws IOException {
assertEquals(VERSION, version);
return new String(serialized, StandardCharsets.UTF_8);
}
};
final String testString = "dugfakgs";
final DataOutputSerializer out = new DataOutputSerializer(32);
SimpleVersionedSerialization.writeVersionAndSerialize(utfEncoder, testString, out);
final DataInputDeserializer in = new DataInputDeserializer(out.getCopyOfBuffer());
final String deserialized = SimpleVersionedSerialization.readVersionAndDeSerialize(utfEncoder, in);
assertEquals(testString, deserialized);
}
@Test
public void testStreamSerializeEmpty() throws IOException {
final String testString = "beeeep!";
SimpleVersionedSerializer<String> emptySerializer = new SimpleVersionedSerializer<String>() {
@Override
public int getVersion() {
return 42;
}
@Override
public byte[] serialize(String obj) throws IOException {
return new byte[0];
}
@Override
public String deserialize(int version, byte[] serialized) throws IOException {
assertEquals(42, version);
assertEquals(0, serialized.length);
return testString;
}
};
final DataOutputSerializer out = new DataOutputSerializer(32);
SimpleVersionedSerialization.writeVersionAndSerialize(emptySerializer, "abc", out);
final DataInputDeserializer in = new DataInputDeserializer(out.getCopyOfBuffer());
assertEquals(testString, SimpleVersionedSerialization.readVersionAndDeSerialize(emptySerializer, in));
}
@Test
public void testSerializationRoundTrip() throws IOException {
final SimpleVersionedSerializer<String> utfEncoder = new SimpleVersionedSerializer<String>() {
private static final int VERSION = Integer.MAX_VALUE / 2; // version should occupy many bytes
@Override
public int getVersion() {
return VERSION;
}
@Override
public byte[] serialize(String str) throws IOException {
return str.getBytes(StandardCharsets.UTF_8);
}
@Override
public String deserialize(int version, byte[] serialized) throws IOException {
assertEquals(VERSION, version);
return new String(serialized, StandardCharsets.UTF_8);
}
};
final String testString = "dugfakgs";
byte[] serialized = SimpleVersionedSerialization.writeVersionAndSerialize(utfEncoder, testString);
final String deserialized = SimpleVersionedSerialization.readVersionAndDeSerialize(utfEncoder, serialized);
assertEquals(testString, deserialized);
}
@Test
public void testSerializeEmpty() throws IOException {
final String testString = "beeeep!";
SimpleVersionedSerializer<String> emptySerializer = new SimpleVersionedSerializer<String>() {
@Override
public int getVersion() {
return 42;
}
@Override
public byte[] serialize(String obj) throws IOException {
return new byte[0];
}
@Override
public String deserialize(int version, byte[] serialized) throws IOException {
assertEquals(42, version);
assertEquals(0, serialized.length);
return testString;
}
};
byte[] serialized = SimpleVersionedSerialization.writeVersionAndSerialize(emptySerializer, "abc");
assertNotNull(serialized);
assertEquals(testString, SimpleVersionedSerialization.readVersionAndDeSerialize(emptySerializer, serialized));
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.api.functions.sink.filesystem;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.serialization.Encoder;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.fs.RecoverableWriter;
import org.apache.flink.util.Preconditions;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
/**
* A bucket is the directory organization of the output of the {@link StreamingFileSink}.
*
* <p>For each incoming element in the {@code BucketingSink}, the user-specified
* {@link org.apache.flink.streaming.api.functions.sink.filesystem.bucketers.Bucketer Bucketer} is
* queried to see in which bucket this element should be written to.
*/
@PublicEvolving
public class Bucket<IN> {
private static final String PART_PREFIX = "part";
private final String bucketId;
private final Path bucketPath;
private final int subtaskIndex;
private final Encoder<IN> encoder;
private final RecoverableWriter fsWriter;
private final Map<Long, List<RecoverableWriter.CommitRecoverable>> pendingPerCheckpoint = new HashMap<>();
private long partCounter;
private PartFileHandler<IN> currentPart;
private List<RecoverableWriter.CommitRecoverable> pending;
/**
* Constructor to restore a bucket from checkpointed state.
*/
public Bucket(
RecoverableWriter fsWriter,
int subtaskIndex,
long initialPartCounter,
Encoder<IN> writer,
BucketState bucketstate) throws IOException {
this(fsWriter, subtaskIndex, bucketstate.getBucketId(), bucketstate.getBucketPath(), initialPartCounter, writer);
// the constructor must have already initialized the filesystem writer
Preconditions.checkState(fsWriter != null);
// we try to resume the previous in-progress file, if the filesystem
// supports such operation. If not, we just commit the file and start fresh.
final RecoverableWriter.ResumeRecoverable resumable = bucketstate.getCurrentInProgress();
if (resumable != null) {
currentPart = PartFileHandler.resumeFrom(
bucketId, fsWriter, resumable, bucketstate.getCreationTime());
}
// we commit pending files for previous checkpoints to the last successful one
// (from which we are recovering from)
for (List<RecoverableWriter.CommitRecoverable> commitables: bucketstate.getPendingPerCheckpoint().values()) {
for (RecoverableWriter.CommitRecoverable commitable: commitables) {
fsWriter.recoverForCommit(commitable).commitAfterRecovery();
}
}
}
/**
* Constructor to create a new empty bucket.
*/
public Bucket(
RecoverableWriter fsWriter,
int subtaskIndex,
String bucketId,
Path bucketPath,
long initialPartCounter,
Encoder<IN> writer) {
this.fsWriter = Preconditions.checkNotNull(fsWriter);
this.subtaskIndex = subtaskIndex;
this.bucketId = Preconditions.checkNotNull(bucketId);
this.bucketPath = Preconditions.checkNotNull(bucketPath);
this.partCounter = initialPartCounter;
this.encoder = Preconditions.checkNotNull(writer);
this.pending = new ArrayList<>();
}
public PartFileInfo getInProgressPartInfo() {
return currentPart;
}
public String getBucketId() {
return bucketId;
}
public Path getBucketPath() {
return bucketPath;
}
public long getPartCounter() {
return partCounter;
}
public boolean isActive() {
return currentPart != null || !pending.isEmpty() || !pendingPerCheckpoint.isEmpty();
}
void write(IN element, long currentTime) throws IOException {
Preconditions.checkState(currentPart != null, "bucket has been closed");
currentPart.write(element, encoder, currentTime);
}
void rollPartFile(final long currentTime) throws IOException {
closePartFile();
currentPart = PartFileHandler.openNew(bucketId, fsWriter, getNewPartPath(), currentTime);
partCounter++;
}
void merge(final Bucket<IN> bucket) throws IOException {
Preconditions.checkNotNull(bucket);
Preconditions.checkState(bucket.getBucketPath().equals(getBucketPath()));
// there should be no pending files in the "to-merge" states.
Preconditions.checkState(bucket.pending.isEmpty());
Preconditions.checkState(bucket.pendingPerCheckpoint.isEmpty());
RecoverableWriter.CommitRecoverable commitable = bucket.closePartFile();
if (commitable != null) {
pending.add(commitable);
}
}
RecoverableWriter.CommitRecoverable closePartFile() throws IOException {
RecoverableWriter.CommitRecoverable commitable = null;
if (currentPart != null) {
commitable = currentPart.closeForCommit();
pending.add(commitable);
currentPart = null;
}
return commitable;
}
public void dispose() {
if (currentPart != null) {
currentPart.dispose();
}
}
public void commitUpToCheckpoint(long checkpointId) throws IOException {
Preconditions.checkNotNull(fsWriter);
Iterator<Map.Entry<Long, List<RecoverableWriter.CommitRecoverable>>> it =
pendingPerCheckpoint.entrySet().iterator();
while (it.hasNext()) {
Map.Entry<Long, List<RecoverableWriter.CommitRecoverable>> entry = it.next();
if (entry.getKey() <= checkpointId) {
for (RecoverableWriter.CommitRecoverable commitable : entry.getValue()) {
fsWriter.recoverForCommit(commitable).commit();
}
it.remove();
}
}
}
public BucketState snapshot(long checkpointId) throws IOException {
RecoverableWriter.ResumeRecoverable resumable = null;
long creationTime = Long.MAX_VALUE;
if (currentPart != null) {
resumable = currentPart.persist();
creationTime = currentPart.getCreationTime();
}
if (!pending.isEmpty()) {
pendingPerCheckpoint.put(checkpointId, pending);
pending = new ArrayList<>();
}
return new BucketState(bucketId, bucketPath, creationTime, resumable, pendingPerCheckpoint);
}
private Path getNewPartPath() {
return new Path(bucketPath, PART_PREFIX + '-' + subtaskIndex + '-' + partCounter);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.api.functions.sink.filesystem;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.serialization.Encoder;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.fs.RecoverableWriter;
import java.io.IOException;
import java.io.Serializable;
/**
* A factory able to create {@link Bucket buckets} for the {@link StreamingFileSink}.
*/
@Internal
public interface BucketFactory<IN> extends Serializable {
Bucket<IN> getNewBucket(
RecoverableWriter fsWriter,
int subtaskIndex,
String bucketId,
Path bucketPath,
long initialPartCounter,
Encoder<IN> writer) throws IOException;
Bucket<IN> restoreBucket(
RecoverableWriter fsWriter,
int subtaskIndex,
long initialPartCounter,
Encoder<IN> writer,
BucketState bucketstate) throws IOException;
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.api.functions.sink.filesystem;
import org.apache.flink.annotation.Internal;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.fs.RecoverableWriter;
import org.apache.flink.util.Preconditions;
import javax.annotation.Nullable;
import java.util.List;
import java.util.Map;
/**
* The state of the {@link Bucket} that is to be checkpointed.
*/
@Internal
public class BucketState {
private final String bucketId;
/**
* The base path for the bucket, i.e. the directory where all the part files are stored.
*/
private final Path bucketPath;
/**
* The creation time of the currently open part file, or {@code Long.MAX_VALUE} if there is no open part file.
*/
private final long creationTime;
/**
* A {@link RecoverableWriter.ResumeRecoverable} for the currently open part file, or null
* if there is no currently open part file.
*/
@Nullable
private final RecoverableWriter.ResumeRecoverable inProgress;
/**
* The {@link RecoverableWriter.CommitRecoverable files} pending to be committed, organized by checkpoint id.
*/
private final Map<Long, List<RecoverableWriter.CommitRecoverable>> pendingPerCheckpoint;
public BucketState(
final String bucketId,
final Path bucketPath,
final long creationTime,
final @Nullable RecoverableWriter.ResumeRecoverable inProgress,
final Map<Long, List<RecoverableWriter.CommitRecoverable>> pendingPerCheckpoint
) {
this.bucketId = Preconditions.checkNotNull(bucketId);
this.bucketPath = Preconditions.checkNotNull(bucketPath);
this.creationTime = creationTime;
this.inProgress = inProgress;
this.pendingPerCheckpoint = Preconditions.checkNotNull(pendingPerCheckpoint);
}
public String getBucketId() {
return bucketId;
}
public Path getBucketPath() {
return bucketPath;
}
public long getCreationTime() {
return creationTime;
}
@Nullable
public RecoverableWriter.ResumeRecoverable getCurrentInProgress() {
return inProgress;
}
public Map<Long, List<RecoverableWriter.CommitRecoverable>> getPendingPerCheckpoint() {
return pendingPerCheckpoint;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.api.functions.sink.filesystem;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.fs.RecoverableWriter;
import org.apache.flink.core.fs.RecoverableWriter.CommitRecoverable;
import org.apache.flink.core.io.SimpleVersionedSerialization;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.core.memory.DataInputDeserializer;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputSerializer;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.util.Preconditions;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
/**
* A {@code SimpleVersionedSerializer} used to serialize the {@link BucketState BucketState}.
*/
@Internal
class BucketStateSerializer implements SimpleVersionedSerializer<BucketState> {
private static final int MAGIC_NUMBER = 0x1e764b79;
private final SimpleVersionedSerializer<RecoverableWriter.ResumeRecoverable> resumableSerializer;
private final SimpleVersionedSerializer<RecoverableWriter.CommitRecoverable> commitableSerializer;
public BucketStateSerializer(
final SimpleVersionedSerializer<RecoverableWriter.ResumeRecoverable> resumableSerializer,
final SimpleVersionedSerializer<RecoverableWriter.CommitRecoverable> commitableSerializer) {
this.resumableSerializer = Preconditions.checkNotNull(resumableSerializer);
this.commitableSerializer = Preconditions.checkNotNull(commitableSerializer);
}
@Override
public int getVersion() {
return 1;
}
@Override
public byte[] serialize(BucketState state) throws IOException {
DataOutputSerializer out = new DataOutputSerializer(256);
out.writeInt(MAGIC_NUMBER);
serializeV1(state, out);
return out.getCopyOfBuffer();
}
@Override
public BucketState deserialize(int version, byte[] serialized) throws IOException {
switch (version) {
case 1:
DataInputDeserializer in = new DataInputDeserializer(serialized);
validateMagicNumber(in);
return deserializeV1(in);
default:
throw new IOException("Unrecognized version or corrupt state: " + version);
}
}
@VisibleForTesting
void serializeV1(BucketState state, DataOutputView out) throws IOException {
out.writeUTF(state.getBucketId());
out.writeUTF(state.getBucketPath().toString());
out.writeLong(state.getCreationTime());
// put the current open part file
final RecoverableWriter.ResumeRecoverable currentPart = state.getCurrentInProgress();
if (currentPart != null) {
out.writeBoolean(true);
SimpleVersionedSerialization.writeVersionAndSerialize(resumableSerializer, currentPart, out);
}
else {
out.writeBoolean(false);
}
// put the map of pending files per checkpoint
final Map<Long, List<CommitRecoverable>> pendingCommitters = state.getPendingPerCheckpoint();
// manually keep the version here to safe some bytes
out.writeInt(commitableSerializer.getVersion());
out.writeInt(pendingCommitters.size());
for (Entry<Long, List<CommitRecoverable>> resumablesForCheckpoint : pendingCommitters.entrySet()) {
List<CommitRecoverable> resumables = resumablesForCheckpoint.getValue();
out.writeLong(resumablesForCheckpoint.getKey());
out.writeInt(resumables.size());
for (CommitRecoverable resumable : resumables) {
byte[] serialized = commitableSerializer.serialize(resumable);
out.writeInt(serialized.length);
out.write(serialized);
}
}
}
@VisibleForTesting
BucketState deserializeV1(DataInputView in) throws IOException {
final String bucketId = in.readUTF();
final String bucketPathStr = in.readUTF();
final long creationTime = in.readLong();
// then get the current resumable stream
RecoverableWriter.ResumeRecoverable current = null;
if (in.readBoolean()) {
current = SimpleVersionedSerialization.readVersionAndDeSerialize(resumableSerializer, in);
}
final int committableVersion = in.readInt();
final int numCheckpoints = in.readInt();
final HashMap<Long, List<RecoverableWriter.CommitRecoverable>> resumablesPerCheckpoint = new HashMap<>(numCheckpoints);
for (int i = 0; i < numCheckpoints; i++) {
final long checkpointId = in.readLong();
final int noOfResumables = in.readInt();
final ArrayList<RecoverableWriter.CommitRecoverable> resumables = new ArrayList<>(noOfResumables);
for (int j = 0; j < noOfResumables; j++) {
final byte[] bytes = new byte[in.readInt()];
in.readFully(bytes);
resumables.add(commitableSerializer.deserialize(committableVersion, bytes));
}
resumablesPerCheckpoint.put(checkpointId, resumables);
}
return new BucketState(
bucketId,
new Path(bucketPathStr),
creationTime,
current,
resumablesPerCheckpoint);
}
private static void validateMagicNumber(DataInputView in) throws IOException {
final int magicNumber = in.readInt();
if (magicNumber != MAGIC_NUMBER) {
throw new IOException(String.format("Corrupt data: Unexpected magic number %08X", magicNumber));
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.api.functions.sink.filesystem;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.serialization.Encoder;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.fs.RecoverableWriter;
import java.io.IOException;
/**
* A factory returning {@link Bucket buckets}.
*/
@Internal
public class DefaultBucketFactory<IN> implements BucketFactory<IN> {
private static final long serialVersionUID = 3372881359208513357L;
@Override
public Bucket<IN> getNewBucket(
RecoverableWriter fsWriter,
int subtaskIndex,
String bucketId,
Path bucketPath,
long initialPartCounter,
Encoder<IN> writer) throws IOException {
return new Bucket<>(
fsWriter,
subtaskIndex,
bucketId,
bucketPath,
initialPartCounter,
writer);
}
@Override
public Bucket<IN> restoreBucket(
RecoverableWriter fsWriter,
int subtaskIndex,
long initialPartCounter,
Encoder<IN> writer,
BucketState bucketState) throws IOException {
return new Bucket<>(
fsWriter,
subtaskIndex,
initialPartCounter,
writer,
bucketState);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.api.functions.sink.filesystem;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.util.Preconditions;
import java.io.IOException;
/**
* The default implementation of the {@link RollingPolicy}.
*
* <p>This policy rolls a part file if:
* <ol>
* <li>there is no open part file,</li>
* <li>the current file has reached the maximum bucket size (by default 128MB),</li>
* <li>the current file is older than the roll over interval (by default 60 sec), or</li>
* <li>the current file has not been written to for more than the allowed inactivityTime (by default 60 sec).</li>
* </ol>
*/
@PublicEvolving
public final class DefaultRollingPolicy implements RollingPolicy {
private static final long serialVersionUID = 1318929857047767030L;
private static final long DEFAULT_INACTIVITY_INTERVAL = 60L * 1000L;
private static final long DEFAULT_ROLLOVER_INTERVAL = 60L * 1000L;
private static final long DEFAULT_MAX_PART_SIZE = 1024L * 1024L * 128L;
private final long partSize;
private final long rolloverInterval;
private final long inactivityInterval;
/**
* Private constructor to avoid direct instantiation.
*/
private DefaultRollingPolicy(long partSize, long rolloverInterval, long inactivityInterval) {
Preconditions.checkArgument(partSize > 0L);
Preconditions.checkArgument(rolloverInterval > 0L);
Preconditions.checkArgument(inactivityInterval > 0L);
this.partSize = partSize;
this.rolloverInterval = rolloverInterval;
this.inactivityInterval = inactivityInterval;
}
@Override
public boolean shouldRoll(final PartFileInfo state, final long currentTime) throws IOException {
if (state == null) {
// this means that there is no currently open part file.
return true;
}
if (state.getSize() > partSize) {
return true;
}
if (currentTime - state.getCreationTime() > rolloverInterval) {
return true;
}
return currentTime - state.getLastUpdateTime() > inactivityInterval;
}
/**
* Initiates the instantiation of a {@link DefaultRollingPolicy}.
* To finalize it and have the actual policy, call {@code .create()}.
*/
public static PolicyBuilder create() {
return new PolicyBuilder();
}
/**
* A helper class that holds the configuration properties for the {@link DefaultRollingPolicy}.
*/
@PublicEvolving
public static class PolicyBuilder {
private long partSize = DEFAULT_MAX_PART_SIZE;
private long rolloverInterval = DEFAULT_ROLLOVER_INTERVAL;
private long inactivityInterval = DEFAULT_INACTIVITY_INTERVAL;
/**
* Sets the part size above which a part file will have to roll.
* @param size the allowed part size.
*/
public PolicyBuilder withMaxPartSize(long size) {
Preconditions.checkState(size > 0L);
this.partSize = size;
return this;
}
/**
* Sets the interval of allowed inactivity after which a part file will have to roll.
* @param interval the allowed inactivity interval.
*/
public PolicyBuilder withInactivityInterval(long interval) {
Preconditions.checkState(interval > 0L);
this.inactivityInterval = interval;
return this;
}
/**
* Sets the max time a part file can stay open before having to roll.
* @param interval the desired rollover interval.
*/
public PolicyBuilder withRolloverInterval(long interval) {
Preconditions.checkState(interval > 0L);
this.rolloverInterval = interval;
return this;
}
/**
* Creates the actual policy.
*/
public DefaultRollingPolicy build() {
return new DefaultRollingPolicy(partSize, rolloverInterval, inactivityInterval);
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.api.functions.sink.filesystem;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.serialization.Encoder;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
import org.apache.flink.core.fs.RecoverableWriter;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.Preconditions;
import java.io.IOException;
/**
* A handler for the currently open part file in a specific {@link Bucket}.
* This also implements the {@link PartFileInfo}.
*/
@Internal
class PartFileHandler<IN> implements PartFileInfo {
private final String bucketId;
private final long creationTime;
private final RecoverableFsDataOutputStream currentPartStream;
private long lastUpdateTime;
private PartFileHandler(
final String bucketId,
final RecoverableFsDataOutputStream currentPartStream,
final long creationTime) {
Preconditions.checkArgument(creationTime >= 0L);
this.bucketId = Preconditions.checkNotNull(bucketId);
this.currentPartStream = Preconditions.checkNotNull(currentPartStream);
this.creationTime = creationTime;
this.lastUpdateTime = creationTime;
}
public static <IN> PartFileHandler<IN> resumeFrom(
final String bucketId,
final RecoverableWriter fileSystemWriter,
final RecoverableWriter.ResumeRecoverable resumable,
final long creationTime) throws IOException {
Preconditions.checkNotNull(bucketId);
Preconditions.checkNotNull(fileSystemWriter);
Preconditions.checkNotNull(resumable);
final RecoverableFsDataOutputStream stream = fileSystemWriter.recover(resumable);
return new PartFileHandler<>(bucketId, stream, creationTime);
}
public static <IN> PartFileHandler<IN> openNew(
final String bucketId,
final RecoverableWriter fileSystemWriter,
final Path path,
final long creationTime) throws IOException {
Preconditions.checkNotNull(bucketId);
Preconditions.checkNotNull(fileSystemWriter);
Preconditions.checkNotNull(path);
final RecoverableFsDataOutputStream stream = fileSystemWriter.open(path);
return new PartFileHandler<>(bucketId, stream, creationTime);
}
void write(IN element, Encoder<IN> encoder, long currentTime) throws IOException {
encoder.encode(element, currentPartStream);
this.lastUpdateTime = currentTime;
}
RecoverableWriter.ResumeRecoverable persist() throws IOException {
return currentPartStream.persist();
}
RecoverableWriter.CommitRecoverable closeForCommit() throws IOException {
return currentPartStream.closeForCommit().getRecoverable();
}
void dispose() {
// we can suppress exceptions here, because we do not rely on close() to
// flush or persist any data
IOUtils.closeQuietly(currentPartStream);
}
@Override
public String getBucketId() {
return bucketId;
}
@Override
public long getCreationTime() {
return creationTime;
}
@Override
public long getSize() throws IOException {
return currentPartStream.getPos();
}
@Override
public long getLastUpdateTime() {
return lastUpdateTime;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.api.functions.sink.filesystem;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketers.Bucketer;
import java.io.IOException;
/**
* An interface exposing the information concerning the current (open) part file
* that is necessary to the {@link RollingPolicy} in order to determine if it
* should roll the part file or not.
*/
@PublicEvolving
public interface PartFileInfo {
/**
* @return The bucket identifier of the current buffer, as returned by the
* {@link Bucketer#getBucketId(Object, Bucketer.Context)}.
*/
String getBucketId();
/**
* @return The creation time (in ms) of the currently open part file.
*/
long getCreationTime();
/**
* @return The size of the currently open part file.
*/
long getSize() throws IOException;
/**
* @return The last time (in ms) the currently open part file was written to.
*/
long getLastUpdateTime();
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.api.functions.sink.filesystem;
import org.apache.flink.annotation.PublicEvolving;
import java.io.IOException;
import java.io.Serializable;
/**
* The policy based on which a {@link Bucket} in the {@link StreamingFileSink}
* rolls its currently open part file and opens a new one.
*/
@PublicEvolving
public interface RollingPolicy extends Serializable {
/**
* Determines if the in-progress part file for a bucket should roll.
* @param partFileState the state of the currently open part file of the bucket.
* @param currentTime the current processing time.
* @return {@code True} if the part file should roll, {@link false} otherwise.
*/
boolean shouldRoll(final PartFileInfo partFileState, final long currentTime) throws IOException;
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.api.functions.sink.filesystem;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.serialization.Encoder;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.OperatorStateStore;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.fs.RecoverableWriter;
import org.apache.flink.core.io.SimpleVersionedSerialization;
import org.apache.flink.runtime.state.CheckpointListener;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketers.Bucketer;
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketers.DateTimeBucketer;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
/**
* Sink that emits its input elements to {@link FileSystem} files within buckets. This is
* integrated with the checkpointing mechanism to provide exactly once semantics.
*
*
* <p>When creating the sink a {@code basePath} must be specified. The base directory contains
* one directory for every bucket. The bucket directories themselves contain several part files,
* with at least one for each parallel subtask of the sink which is writing data to that bucket.
* These part files contain the actual output data.
*
*
* <p>The sink uses a {@link Bucketer} to determine in which bucket directory each element should
* be written to inside the base directory. The {@code Bucketer} can, for example, use time or
* a property of the element to determine the bucket directory. The default {@code Bucketer} is a
* {@link DateTimeBucketer} which will create one new bucket every hour. You can specify
* a custom {@code Bucketer} using {@link #setBucketer(Bucketer)}.
*
*
* <p>The filenames of the part files contain the part prefix, "part-", the parallel subtask index of the sink
* and a rolling counter. For example the file {@code "part-1-17"} contains the data from
* {@code subtask 1} of the sink and is the {@code 17th} bucket created by that subtask.
* Part files roll based on the user-specified {@link RollingPolicy}. By default, a {@link DefaultRollingPolicy}
* is used.
*
* <p>In some scenarios, the open buckets are required to change based on time. In these cases, the user
* can specify a {@code bucketCheckInterval} (by default 1m) and the sink will check periodically and roll
* the part file if the specified rolling policy says so.
*
* <p>Part files can be in one of three states: {@code in-progress}, {@code pending} or {@code finished}.
* The reason for this is how the sink works together with the checkpointing mechanism to provide exactly-once
* semantics and fault-tolerance. The part file that is currently being written to is {@code in-progress}. Once
* a part file is closed for writing it becomes {@code pending}. When a checkpoint is successful the currently
* pending files will be moved to {@code finished}.
*
*
* <p>If case of a failure, and in order to guarantee exactly-once semantics, the sink should roll back to the state it
* had when that last successful checkpoint occurred. To this end, when restoring, the restored files in {@code pending}
* state are transferred into the {@code finished} state while any {@code in-progress} files are rolled back, so that
* they do not contain data that arrived after the checkpoint from which we restore.
*
* <p><b>NOTE:</b>
* <ol>
* <li>
* If checkpointing is not enabled the pending files will never be moved to the finished state.
* </li>
* <li>
* The part files are written using an instance of {@link Encoder}. By default, a
* {@link SimpleStringEncoder} is used, which writes the result of {@code toString()} for
* every element, separated by newlines. You can configure the writer using the
* {@link #setEncoder(Encoder)}.
* </li>
* </ol>
*
* @param <IN> Type of the elements emitted by this sink
*/
@PublicEvolving
public class StreamingFileSink<IN>
extends RichSinkFunction<IN>
implements CheckpointedFunction, CheckpointListener, ProcessingTimeCallback {
private static final long serialVersionUID = 1L;
private static final Logger LOG = LoggerFactory.getLogger(StreamingFileSink.class);
// -------------------------- state descriptors ---------------------------
private static final ListStateDescriptor<byte[]> BUCKET_STATE_DESC =
new ListStateDescriptor<>("bucket-states", BytePrimitiveArraySerializer.INSTANCE);
private static final ListStateDescriptor<Long> MAX_PART_COUNTER_STATE_DESC =
new ListStateDescriptor<>("max-part-counter", LongSerializer.INSTANCE);
// ------------------------ configuration fields --------------------------
private final Path basePath;
private final BucketFactory<IN> bucketFactory;
private long bucketCheckInterval = 60L * 1000L;
private Bucketer<IN> bucketer;
private Encoder<IN> encoder;
private RollingPolicy rollingPolicy;
// --------------------------- runtime fields -----------------------------
private transient BucketerContext bucketerContext;
private transient RecoverableWriter fileSystemWriter;
private transient ProcessingTimeService processingTimeService;
private transient Map<String, Bucket<IN>> activeBuckets;
////////////////// State Related Fields /////////////////////
private transient BucketStateSerializer bucketStateSerializer;
private transient ListState<byte[]> restoredBucketStates;
private transient ListState<Long> restoredMaxCounters;
private transient long initMaxPartCounter;
private transient long maxPartCounterUsed;
/**
* Creates a new {@code StreamingFileSink} that writes files to the given base directory.
*
* <p>This uses a {@link DateTimeBucketer} as {@link Bucketer} and a {@link SimpleStringEncoder} as a writer.
*
* @param basePath The directory to which to write the bucket files.
*/
public StreamingFileSink(Path basePath) {
this(basePath, new DefaultBucketFactory<>());
}
@VisibleForTesting
StreamingFileSink(Path basePath, BucketFactory<IN> bucketFactory) {
this.basePath = Preconditions.checkNotNull(basePath);
this.bucketer = new DateTimeBucketer<>();
this.encoder = new SimpleStringEncoder<>();
this.rollingPolicy = DefaultRollingPolicy.create().build();
this.bucketFactory = Preconditions.checkNotNull(bucketFactory);
}
public StreamingFileSink<IN> setEncoder(Encoder<IN> encoder) {
this.encoder = Preconditions.checkNotNull(encoder);
return this;
}
public StreamingFileSink<IN> setBucketer(Bucketer<IN> bucketer) {
this.bucketer = Preconditions.checkNotNull(bucketer);
return this;
}
public StreamingFileSink<IN> setBucketCheckInterval(long interval) {
this.bucketCheckInterval = interval;
return this;
}
public StreamingFileSink<IN> setRollingPolicy(RollingPolicy policy) {
this.rollingPolicy = Preconditions.checkNotNull(policy);
return this;
}
@Override
public void notifyCheckpointComplete(long checkpointId) throws Exception {
final Iterator<Map.Entry<String, Bucket<IN>>> activeBucketIt =
activeBuckets.entrySet().iterator();
while (activeBucketIt.hasNext()) {
Bucket<IN> bucket = activeBucketIt.next().getValue();
bucket.commitUpToCheckpoint(checkpointId);
if (!bucket.isActive()) {
// We've dealt with all the pending files and the writer for this bucket is not currently open.
// Therefore this bucket is currently inactive and we can remove it from our state.
activeBucketIt.remove();
}
}
}
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
Preconditions.checkState(
restoredBucketStates != null && fileSystemWriter != null && bucketStateSerializer != null,
"sink has not been initialized");
restoredBucketStates.clear();
for (Bucket<IN> bucket : activeBuckets.values()) {
final PartFileInfo info = bucket.getInProgressPartInfo();
final long checkpointTimestamp = context.getCheckpointTimestamp();
if (info != null && rollingPolicy.shouldRoll(info, checkpointTimestamp)) {
// we also check here so that we do not have to always
// wait for the "next" element to arrive.
bucket.closePartFile();
}
final BucketState bucketState = bucket.snapshot(context.getCheckpointId());
restoredBucketStates.add(SimpleVersionedSerialization.writeVersionAndSerialize(bucketStateSerializer, bucketState));
}
restoredMaxCounters.clear();
restoredMaxCounters.add(maxPartCounterUsed);
}
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
initFileSystemWriter();
this.activeBuckets = new HashMap<>();
// When resuming after a failure:
// 1) we get the max part counter used before in order to make sure that we do not overwrite valid data
// 2) we commit any pending files for previous checkpoints (previous to the last successful one)
// 3) we resume writing to the previous in-progress file of each bucket, and
// 4) if we receive multiple states for the same bucket, we merge them.
final OperatorStateStore stateStore = context.getOperatorStateStore();
restoredBucketStates = stateStore.getListState(BUCKET_STATE_DESC);
restoredMaxCounters = stateStore.getUnionListState(MAX_PART_COUNTER_STATE_DESC);
if (context.isRestored()) {
final int subtaskIndex = getRuntimeContext().getIndexOfThisSubtask();
LOG.info("Restoring state for the {} (taskIdx={}).", getClass().getSimpleName(), subtaskIndex);
long maxCounter = 0L;
for (long partCounter: restoredMaxCounters.get()) {
maxCounter = Math.max(partCounter, maxCounter);
}
initMaxPartCounter = maxCounter;
for (byte[] recoveredState : restoredBucketStates.get()) {
final BucketState bucketState = SimpleVersionedSerialization.readVersionAndDeSerialize(
bucketStateSerializer, recoveredState);
final String bucketId = bucketState.getBucketId();
LOG.info("Recovered bucket for {}", bucketId);
final Bucket<IN> restoredBucket = bucketFactory.restoreBucket(
fileSystemWriter,
subtaskIndex,
initMaxPartCounter,
encoder,
bucketState
);
final Bucket<IN> existingBucket = activeBuckets.get(bucketId);
if (existingBucket == null) {
activeBuckets.put(bucketId, restoredBucket);
} else {
existingBucket.merge(restoredBucket);
}
if (LOG.isDebugEnabled()) {
LOG.debug("{} idx {} restored state for bucket {}", getClass().getSimpleName(),
subtaskIndex, assembleBucketPath(bucketId));
}
}
}
}
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
processingTimeService = ((StreamingRuntimeContext) getRuntimeContext()).getProcessingTimeService();
long currentProcessingTime = processingTimeService.getCurrentProcessingTime();
processingTimeService.registerTimer(currentProcessingTime + bucketCheckInterval, this);
this.bucketerContext = new BucketerContext();
}
@Override
public void onProcessingTime(long timestamp) throws Exception {
final long currentTime = processingTimeService.getCurrentProcessingTime();
for (Bucket<IN> bucket : activeBuckets.values()) {
final PartFileInfo info = bucket.getInProgressPartInfo();
if (info != null && rollingPolicy.shouldRoll(info, currentTime)) {
bucket.closePartFile();
}
}
processingTimeService.registerTimer(timestamp + bucketCheckInterval, this);
}
@Override
public void invoke(IN value, Context context) throws Exception {
final long currentProcessingTime = processingTimeService.getCurrentProcessingTime();
final int subtaskIndex = getRuntimeContext().getIndexOfThisSubtask();
// setting the values in the bucketer context
bucketerContext.update(context.timestamp(), currentProcessingTime, context.currentWatermark());
final String bucketId = bucketer.getBucketId(value, bucketerContext);
Bucket<IN> bucket = activeBuckets.get(bucketId);
if (bucket == null) {
final Path bucketPath = assembleBucketPath(bucketId);
bucket = bucketFactory.getNewBucket(
fileSystemWriter,
subtaskIndex,
bucketId,
bucketPath,
initMaxPartCounter,
encoder);
activeBuckets.put(bucketId, bucket);
}
final PartFileInfo info = bucket.getInProgressPartInfo();
if (info == null || rollingPolicy.shouldRoll(info, currentProcessingTime)) {
bucket.rollPartFile(currentProcessingTime);
}
bucket.write(value, currentProcessingTime);
// we update the counter here because as buckets become inactive and
// get removed in the initializeState(), at the time we snapshot they
// may not be there to take them into account during checkpointing.
updateMaxPartCounter(bucket.getPartCounter());
}
@Override
public void close() throws Exception {
if (activeBuckets != null) {
activeBuckets.values().forEach(Bucket::dispose);
}
}
private void initFileSystemWriter() throws IOException {
if (fileSystemWriter == null) {
fileSystemWriter = FileSystem.get(basePath.toUri()).createRecoverableWriter();
bucketStateSerializer = new BucketStateSerializer(
fileSystemWriter.getResumeRecoverableSerializer(),
fileSystemWriter.getCommitRecoverableSerializer()
);
}
}
private void updateMaxPartCounter(long candidate) {
maxPartCounterUsed = Math.max(maxPartCounterUsed, candidate);
}
private Path assembleBucketPath(String bucketId) {
return new Path(basePath, bucketId);
}
/**
* The {@link Bucketer.Context} exposed to the
* {@link Bucketer#getBucketId(Object, Bucketer.Context)}
* whenever a new incoming element arrives.
*/
private static class BucketerContext implements Bucketer.Context {
@Nullable
private Long elementTimestamp;
private long currentWatermark;
private long currentProcessingTime;
void update(@Nullable Long elementTimestamp, long currentWatermark, long currentProcessingTime) {
this.elementTimestamp = elementTimestamp;
this.currentWatermark = currentWatermark;
this.currentProcessingTime = currentProcessingTime;
}
@Override
public long currentProcessingTime() {
return currentProcessingTime;
}
@Override
public long currentWatermark() {
return currentWatermark;
}
@Override
@Nullable
public Long timestamp() {
return elementTimestamp;
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.api.functions.sink.filesystem.bucketers;
import org.apache.flink.annotation.PublicEvolving;
/**
* A {@link Bucketer} that does not perform any
* bucketing of files. All files are written to the base path.
*/
@PublicEvolving
public class BasePathBucketer<T> implements Bucketer<T> {
private static final long serialVersionUID = -6033643155550226022L;
@Override
public String getBucketId(T element, Context context) {
return "";
}
@Override
public String toString() {
return "BasePathBucketer";
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.api.functions.sink.filesystem.bucketers;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import javax.annotation.Nullable;
import java.io.Serializable;
/**
* A bucketer is used with a {@link org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink}
* to determine the {@link org.apache.flink.streaming.api.functions.sink.filesystem.Bucket} each incoming element
* should be put into.
*
* <p>The {@code StreamingFileSink} can be writing to many buckets at a time, and it is responsible for managing
* a set of active buckets. Whenever a new element arrives it will ask the {@code Bucketer} for the bucket the
* element should fall in. The {@code Bucketer} can, for example, determine buckets based on system time.
*/
@PublicEvolving
public interface Bucketer<T> extends Serializable {
/**
* Returns the identifier of the bucket the provided element should be put into.
* @param element The current element being processed.
* @param context The {@link SinkFunction.Context context} used by the
* {@link org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink sink}.
*
* @return A string representing the identifier of the bucket the element should be put into.
* This actual path to the bucket will result from the concatenation of the returned string
* and the {@code base path} provided during the initialization of the
* {@link org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink sink}.
*/
String getBucketId(T element, Context context);
/**
* Context that the {@link Bucketer} can use for getting additional data about
* an input record.
*
* <p>The context is only valid for the duration of a {@link Bucketer#getBucketId(Object, Context)} call.
* Do not store the context and use afterwards!
*/
@PublicEvolving
interface Context {
/**
* Returns the current processing time.
*/
long currentProcessingTime();
/**
* Returns the current event-time watermark.
*/
long currentWatermark();
/**
* Returns the timestamp of the current input record or
* {@code null} if the element does not have an assigned timestamp.
*/
@Nullable
Long timestamp();
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.api.functions.sink.filesystem.bucketers;
import org.apache.flink.annotation.PublicEvolving;
import java.text.SimpleDateFormat;
import java.util.Date;
/**
* A {@link Bucketer} that assigns to buckets based on current system time.
*
*
* <p>The {@code DateTimeBucketer} will create directories of the following form:
* {@code /{basePath}/{dateTimePath}/}. The {@code basePath} is the path
* that was specified as a base path when creating the
* {@link org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink}.
* The {@code dateTimePath} is determined based on the current system time and the
* user provided format string.
*
*
* <p>{@link SimpleDateFormat} is used to derive a date string from the current system time and
* the date format string. The default format string is {@code "yyyy-MM-dd--HH"} so the rolling
* files will have a granularity of hours.
*
* <p>Example:
*
* <pre>{@code
* Bucketer buck = new DateTimeBucketer("yyyy-MM-dd--HH");
* }</pre>
*
* <p>This will create for example the following bucket path:
* {@code /base/1976-12-31-14/}
*
*/
@PublicEvolving
public class DateTimeBucketer<T> implements Bucketer<T> {
private static final long serialVersionUID = 3284420879277893804L;
private static final String DEFAULT_FORMAT_STRING = "yyyy-MM-dd--HH";
private final String formatString;
private transient SimpleDateFormat dateFormatter;
/**
* Creates a new {@code DateTimeBucketer} with format string {@code "yyyy-MM-dd--HH"}.
*/
public DateTimeBucketer() {
this(DEFAULT_FORMAT_STRING);
}
/**
* Creates a new {@code DateTimeBucketer} with the given date/time format string.
*
* @param formatString The format string that will be given to {@code SimpleDateFormat} to determine
* the bucket path.
*/
public DateTimeBucketer(String formatString) {
this.formatString = formatString;
}
@Override
public String getBucketId(T element, Context context) {
if (dateFormatter == null) {
dateFormatter = new SimpleDateFormat(formatString);
}
return dateFormatter.format(new Date(context.currentProcessingTime()));
}
@Override
public String toString() {
return "DateTimeBucketer{" +
"formatString='" + formatString + '\'' +
'}';
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.api.functions.sink.filesystem;
import org.apache.flink.core.fs.FileStatus;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
import org.apache.flink.core.fs.RecoverableWriter;
import org.apache.flink.core.io.SimpleVersionedSerialization;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import java.io.File;
import java.io.IOException;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
/**
* Tests for the {@link BucketStateSerializer}.
*/
public class BucketStateSerializerTest {
private static final String IN_PROGRESS_CONTENT = "writing";
private static final String PENDING_CONTENT = "wrote";
@ClassRule
public static TemporaryFolder tempFolder = new TemporaryFolder();
@Test
public void testSerializationEmpty() throws IOException {
final File testFolder = tempFolder.newFolder();
final FileSystem fs = FileSystem.get(testFolder.toURI());
final RecoverableWriter writer = fs.createRecoverableWriter();
final Path testBucket = new Path(testFolder.getPath(), "test");
final BucketState bucketState = new BucketState(
"test", testBucket, Long.MAX_VALUE, null, new HashMap<>());
final SimpleVersionedSerializer<BucketState> serializer =
new BucketStateSerializer(
writer.getResumeRecoverableSerializer(),
writer.getCommitRecoverableSerializer()
);
byte[] bytes = SimpleVersionedSerialization.writeVersionAndSerialize(serializer, bucketState);
final BucketState recoveredState = SimpleVersionedSerialization.readVersionAndDeSerialize(serializer, bytes);
Assert.assertEquals(testBucket, recoveredState.getBucketPath());
Assert.assertNull(recoveredState.getCurrentInProgress());
Assert.assertTrue(recoveredState.getPendingPerCheckpoint().isEmpty());
}
@Test
public void testSerializationOnlyInProgress() throws IOException {
final File testFolder = tempFolder.newFolder();
final FileSystem fs = FileSystem.get(testFolder.toURI());
final Path testBucket = new Path(testFolder.getPath(), "test");
final RecoverableWriter writer = fs.createRecoverableWriter();
final RecoverableFsDataOutputStream stream = writer.open(testBucket);
stream.write(IN_PROGRESS_CONTENT.getBytes(Charset.forName("UTF-8")));
final RecoverableWriter.ResumeRecoverable current = stream.persist();
final BucketState bucketState = new BucketState(
"test", testBucket, Long.MAX_VALUE, current, new HashMap<>());
final SimpleVersionedSerializer<BucketState> serializer =
new BucketStateSerializer(
writer.getResumeRecoverableSerializer(),
writer.getCommitRecoverableSerializer()
);
final byte[] bytes = SimpleVersionedSerialization.writeVersionAndSerialize(serializer, bucketState);
// to simulate that everything is over for file.
stream.close();
final BucketState recoveredState = SimpleVersionedSerialization.readVersionAndDeSerialize(serializer, bytes);
Assert.assertEquals(testBucket, recoveredState.getBucketPath());
FileStatus[] statuses = fs.listStatus(testBucket.getParent());
Assert.assertEquals(1L, statuses.length);
Assert.assertTrue(
statuses[0].getPath().getPath().startsWith(
(new Path(testBucket.getParent(), ".test.inprogress")).toString())
);
}
@Test
public void testSerializationFull() throws IOException {
final int noOfTasks = 5;
final File testFolder = tempFolder.newFolder();
final FileSystem fs = FileSystem.get(testFolder.toURI());
final RecoverableWriter writer = fs.createRecoverableWriter();
final Path bucketPath = new Path(testFolder.getPath());
// pending for checkpoints
final Map<Long, List<RecoverableWriter.CommitRecoverable>> commitRecoverables = new HashMap<>();
for (int i = 0; i < noOfTasks; i++) {
final List<RecoverableWriter.CommitRecoverable> recoverables = new ArrayList<>();
for (int j = 0; j < 2 + i; j++) {
final Path part = new Path(bucketPath, "part-" + i + '-' + j);
final RecoverableFsDataOutputStream stream = writer.open(part);
stream.write((PENDING_CONTENT + '-' + j).getBytes(Charset.forName("UTF-8")));
recoverables.add(stream.closeForCommit().getRecoverable());
}
commitRecoverables.put((long) i, recoverables);
}
// in-progress
final Path testBucket = new Path(bucketPath, "test-2");
final RecoverableFsDataOutputStream stream = writer.open(testBucket);
stream.write(IN_PROGRESS_CONTENT.getBytes(Charset.forName("UTF-8")));
final RecoverableWriter.ResumeRecoverable current = stream.persist();
final BucketState bucketState = new BucketState(
"test-2", bucketPath, Long.MAX_VALUE, current, commitRecoverables);
final SimpleVersionedSerializer<BucketState> serializer =
new BucketStateSerializer(
writer.getResumeRecoverableSerializer(),
writer.getCommitRecoverableSerializer()
);
stream.close();
byte[] bytes = SimpleVersionedSerialization.writeVersionAndSerialize(serializer, bucketState);
final BucketState recoveredState = SimpleVersionedSerialization.readVersionAndDeSerialize(serializer, bytes);
Assert.assertEquals(bucketPath, recoveredState.getBucketPath());
final Map<Long, List<RecoverableWriter.CommitRecoverable>> recoveredRecoverables = recoveredState.getPendingPerCheckpoint();
Assert.assertEquals(5L, recoveredRecoverables.size());
// recover and commit
for (Map.Entry<Long, List<RecoverableWriter.CommitRecoverable>> entry: recoveredRecoverables.entrySet()) {
for (RecoverableWriter.CommitRecoverable recoverable: entry.getValue()) {
writer.recoverForCommit(recoverable).commit();
}
}
FileStatus[] filestatuses = fs.listStatus(bucketPath);
Set<String> paths = new HashSet<>(filestatuses.length);
for (FileStatus filestatus : filestatuses) {
paths.add(filestatus.getPath().getPath());
}
for (int i = 0; i < noOfTasks; i++) {
for (int j = 0; j < 2 + i; j++) {
final String part = new Path(bucketPath, "part-" + i + '-' + j).toString();
Assert.assertTrue(paths.contains(part));
paths.remove(part);
}
}
// only the in-progress must be left
Assert.assertEquals(1L, paths.size());
// verify that the in-progress file is still there
Assert.assertTrue(paths.iterator().next().startsWith(
(new Path(testBucket.getParent(), ".test-2.inprogress").toString())));
}
@Test
public void testSerializationNullInProgress() throws IOException {
final int noOfTasks = 5;
final File testFolder = tempFolder.newFolder();
final FileSystem fs = FileSystem.get(testFolder.toURI());
final RecoverableWriter writer = fs.createRecoverableWriter();
final Path bucketPath = new Path(testFolder.getPath());
// pending for checkpoints
final Map<Long, List<RecoverableWriter.CommitRecoverable>> commitRecoverables = new HashMap<>();
for (int i = 0; i < noOfTasks; i++) {
final List<RecoverableWriter.CommitRecoverable> recoverables = new ArrayList<>();
for (int j = 0; j < 2 + i; j++) {
final Path part = new Path(bucketPath, "test-" + i + '-' + j);
final RecoverableFsDataOutputStream stream = writer.open(part);
stream.write((PENDING_CONTENT + '-' + j).getBytes(Charset.forName("UTF-8")));
recoverables.add(stream.closeForCommit().getRecoverable());
}
commitRecoverables.put((long) i, recoverables);
}
final RecoverableWriter.ResumeRecoverable current = null;
final BucketState bucketState = new BucketState(
"", bucketPath, Long.MAX_VALUE, current, commitRecoverables);
final SimpleVersionedSerializer<BucketState> serializer = new BucketStateSerializer(
writer.getResumeRecoverableSerializer(),
writer.getCommitRecoverableSerializer()
);
byte[] bytes = SimpleVersionedSerialization.writeVersionAndSerialize(serializer, bucketState);
final BucketState recoveredState = SimpleVersionedSerialization.readVersionAndDeSerialize(serializer, bytes);
Assert.assertEquals(bucketPath, recoveredState.getBucketPath());
Assert.assertNull(recoveredState.getCurrentInProgress());
final Map<Long, List<RecoverableWriter.CommitRecoverable>> recoveredRecoverables = recoveredState.getPendingPerCheckpoint();
Assert.assertEquals(5L, recoveredRecoverables.size());
// recover and commit
for (Map.Entry<Long, List<RecoverableWriter.CommitRecoverable>> entry: recoveredRecoverables.entrySet()) {
for (RecoverableWriter.CommitRecoverable recoverable: entry.getValue()) {
writer.recoverForCommit(recoverable).commit();
}
}
FileStatus[] filestatuses = fs.listStatus(bucketPath);
Set<String> paths = new HashSet<>(filestatuses.length);
for (FileStatus filestatus : filestatuses) {
paths.add(filestatus.getPath().getPath());
}
for (int i = 0; i < noOfTasks; i++) {
for (int j = 0; j < 2 + i; j++) {
final String part = new Path(bucketPath, "test-" + i + '-' + j).toString();
Assert.assertTrue(paths.contains(part));
paths.remove(part);
}
}
// only the in-progress must be left
Assert.assertTrue(paths.isEmpty());
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册