未验证 提交 33f737e7 编写于 作者: H He Wang 提交者: GitHub

check value when update checkpoint (#49)

上级 1dc838eb
...@@ -342,6 +342,8 @@ public class ClientHandler extends ChannelInboundHandlerAdapter { ...@@ -342,6 +342,8 @@ public class ClientHandler extends ChannelInboundHandlerAdapter {
break; break;
} catch (InterruptedException e) { } catch (InterruptedException e) {
// do nothing // do nothing
} catch (IllegalArgumentException e) {
logger.error("Failed to update checkpoint for log message: " + logMessage, e);
} }
} }
......
...@@ -296,8 +296,15 @@ public class ClientStream { ...@@ -296,8 +296,15 @@ public class ClientStream {
* @param checkpointString Checkpoint string. * @param checkpointString Checkpoint string.
*/ */
public void setCheckpointString(String checkpointString) { public void setCheckpointString(String checkpointString) {
long timestamp = Long.parseLong(checkpointString);
if (timestamp < 0) {
throw new IllegalArgumentException(
"Update checkpoint with invalid value: " + timestamp);
}
if (this.checkpointString == null || Long.parseLong(this.checkpointString) < timestamp) {
this.checkpointString = checkpointString; this.checkpointString = checkpointString;
} }
}
/** /**
* Add a {@link RecordListener} to {@link #listeners}. * Add a {@link RecordListener} to {@link #listeners}.
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册