From 2ba5f8733694a4e52a20b12d949b8611026065d2 Mon Sep 17 00:00:00 2001 From: Stefan Richter Date: Thu, 20 Jul 2017 11:03:18 +0200 Subject: [PATCH] [hotfix] Backwards compatible deserialization of RocksDB backend UUIDs --- .../savepoint/SavepointV2Serializer.java | 12 +++++++++++- .../runtime/testingUtils/TestingCluster.scala | 19 +++---------------- 2 files changed, 14 insertions(+), 17 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2Serializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2Serializer.java index c8d695f380d..4cbbfcfba8b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2Serializer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2Serializer.java @@ -39,6 +39,7 @@ import java.io.ByteArrayOutputStream; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -391,8 +392,17 @@ class SavepointV2Serializer implements SavepointSerializer { Map sharedStates = deserializeStreamStateHandleMap(dis); Map privateStates = deserializeStreamStateHandleMap(dis); + UUID uuid; + + try { + uuid = UUID.fromString(backendId); + } catch (Exception ex) { + // compatibility with old format pre FLINK-6964: + uuid = UUID.nameUUIDFromBytes(backendId.getBytes(StandardCharsets.UTF_8)); + } + return new IncrementalKeyedStateHandle( - UUID.fromString(backendId), + uuid, keyGroupRange, checkpointId, sharedStates, diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala index 48c4d858c73..833cb61d6fe 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala @@ -378,12 +378,6 @@ class TestingCluster( def requestCheckpoint(jobId: JobID, options : CheckpointOptions): String = { val jobManagerGateway = getLeaderGateway(timeout) - // wait until the cluster is ready to take a checkpoint. - val allRunning = jobManagerGateway.ask( - TestingJobManagerMessages.WaitForAllVerticesToBeRunning(jobId), timeout) - - Await.ready(allRunning, timeout) - // trigger checkpoint val result = Await.result( jobManagerGateway.ask(CheckpointRequest(jobId, options), timeout), timeout) @@ -395,16 +389,9 @@ class TestingCluster( // failed because tasks were not ready.This would not be required if // TestingJobManagerMessages.WaitForAllVerticesToBeRunning(...) works // properly. - if (fail.cause != null) { - val innerCause = fail.cause.getCause - if (innerCause != null - && innerCause.getMessage.contains("tasks not ready")) { - // retry if the tasks are not ready yet. - Thread.sleep(50) - return requestCheckpoint(jobId, options) - } - } - throw new IOException(fail.cause) + LOG.info("Test checkpoint attempt failed. Retry ...", fail.cause) + Thread.sleep(50) + requestCheckpoint(jobId, options) } case _ => throw new IllegalStateException("Trigger checkpoint failed") } -- GitLab