diff --git a/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/connection/ClientHandler.java b/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/connection/ClientHandler.java index 35b3d05990921846de2a2298c9de7248046e5e4e..c9be47e2a5b065d7162df1cc38933f0b6da693a4 100644 --- a/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/connection/ClientHandler.java +++ b/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/connection/ClientHandler.java @@ -334,6 +334,7 @@ public class ClientHandler extends ChannelInboundHandlerAdapter { while (true) { try { recordQueue.put(new StreamContext.TransferPacket(logMessage)); + stream.setCheckpointString(logMessage.getTimestamp()); break; } catch (InterruptedException e) { // do nothing diff --git a/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/connection/ClientStream.java b/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/connection/ClientStream.java index 20a7a3764ae04a3a7fab687ed203b9942d775f2a..e9fc0423e6d72a48887c543cb389121acfda6071 100644 --- a/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/connection/ClientStream.java +++ b/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/connection/ClientStream.java @@ -302,6 +302,15 @@ public class ClientStream { } } + /** + * Set checkpoint string. + * + * @param checkpointString Checkpoint string. + */ + public void setCheckpointString(String checkpointString) { + this.checkpointString = checkpointString; + } + /** * Add a {@link RecordListener} to {@link #listeners}. *