From 1a92566f92171dcf06d4c03dc467912d82f2e69d Mon Sep 17 00:00:00 2001 From: He Wang Date: Thu, 19 May 2022 12:13:28 +0800 Subject: [PATCH] update checkpoint for record queue (#40) --- .../clogproxy/client/connection/ClientHandler.java | 1 + .../clogproxy/client/connection/ClientStream.java | 9 +++++++++ 2 files changed, 10 insertions(+) diff --git a/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/connection/ClientHandler.java b/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/connection/ClientHandler.java index 35b3d05..c9be47e 100644 --- a/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/connection/ClientHandler.java +++ b/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/connection/ClientHandler.java @@ -334,6 +334,7 @@ public class ClientHandler extends ChannelInboundHandlerAdapter { while (true) { try { recordQueue.put(new StreamContext.TransferPacket(logMessage)); + stream.setCheckpointString(logMessage.getTimestamp()); break; } catch (InterruptedException e) { // do nothing diff --git a/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/connection/ClientStream.java b/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/connection/ClientStream.java index 20a7a37..e9fc042 100644 --- a/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/connection/ClientStream.java +++ b/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/connection/ClientStream.java @@ -302,6 +302,15 @@ public class ClientStream { } } + /** + * Set checkpoint string. + * + * @param checkpointString Checkpoint string. + */ + public void setCheckpointString(String checkpointString) { + this.checkpointString = checkpointString; + } + /** * Add a {@link RecordListener} to {@link #listeners}. * -- GitLab