提交 d30d0e85 编写于 作者: D dongeforever

Polish DLedgerCommitlog, and rename dleger to dledger

上级 3e9799ba
...@@ -39,7 +39,7 @@ import org.apache.rocketmq.broker.client.DefaultConsumerIdsChangeListener; ...@@ -39,7 +39,7 @@ import org.apache.rocketmq.broker.client.DefaultConsumerIdsChangeListener;
import org.apache.rocketmq.broker.client.ProducerManager; import org.apache.rocketmq.broker.client.ProducerManager;
import org.apache.rocketmq.broker.client.net.Broker2Client; import org.apache.rocketmq.broker.client.net.Broker2Client;
import org.apache.rocketmq.broker.client.rebalance.RebalanceLockManager; import org.apache.rocketmq.broker.client.rebalance.RebalanceLockManager;
import org.apache.rocketmq.broker.dleger.DLegerRoleChangeHandler; import org.apache.rocketmq.broker.dledger.DLedgerRoleChangeHandler;
import org.apache.rocketmq.broker.filter.CommitLogDispatcherCalcBitMap; import org.apache.rocketmq.broker.filter.CommitLogDispatcherCalcBitMap;
import org.apache.rocketmq.broker.filter.ConsumerFilterManager; import org.apache.rocketmq.broker.filter.ConsumerFilterManager;
import org.apache.rocketmq.broker.filtersrv.FilterServerManager; import org.apache.rocketmq.broker.filtersrv.FilterServerManager;
...@@ -99,7 +99,7 @@ import org.apache.rocketmq.store.MessageArrivingListener; ...@@ -99,7 +99,7 @@ import org.apache.rocketmq.store.MessageArrivingListener;
import org.apache.rocketmq.store.MessageStore; import org.apache.rocketmq.store.MessageStore;
import org.apache.rocketmq.store.config.BrokerRole; import org.apache.rocketmq.store.config.BrokerRole;
import org.apache.rocketmq.store.config.MessageStoreConfig; import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.apache.rocketmq.store.dleger.DLegerCommitLog; import org.apache.rocketmq.store.dledger.DLedgerCommitLog;
import org.apache.rocketmq.store.stats.BrokerStats; import org.apache.rocketmq.store.stats.BrokerStats;
import org.apache.rocketmq.store.stats.BrokerStatsManager; import org.apache.rocketmq.store.stats.BrokerStatsManager;
...@@ -236,8 +236,8 @@ public class BrokerController { ...@@ -236,8 +236,8 @@ public class BrokerController {
new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener, new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener,
this.brokerConfig); this.brokerConfig);
if (messageStoreConfig.isEnableDLegerCommitLog()) { if (messageStoreConfig.isEnableDLegerCommitLog()) {
DLegerRoleChangeHandler roleChangeHandler = new DLegerRoleChangeHandler(this, (DefaultMessageStore) messageStore); DLedgerRoleChangeHandler roleChangeHandler = new DLedgerRoleChangeHandler(this, (DefaultMessageStore) messageStore);
((DLegerCommitLog)((DefaultMessageStore) messageStore).getCommitLog()).getdLegerServer().getdLegerLeaderElector().addRoleChangeHandler(roleChangeHandler); ((DLedgerCommitLog)((DefaultMessageStore) messageStore).getCommitLog()).getdLegerServer().getdLegerLeaderElector().addRoleChangeHandler(roleChangeHandler);
} }
this.brokerStats = new BrokerStats((DefaultMessageStore) this.messageStore); this.brokerStats = new BrokerStats((DefaultMessageStore) this.messageStore);
//load plugin //load plugin
...@@ -784,7 +784,7 @@ public class BrokerController { ...@@ -784,7 +784,7 @@ public class BrokerController {
} }
if (messageStoreConfig.isEnableDLegerCommitLog()) { if (messageStoreConfig.isEnableDLegerCommitLog()) {
changeToSlave(((DLegerCommitLog)((DefaultMessageStore) messageStore).getCommitLog()).getId()); changeToSlave(((DLedgerCommitLog)((DefaultMessageStore) messageStore).getCommitLog()).getId());
} else { } else {
startProcessorByHa(messageStoreConfig.getBrokerRole()); startProcessorByHa(messageStoreConfig.getBrokerRole());
handleSlaveSynchronize(messageStoreConfig.getBrokerRole()); handleSlaveSynchronize(messageStoreConfig.getBrokerRole());
......
...@@ -14,7 +14,7 @@ ...@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.rocketmq.broker.dleger; package org.apache.rocketmq.broker.dledger;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
...@@ -29,21 +29,21 @@ import org.apache.rocketmq.logging.InternalLogger; ...@@ -29,21 +29,21 @@ import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.store.DefaultMessageStore; import org.apache.rocketmq.store.DefaultMessageStore;
import org.apache.rocketmq.store.config.BrokerRole; import org.apache.rocketmq.store.config.BrokerRole;
import org.apache.rocketmq.store.dleger.DLegerCommitLog; import org.apache.rocketmq.store.dledger.DLedgerCommitLog;
public class DLegerRoleChangeHandler implements DLegerLeaderElector.RoleChangeHandler { public class DLedgerRoleChangeHandler implements DLegerLeaderElector.RoleChangeHandler {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
private ExecutorService executorService = Executors.newSingleThreadExecutor(new ThreadFactoryImpl("DLegerRoleChangeHandler_")); private ExecutorService executorService = Executors.newSingleThreadExecutor(new ThreadFactoryImpl("DLegerRoleChangeHandler_"));
private BrokerController brokerController; private BrokerController brokerController;
private DefaultMessageStore messageStore; private DefaultMessageStore messageStore;
private DLegerCommitLog dLegerCommitLog; private DLedgerCommitLog dLedgerCommitLog;
private DLegerServer dLegerServer; private DLegerServer dLegerServer;
public DLegerRoleChangeHandler(BrokerController brokerController, DefaultMessageStore messageStore) { public DLedgerRoleChangeHandler(BrokerController brokerController, DefaultMessageStore messageStore) {
this.brokerController = brokerController; this.brokerController = brokerController;
this.messageStore = messageStore; this.messageStore = messageStore;
this.dLegerCommitLog = (DLegerCommitLog) messageStore.getCommitLog(); this.dLedgerCommitLog = (DLedgerCommitLog) messageStore.getCommitLog();
this.dLegerServer = dLegerCommitLog.getdLegerServer(); this.dLegerServer = dLedgerCommitLog.getdLegerServer();
} }
@Override public void handle(long term, MemberState.Role role) { @Override public void handle(long term, MemberState.Role role) {
...@@ -55,11 +55,11 @@ public class DLegerRoleChangeHandler implements DLegerLeaderElector.RoleChangeHa ...@@ -55,11 +55,11 @@ public class DLegerRoleChangeHandler implements DLegerLeaderElector.RoleChangeHa
switch (role) { switch (role) {
case CANDIDATE: case CANDIDATE:
if (messageStore.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE) { if (messageStore.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE) {
brokerController.changeToSlave(dLegerCommitLog.getId()); brokerController.changeToSlave(dLedgerCommitLog.getId());
} }
break; break;
case FOLLOWER: case FOLLOWER:
brokerController.changeToSlave(dLegerCommitLog.getId()); brokerController.changeToSlave(dLedgerCommitLog.getId());
break; break;
case LEADER: case LEADER:
while (dLegerServer.getMemberState().isLeader() while (dLegerServer.getMemberState().isLeader()
......
...@@ -474,6 +474,7 @@ public class CommitLog { ...@@ -474,6 +474,7 @@ public class CommitLog {
} }
// Commitlog case files are deleted // Commitlog case files are deleted
else { else {
log.warn("The commitlog files are deleted, and delete the consume queue files");
this.mappedFileQueue.setFlushedWhere(0); this.mappedFileQueue.setFlushedWhere(0);
this.mappedFileQueue.setCommittedWhere(0); this.mappedFileQueue.setCommittedWhere(0);
this.defaultMessageStore.destroyLogics(); this.defaultMessageStore.destroyLogics();
......
...@@ -52,7 +52,7 @@ import org.apache.rocketmq.logging.InternalLoggerFactory; ...@@ -52,7 +52,7 @@ import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.store.config.BrokerRole; import org.apache.rocketmq.store.config.BrokerRole;
import org.apache.rocketmq.store.config.MessageStoreConfig; import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.apache.rocketmq.store.config.StorePathConfigHelper; import org.apache.rocketmq.store.config.StorePathConfigHelper;
import org.apache.rocketmq.store.dleger.DLegerCommitLog; import org.apache.rocketmq.store.dledger.DLedgerCommitLog;
import org.apache.rocketmq.store.ha.HAService; import org.apache.rocketmq.store.ha.HAService;
import org.apache.rocketmq.store.index.IndexService; import org.apache.rocketmq.store.index.IndexService;
import org.apache.rocketmq.store.index.QueryOffsetResult; import org.apache.rocketmq.store.index.QueryOffsetResult;
...@@ -119,7 +119,7 @@ public class DefaultMessageStore implements MessageStore { ...@@ -119,7 +119,7 @@ public class DefaultMessageStore implements MessageStore {
this.brokerStatsManager = brokerStatsManager; this.brokerStatsManager = brokerStatsManager;
this.allocateMappedFileService = new AllocateMappedFileService(this); this.allocateMappedFileService = new AllocateMappedFileService(this);
if (messageStoreConfig.isEnableDLegerCommitLog()) { if (messageStoreConfig.isEnableDLegerCommitLog()) {
this.commitLog = new DLegerCommitLog(this); this.commitLog = new DLedgerCommitLog(this);
} else { } else {
this.commitLog = new CommitLog(this); this.commitLog = new CommitLog(this);
} }
......
...@@ -14,7 +14,7 @@ ...@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.rocketmq.store.dleger; package org.apache.rocketmq.store.dledger;
import io.openmessaging.storage.dleger.DLegerConfig; import io.openmessaging.storage.dleger.DLegerConfig;
import io.openmessaging.storage.dleger.DLegerServer; import io.openmessaging.storage.dleger.DLegerServer;
...@@ -34,7 +34,6 @@ import org.apache.rocketmq.common.UtilAll; ...@@ -34,7 +34,6 @@ import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.message.MessageAccessor; import org.apache.rocketmq.common.message.MessageAccessor;
import org.apache.rocketmq.common.message.MessageConst; import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageDecoder; import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageExtBatch; import org.apache.rocketmq.common.message.MessageExtBatch;
import org.apache.rocketmq.common.sysflag.MessageSysFlag; import org.apache.rocketmq.common.sysflag.MessageSysFlag;
import org.apache.rocketmq.store.AppendMessageResult; import org.apache.rocketmq.store.AppendMessageResult;
...@@ -52,7 +51,7 @@ import org.apache.rocketmq.store.schedule.ScheduleMessageService; ...@@ -52,7 +51,7 @@ import org.apache.rocketmq.store.schedule.ScheduleMessageService;
/** /**
* Store all metadata downtime for recovery, data protection reliability * Store all metadata downtime for recovery, data protection reliability
*/ */
public class DLegerCommitLog extends CommitLog { public class DLedgerCommitLog extends CommitLog {
private final DLegerServer dLegerServer; private final DLegerServer dLegerServer;
private final DLegerConfig dLegerConfig; private final DLegerConfig dLegerConfig;
private final DLegerMmapFileStore dLegerFileStore; private final DLegerMmapFileStore dLegerFileStore;
...@@ -66,7 +65,7 @@ public class DLegerCommitLog extends CommitLog { ...@@ -66,7 +65,7 @@ public class DLegerCommitLog extends CommitLog {
private volatile long beginTimeInDlegerLock = 0; private volatile long beginTimeInDlegerLock = 0;
public DLegerCommitLog(final DefaultMessageStore defaultMessageStore) { public DLedgerCommitLog(final DefaultMessageStore defaultMessageStore) {
super(defaultMessageStore); super(defaultMessageStore);
dLegerConfig = new DLegerConfig(); dLegerConfig = new DLegerConfig();
dLegerConfig.setSelfId(defaultMessageStore.getMessageStoreConfig().getdLegerSelfId()); dLegerConfig.setSelfId(defaultMessageStore.getMessageStoreConfig().getdLegerSelfId());
...@@ -87,6 +86,7 @@ public class DLegerCommitLog extends CommitLog { ...@@ -87,6 +86,7 @@ public class DLegerCommitLog extends CommitLog {
} }
@Override
public boolean load() { public boolean load() {
/*boolean result = this.mappedFileQueue.load(); /*boolean result = this.mappedFileQueue.load();
log.info("load commit log " + (result ? "OK" : "Failed")); log.info("load commit log " + (result ? "OK" : "Failed"));
...@@ -94,6 +94,7 @@ public class DLegerCommitLog extends CommitLog { ...@@ -94,6 +94,7 @@ public class DLegerCommitLog extends CommitLog {
return true; return true;
} }
@Override
public void start() { public void start() {
dLegerServer.startup(); dLegerServer.startup();
/* this.flushCommitLogService.start(); /* this.flushCommitLogService.start();
...@@ -103,6 +104,7 @@ public class DLegerCommitLog extends CommitLog { ...@@ -103,6 +104,7 @@ public class DLegerCommitLog extends CommitLog {
}*/ }*/
} }
@Override
public void shutdown() { public void shutdown() {
/* if (defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) { /* if (defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
this.commitLogService.shutdown(); this.commitLogService.shutdown();
...@@ -111,11 +113,13 @@ public class DLegerCommitLog extends CommitLog { ...@@ -111,11 +113,13 @@ public class DLegerCommitLog extends CommitLog {
dLegerServer.shutdown(); dLegerServer.shutdown();
} }
@Override
public long flush() { public long flush() {
dLegerFileStore.flush(); dLegerFileStore.flush();
return dLegerFileList.getFlushedWhere(); return dLegerFileList.getFlushedWhere();
} }
@Override
public long getMaxOffset() { public long getMaxOffset() {
if (this.dLegerFileStore.getCommittedPos() != -1) { if (this.dLegerFileStore.getCommittedPos() != -1) {
return dLegerFileStore.getCommittedPos(); return dLegerFileStore.getCommittedPos();
...@@ -124,14 +128,17 @@ public class DLegerCommitLog extends CommitLog { ...@@ -124,14 +128,17 @@ public class DLegerCommitLog extends CommitLog {
} }
} }
@Override
public long remainHowManyDataToCommit() { public long remainHowManyDataToCommit() {
return dLegerFileList.remainHowManyDataToCommit(); return dLegerFileList.remainHowManyDataToCommit();
} }
@Override
public long remainHowManyDataToFlush() { public long remainHowManyDataToFlush() {
return dLegerFileList.remainHowManyDataToFlush(); return dLegerFileList.remainHowManyDataToFlush();
} }
@Override
public int deleteExpiredFile( public int deleteExpiredFile(
final long expiredTime, final long expiredTime,
final int deleteFilesInterval, final int deleteFilesInterval,
...@@ -142,7 +149,7 @@ public class DLegerCommitLog extends CommitLog { ...@@ -142,7 +149,7 @@ public class DLegerCommitLog extends CommitLog {
} }
private static class DLegerSelectMappedBufferResult extends SelectMappedBufferResult { static class DLegerSelectMappedBufferResult extends SelectMappedBufferResult {
private SelectMmapBufferResult sbr; private SelectMmapBufferResult sbr;
public DLegerSelectMappedBufferResult(SelectMmapBufferResult sbr) { public DLegerSelectMappedBufferResult(SelectMmapBufferResult sbr) {
...@@ -182,11 +189,13 @@ public class DLegerCommitLog extends CommitLog { ...@@ -182,11 +189,13 @@ public class DLegerCommitLog extends CommitLog {
/** /**
* Read CommitLog data, use data replication * Read CommitLog data, use data replication
*/ */
@Override
public SelectMappedBufferResult getData(final long offset) { public SelectMappedBufferResult getData(final long offset) {
return this.getData(offset, offset == 0); return this.getData(offset, offset == 0);
} }
@Override
public SelectMappedBufferResult getData(final long offset, final boolean returnFirstOnNotFound) { public SelectMappedBufferResult getData(final long offset, final boolean returnFirstOnNotFound) {
if (offset >= dLegerFileStore.getCommittedPos()) { if (offset >= dLegerFileStore.getCommittedPos()) {
return null; return null;
...@@ -205,28 +214,27 @@ public class DLegerCommitLog extends CommitLog { ...@@ -205,28 +214,27 @@ public class DLegerCommitLog extends CommitLog {
/** /**
* When the normal exit, data recovery, all memory data have been flush * When the normal exit, data recovery, all memory data have been flush
*/ */
public void recoverNormally() { @Override
public void recoverAbnormally(long maxPhyOffsetOfConsumeQueue) {
} }
public void recoverAbnormally() {
@Override
public void recoverNormally(long maxPhyOffsetOfConsumeQueue) {
} }
@Override
public DispatchRequest checkMessageAndReturnSize(ByteBuffer byteBuffer, final boolean checkCRC) { public DispatchRequest checkMessageAndReturnSize(ByteBuffer byteBuffer, final boolean checkCRC) {
return this.checkMessageAndReturnSize(byteBuffer, checkCRC, true); return this.checkMessageAndReturnSize(byteBuffer, checkCRC, true);
} }
private void doNothingForDeadCode(final Object obj) {
if (obj != null) {
log.debug(String.valueOf(obj.hashCode()));
}
}
/** /**
* check the message and returns the message size * check the message and returns the message size
* *
* @return 0 Come the end of the file // >0 Normal messages // -1 Message checksum failure * @return 0 Come the end of the file // >0 Normal messages // -1 Message checksum failure
*/ */
@Override
public DispatchRequest checkMessageAndReturnSize(ByteBuffer byteBuffer, final boolean checkCRC, public DispatchRequest checkMessageAndReturnSize(ByteBuffer byteBuffer, final boolean checkCRC,
final boolean readBody) { final boolean readBody) {
try { try {
...@@ -251,22 +259,19 @@ public class DLegerCommitLog extends CommitLog { ...@@ -251,22 +259,19 @@ public class DLegerCommitLog extends CommitLog {
return new DispatchRequest(-1, false /* success */); return new DispatchRequest(-1, false /* success */);
} }
@Override
public long getConfirmOffset() { public long getConfirmOffset() {
return this.dLegerFileStore.getCommittedPos() == -1 ? getMaxOffset() return this.dLegerFileStore.getCommittedPos() == -1 ? getMaxOffset()
: this.dLegerFileStore.getCommittedPos(); : this.dLegerFileStore.getCommittedPos();
} }
@Override
public void setConfirmOffset(long phyOffset) { public void setConfirmOffset(long phyOffset) {
log.warn("Should not set confirm offset {} for dleger commitlog", phyOffset); log.warn("Should not set confirm offset {} for dleger commitlog", phyOffset);
} }
@Override
private void notifyMessageArriving() {
}
public boolean resetOffset(long offset) { public boolean resetOffset(long offset) {
//return this.mappedFileQueue.resetOffset(offset);
return false; return false;
} }
...@@ -349,7 +354,7 @@ public class DLegerCommitLog extends CommitLog { ...@@ -349,7 +354,7 @@ public class DLegerCommitLog extends CommitLog {
case MessageSysFlag.TRANSACTION_NOT_TYPE: case MessageSysFlag.TRANSACTION_NOT_TYPE:
case MessageSysFlag.TRANSACTION_COMMIT_TYPE: case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
// The next update ConsumeQueue information // The next update ConsumeQueue information
DLegerCommitLog.this.topicQueueTable.put(encodeResult.queueOffsetKey, queueOffset + 1); DLedgerCommitLog.this.topicQueueTable.put(encodeResult.queueOffsetKey, queueOffset + 1);
break; break;
default: default:
break; break;
...@@ -408,43 +413,20 @@ public class DLegerCommitLog extends CommitLog { ...@@ -408,43 +413,20 @@ public class DLegerCommitLog extends CommitLog {
return putMessageResult; return putMessageResult;
} }
public void handleDiskFlush(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) {
}
public void handleHA(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) {
}
@Override @Override
public PutMessageResult putMessages(final MessageExtBatch messageExtBatch) { public PutMessageResult putMessages(final MessageExtBatch messageExtBatch) {
return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null); return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null);
} }
/**
* According to receive certain message or offset storage time if an error occurs, it returns -1
*/
public long pickupStoreTimestamp(final long offset, final int size) {
if (offset >= this.getMinOffset()) {
SelectMappedBufferResult result = this.getMessage(offset, size);
if (null != result) {
try {
return result.getByteBuffer().getLong(MessageDecoder.MESSAGE_STORE_TIMESTAMP_POSTION);
} finally {
result.release();
}
}
}
return -1;
}
@Override
public long getMinOffset() { public long getMinOffset() {
return dLegerFileList.getMinOffset(); return dLegerFileList.getMinOffset();
} }
@Override
public SelectMappedBufferResult getMessage(final long offset, final int size) { public SelectMappedBufferResult getMessage(final long offset, final int size) {
int mappedFileSize = this.dLegerServer.getdLegerConfig().getMappedFileSizeForEntryData(); int mappedFileSize = this.dLegerServer.getdLegerConfig().getMappedFileSizeForEntryData();
MmapFile mappedFile = this.dLegerFileList.findMappedFileByOffset(offset, offset == 0); MmapFile mappedFile = this.dLegerFileList.findMappedFileByOffset(offset, offset == 0);
...@@ -455,42 +437,40 @@ public class DLegerCommitLog extends CommitLog { ...@@ -455,42 +437,40 @@ public class DLegerCommitLog extends CommitLog {
return null; return null;
} }
@Override
public long rollNextFile(final long offset) { public long rollNextFile(final long offset) {
int mappedFileSize = this.defaultMessageStore.getMessageStoreConfig().getMapedFileSizeCommitLog(); int mappedFileSize = this.defaultMessageStore.getMessageStoreConfig().getMapedFileSizeCommitLog();
return offset + mappedFileSize - offset % mappedFileSize; return offset + mappedFileSize - offset % mappedFileSize;
} }
@Override
public HashMap<String, Long> getTopicQueueTable() { public HashMap<String, Long> getTopicQueueTable() {
return topicQueueTable; return topicQueueTable;
} }
@Override
public void setTopicQueueTable(HashMap<String, Long> topicQueueTable) { public void setTopicQueueTable(HashMap<String, Long> topicQueueTable) {
this.topicQueueTable = topicQueueTable; this.topicQueueTable = topicQueueTable;
} }
@Override
public void destroy() { public void destroy() {
//TO DO //TO DO
} }
@Override
public boolean appendData(long startOffset, byte[] data) { public boolean appendData(long startOffset, byte[] data) {
//TO DO //TO DO
return false; return false;
} }
@Override
public boolean retryDeleteFirstFile(final long intervalForcibly) { public boolean retryDeleteFirstFile(final long intervalForcibly) {
//TO DO
return false; return false;
} }
public void removeQueueFromTopicQueueTable(final String topic, final int queueId) {
String key = topic + "-" + queueId;
synchronized (this) {
this.topicQueueTable.remove(key);
}
log.info("removeQueueFromTopicQueueTable OK Topic: {} QueueId: {}", topic, queueId);
}
@Override
public void checkSelf() { public void checkSelf() {
dLegerFileList.checkSelf(); dLegerFileList.checkSelf();
} }
...@@ -560,10 +540,10 @@ public class DLegerCommitLog extends CommitLog { ...@@ -560,10 +540,10 @@ public class DLegerCommitLog extends CommitLog {
keyBuilder.append(msgInner.getQueueId()); keyBuilder.append(msgInner.getQueueId());
String key = keyBuilder.toString(); String key = keyBuilder.toString();
Long queueOffset = DLegerCommitLog.this.topicQueueTable.get(key); Long queueOffset = DLedgerCommitLog.this.topicQueueTable.get(key);
if (null == queueOffset) { if (null == queueOffset) {
queueOffset = 0L; queueOffset = 0L;
DLegerCommitLog.this.topicQueueTable.put(key, queueOffset); DLedgerCommitLog.this.topicQueueTable.put(key, queueOffset);
} }
// Transaction messages that require special handling // Transaction messages that require special handling
...@@ -603,7 +583,7 @@ public class DLegerCommitLog extends CommitLog { ...@@ -603,7 +583,7 @@ public class DLegerCommitLog extends CommitLog {
// Exceeds the maximum message // Exceeds the maximum message
if (msgLen > this.maxMessageSize) { if (msgLen > this.maxMessageSize) {
DLegerCommitLog.log.warn("message size exceeded, msg total size: " + msgLen + ", msg body size: " + bodyLength DLedgerCommitLog.log.warn("message size exceeded, msg total size: " + msgLen + ", msg body size: " + bodyLength
+ ", maxMessageSize: " + this.maxMessageSize); + ", maxMessageSize: " + this.maxMessageSize);
return new EncodeResult(AppendMessageStatus.MESSAGE_SIZE_EXCEEDED, null, key); return new EncodeResult(AppendMessageStatus.MESSAGE_SIZE_EXCEEDED, null, key);
} }
...@@ -612,7 +592,7 @@ public class DLegerCommitLog extends CommitLog { ...@@ -612,7 +592,7 @@ public class DLegerCommitLog extends CommitLog {
// 1 TOTALSIZE // 1 TOTALSIZE
this.msgStoreItemMemory.putInt(msgLen); this.msgStoreItemMemory.putInt(msgLen);
// 2 MAGICCODE // 2 MAGICCODE
this.msgStoreItemMemory.putInt(DLegerCommitLog.MESSAGE_MAGIC_CODE); this.msgStoreItemMemory.putInt(DLedgerCommitLog.MESSAGE_MAGIC_CODE);
// 3 BODYCRC // 3 BODYCRC
this.msgStoreItemMemory.putInt(msgInner.getBodyCRC()); this.msgStoreItemMemory.putInt(msgInner.getBodyCRC());
// 4 QUEUEID // 4 QUEUEID
...@@ -642,18 +622,17 @@ public class DLegerCommitLog extends CommitLog { ...@@ -642,18 +622,17 @@ public class DLegerCommitLog extends CommitLog {
this.msgStoreItemMemory.putLong(msgInner.getPreparedTransactionOffset()); this.msgStoreItemMemory.putLong(msgInner.getPreparedTransactionOffset());
// 15 BODY // 15 BODY
this.msgStoreItemMemory.putInt(bodyLength); this.msgStoreItemMemory.putInt(bodyLength);
if (bodyLength > 0) if (bodyLength > 0) {
this.msgStoreItemMemory.put(msgInner.getBody()); this.msgStoreItemMemory.put(msgInner.getBody());
}
// 16 TOPIC // 16 TOPIC
this.msgStoreItemMemory.put((byte) topicLength); this.msgStoreItemMemory.put((byte) topicLength);
this.msgStoreItemMemory.put(topicData); this.msgStoreItemMemory.put(topicData);
// 17 PROPERTIES // 17 PROPERTIES
this.msgStoreItemMemory.putShort((short) propertiesLength); this.msgStoreItemMemory.putShort((short) propertiesLength);
if (propertiesLength > 0) if (propertiesLength > 0) {
this.msgStoreItemMemory.put(propertiesData); this.msgStoreItemMemory.put(propertiesData);
}
final long beginTimeMills = DLegerCommitLog.this.defaultMessageStore.now();
byte[] data = new byte[msgLen]; byte[] data = new byte[msgLen];
this.msgStoreItemMemory.clear(); this.msgStoreItemMemory.clear();
this.msgStoreItemMemory.get(data); this.msgStoreItemMemory.get(data);
......
...@@ -15,15 +15,15 @@ ...@@ -15,15 +15,15 @@
* limitations under the License. * limitations under the License.
*/ */
package org.apache.rocketmq.store.dleger; package org.apache.rocketmq.store.dledger;
import io.openmessaging.storage.dleger.store.file.SelectMmapBufferResult; import io.openmessaging.storage.dleger.store.file.SelectMmapBufferResult;
import org.apache.rocketmq.store.SelectMappedBufferResult; import org.apache.rocketmq.store.SelectMappedBufferResult;
public class DLegerSelectMappedBufferResult extends SelectMappedBufferResult { public class DLedgerSelectMappedBufferResult extends SelectMappedBufferResult {
private SelectMmapBufferResult sbr; private SelectMmapBufferResult sbr;
public DLegerSelectMappedBufferResult(SelectMmapBufferResult sbr) { public DLedgerSelectMappedBufferResult(SelectMmapBufferResult sbr) {
super(sbr.getStartOffset(), sbr.getByteBuffer(), sbr.getSize(), null); super(sbr.getStartOffset(), sbr.getByteBuffer(), sbr.getSize(), null);
this.sbr = sbr; this.sbr = sbr;
} }
......
package org.apache.rocketmq.store.dleger; package org.apache.rocketmq.store.dledger;
import java.io.File; import java.io.File;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
...@@ -22,7 +22,7 @@ import org.apache.rocketmq.store.stats.BrokerStatsManager; ...@@ -22,7 +22,7 @@ import org.apache.rocketmq.store.stats.BrokerStatsManager;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
public class DLegerCommitlogTest extends StoreTestBase { public class DLedgerCommitlogTest extends StoreTestBase {
private DefaultMessageStore createMessageStore(String base, String group, String selfId, String peers, String leaderId) throws Exception { private DefaultMessageStore createMessageStore(String base, String group, String selfId, String peers, String leaderId) throws Exception {
baseDirs.add(base); baseDirs.add(base);
...@@ -43,7 +43,7 @@ public class DLegerCommitlogTest extends StoreTestBase { ...@@ -43,7 +43,7 @@ public class DLegerCommitlogTest extends StoreTestBase {
}, new BrokerConfig()); }, new BrokerConfig());
if (leaderId != null) { if (leaderId != null) {
DLegerServer dLegerServer = ((DLegerCommitLog) defaultMessageStore.getCommitLog()).getdLegerServer(); DLegerServer dLegerServer = ((DLedgerCommitLog) defaultMessageStore.getCommitLog()).getdLegerServer();
dLegerServer.getdLegerConfig().setEnableLeaderElector(false); dLegerServer.getdLegerConfig().setEnableLeaderElector(false);
if (selfId.equals(leaderId)) { if (selfId.equals(leaderId)) {
dLegerServer.getMemberState().changeToLeader(-1); dLegerServer.getMemberState().changeToLeader(-1);
......
package org.apache.rocketmq.test.base.dleger; package org.apache.rocketmq.test.base.dledger;
import java.util.UUID; import java.util.UUID;
import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.broker.BrokerController;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册