未验证 提交 92bcf94e 编写于 作者: H He Wang 提交者: GitHub

fix parseRecord (#50)

* fix parseRecord

* set checkpoint after put successfully
上级 33f737e7
...@@ -309,25 +309,23 @@ public class ClientHandler extends ChannelInboundHandlerAdapter { ...@@ -309,25 +309,23 @@ public class ClientHandler extends ChannelInboundHandlerAdapter {
int offset = 0; int offset = 0;
while (offset < bytes.length) { while (offset < bytes.length) {
int dataLength = Conversion.byteArrayToInt(bytes, offset + 4, 0, 0, 4); int dataLength = Conversion.byteArrayToInt(bytes, offset + 4, 0, 0, 4);
LogMessage logMessage;
try {
/* /*
* We must copy a byte array and call parse after then, * We must copy a byte array and call parse after then,
* or got a !!!RIDICULOUS EXCEPTION!!!, * or got a !!!RIDICULOUS EXCEPTION!!!,
* if we wrap an unpooled buffer with offset and call setByteBuf just as same as `parse` function do. * if we wrap an unpooled buffer with offset and call setByteBuf just as same as `parse` function do.
*/ */
logMessage = new LogMessage(false); LogMessage logMessage = new LogMessage(false);
byte[] data = new byte[dataLength + 8]; byte[] data = new byte[dataLength + 8];
System.arraycopy(bytes, offset, data, 0, data.length); System.arraycopy(bytes, offset, data, 0, data.length);
try {
logMessage.parse(data); logMessage.parse(data);
} catch (Exception e) {
if (config.isIgnoreUnknownRecordType()) { if (config.isIgnoreUnknownRecordType()) {
// unsupported type, ignore // unsupported type, ignore
logger.debug("Unsupported record type: {}", logMessage); logger.debug("Unsupported record type: {}", logMessage);
offset += (8 + dataLength); offset += (8 + dataLength);
continue; continue;
} }
} catch (Exception e) {
throw new LogProxyClientException(ErrorCode.E_PARSE, e); throw new LogProxyClientException(ErrorCode.E_PARSE, e);
} }
...@@ -338,14 +336,17 @@ public class ClientHandler extends ChannelInboundHandlerAdapter { ...@@ -338,14 +336,17 @@ public class ClientHandler extends ChannelInboundHandlerAdapter {
while (true) { while (true) {
try { try {
recordQueue.put(new StreamContext.TransferPacket(logMessage)); recordQueue.put(new StreamContext.TransferPacket(logMessage));
stream.setCheckpointString(logMessage.getSafeTimestamp());
break; break;
} catch (InterruptedException e) { } catch (InterruptedException e) {
// do nothing // do nothing
}
}
try {
stream.setCheckpointString(logMessage.getSafeTimestamp());
} catch (IllegalArgumentException e) { } catch (IllegalArgumentException e) {
logger.error("Failed to update checkpoint for log message: " + logMessage, e); logger.error("Failed to update checkpoint for log message: " + logMessage, e);
} }
}
offset += (8 + dataLength); offset += (8 + dataLength);
} }
......
...@@ -297,7 +297,7 @@ public class ClientStream { ...@@ -297,7 +297,7 @@ public class ClientStream {
*/ */
public void setCheckpointString(String checkpointString) { public void setCheckpointString(String checkpointString) {
long timestamp = Long.parseLong(checkpointString); long timestamp = Long.parseLong(checkpointString);
if (timestamp < 0) { if (timestamp <= 0) {
throw new IllegalArgumentException( throw new IllegalArgumentException(
"Update checkpoint with invalid value: " + timestamp); "Update checkpoint with invalid value: " + timestamp);
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册