未验证 提交 129d7e2b 编写于 作者: V von gosling 提交者: GitHub

Revert "[ISSUE #2865] Batch message send bug fix (#2866)" (#2912)

This reverts commit 16694485.
上级 16694485
...@@ -426,18 +426,17 @@ public class DLedgerCommitLog extends CommitLog { ...@@ -426,18 +426,17 @@ public class DLedgerCommitLog extends CommitLog {
AppendFuture<AppendEntryResponse> dledgerFuture; AppendFuture<AppendEntryResponse> dledgerFuture;
EncodeResult encodeResult; EncodeResult encodeResult;
encodeResult = this.messageSerializer.serialize(msg);
if (encodeResult.status != AppendMessageStatus.PUT_OK) {
return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, new AppendMessageResult(encodeResult.status));
}
putMessageLock.lock(); //spin or ReentrantLock ,depending on store config putMessageLock.lock(); //spin or ReentrantLock ,depending on store config
long elapsedTimeInLock; long elapsedTimeInLock;
long queueOffset; long queueOffset;
try { try {
beginTimeInDledgerLock = this.defaultMessageStore.getSystemClock().now(); beginTimeInDledgerLock = this.defaultMessageStore.getSystemClock().now();
encodeResult = this.messageSerializer.serialize(msg);
queueOffset = getQueueOffsetByKey(encodeResult.queueOffsetKey, tranType); queueOffset = getQueueOffsetByKey(encodeResult.queueOffsetKey, tranType);
encodeResult.setQueueOffsetKey(queueOffset, false); encodeResult.setQueueOffsetKey(queueOffset);
if (encodeResult.status != AppendMessageStatus.PUT_OK) {
return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, new AppendMessageResult(encodeResult.status));
}
AppendEntryRequest request = new AppendEntryRequest(); AppendEntryRequest request = new AppendEntryRequest();
request.setGroup(dLedgerConfig.getGroup()); request.setGroup(dLedgerConfig.getGroup());
request.setRemoteId(dLedgerServer.getMemberState().getSelfId()); request.setRemoteId(dLedgerServer.getMemberState().getSelfId());
...@@ -543,12 +542,6 @@ public class DLedgerCommitLog extends CommitLog { ...@@ -543,12 +542,6 @@ public class DLedgerCommitLog extends CommitLog {
BatchAppendFuture<AppendEntryResponse> dledgerFuture; BatchAppendFuture<AppendEntryResponse> dledgerFuture;
EncodeResult encodeResult; EncodeResult encodeResult;
encodeResult = this.messageSerializer.serialize(messageExtBatch);
if (encodeResult.status != AppendMessageStatus.PUT_OK) {
return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, new AppendMessageResult(encodeResult
.status));
}
putMessageLock.lock(); //spin or ReentrantLock ,depending on store config putMessageLock.lock(); //spin or ReentrantLock ,depending on store config
msgIdBuilder.setLength(0); msgIdBuilder.setLength(0);
long elapsedTimeInLock; long elapsedTimeInLock;
...@@ -556,8 +549,12 @@ public class DLedgerCommitLog extends CommitLog { ...@@ -556,8 +549,12 @@ public class DLedgerCommitLog extends CommitLog {
long msgNum = 0; long msgNum = 0;
try { try {
beginTimeInDledgerLock = this.defaultMessageStore.getSystemClock().now(); beginTimeInDledgerLock = this.defaultMessageStore.getSystemClock().now();
queueOffset = getQueueOffsetByKey(encodeResult.queueOffsetKey, tranType); encodeResult = this.messageSerializer.serialize(messageExtBatch);
encodeResult.setQueueOffsetKey(queueOffset, true); queueOffset = topicQueueTable.get(encodeResult.queueOffsetKey);
if (encodeResult.status != AppendMessageStatus.PUT_OK) {
return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, new AppendMessageResult(encodeResult
.status));
}
BatchAppendEntryRequest request = new BatchAppendEntryRequest(); BatchAppendEntryRequest request = new BatchAppendEntryRequest();
request.setGroup(dLedgerConfig.getGroup()); request.setGroup(dLedgerConfig.getGroup());
request.setRemoteId(dLedgerServer.getMemberState().getSelfId()); request.setRemoteId(dLedgerServer.getMemberState().getSelfId());
...@@ -667,7 +664,7 @@ public class DLedgerCommitLog extends CommitLog { ...@@ -667,7 +664,7 @@ public class DLedgerCommitLog extends CommitLog {
try { try {
beginTimeInDledgerLock = this.defaultMessageStore.getSystemClock().now(); beginTimeInDledgerLock = this.defaultMessageStore.getSystemClock().now();
queueOffset = getQueueOffsetByKey(encodeResult.queueOffsetKey, tranType); queueOffset = getQueueOffsetByKey(encodeResult.queueOffsetKey, tranType);
encodeResult.setQueueOffsetKey(queueOffset, false); encodeResult.setQueueOffsetKey(queueOffset);
AppendEntryRequest request = new AppendEntryRequest(); AppendEntryRequest request = new AppendEntryRequest();
request.setGroup(dLedgerConfig.getGroup()); request.setGroup(dLedgerConfig.getGroup());
request.setRemoteId(dLedgerServer.getMemberState().getSelfId()); request.setRemoteId(dLedgerServer.getMemberState().getSelfId());
...@@ -782,8 +779,7 @@ public class DLedgerCommitLog extends CommitLog { ...@@ -782,8 +779,7 @@ public class DLedgerCommitLog extends CommitLog {
long msgNum = 0; long msgNum = 0;
try { try {
beginTimeInDledgerLock = this.defaultMessageStore.getSystemClock().now(); beginTimeInDledgerLock = this.defaultMessageStore.getSystemClock().now();
queueOffset = getQueueOffsetByKey(encodeResult.queueOffsetKey, tranType); queueOffset = topicQueueTable.get(encodeResult.queueOffsetKey);
encodeResult.setQueueOffsetKey(queueOffset, true);
BatchAppendEntryRequest request = new BatchAppendEntryRequest(); BatchAppendEntryRequest request = new BatchAppendEntryRequest();
request.setGroup(dLedgerConfig.getGroup()); request.setGroup(dLedgerConfig.getGroup());
request.setRemoteId(dLedgerServer.getMemberState().getSelfId()); request.setRemoteId(dLedgerServer.getMemberState().getSelfId());
...@@ -961,15 +957,8 @@ public class DLedgerCommitLog extends CommitLog { ...@@ -961,15 +957,8 @@ public class DLedgerCommitLog extends CommitLog {
this.queueOffsetKey = queueOffsetKey; this.queueOffsetKey = queueOffsetKey;
} }
public void setQueueOffsetKey(long offset, boolean isBatch) { public void setQueueOffsetKey(long offset) {
if (!isBatch) { data.putLong(MessageDecoder.QUEUE_OFFSET_POSITION, offset);
this.data.putLong(MessageDecoder.QUEUE_OFFSET_POSITION, offset);
return;
}
for (byte[] data : batchData) {
ByteBuffer.wrap(data).putLong(MessageDecoder.QUEUE_OFFSET_POSITION, offset++);
}
} }
public byte[] getData() { public byte[] getData() {
...@@ -988,6 +977,8 @@ public class DLedgerCommitLog extends CommitLog { ...@@ -988,6 +977,8 @@ public class DLedgerCommitLog extends CommitLog {
// The maximum length of the message // The maximum length of the message
private final int maxMessageSize; private final int maxMessageSize;
// Build Message Key
private final StringBuilder keyBuilder = new StringBuilder();
MessageSerializer(final int size) { MessageSerializer(final int size) {
this.maxMessageSize = size; this.maxMessageSize = size;
...@@ -1088,7 +1079,17 @@ public class DLedgerCommitLog extends CommitLog { ...@@ -1088,7 +1079,17 @@ public class DLedgerCommitLog extends CommitLog {
} }
public EncodeResult serialize(final MessageExtBatch messageExtBatch) { public EncodeResult serialize(final MessageExtBatch messageExtBatch) {
String key = messageExtBatch.getTopic() + "-" + messageExtBatch.getQueueId(); keyBuilder.setLength(0);
keyBuilder.append(messageExtBatch.getTopic());
keyBuilder.append('-');
keyBuilder.append(messageExtBatch.getQueueId());
String key = keyBuilder.toString();
Long queueOffset = DLedgerCommitLog.this.topicQueueTable.get(key);
if (null == queueOffset) {
queueOffset = 0L;
DLedgerCommitLog.this.topicQueueTable.put(key, queueOffset);
}
int totalMsgLen = 0; int totalMsgLen = 0;
ByteBuffer messagesByteBuff = messageExtBatch.wrap(); ByteBuffer messagesByteBuff = messageExtBatch.wrap();
...@@ -1153,7 +1154,7 @@ public class DLedgerCommitLog extends CommitLog { ...@@ -1153,7 +1154,7 @@ public class DLedgerCommitLog extends CommitLog {
// 5 FLAG // 5 FLAG
msgStoreItemMemory.putInt(flag); msgStoreItemMemory.putInt(flag);
// 6 QUEUEOFFSET // 6 QUEUEOFFSET
msgStoreItemMemory.putLong(0L); msgStoreItemMemory.putLong(queueOffset++);
// 7 PHYSICALOFFSET // 7 PHYSICALOFFSET
msgStoreItemMemory.putLong(0); msgStoreItemMemory.putLong(0);
// 8 SYSFLAG // 8 SYSFLAG
...@@ -1209,7 +1210,6 @@ public class DLedgerCommitLog extends CommitLog { ...@@ -1209,7 +1210,6 @@ public class DLedgerCommitLog extends CommitLog {
this.sbr = sbr; this.sbr = sbr;
} }
@Override
public synchronized void release() { public synchronized void release() {
super.release(); super.release();
if (sbr != null) { if (sbr != null) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册