diff --git a/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/connection/ClientHandler.java b/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/connection/ClientHandler.java index 66b0404d45180c1bb19deb1f18ea9db2b56eef9a..0b97ea6a6e26dce8c91bb267e309cb1521ae42dd 100644 --- a/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/connection/ClientHandler.java +++ b/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/connection/ClientHandler.java @@ -342,6 +342,8 @@ public class ClientHandler extends ChannelInboundHandlerAdapter { break; } catch (InterruptedException e) { // do nothing + } catch (IllegalArgumentException e) { + logger.error("Failed to update checkpoint for log message: " + logMessage, e); } } diff --git a/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/connection/ClientStream.java b/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/connection/ClientStream.java index 15f3d53fd7ef9377c9d84d2ec99d25f804ebf8e9..a16cb90d8c31410d87c926c6780e860e97506f31 100644 --- a/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/connection/ClientStream.java +++ b/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/connection/ClientStream.java @@ -296,7 +296,14 @@ public class ClientStream { * @param checkpointString Checkpoint string. */ public void setCheckpointString(String checkpointString) { - this.checkpointString = checkpointString; + long timestamp = Long.parseLong(checkpointString); + if (timestamp < 0) { + throw new IllegalArgumentException( + "Update checkpoint with invalid value: " + timestamp); + } + if (this.checkpointString == null || Long.parseLong(this.checkpointString) < timestamp) { + this.checkpointString = checkpointString; + } } /**