From 33f737e7a0dec46fc0235695d3a37d6d95ab8f4a Mon Sep 17 00:00:00 2001 From: He Wang Date: Thu, 23 Jun 2022 14:07:50 +0800 Subject: [PATCH] check value when update checkpoint (#49) --- .../clogproxy/client/connection/ClientHandler.java | 2 ++ .../clogproxy/client/connection/ClientStream.java | 9 ++++++++- 2 files changed, 10 insertions(+), 1 deletion(-) diff --git a/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/connection/ClientHandler.java b/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/connection/ClientHandler.java index 66b0404..0b97ea6 100644 --- a/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/connection/ClientHandler.java +++ b/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/connection/ClientHandler.java @@ -342,6 +342,8 @@ public class ClientHandler extends ChannelInboundHandlerAdapter { break; } catch (InterruptedException e) { // do nothing + } catch (IllegalArgumentException e) { + logger.error("Failed to update checkpoint for log message: " + logMessage, e); } } diff --git a/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/connection/ClientStream.java b/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/connection/ClientStream.java index 15f3d53..a16cb90 100644 --- a/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/connection/ClientStream.java +++ b/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/connection/ClientStream.java @@ -296,7 +296,14 @@ public class ClientStream { * @param checkpointString Checkpoint string. */ public void setCheckpointString(String checkpointString) { - this.checkpointString = checkpointString; + long timestamp = Long.parseLong(checkpointString); + if (timestamp < 0) { + throw new IllegalArgumentException( + "Update checkpoint with invalid value: " + timestamp); + } + if (this.checkpointString == null || Long.parseLong(this.checkpointString) < timestamp) { + this.checkpointString = checkpointString; + } } /** -- GitLab