diff --git a/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/connection/ClientHandler.java b/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/connection/ClientHandler.java index 0b97ea6a6e26dce8c91bb267e309cb1521ae42dd..1433cc25cc56cf36d30d67be0bfa13e44257b72c 100644 --- a/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/connection/ClientHandler.java +++ b/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/connection/ClientHandler.java @@ -309,25 +309,23 @@ public class ClientHandler extends ChannelInboundHandlerAdapter { int offset = 0; while (offset < bytes.length) { int dataLength = Conversion.byteArrayToInt(bytes, offset + 4, 0, 0, 4); - LogMessage logMessage; + /* + * We must copy a byte array and call parse after then, + * or got a !!!RIDICULOUS EXCEPTION!!!, + * if we wrap an unpooled buffer with offset and call setByteBuf just as same as `parse` function do. + */ + LogMessage logMessage = new LogMessage(false); + byte[] data = new byte[dataLength + 8]; + System.arraycopy(bytes, offset, data, 0, data.length); try { - /* - * We must copy a byte array and call parse after then, - * or got a !!!RIDICULOUS EXCEPTION!!!, - * if we wrap an unpooled buffer with offset and call setByteBuf just as same as `parse` function do. - */ - logMessage = new LogMessage(false); - byte[] data = new byte[dataLength + 8]; - System.arraycopy(bytes, offset, data, 0, data.length); logMessage.parse(data); + } catch (Exception e) { if (config.isIgnoreUnknownRecordType()) { // unsupported type, ignore logger.debug("Unsupported record type: {}", logMessage); offset += (8 + dataLength); continue; } - - } catch (Exception e) { throw new LogProxyClientException(ErrorCode.E_PARSE, e); } @@ -338,15 +336,18 @@ public class ClientHandler extends ChannelInboundHandlerAdapter { while (true) { try { recordQueue.put(new StreamContext.TransferPacket(logMessage)); - stream.setCheckpointString(logMessage.getSafeTimestamp()); break; } catch (InterruptedException e) { // do nothing - } catch (IllegalArgumentException e) { - logger.error("Failed to update checkpoint for log message: " + logMessage, e); } } + try { + stream.setCheckpointString(logMessage.getSafeTimestamp()); + } catch (IllegalArgumentException e) { + logger.error("Failed to update checkpoint for log message: " + logMessage, e); + } + offset += (8 + dataLength); } } diff --git a/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/connection/ClientStream.java b/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/connection/ClientStream.java index a16cb90d8c31410d87c926c6780e860e97506f31..a51a0c40552fd521d6ca6564bd337797e70793fa 100644 --- a/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/connection/ClientStream.java +++ b/logproxy-client/src/main/java/com/oceanbase/clogproxy/client/connection/ClientStream.java @@ -297,7 +297,7 @@ public class ClientStream { */ public void setCheckpointString(String checkpointString) { long timestamp = Long.parseLong(checkpointString); - if (timestamp < 0) { + if (timestamp <= 0) { throw new IllegalArgumentException( "Update checkpoint with invalid value: " + timestamp); }