[FLINK-17593] Turn BucketStateSerializerTest into an upgrade test

We need this for future changes to the serialization format.
上级 c3a4a124
......@@ -269,7 +269,14 @@ public class Bucket<IN, BucketID> {
}
}
return new BucketState<>(bucketId, bucketPath, inProgressFileCreationTime, inProgressResumable, pendingPartsPerCheckpoint);
// It's important to create a defensive copy here! Otherwise, later calls to
// onSuccessfulCompletionOfCheckpoint would change a BucketState that we already gave out.
final NavigableMap<Long, List<CommitRecoverable>> copiedPendingPartsPerCheckpoint = new TreeMap<>();
for (Map.Entry<Long, List<CommitRecoverable>> checkpoint : pendingPartsPerCheckpoint.entrySet()) {
copiedPendingPartsPerCheckpoint.put(checkpoint.getKey(), new ArrayList<>(checkpoint.getValue()));
}
return new BucketState<>(bucketId, bucketPath, inProgressFileCreationTime, inProgressResumable, copiedPendingPartsPerCheckpoint);
}
private void prepareBucketForCheckpointing(long checkpointId) throws IOException {
......
......@@ -1458,6 +1458,7 @@ under the License.
<exclude>**/src/test/resources/*-snapshot</exclude>
<exclude>**/src/test/resources/*.snapshot</exclude>
<exclude>**/src/test/resources/*-savepoint</exclude>
<exclude>**/src/test/resources/bucket-state-migration-test/**</exclude>
<exclude>flink-core/src/test/resources/serialized-kryo-serializer-1.3</exclude>
<exclude>flink-core/src/test/resources/type-without-avro-serialized-using-kryo</exclude>
<exclude>flink-formats/flink-avro/src/test/resources/flink-1.4-serializer-java-serialized</exclude>
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册