未验证 提交 16694485 编写于 作者: G Git_Yang 提交者: GitHub

[ISSUE #2865] Batch message send bug fix (#2866)

* [DLedgerCommitLog] Batch message send bug fix
Signed-off-by: Nzhangyang <Git_Yang@163.com>

* [DLedgerCommitLog] Batch message send bug fix2
Signed-off-by: Nzhangyang <Git_Yang@163.com>
上级 b4240d5c
...@@ -426,17 +426,18 @@ public class DLedgerCommitLog extends CommitLog { ...@@ -426,17 +426,18 @@ public class DLedgerCommitLog extends CommitLog {
AppendFuture<AppendEntryResponse> dledgerFuture; AppendFuture<AppendEntryResponse> dledgerFuture;
EncodeResult encodeResult; EncodeResult encodeResult;
encodeResult = this.messageSerializer.serialize(msg);
if (encodeResult.status != AppendMessageStatus.PUT_OK) {
return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, new AppendMessageResult(encodeResult.status));
}
putMessageLock.lock(); //spin or ReentrantLock ,depending on store config putMessageLock.lock(); //spin or ReentrantLock ,depending on store config
long elapsedTimeInLock; long elapsedTimeInLock;
long queueOffset; long queueOffset;
try { try {
beginTimeInDledgerLock = this.defaultMessageStore.getSystemClock().now(); beginTimeInDledgerLock = this.defaultMessageStore.getSystemClock().now();
encodeResult = this.messageSerializer.serialize(msg);
queueOffset = getQueueOffsetByKey(encodeResult.queueOffsetKey, tranType); queueOffset = getQueueOffsetByKey(encodeResult.queueOffsetKey, tranType);
encodeResult.setQueueOffsetKey(queueOffset); encodeResult.setQueueOffsetKey(queueOffset, false);
if (encodeResult.status != AppendMessageStatus.PUT_OK) {
return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, new AppendMessageResult(encodeResult.status));
}
AppendEntryRequest request = new AppendEntryRequest(); AppendEntryRequest request = new AppendEntryRequest();
request.setGroup(dLedgerConfig.getGroup()); request.setGroup(dLedgerConfig.getGroup());
request.setRemoteId(dLedgerServer.getMemberState().getSelfId()); request.setRemoteId(dLedgerServer.getMemberState().getSelfId());
...@@ -542,6 +543,12 @@ public class DLedgerCommitLog extends CommitLog { ...@@ -542,6 +543,12 @@ public class DLedgerCommitLog extends CommitLog {
BatchAppendFuture<AppendEntryResponse> dledgerFuture; BatchAppendFuture<AppendEntryResponse> dledgerFuture;
EncodeResult encodeResult; EncodeResult encodeResult;
encodeResult = this.messageSerializer.serialize(messageExtBatch);
if (encodeResult.status != AppendMessageStatus.PUT_OK) {
return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, new AppendMessageResult(encodeResult
.status));
}
putMessageLock.lock(); //spin or ReentrantLock ,depending on store config putMessageLock.lock(); //spin or ReentrantLock ,depending on store config
msgIdBuilder.setLength(0); msgIdBuilder.setLength(0);
long elapsedTimeInLock; long elapsedTimeInLock;
...@@ -549,12 +556,8 @@ public class DLedgerCommitLog extends CommitLog { ...@@ -549,12 +556,8 @@ public class DLedgerCommitLog extends CommitLog {
long msgNum = 0; long msgNum = 0;
try { try {
beginTimeInDledgerLock = this.defaultMessageStore.getSystemClock().now(); beginTimeInDledgerLock = this.defaultMessageStore.getSystemClock().now();
encodeResult = this.messageSerializer.serialize(messageExtBatch); queueOffset = getQueueOffsetByKey(encodeResult.queueOffsetKey, tranType);
queueOffset = topicQueueTable.get(encodeResult.queueOffsetKey); encodeResult.setQueueOffsetKey(queueOffset, true);
if (encodeResult.status != AppendMessageStatus.PUT_OK) {
return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, new AppendMessageResult(encodeResult
.status));
}
BatchAppendEntryRequest request = new BatchAppendEntryRequest(); BatchAppendEntryRequest request = new BatchAppendEntryRequest();
request.setGroup(dLedgerConfig.getGroup()); request.setGroup(dLedgerConfig.getGroup());
request.setRemoteId(dLedgerServer.getMemberState().getSelfId()); request.setRemoteId(dLedgerServer.getMemberState().getSelfId());
...@@ -664,7 +667,7 @@ public class DLedgerCommitLog extends CommitLog { ...@@ -664,7 +667,7 @@ public class DLedgerCommitLog extends CommitLog {
try { try {
beginTimeInDledgerLock = this.defaultMessageStore.getSystemClock().now(); beginTimeInDledgerLock = this.defaultMessageStore.getSystemClock().now();
queueOffset = getQueueOffsetByKey(encodeResult.queueOffsetKey, tranType); queueOffset = getQueueOffsetByKey(encodeResult.queueOffsetKey, tranType);
encodeResult.setQueueOffsetKey(queueOffset); encodeResult.setQueueOffsetKey(queueOffset, false);
AppendEntryRequest request = new AppendEntryRequest(); AppendEntryRequest request = new AppendEntryRequest();
request.setGroup(dLedgerConfig.getGroup()); request.setGroup(dLedgerConfig.getGroup());
request.setRemoteId(dLedgerServer.getMemberState().getSelfId()); request.setRemoteId(dLedgerServer.getMemberState().getSelfId());
...@@ -779,7 +782,8 @@ public class DLedgerCommitLog extends CommitLog { ...@@ -779,7 +782,8 @@ public class DLedgerCommitLog extends CommitLog {
long msgNum = 0; long msgNum = 0;
try { try {
beginTimeInDledgerLock = this.defaultMessageStore.getSystemClock().now(); beginTimeInDledgerLock = this.defaultMessageStore.getSystemClock().now();
queueOffset = topicQueueTable.get(encodeResult.queueOffsetKey); queueOffset = getQueueOffsetByKey(encodeResult.queueOffsetKey, tranType);
encodeResult.setQueueOffsetKey(queueOffset, true);
BatchAppendEntryRequest request = new BatchAppendEntryRequest(); BatchAppendEntryRequest request = new BatchAppendEntryRequest();
request.setGroup(dLedgerConfig.getGroup()); request.setGroup(dLedgerConfig.getGroup());
request.setRemoteId(dLedgerServer.getMemberState().getSelfId()); request.setRemoteId(dLedgerServer.getMemberState().getSelfId());
...@@ -957,8 +961,15 @@ public class DLedgerCommitLog extends CommitLog { ...@@ -957,8 +961,15 @@ public class DLedgerCommitLog extends CommitLog {
this.queueOffsetKey = queueOffsetKey; this.queueOffsetKey = queueOffsetKey;
} }
public void setQueueOffsetKey(long offset) { public void setQueueOffsetKey(long offset, boolean isBatch) {
data.putLong(MessageDecoder.QUEUE_OFFSET_POSITION, offset); if (!isBatch) {
this.data.putLong(MessageDecoder.QUEUE_OFFSET_POSITION, offset);
return;
}
for (byte[] data : batchData) {
ByteBuffer.wrap(data).putLong(MessageDecoder.QUEUE_OFFSET_POSITION, offset++);
}
} }
public byte[] getData() { public byte[] getData() {
...@@ -977,8 +988,6 @@ public class DLedgerCommitLog extends CommitLog { ...@@ -977,8 +988,6 @@ public class DLedgerCommitLog extends CommitLog {
// The maximum length of the message // The maximum length of the message
private final int maxMessageSize; private final int maxMessageSize;
// Build Message Key
private final StringBuilder keyBuilder = new StringBuilder();
MessageSerializer(final int size) { MessageSerializer(final int size) {
this.maxMessageSize = size; this.maxMessageSize = size;
...@@ -1079,17 +1088,7 @@ public class DLedgerCommitLog extends CommitLog { ...@@ -1079,17 +1088,7 @@ public class DLedgerCommitLog extends CommitLog {
} }
public EncodeResult serialize(final MessageExtBatch messageExtBatch) { public EncodeResult serialize(final MessageExtBatch messageExtBatch) {
keyBuilder.setLength(0); String key = messageExtBatch.getTopic() + "-" + messageExtBatch.getQueueId();
keyBuilder.append(messageExtBatch.getTopic());
keyBuilder.append('-');
keyBuilder.append(messageExtBatch.getQueueId());
String key = keyBuilder.toString();
Long queueOffset = DLedgerCommitLog.this.topicQueueTable.get(key);
if (null == queueOffset) {
queueOffset = 0L;
DLedgerCommitLog.this.topicQueueTable.put(key, queueOffset);
}
int totalMsgLen = 0; int totalMsgLen = 0;
ByteBuffer messagesByteBuff = messageExtBatch.wrap(); ByteBuffer messagesByteBuff = messageExtBatch.wrap();
...@@ -1154,7 +1153,7 @@ public class DLedgerCommitLog extends CommitLog { ...@@ -1154,7 +1153,7 @@ public class DLedgerCommitLog extends CommitLog {
// 5 FLAG // 5 FLAG
msgStoreItemMemory.putInt(flag); msgStoreItemMemory.putInt(flag);
// 6 QUEUEOFFSET // 6 QUEUEOFFSET
msgStoreItemMemory.putLong(queueOffset++); msgStoreItemMemory.putLong(0L);
// 7 PHYSICALOFFSET // 7 PHYSICALOFFSET
msgStoreItemMemory.putLong(0); msgStoreItemMemory.putLong(0);
// 8 SYSFLAG // 8 SYSFLAG
...@@ -1210,6 +1209,7 @@ public class DLedgerCommitLog extends CommitLog { ...@@ -1210,6 +1209,7 @@ public class DLedgerCommitLog extends CommitLog {
this.sbr = sbr; this.sbr = sbr;
} }
@Override
public synchronized void release() { public synchronized void release() {
super.release(); super.release();
if (sbr != null) { if (sbr != null) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册