From 43b8e57b82b4fb086a8b307c6ca3b4c2b8086532 Mon Sep 17 00:00:00 2001 From: Gyula Fora Date: Sun, 8 Nov 2015 23:45:32 +0100 Subject: [PATCH] [FLINK-2924] [streaming] Improve compacting logic --- .../streaming/state/LazyDbKvState.java | 25 ++++++++++++------- .../contrib/streaming/state/MySqlAdapter.java | 2 ++ .../contrib/streaming/state/DerbyAdapter.java | 3 ++- 3 files changed, 20 insertions(+), 10 deletions(-) diff --git a/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/LazyDbKvState.java b/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/LazyDbKvState.java index e7f90f54921..12a3332dca3 100644 --- a/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/LazyDbKvState.java +++ b/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/LazyDbKvState.java @@ -96,6 +96,8 @@ public class LazyDbKvState implements KvState, Check private long nextTs; private Map completedCheckpoints = new HashMap<>(); + private long lastCompactedTs; + // ------------------------------------------------------ /** @@ -104,14 +106,15 @@ public class LazyDbKvState implements KvState, Check */ public LazyDbKvState(String kvStateId, boolean compact, ShardedConnection cons, DbBackendConfig conf, TypeSerializer keySerializer, TypeSerializer valueSerializer, V defaultValue) throws IOException { - this(kvStateId, compact, cons, conf, keySerializer, valueSerializer, defaultValue, 1); + this(kvStateId, compact, cons, conf, keySerializer, valueSerializer, defaultValue, 1, 0); } /** * Initialize the {@link LazyDbKvState} from a snapshot. */ public LazyDbKvState(String kvStateId, boolean compact, ShardedConnection cons, final DbBackendConfig conf, - TypeSerializer keySerializer, TypeSerializer valueSerializer, V defaultValue, long nextTs) + TypeSerializer keySerializer, TypeSerializer valueSerializer, V defaultValue, long nextTs, + long lastCompactedTs) throws IOException { this.kvStateId = kvStateId; @@ -130,6 +133,7 @@ public class LazyDbKvState implements KvState, Check this.sqlRetrySleep = conf.getSleepBetweenSqlRetries(); this.nextTs = nextTs; + this.lastCompactedTs = lastCompactedTs; this.cache = new StateCache(conf.getKvCacheSize(), conf.getNumElementsToEvict()); @@ -192,7 +196,7 @@ public class LazyDbKvState implements KvState, Check nextTs = timestamp + 1; completedCheckpoints.put(checkpointId, timestamp); - return new DbKvStateSnapshot(kvStateId, timestamp); + return new DbKvStateSnapshot(kvStateId, timestamp, lastCompactedTs); } /** @@ -245,8 +249,9 @@ public class LazyDbKvState implements KvState, Check if (compactEvery > 0 && compact && checkpointId % compactEvery == 0) { try { for (Connection c : connections.connections()) { - dbAdapter.compactKvStates(kvStateId, c, 0, ts); + dbAdapter.compactKvStates(kvStateId, c, lastCompactedTs, ts); } + lastCompactedTs = ts; if (LOG.isDebugEnabled()) { LOG.debug("State succesfully compacted for {}.", kvStateId); } @@ -294,10 +299,10 @@ public class LazyDbKvState implements KvState, Check } /** - * Snapshot that stores a specific checkpoint id and state id, and also + * Snapshot that stores a specific checkpoint timestamp and state id, and also * rolls back the database to that point upon restore. The rollback is done - * by removing all state checkpoints that have ids between the checkpoint - * and recovery id. + * by removing all state checkpoints that have timestamps between the checkpoint + * and recovery timestamp. * */ private static class DbKvStateSnapshot implements KvStateSnapshot { @@ -306,10 +311,12 @@ public class LazyDbKvState implements KvState, Check private final String kvStateId; private final long checkpointTimestamp; + private final long lastCompactedTimestamp; - public DbKvStateSnapshot(String kvStateId, long checkpointTimestamp) { + public DbKvStateSnapshot(String kvStateId, long checkpointTimestamp, long lastCompactedTs) { this.checkpointTimestamp = checkpointTimestamp; this.kvStateId = kvStateId; + this.lastCompactedTimestamp = lastCompactedTs; } @Override @@ -346,7 +353,7 @@ public class LazyDbKvState implements KvState, Check // Restore the KvState LazyDbKvState restored = new LazyDbKvState(kvStateId, cleanup, stateBackend.getConnections(), stateBackend.getConfiguration(), keySerializer, valueSerializer, - defaultValue, recoveryTimestamp); + defaultValue, recoveryTimestamp, lastCompactedTimestamp); if (LOG.isDebugEnabled()) { LOG.debug("KV state({},{}) restored.", kvStateId, recoveryTimestamp); diff --git a/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/MySqlAdapter.java b/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/MySqlAdapter.java index 760ee4f0307..c47d6f49884 100644 --- a/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/MySqlAdapter.java +++ b/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/MySqlAdapter.java @@ -141,6 +141,7 @@ public class MySqlAdapter implements DbAdapter { return "SELECT v" + " FROM kvstate_" + stateId + " WHERE k = ?" + + " AND timestamp <= ?" + " ORDER BY timestamp DESC LIMIT 1"; } @@ -148,6 +149,7 @@ public class MySqlAdapter implements DbAdapter { public byte[] lookupKey(String stateId, PreparedStatement lookupStatement, byte[] key, long lookupTs) throws SQLException { lookupStatement.setBytes(1, key); + lookupStatement.setLong(2, lookupTs); ResultSet res = lookupStatement.executeQuery(); diff --git a/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/state/DerbyAdapter.java b/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/state/DerbyAdapter.java index 2b0b2321fe2..d4fc838c83a 100644 --- a/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/state/DerbyAdapter.java +++ b/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/state/DerbyAdapter.java @@ -98,7 +98,8 @@ public class DerbyAdapter extends MySqlAdapter { validateStateId(stateId); return "SELECT v " + "FROM kvstate_" + stateId + " WHERE k = ? " - + "ORDER BY timestamp DESC"; + + " AND timestamp <= ?" + + " ORDER BY timestamp DESC"; } @Override -- GitLab