未验证 提交 03257725 编写于 作者: H huangli 提交者: GitHub

[ISSUE #2883] [Part E] Improve produce performance in M/S mode. (#2889)

* Remove putMessage/putMessages method in CommitLog which has too many duplicated code.

* Optimise performance of asyncPutMessage (extract some code out of putMessage lock)

* extract generation of msgId out of lock in CommitLog (now only for single message processor)

* extract generation of topicQueueTable key out of sync code

* extract generation of msgId out of lock in CommitLog (for batch)

* fix ipv6 problem introduced in commit "Optimise performance of asyncPutMessage (extract some code out of putMessage lock)"
上级 360b616b
...@@ -18,6 +18,7 @@ package org.apache.rocketmq.store; ...@@ -18,6 +18,7 @@ package org.apache.rocketmq.store;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import org.apache.rocketmq.common.message.MessageExtBatch; import org.apache.rocketmq.common.message.MessageExtBatch;
import org.apache.rocketmq.store.CommitLog.PutMessageContext;
/** /**
* Write messages callback interface * Write messages callback interface
...@@ -30,7 +31,7 @@ public interface AppendMessageCallback { ...@@ -30,7 +31,7 @@ public interface AppendMessageCallback {
* @return How many bytes to write * @return How many bytes to write
*/ */
AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer, AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer,
final int maxBlank, final MessageExtBrokerInner msg); final int maxBlank, final MessageExtBrokerInner msg, PutMessageContext putMessageContext);
/** /**
* After batched message serialization, write MapedByteBuffer * After batched message serialization, write MapedByteBuffer
...@@ -39,5 +40,5 @@ public interface AppendMessageCallback { ...@@ -39,5 +40,5 @@ public interface AppendMessageCallback {
* @return How many bytes to write * @return How many bytes to write
*/ */
AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer, AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer,
final int maxBlank, final MessageExtBatch messageExtBatch); final int maxBlank, final MessageExtBatch messageExtBatch, PutMessageContext putMessageContext);
} }
...@@ -16,6 +16,8 @@ ...@@ -16,6 +16,8 @@
*/ */
package org.apache.rocketmq.store; package org.apache.rocketmq.store;
import java.util.function.Supplier;
/** /**
* When write a message to the commit log, returns results * When write a message to the commit log, returns results
*/ */
...@@ -28,6 +30,7 @@ public class AppendMessageResult { ...@@ -28,6 +30,7 @@ public class AppendMessageResult {
private int wroteBytes; private int wroteBytes;
// Message ID // Message ID
private String msgId; private String msgId;
private Supplier<String> msgIdSupplier;
// Message storage timestamp // Message storage timestamp
private long storeTimestamp; private long storeTimestamp;
// Consume queue's offset(step by one) // Consume queue's offset(step by one)
...@@ -51,6 +54,17 @@ public class AppendMessageResult { ...@@ -51,6 +54,17 @@ public class AppendMessageResult {
this.pagecacheRT = pagecacheRT; this.pagecacheRT = pagecacheRT;
} }
public AppendMessageResult(AppendMessageStatus status, long wroteOffset, int wroteBytes, Supplier<String> msgIdSupplier,
long storeTimestamp, long logicsOffset, long pagecacheRT) {
this.status = status;
this.wroteOffset = wroteOffset;
this.wroteBytes = wroteBytes;
this.msgIdSupplier = msgIdSupplier;
this.storeTimestamp = storeTimestamp;
this.logicsOffset = logicsOffset;
this.pagecacheRT = pagecacheRT;
}
public long getPagecacheRT() { public long getPagecacheRT() {
return pagecacheRT; return pagecacheRT;
} }
...@@ -88,6 +102,9 @@ public class AppendMessageResult { ...@@ -88,6 +102,9 @@ public class AppendMessageResult {
} }
public String getMsgId() { public String getMsgId() {
if (msgId == null && msgIdSupplier != null) {
msgId = msgIdSupplier.get();
}
return msgId; return msgId;
} }
......
...@@ -34,6 +34,7 @@ import java.util.Set; ...@@ -34,6 +34,7 @@ import java.util.Set;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
...@@ -476,58 +477,20 @@ public class DefaultMessageStore implements MessageStore { ...@@ -476,58 +477,20 @@ public class DefaultMessageStore implements MessageStore {
@Override @Override
public PutMessageResult putMessage(MessageExtBrokerInner msg) { public PutMessageResult putMessage(MessageExtBrokerInner msg) {
PutMessageStatus checkStoreStatus = this.checkStoreStatus(); try {
if (checkStoreStatus != PutMessageStatus.PUT_OK) { return asyncPutMessage(msg).get();
return new PutMessageResult(checkStoreStatus, null); } catch (InterruptedException | ExecutionException e) {
} return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, null);
PutMessageStatus msgCheckStatus = this.checkMessage(msg);
if (msgCheckStatus == PutMessageStatus.MESSAGE_ILLEGAL) {
return new PutMessageResult(msgCheckStatus, null);
}
long beginTime = this.getSystemClock().now();
PutMessageResult result = this.commitLog.putMessage(msg);
long elapsedTime = this.getSystemClock().now() - beginTime;
if (elapsedTime > 500) {
log.warn("not in lock elapsed time(ms)={}, bodyLength={}", elapsedTime, msg.getBody().length);
}
this.storeStatsService.setPutMessageEntireTimeMax(elapsedTime);
if (null == result || !result.isOk()) {
this.storeStatsService.getPutMessageFailedTimes().incrementAndGet();
} }
return result;
} }
@Override @Override
public PutMessageResult putMessages(MessageExtBatch messageExtBatch) { public PutMessageResult putMessages(MessageExtBatch messageExtBatch) {
PutMessageStatus checkStoreStatus = this.checkStoreStatus(); try {
if (checkStoreStatus != PutMessageStatus.PUT_OK) { return asyncPutMessages(messageExtBatch).get();
return new PutMessageResult(checkStoreStatus, null); } catch (InterruptedException | ExecutionException e) {
} return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, null);
PutMessageStatus msgCheckStatus = this.checkMessages(messageExtBatch);
if (msgCheckStatus == PutMessageStatus.MESSAGE_ILLEGAL) {
return new PutMessageResult(msgCheckStatus, null);
}
long beginTime = this.getSystemClock().now();
PutMessageResult result = this.commitLog.putMessages(messageExtBatch);
long elapsedTime = this.getSystemClock().now() - beginTime;
if (elapsedTime > 500) {
log.warn("not in lock elapsed time(ms)={}, bodyLength={}", elapsedTime, messageExtBatch.getBody().length);
}
this.storeStatsService.setPutMessageEntireTimeMax(elapsedTime);
if (null == result || !result.isOk()) {
this.storeStatsService.getPutMessageFailedTimes().incrementAndGet();
} }
return result;
} }
@Override @Override
......
...@@ -37,6 +37,7 @@ import org.apache.rocketmq.logging.InternalLogger; ...@@ -37,6 +37,7 @@ import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageExtBatch; import org.apache.rocketmq.common.message.MessageExtBatch;
import org.apache.rocketmq.store.CommitLog.PutMessageContext;
import org.apache.rocketmq.store.config.FlushDiskType; import org.apache.rocketmq.store.config.FlushDiskType;
import org.apache.rocketmq.store.util.LibC; import org.apache.rocketmq.store.util.LibC;
import sun.nio.ch.DirectBuffer; import sun.nio.ch.DirectBuffer;
...@@ -188,15 +189,18 @@ public class MappedFile extends ReferenceResource { ...@@ -188,15 +189,18 @@ public class MappedFile extends ReferenceResource {
return fileChannel; return fileChannel;
} }
public AppendMessageResult appendMessage(final MessageExtBrokerInner msg, final AppendMessageCallback cb) { public AppendMessageResult appendMessage(final MessageExtBrokerInner msg, final AppendMessageCallback cb,
return appendMessagesInner(msg, cb); PutMessageContext putMessageContext) {
return appendMessagesInner(msg, cb, putMessageContext);
} }
public AppendMessageResult appendMessages(final MessageExtBatch messageExtBatch, final AppendMessageCallback cb) { public AppendMessageResult appendMessages(final MessageExtBatch messageExtBatch, final AppendMessageCallback cb,
return appendMessagesInner(messageExtBatch, cb); PutMessageContext putMessageContext) {
return appendMessagesInner(messageExtBatch, cb, putMessageContext);
} }
public AppendMessageResult appendMessagesInner(final MessageExt messageExt, final AppendMessageCallback cb) { public AppendMessageResult appendMessagesInner(final MessageExt messageExt, final AppendMessageCallback cb,
PutMessageContext putMessageContext) {
assert messageExt != null; assert messageExt != null;
assert cb != null; assert cb != null;
...@@ -207,9 +211,11 @@ public class MappedFile extends ReferenceResource { ...@@ -207,9 +211,11 @@ public class MappedFile extends ReferenceResource {
byteBuffer.position(currentPos); byteBuffer.position(currentPos);
AppendMessageResult result; AppendMessageResult result;
if (messageExt instanceof MessageExtBrokerInner) { if (messageExt instanceof MessageExtBrokerInner) {
result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBrokerInner) messageExt); result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos,
(MessageExtBrokerInner) messageExt, putMessageContext);
} else if (messageExt instanceof MessageExtBatch) { } else if (messageExt instanceof MessageExtBatch) {
result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBatch) messageExt); result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos,
(MessageExtBatch) messageExt, putMessageContext);
} else { } else {
return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR); return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
} }
......
...@@ -16,6 +16,8 @@ ...@@ -16,6 +16,8 @@
*/ */
package org.apache.rocketmq.store; package org.apache.rocketmq.store;
import java.nio.ByteBuffer;
import org.apache.rocketmq.common.TopicFilterType; import org.apache.rocketmq.common.TopicFilterType;
import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageExt;
...@@ -24,6 +26,16 @@ public class MessageExtBrokerInner extends MessageExt { ...@@ -24,6 +26,16 @@ public class MessageExtBrokerInner extends MessageExt {
private String propertiesString; private String propertiesString;
private long tagsCode; private long tagsCode;
private ByteBuffer encodedBuff;
public ByteBuffer getEncodedBuff() {
return encodedBuff;
}
public void setEncodedBuff(ByteBuffer encodedBuff) {
this.encodedBuff = encodedBuff;
}
public static long tagsString2tagsCode(final TopicFilterType filter, final String tags) { public static long tagsString2tagsCode(final TopicFilterType filter, final String tags) {
if (null == tags || tags.length() == 0) { return 0; } if (null == tags || tags.length() == 0) { return 0; }
......
...@@ -37,7 +37,6 @@ import java.util.HashMap; ...@@ -37,7 +37,6 @@ import java.util.HashMap;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.message.MessageAccessor; import org.apache.rocketmq.common.message.MessageAccessor;
import org.apache.rocketmq.common.message.MessageConst; import org.apache.rocketmq.common.message.MessageConst;
...@@ -413,237 +412,6 @@ public class DLedgerCommitLog extends CommitLog { ...@@ -413,237 +412,6 @@ public class DLedgerCommitLog extends CommitLog {
} }
} }
@Override
public PutMessageResult putMessage(final MessageExtBrokerInner msg) {
StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();
final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
String topic = msg.getTopic();
setMessageInfo(msg,tranType);
// Back to Results
AppendMessageResult appendResult;
AppendFuture<AppendEntryResponse> dledgerFuture;
EncodeResult encodeResult;
encodeResult = this.messageSerializer.serialize(msg);
if (encodeResult.status != AppendMessageStatus.PUT_OK) {
return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, new AppendMessageResult(encodeResult.status));
}
putMessageLock.lock(); //spin or ReentrantLock ,depending on store config
long elapsedTimeInLock;
long queueOffset;
try {
beginTimeInDledgerLock = this.defaultMessageStore.getSystemClock().now();
queueOffset = getQueueOffsetByKey(encodeResult.queueOffsetKey, tranType);
encodeResult.setQueueOffsetKey(queueOffset, false);
AppendEntryRequest request = new AppendEntryRequest();
request.setGroup(dLedgerConfig.getGroup());
request.setRemoteId(dLedgerServer.getMemberState().getSelfId());
request.setBody(encodeResult.getData());
dledgerFuture = (AppendFuture<AppendEntryResponse>) dLedgerServer.handleAppend(request);
if (dledgerFuture.getPos() == -1) {
return new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR));
}
long wroteOffset = dledgerFuture.getPos() + DLedgerEntry.BODY_OFFSET;
int msgIdLength = (msg.getSysFlag() & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0 ? 4 + 4 + 8 : 16 + 4 + 8;
ByteBuffer buffer = ByteBuffer.allocate(msgIdLength);
String msgId = MessageDecoder.createMessageId(buffer, msg.getStoreHostBytes(), wroteOffset);
elapsedTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginTimeInDledgerLock;
appendResult = new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, encodeResult.getData().length, msgId, System.currentTimeMillis(), queueOffset, elapsedTimeInLock);
switch (tranType) {
case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
break;
case MessageSysFlag.TRANSACTION_NOT_TYPE:
case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
// The next update ConsumeQueue information
DLedgerCommitLog.this.topicQueueTable.put(encodeResult.queueOffsetKey, queueOffset + 1);
break;
default:
break;
}
} catch (Exception e) {
log.error("Put message error", e);
return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR));
} finally {
beginTimeInDledgerLock = 0;
putMessageLock.unlock();
}
if (elapsedTimeInLock > 500) {
log.warn("[NOTIFYME]putMessage in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}", elapsedTimeInLock, msg.getBody().length, appendResult);
}
PutMessageStatus putMessageStatus = PutMessageStatus.UNKNOWN_ERROR;
try {
AppendEntryResponse appendEntryResponse = dledgerFuture.get(3, TimeUnit.SECONDS);
switch (DLedgerResponseCode.valueOf(appendEntryResponse.getCode())) {
case SUCCESS:
putMessageStatus = PutMessageStatus.PUT_OK;
break;
case INCONSISTENT_LEADER:
case NOT_LEADER:
case LEADER_NOT_READY:
case DISK_FULL:
putMessageStatus = PutMessageStatus.SERVICE_NOT_AVAILABLE;
break;
case WAIT_QUORUM_ACK_TIMEOUT:
//Do not return flush_slave_timeout to the client, for the ons client will ignore it.
putMessageStatus = PutMessageStatus.OS_PAGECACHE_BUSY;
break;
case LEADER_PENDING_FULL:
putMessageStatus = PutMessageStatus.OS_PAGECACHE_BUSY;
break;
}
} catch (Throwable t) {
log.error("Failed to get dledger append result", t);
}
PutMessageResult putMessageResult = new PutMessageResult(putMessageStatus, appendResult);
if (putMessageStatus == PutMessageStatus.PUT_OK) {
// Statistics
storeStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).incrementAndGet();
storeStatsService.getSinglePutMessageTopicSizeTotal(topic).addAndGet(appendResult.getWroteBytes());
}
return putMessageResult;
}
@Override
public PutMessageResult putMessages(final MessageExtBatch messageExtBatch) {
final int tranType = MessageSysFlag.getTransactionValue(messageExtBatch.getSysFlag());
if (tranType != MessageSysFlag.TRANSACTION_NOT_TYPE) {
return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null);
}
if (messageExtBatch.getDelayTimeLevel() > 0) {
return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null);
}
// Set the storage time
messageExtBatch.setStoreTimestamp(System.currentTimeMillis());
StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();
InetSocketAddress bornSocketAddress = (InetSocketAddress) messageExtBatch.getBornHost();
if (bornSocketAddress.getAddress() instanceof Inet6Address) {
messageExtBatch.setBornHostV6Flag();
}
InetSocketAddress storeSocketAddress = (InetSocketAddress) messageExtBatch.getStoreHost();
if (storeSocketAddress.getAddress() instanceof Inet6Address) {
messageExtBatch.setStoreHostAddressV6Flag();
}
// Back to Results
AppendMessageResult appendResult;
BatchAppendFuture<AppendEntryResponse> dledgerFuture;
EncodeResult encodeResult;
encodeResult = this.messageSerializer.serialize(messageExtBatch);
if (encodeResult.status != AppendMessageStatus.PUT_OK) {
return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, new AppendMessageResult(encodeResult
.status));
}
putMessageLock.lock(); //spin or ReentrantLock ,depending on store config
msgIdBuilder.setLength(0);
long elapsedTimeInLock;
long queueOffset;
int msgNum = 0;
try {
beginTimeInDledgerLock = this.defaultMessageStore.getSystemClock().now();
queueOffset = getQueueOffsetByKey(encodeResult.queueOffsetKey, tranType);
encodeResult.setQueueOffsetKey(queueOffset, true);
BatchAppendEntryRequest request = new BatchAppendEntryRequest();
request.setGroup(dLedgerConfig.getGroup());
request.setRemoteId(dLedgerServer.getMemberState().getSelfId());
request.setBatchMsgs(encodeResult.batchData);
AppendFuture<AppendEntryResponse> appendFuture = (AppendFuture<AppendEntryResponse>) dLedgerServer.handleAppend(request);
if (appendFuture.getPos() == -1) {
log.warn("HandleAppend return false due to error code {}", appendFuture.get().getCode());
return new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR));
}
dledgerFuture = (BatchAppendFuture<AppendEntryResponse>) appendFuture;
long wroteOffset = 0;
int msgIdLength = (messageExtBatch.getSysFlag() & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0 ? 4 + 4 + 8 : 16 + 4 + 8;
ByteBuffer buffer = ByteBuffer.allocate(msgIdLength);
boolean isFirstOffset = true;
long firstWroteOffset = 0;
for (long pos : dledgerFuture.getPositions()) {
wroteOffset = pos + DLedgerEntry.BODY_OFFSET;
if (isFirstOffset) {
firstWroteOffset = wroteOffset;
isFirstOffset = false;
}
String msgId = MessageDecoder.createMessageId(buffer, messageExtBatch.getStoreHostBytes(), wroteOffset);
if (msgIdBuilder.length() > 0) {
msgIdBuilder.append(',').append(msgId);
} else {
msgIdBuilder.append(msgId);
}
msgNum++;
}
elapsedTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginTimeInDledgerLock;
appendResult = new AppendMessageResult(AppendMessageStatus.PUT_OK, firstWroteOffset, encodeResult.totalMsgLen,
msgIdBuilder.toString(), System.currentTimeMillis(), queueOffset, elapsedTimeInLock);
appendResult.setMsgNum(msgNum);
DLedgerCommitLog.this.topicQueueTable.put(encodeResult.queueOffsetKey, queueOffset + msgNum);
} catch (Exception e) {
log.error("Put message error", e);
return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, new AppendMessageResult(AppendMessageStatus
.UNKNOWN_ERROR));
} finally {
beginTimeInDledgerLock = 0;
putMessageLock.unlock();
}
if (elapsedTimeInLock > 500) {
log.warn("[NOTIFYME]putMessage in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}",
elapsedTimeInLock, messageExtBatch.getBody().length, appendResult);
}
PutMessageStatus putMessageStatus = PutMessageStatus.UNKNOWN_ERROR;
try {
AppendEntryResponse appendEntryResponse = dledgerFuture.get(3, TimeUnit.SECONDS);
switch (DLedgerResponseCode.valueOf(appendEntryResponse.getCode())) {
case SUCCESS:
putMessageStatus = PutMessageStatus.PUT_OK;
break;
case INCONSISTENT_LEADER:
case NOT_LEADER:
case LEADER_NOT_READY:
case DISK_FULL:
putMessageStatus = PutMessageStatus.SERVICE_NOT_AVAILABLE;
break;
case WAIT_QUORUM_ACK_TIMEOUT:
//Do not return flush_slave_timeout to the client, for the ons client will ignore it.
putMessageStatus = PutMessageStatus.OS_PAGECACHE_BUSY;
break;
case LEADER_PENDING_FULL:
putMessageStatus = PutMessageStatus.OS_PAGECACHE_BUSY;
break;
}
} catch (Throwable t) {
log.error("Failed to get dledger append result", t);
}
PutMessageResult putMessageResult = new PutMessageResult(putMessageStatus, appendResult);
if (putMessageStatus == PutMessageStatus.PUT_OK) {
// Statistics
storeStatsService.getSinglePutMessageTopicTimesTotal(messageExtBatch.getTopic()).addAndGet(msgNum);
storeStatsService.getSinglePutMessageTopicSizeTotal(messageExtBatch.getTopic()).addAndGet(encodeResult.totalMsgLen);
}
return putMessageResult;
}
@Override @Override
public CompletableFuture<PutMessageResult> asyncPutMessage(MessageExtBrokerInner msg) { public CompletableFuture<PutMessageResult> asyncPutMessage(MessageExtBrokerInner msg) {
......
...@@ -30,6 +30,8 @@ import org.apache.rocketmq.common.message.Message; ...@@ -30,6 +30,8 @@ import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageDecoder; import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageExtBatch; import org.apache.rocketmq.common.message.MessageExtBatch;
import org.apache.rocketmq.store.CommitLog.MessageExtEncoder;
import org.apache.rocketmq.store.CommitLog.PutMessageContext;
import org.apache.rocketmq.store.config.MessageStoreConfig; import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
...@@ -42,7 +44,7 @@ public class AppendCallbackTest { ...@@ -42,7 +44,7 @@ public class AppendCallbackTest {
AppendMessageCallback callback; AppendMessageCallback callback;
CommitLog.MessageExtBatchEncoder batchEncoder = new CommitLog.MessageExtBatchEncoder(10 * 1024 * 1024); MessageExtEncoder batchEncoder = new MessageExtEncoder(10 * 1024 * 1024);
@Before @Before
public void init() throws Exception { public void init() throws Exception {
...@@ -84,10 +86,12 @@ public class AppendCallbackTest { ...@@ -84,10 +86,12 @@ public class AppendCallbackTest {
messageExtBatch.setStoreHost(new InetSocketAddress("127.0.0.1", 124)); messageExtBatch.setStoreHost(new InetSocketAddress("127.0.0.1", 124));
messageExtBatch.setBody(MessageDecoder.encodeMessages(messages)); messageExtBatch.setBody(MessageDecoder.encodeMessages(messages));
messageExtBatch.setEncodedBuff(batchEncoder.encode(messageExtBatch)); PutMessageContext putMessageContext = new PutMessageContext(topic + "-" + queue);
messageExtBatch.setEncodedBuff(batchEncoder.encode(messageExtBatch, putMessageContext));
ByteBuffer buff = ByteBuffer.allocate(1024 * 10); ByteBuffer buff = ByteBuffer.allocate(1024 * 10);
//encounter end of file when append half of the data //encounter end of file when append half of the data
AppendMessageResult result = callback.doAppend(0, buff, 1000, messageExtBatch); AppendMessageResult result =
callback.doAppend(0, buff, 1000, messageExtBatch, putMessageContext);
assertEquals(AppendMessageStatus.END_OF_FILE, result.getStatus()); assertEquals(AppendMessageStatus.END_OF_FILE, result.getStatus());
assertEquals(0, result.getWroteOffset()); assertEquals(0, result.getWroteOffset());
assertEquals(0, result.getLogicsOffset()); assertEquals(0, result.getLogicsOffset());
...@@ -121,10 +125,12 @@ public class AppendCallbackTest { ...@@ -121,10 +125,12 @@ public class AppendCallbackTest {
messageExtBatch.setStoreHost(new InetSocketAddress("::1", 124)); messageExtBatch.setStoreHost(new InetSocketAddress("::1", 124));
messageExtBatch.setBody(MessageDecoder.encodeMessages(messages)); messageExtBatch.setBody(MessageDecoder.encodeMessages(messages));
messageExtBatch.setEncodedBuff(batchEncoder.encode(messageExtBatch)); PutMessageContext putMessageContext = new PutMessageContext(topic + "-" + queue);
messageExtBatch.setEncodedBuff(batchEncoder.encode(messageExtBatch, putMessageContext));
ByteBuffer buff = ByteBuffer.allocate(1024 * 10); ByteBuffer buff = ByteBuffer.allocate(1024 * 10);
//encounter end of file when append half of the data //encounter end of file when append half of the data
AppendMessageResult result = callback.doAppend(0, buff, 1000, messageExtBatch); AppendMessageResult result =
callback.doAppend(0, buff, 1000, messageExtBatch, putMessageContext);
assertEquals(AppendMessageStatus.END_OF_FILE, result.getStatus()); assertEquals(AppendMessageStatus.END_OF_FILE, result.getStatus());
assertEquals(0, result.getWroteOffset()); assertEquals(0, result.getWroteOffset());
assertEquals(0, result.getLogicsOffset()); assertEquals(0, result.getLogicsOffset());
...@@ -154,9 +160,11 @@ public class AppendCallbackTest { ...@@ -154,9 +160,11 @@ public class AppendCallbackTest {
messageExtBatch.setStoreHost(new InetSocketAddress("127.0.0.1", 124)); messageExtBatch.setStoreHost(new InetSocketAddress("127.0.0.1", 124));
messageExtBatch.setBody(MessageDecoder.encodeMessages(messages)); messageExtBatch.setBody(MessageDecoder.encodeMessages(messages));
messageExtBatch.setEncodedBuff(batchEncoder.encode(messageExtBatch)); PutMessageContext putMessageContext = new PutMessageContext(topic + "-" + queue);
messageExtBatch.setEncodedBuff(batchEncoder.encode(messageExtBatch, putMessageContext));
ByteBuffer buff = ByteBuffer.allocate(1024 * 10); ByteBuffer buff = ByteBuffer.allocate(1024 * 10);
AppendMessageResult allresult = callback.doAppend(0, buff, 1024 * 10, messageExtBatch); AppendMessageResult allresult =
callback.doAppend(0, buff, 1024 * 10, messageExtBatch, putMessageContext);
assertEquals(AppendMessageStatus.PUT_OK, allresult.getStatus()); assertEquals(AppendMessageStatus.PUT_OK, allresult.getStatus());
assertEquals(0, allresult.getWroteOffset()); assertEquals(0, allresult.getWroteOffset());
...@@ -214,9 +222,11 @@ public class AppendCallbackTest { ...@@ -214,9 +222,11 @@ public class AppendCallbackTest {
messageExtBatch.setStoreHost(new InetSocketAddress("::1", 124)); messageExtBatch.setStoreHost(new InetSocketAddress("::1", 124));
messageExtBatch.setBody(MessageDecoder.encodeMessages(messages)); messageExtBatch.setBody(MessageDecoder.encodeMessages(messages));
messageExtBatch.setEncodedBuff(batchEncoder.encode(messageExtBatch)); PutMessageContext putMessageContext = new PutMessageContext(topic + "-" + queue);
messageExtBatch.setEncodedBuff(batchEncoder.encode(messageExtBatch, putMessageContext));
ByteBuffer buff = ByteBuffer.allocate(1024 * 10); ByteBuffer buff = ByteBuffer.allocate(1024 * 10);
AppendMessageResult allresult = callback.doAppend(0, buff, 1024 * 10, messageExtBatch); AppendMessageResult allresult =
callback.doAppend(0, buff, 1024 * 10, messageExtBatch, putMessageContext);
assertEquals(AppendMessageStatus.PUT_OK, allresult.getStatus()); assertEquals(AppendMessageStatus.PUT_OK, allresult.getStatus());
assertEquals(0, allresult.getWroteOffset()); assertEquals(0, allresult.getWroteOffset());
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册