diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2Serializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2Serializer.java index c8d695f380d1198f0b8cf8d33d2f844ff4337fb8..4cbbfcfba8b5b17490cfa9560172fe4ebd0f1d77 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2Serializer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2Serializer.java @@ -39,6 +39,7 @@ import java.io.ByteArrayOutputStream; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -391,8 +392,17 @@ class SavepointV2Serializer implements SavepointSerializer { Map sharedStates = deserializeStreamStateHandleMap(dis); Map privateStates = deserializeStreamStateHandleMap(dis); + UUID uuid; + + try { + uuid = UUID.fromString(backendId); + } catch (Exception ex) { + // compatibility with old format pre FLINK-6964: + uuid = UUID.nameUUIDFromBytes(backendId.getBytes(StandardCharsets.UTF_8)); + } + return new IncrementalKeyedStateHandle( - UUID.fromString(backendId), + uuid, keyGroupRange, checkpointId, sharedStates, diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala index 48c4d858c73a1d00d0ce51238a32bd398938590a..833cb61d6fe8e2ebedf8dd393d78b2f7bb5d4791 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala @@ -378,12 +378,6 @@ class TestingCluster( def requestCheckpoint(jobId: JobID, options : CheckpointOptions): String = { val jobManagerGateway = getLeaderGateway(timeout) - // wait until the cluster is ready to take a checkpoint. - val allRunning = jobManagerGateway.ask( - TestingJobManagerMessages.WaitForAllVerticesToBeRunning(jobId), timeout) - - Await.ready(allRunning, timeout) - // trigger checkpoint val result = Await.result( jobManagerGateway.ask(CheckpointRequest(jobId, options), timeout), timeout) @@ -395,16 +389,9 @@ class TestingCluster( // failed because tasks were not ready.This would not be required if // TestingJobManagerMessages.WaitForAllVerticesToBeRunning(...) works // properly. - if (fail.cause != null) { - val innerCause = fail.cause.getCause - if (innerCause != null - && innerCause.getMessage.contains("tasks not ready")) { - // retry if the tasks are not ready yet. - Thread.sleep(50) - return requestCheckpoint(jobId, options) - } - } - throw new IOException(fail.cause) + LOG.info("Test checkpoint attempt failed. Retry ...", fail.cause) + Thread.sleep(50) + requestCheckpoint(jobId, options) } case _ => throw new IllegalStateException("Trigger checkpoint failed") }