diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
index c83d19633a7d6a362ecc808120abea4ee997d19f..3e8822690e2d2bafc056b32dea2ce21782bb3873 100644
--- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
+++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
@@ -66,7 +66,7 @@ public class CommitLog {
public CommitLog(final DefaultMessageStore defaultMessageStore) {
this.mappedFileQueue = new MappedFileQueue(defaultMessageStore.getMessageStoreConfig().getStorePathCommitLog(),
- defaultMessageStore.getMessageStoreConfig().getMapedFileSizeCommitLog(), defaultMessageStore.getAllocateMappedFileService());
+ defaultMessageStore.getMessageStoreConfig().getMapedFileSizeCommitLog(), defaultMessageStore.getAllocateMappedFileService());
this.defaultMessageStore = defaultMessageStore;
if (FlushDiskType.SYNC_FLUSH == defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
@@ -129,10 +129,10 @@ public class CommitLog {
}
public int deleteExpiredFile(
- final long expiredTime,
- final int deleteFilesInterval,
- final long intervalForcibly,
- final boolean cleanImmediately
+ final long expiredTime,
+ final int deleteFilesInterval,
+ final long intervalForcibly,
+ final boolean cleanImmediately
) {
return this.mappedFileQueue.deleteExpiredFileByTime(expiredTime, deleteFilesInterval, intervalForcibly, cleanImmediately);
}
@@ -234,7 +234,7 @@ public class CommitLog {
* @return 0 Come the end of the file // >0 Normal messages // -1 Message checksum failure
*/
public DispatchRequest checkMessageAndReturnSize(java.nio.ByteBuffer byteBuffer, final boolean checkCRC,
- final boolean readBody) {
+ final boolean readBody) {
try {
// 1 TOTAL SIZE
int totalSize = byteBuffer.getInt();
@@ -330,7 +330,7 @@ public class CommitLog {
if (delayLevel > 0) {
tagsCode = this.defaultMessageStore.getScheduleMessageService().computeDeliverTimestamp(delayLevel,
- storeTimestamp);
+ storeTimestamp);
}
}
}
@@ -344,24 +344,24 @@ public class CommitLog {
doNothingForDeadCode(byteBuffer1);
doNothingForDeadCode(byteBuffer2);
log.error(
- "[BUG]read total count not equals msg total size. totalSize={}, readTotalCount={}, bodyLen={}, topicLen={}, propertiesLength={}",
- totalSize, readLength, bodyLen, topicLen, propertiesLength);
+ "[BUG]read total count not equals msg total size. totalSize={}, readTotalCount={}, bodyLen={}, topicLen={}, propertiesLength={}",
+ totalSize, readLength, bodyLen, topicLen, propertiesLength);
return new DispatchRequest(totalSize, false/* success */);
}
return new DispatchRequest(
- topic,
- queueId,
- physicOffset,
- totalSize,
- tagsCode,
- storeTimestamp,
- queueOffset,
- keys,
- uniqKey,
- sysFlag,
- preparedTransactionOffset,
- propertiesMap
+ topic,
+ queueId,
+ physicOffset,
+ totalSize,
+ tagsCode,
+ storeTimestamp,
+ queueOffset,
+ keys,
+ uniqKey,
+ sysFlag,
+ preparedTransactionOffset,
+ propertiesMap
);
} catch (Exception e) {
}
@@ -371,23 +371,23 @@ public class CommitLog {
private static int calMsgLength(int bodyLength, int topicLength, int propertiesLength) {
final int msgLen = 4 //TOTALSIZE
- + 4 //MAGICCODE
- + 4 //BODYCRC
- + 4 //QUEUEID
- + 4 //FLAG
- + 8 //QUEUEOFFSET
- + 8 //PHYSICALOFFSET
- + 4 //SYSFLAG
- + 8 //BORNTIMESTAMP
- + 8 //BORNHOST
- + 8 //STORETIMESTAMP
- + 8 //STOREHOSTADDRESS
- + 4 //RECONSUMETIMES
- + 8 //Prepared Transaction Offset
- + 4 + (bodyLength > 0 ? bodyLength : 0) //BODY
- + 1 + topicLength //TOPIC
- + 2 + (propertiesLength > 0 ? propertiesLength : 0) //propertiesLength
- + 0;
+ + 4 //MAGICCODE
+ + 4 //BODYCRC
+ + 4 //QUEUEID
+ + 4 //FLAG
+ + 8 //QUEUEOFFSET
+ + 8 //PHYSICALOFFSET
+ + 4 //SYSFLAG
+ + 8 //BORNTIMESTAMP
+ + 8 //BORNHOST
+ + 8 //STORETIMESTAMP
+ + 8 //STOREHOSTADDRESS
+ + 4 //RECONSUMETIMES
+ + 8 //Prepared Transaction Offset
+ + 4 + (bodyLength > 0 ? bodyLength : 0) //BODY
+ + 1 + topicLength //TOPIC
+ + 2 + (propertiesLength > 0 ? propertiesLength : 0) //propertiesLength
+ + 0;
return msgLen;
}
@@ -497,18 +497,18 @@ public class CommitLog {
}
if (this.defaultMessageStore.getMessageStoreConfig().isMessageIndexEnable()
- && this.defaultMessageStore.getMessageStoreConfig().isMessageIndexSafe()) {
+ && this.defaultMessageStore.getMessageStoreConfig().isMessageIndexSafe()) {
if (storeTimestamp <= this.defaultMessageStore.getStoreCheckpoint().getMinTimestampIndex()) {
log.info("find check timestamp, {} {}",
- storeTimestamp,
- UtilAll.timeMillisToHumanString(storeTimestamp));
+ storeTimestamp,
+ UtilAll.timeMillisToHumanString(storeTimestamp));
return true;
}
} else {
if (storeTimestamp <= this.defaultMessageStore.getStoreCheckpoint().getMinTimestamp()) {
log.info("find check timestamp, {} {}",
- storeTimestamp,
- UtilAll.timeMillisToHumanString(storeTimestamp));
+ storeTimestamp,
+ UtilAll.timeMillisToHumanString(storeTimestamp));
return true;
}
}
@@ -544,7 +544,7 @@ public class CommitLog {
final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE
- || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
+ || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
// Delay Delivery
if (msg.getDelayTimeLevel() > 0) {
if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
@@ -650,7 +650,7 @@ public class CommitLog {
boolean flushOK = request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
if (!flushOK) {
log.error("do groupcommit, wait for flush failed, topic: " + messageExt.getTopic() + " tags: " + messageExt.getTags()
- + " client address: " + messageExt.getBornHostString());
+ + " client address: " + messageExt.getBornHostString());
putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT);
}
} else {
@@ -677,10 +677,10 @@ public class CommitLog {
service.putRequest(request);
service.getWaitNotifyObject().wakeupAll();
boolean flushOK =
- request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
+ request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
if (!flushOK) {
log.error("do sync transfer other node, wait return, but failed, topic: " + messageExt.getTopic() + " tags: "
- + messageExt.getTags() + " client address: " + messageExt.getBornHostNameString());
+ + messageExt.getTags() + " client address: " + messageExt.getBornHostNameString());
putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_SLAVE_TIMEOUT);
}
}
@@ -917,7 +917,7 @@ public class CommitLog {
int commitDataLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogLeastPages();
int commitDataThoroughInterval =
- CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogThoroughInterval();
+ CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogThoroughInterval();
long begin = System.currentTimeMillis();
if (begin >= (this.lastCommitTimestamp + commitDataThoroughInterval)) {
@@ -966,7 +966,7 @@ public class CommitLog {
int flushPhysicQueueLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogLeastPages();
int flushPhysicQueueThoroughInterval =
- CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogThoroughInterval();
+ CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogThoroughInterval();
boolean printFlushProgress = false;
@@ -1187,7 +1187,7 @@ public class CommitLog {
}
public AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer, final int maxBlank,
- final MessageExtBrokerInner msgInner) {
+ final MessageExtBrokerInner msgInner) {
// STORETIMESTAMP + STOREHOSTADDRESS + OFFSET
// PHY OFFSET
@@ -1227,7 +1227,7 @@ public class CommitLog {
* Serialize message
*/
final byte[] propertiesData =
- msgInner.getPropertiesString() == null ? null : msgInner.getPropertiesString().getBytes(MessageDecoder.CHARSET_UTF8);
+ msgInner.getPropertiesString() == null ? null : msgInner.getPropertiesString().getBytes(MessageDecoder.CHARSET_UTF8);
final int propertiesLength = propertiesData == null ? 0 : propertiesData.length;
@@ -1246,7 +1246,7 @@ public class CommitLog {
// Exceeds the maximum message
if (msgLen > this.maxMessageSize) {
CommitLog.log.warn("message size exceeded, msg total size: " + msgLen + ", msg body size: " + bodyLength
- + ", maxMessageSize: " + this.maxMessageSize);
+ + ", maxMessageSize: " + this.maxMessageSize);
return new AppendMessageResult(AppendMessageStatus.MESSAGE_SIZE_EXCEEDED);
}
@@ -1262,7 +1262,7 @@ public class CommitLog {
final long beginTimeMills = CommitLog.this.defaultMessageStore.now();
byteBuffer.put(this.msgStoreItemMemory.array(), 0, maxBlank);
return new AppendMessageResult(AppendMessageStatus.END_OF_FILE, wroteOffset, maxBlank, msgId, msgInner.getStoreTimestamp(),
- queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);
+ queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);
}
// Initialization of storage space
@@ -1315,7 +1315,7 @@ public class CommitLog {
byteBuffer.put(this.msgStoreItemMemory.array(), 0, msgLen);
AppendMessageResult result = new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, msgLen, msgId,
- msgInner.getStoreTimestamp(), queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);
+ msgInner.getStoreTimestamp(), queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);
switch (tranType) {
case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
@@ -1333,7 +1333,7 @@ public class CommitLog {
}
public AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer, final int maxBlank,
- final MessageExtBatch messageExtBatch) {
+ final MessageExtBatch messageExtBatch) {
byteBuffer.mark();
//physical offset
long wroteOffset = fileFromOffset + byteBuffer.position();
@@ -1365,7 +1365,7 @@ public class CommitLog {
// Exceeds the maximum message
if (msgLen > this.maxMessageSize) {
CommitLog.log.warn("message size exceeded, msg total size: " + msgLen + ", msg body size: " + bodyLen
- + ", maxMessageSize: " + this.maxMessageSize);
+ + ", maxMessageSize: " + this.maxMessageSize);
return new AppendMessageResult(AppendMessageStatus.MESSAGE_SIZE_EXCEEDED);
}
totalMsgLen += msgLen;
@@ -1383,7 +1383,7 @@ public class CommitLog {
byteBuffer.reset(); //ignore the previous appended messages
byteBuffer.put(this.msgStoreItemMemory.array(), 0, 8);
return new AppendMessageResult(AppendMessageStatus.END_OF_FILE, wroteOffset, maxBlank, msgIdBuilder.toString(), messageExtBatch.getStoreTimestamp(),
- beginQueueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);
+ beginQueueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);
}
//move to add queue offset and commitlog offset
messagesByteBuff.position(msgPos + 20);
@@ -1407,7 +1407,7 @@ public class CommitLog {
byteBuffer.put(messagesByteBuff);
messageExtBatch.setEncodedBuff(null);
AppendMessageResult result = new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, totalMsgLen, msgIdBuilder.toString(),
- messageExtBatch.getStoreTimestamp(), beginQueueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);
+ messageExtBatch.getStoreTimestamp(), beginQueueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);
result.setMsgNum(msgNum);
CommitLog.this.topicQueueTable.put(key, queueOffset);
@@ -1466,7 +1466,7 @@ public class CommitLog {
// Exceeds the maximum message
if (msgLen > this.maxMessageSize) {
CommitLog.log.warn("message size exceeded, msg total size: " + msgLen + ", msg body size: " + bodyLen
- + ", maxMessageSize: " + this.maxMessageSize);
+ + ", maxMessageSize: " + this.maxMessageSize);
throw new RuntimeException("message size exceeded");
}
diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
index 200f8ad51114ae4aaa740737eab79e84b6036837..e8ef4b664b279daf736d9d69c67997a247e1da3f 100644
--- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
@@ -94,7 +94,7 @@ public class DefaultMessageStore implements MessageStore {
private final SystemClock systemClock = new SystemClock();
private final ScheduledExecutorService scheduledExecutorService =
- Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("StoreScheduledThread"));
+ Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("StoreScheduledThread"));
private final BrokerStatsManager brokerStatsManager;
private final MessageArrivingListener messageArrivingListener;
private final BrokerConfig brokerConfig;
@@ -114,7 +114,7 @@ public class DefaultMessageStore implements MessageStore {
boolean shutDownNormal = false;
public DefaultMessageStore(final MessageStoreConfig messageStoreConfig, final BrokerStatsManager brokerStatsManager,
- final MessageArrivingListener messageArrivingListener, final BrokerConfig brokerConfig) throws IOException {
+ final MessageArrivingListener messageArrivingListener, final BrokerConfig brokerConfig) throws IOException {
this.messageArrivingListener = messageArrivingListener;
this.brokerConfig = brokerConfig;
this.messageStoreConfig = messageStoreConfig;
@@ -185,7 +185,7 @@ public class DefaultMessageStore implements MessageStore {
if (result) {
this.storeCheckpoint =
- new StoreCheckpoint(StorePathConfigHelper.getStoreCheckpoint(this.messageStoreConfig.getStorePathRootDir()));
+ new StoreCheckpoint(StorePathConfigHelper.getStoreCheckpoint(this.messageStoreConfig.getStorePathRootDir()));
this.indexService.load(lastExitOK);
@@ -421,7 +421,7 @@ public class DefaultMessageStore implements MessageStore {
long diff = this.systemClock.now() - begin;
return diff < 10000000
- && diff > this.messageStoreConfig.getOsPageCacheBusyTimeOutMills();
+ && diff > this.messageStoreConfig.getOsPageCacheBusyTimeOutMills();
}
@Override
@@ -438,8 +438,8 @@ public class DefaultMessageStore implements MessageStore {
}
public GetMessageResult getMessage(final String group, final String topic, final int queueId, final long offset,
- final int maxMsgNums,
- final MessageFilter messageFilter) {
+ final int maxMsgNums,
+ final MessageFilter messageFilter) {
if (this.shutdown) {
log.warn("message store has shutdown, so getMessage is forbidden");
return null;
@@ -510,7 +510,7 @@ public class DefaultMessageStore implements MessageStore {
boolean isInDisk = checkInDiskByCommitOffset(offsetPy, maxOffsetPy);
if (this.isTheBatchFull(sizePy, maxMsgNums, getResult.getBufferTotalSize(), getResult.getMessageCount(),
- isInDisk)) {
+ isInDisk)) {
break;
}
@@ -522,13 +522,13 @@ public class DefaultMessageStore implements MessageStore {
} else {
// can't find ext content.Client will filter messages by tag also.
log.error("[BUG] can't find consume queue extend file content!addr={}, offsetPy={}, sizePy={}, topic={}, group={}",
- tagsCode, offsetPy, sizePy, topic, group);
+ tagsCode, offsetPy, sizePy, topic, group);
isTagsCodeLegal = false;
}
}
if (messageFilter != null
- && !messageFilter.isMatchedByConsumeQueue(isTagsCodeLegal ? tagsCode : null, extRet ? cqExtUnit : null)) {
+ && !messageFilter.isMatchedByConsumeQueue(isTagsCodeLegal ? tagsCode : null, extRet ? cqExtUnit : null)) {
if (getResult.getBufferTotalSize() == 0) {
status = GetMessageStatus.NO_MATCHED_MESSAGE;
}
@@ -547,7 +547,7 @@ public class DefaultMessageStore implements MessageStore {
}
if (messageFilter != null
- && !messageFilter.isMatchedByCommitLog(selectResult.getByteBuffer().slice(), null)) {
+ && !messageFilter.isMatchedByCommitLog(selectResult.getByteBuffer().slice(), null)) {
if (getResult.getBufferTotalSize() == 0) {
status = GetMessageStatus.NO_MATCHED_MESSAGE;
}
@@ -571,7 +571,7 @@ public class DefaultMessageStore implements MessageStore {
long diff = maxOffsetPy - maxPhyOffsetPulling;
long memory = (long) (StoreUtil.TOTAL_PHYSICAL_MEMORY_SIZE
- * (this.messageStoreConfig.getAccessMessageInMemoryMaxRatio() / 100.0));
+ * (this.messageStoreConfig.getAccessMessageInMemoryMaxRatio() / 100.0));
getResult.setSuggestPullingFromSlave(diff > memory);
} finally {
@@ -581,7 +581,7 @@ public class DefaultMessageStore implements MessageStore {
status = GetMessageStatus.OFFSET_FOUND_NULL;
nextBeginOffset = nextOffsetCorrection(offset, consumeQueue.rollNextFile(offset));
log.warn("consumer request topic: " + topic + "offset: " + offset + " minOffset: " + minOffset + " maxOffset: "
- + maxOffset + ", but access logic queue failed.");
+ + maxOffset + ", but access logic queue failed.");
}
}
} else {
@@ -911,8 +911,8 @@ public class DefaultMessageStore implements MessageStore {
for (ConsumeQueue cq : queueTable.values()) {
cq.destroy();
log.info("cleanUnusedTopic: {} {} ConsumeQueue cleaned",
- cq.getTopic(),
- cq.getQueueId()
+ cq.getTopic(),
+ cq.getQueueId()
);
this.commitLog.removeQueueFromTopicQueueTable(cq.getTopic(), cq.getQueueId());
@@ -942,20 +942,20 @@ public class DefaultMessageStore implements MessageStore {
if (maxCLOffsetInConsumeQueue == -1) {
log.warn("maybe ConsumeQueue was created just now. topic={} queueId={} maxPhysicOffset={} minLogicOffset={}.",
- nextQT.getValue().getTopic(),
- nextQT.getValue().getQueueId(),
- nextQT.getValue().getMaxPhysicOffset(),
- nextQT.getValue().getMinLogicOffset());
+ nextQT.getValue().getTopic(),
+ nextQT.getValue().getQueueId(),
+ nextQT.getValue().getMaxPhysicOffset(),
+ nextQT.getValue().getMinLogicOffset());
} else if (maxCLOffsetInConsumeQueue < minCommitLogOffset) {
log.info(
- "cleanExpiredConsumerQueue: {} {} consumer queue destroyed, minCommitLogOffset: {} maxCLOffsetInConsumeQueue: {}",
- topic,
- nextQT.getKey(),
- minCommitLogOffset,
- maxCLOffsetInConsumeQueue);
+ "cleanExpiredConsumerQueue: {} {} consumer queue destroyed, minCommitLogOffset: {} maxCLOffsetInConsumeQueue: {}",
+ topic,
+ nextQT.getKey(),
+ minCommitLogOffset,
+ maxCLOffsetInConsumeQueue);
DefaultMessageStore.this.commitLog.removeQueueFromTopicQueueTable(nextQT.getValue().getTopic(),
- nextQT.getValue().getQueueId());
+ nextQT.getValue().getQueueId());
nextQT.getValue().destroy();
itQT.remove();
@@ -971,7 +971,7 @@ public class DefaultMessageStore implements MessageStore {
}
public Map getMessageIds(final String topic, final int queueId, long minOffset, long maxOffset,
- SocketAddress storeHost) {
+ SocketAddress storeHost) {
Map messageIds = new HashMap();
if (this.shutdown) {
return messageIds;
@@ -996,7 +996,7 @@ public class DefaultMessageStore implements MessageStore {
long offsetPy = bufferConsumeQueue.getByteBuffer().getLong();
final ByteBuffer msgIdMemory = ByteBuffer.allocate(MessageDecoder.MSG_ID_LENGTH);
String msgId =
- MessageDecoder.createMessageId(msgIdMemory, MessageExt.socketAddress2ByteBuffer(storeHost), offsetPy);
+ MessageDecoder.createMessageId(msgIdMemory, MessageExt.socketAddress2ByteBuffer(storeHost), offsetPy);
messageIds.put(msgId, nextOffset++);
if (nextOffset > maxOffset) {
return messageIds;
@@ -1092,11 +1092,11 @@ public class DefaultMessageStore implements MessageStore {
ConsumeQueue logic = map.get(queueId);
if (null == logic) {
ConsumeQueue newLogic = new ConsumeQueue(
- topic,
- queueId,
- StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()),
- this.getMessageStoreConfig().getMapedFileSizeConsumeQueue(),
- this);
+ topic,
+ queueId,
+ StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()),
+ this.getMessageStoreConfig().getMapedFileSizeConsumeQueue(),
+ this);
ConsumeQueue oldLogic = map.putIfAbsent(queueId, newLogic);
if (oldLogic != null) {
logic = oldLogic;
@@ -1196,7 +1196,7 @@ public class DefaultMessageStore implements MessageStore {
String stack = UtilAll.jstack();
final String fileName = System.getProperty("user.home") + File.separator + "debug/lock/stack-"
- + DefaultMessageStore.this.commitLog.getBeginTimeInLock() + "-" + lockTime;
+ + DefaultMessageStore.this.commitLog.getBeginTimeInLock() + "-" + lockTime;
MixAll.string2FileNotSafe(stack, fileName);
}
}
@@ -1257,11 +1257,11 @@ public class DefaultMessageStore implements MessageStore {
continue;
}
ConsumeQueue logic = new ConsumeQueue(
- topic,
- queueId,
- StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()),
- this.getMessageStoreConfig().getMapedFileSizeConsumeQueue(),
- this);
+ topic,
+ queueId,
+ StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()),
+ this.getMessageStoreConfig().getMapedFileSizeConsumeQueue(),
+ this);
this.putConsumeQueue(topic, queueId, logic);
if (!logic.load()) {
return false;
@@ -1445,10 +1445,10 @@ public class DefaultMessageStore implements MessageStore {
private final static int MAX_MANUAL_DELETE_FILE_TIMES = 20;
private final double diskSpaceWarningLevelRatio =
- Double.parseDouble(System.getProperty("rocketmq.broker.diskSpaceWarningLevelRatio", "0.90"));
+ Double.parseDouble(System.getProperty("rocketmq.broker.diskSpaceWarningLevelRatio", "0.90"));
private final double diskSpaceCleanForciblyRatio =
- Double.parseDouble(System.getProperty("rocketmq.broker.diskSpaceCleanForciblyRatio", "0.85"));
+ Double.parseDouble(System.getProperty("rocketmq.broker.diskSpaceCleanForciblyRatio", "0.85"));
private long lastRedeleteTimestamp = 0;
private volatile int manualDeleteFileSeveralTimes = 0;
@@ -1488,16 +1488,16 @@ public class DefaultMessageStore implements MessageStore {
boolean cleanAtOnce = DefaultMessageStore.this.getMessageStoreConfig().isCleanFileForciblyEnable() && this.cleanImmediately;
log.info("begin to delete before {} hours file. timeup: {} spacefull: {} manualDeleteFileSeveralTimes: {} cleanAtOnce: {}",
- fileReservedTime,
- timeup,
- spacefull,
- manualDeleteFileSeveralTimes,
- cleanAtOnce);
+ fileReservedTime,
+ timeup,
+ spacefull,
+ manualDeleteFileSeveralTimes,
+ cleanAtOnce);
fileReservedTime *= 60 * 60 * 1000;
deleteCount = DefaultMessageStore.this.commitLog.deleteExpiredFile(fileReservedTime, deletePhysicFilesInterval,
- destroyMapedFileIntervalForcibly, cleanAtOnce);
+ destroyMapedFileIntervalForcibly, cleanAtOnce);
if (deleteCount > 0) {
} else if (spacefull) {
log.warn("disk space will be full soon, but delete file failed.");
@@ -1511,7 +1511,7 @@ public class DefaultMessageStore implements MessageStore {
if ((currentTimestamp - this.lastRedeleteTimestamp) > interval) {
this.lastRedeleteTimestamp = currentTimestamp;
int destroyMapedFileIntervalForcibly =
- DefaultMessageStore.this.getMessageStoreConfig().getDestroyMapedFileIntervalForcibly();
+ DefaultMessageStore.this.getMessageStoreConfig().getDestroyMapedFileIntervalForcibly();
if (DefaultMessageStore.this.commitLog.retryDeleteFirstFile(destroyMapedFileIntervalForcibly)) {
}
}
@@ -1563,7 +1563,7 @@ public class DefaultMessageStore implements MessageStore {
{
String storePathLogics = StorePathConfigHelper
- .getStorePathConsumeQueue(DefaultMessageStore.this.getMessageStoreConfig().getStorePathRootDir());
+ .getStorePathConsumeQueue(DefaultMessageStore.this.getMessageStoreConfig().getStorePathRootDir());
double logicsRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathLogics);
if (logicsRatio > diskSpaceWarningLevelRatio) {
boolean diskok = DefaultMessageStore.this.runningFlags.getAndMakeDiskFull();
@@ -1733,7 +1733,7 @@ public class DefaultMessageStore implements MessageStore {
if (this.isCommitLogAvailable()) {
log.warn("shutdown ReputMessageService, but commitlog have not finish to be dispatched, CL: {} reputFromOffset: {}",
- DefaultMessageStore.this.commitLog.getMaxOffset(), this.reputFromOffset);
+ DefaultMessageStore.this.commitLog.getMaxOffset(), this.reputFromOffset);
}
super.shutdown();
@@ -1751,7 +1751,7 @@ public class DefaultMessageStore implements MessageStore {
for (boolean doNext = true; this.isCommitLogAvailable() && doNext; ) {
if (DefaultMessageStore.this.getMessageStoreConfig().isDuplicationEnable()
- && this.reputFromOffset >= DefaultMessageStore.this.getConfirmOffset()) {
+ && this.reputFromOffset >= DefaultMessageStore.this.getConfirmOffset()) {
break;
}
@@ -1762,7 +1762,7 @@ public class DefaultMessageStore implements MessageStore {
for (int readSize = 0; readSize < result.getSize() && doNext; ) {
DispatchRequest dispatchRequest =
- DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(result.getByteBuffer(), false, false);
+ DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(result.getByteBuffer(), false, false);
int size = dispatchRequest.getMsgSize();
if (dispatchRequest.isSuccess()) {
@@ -1770,21 +1770,21 @@ public class DefaultMessageStore implements MessageStore {
DefaultMessageStore.this.doDispatch(dispatchRequest);
if (BrokerRole.SLAVE != DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole()
- && DefaultMessageStore.this.brokerConfig.isLongPollingEnable()) {
+ && DefaultMessageStore.this.brokerConfig.isLongPollingEnable()) {
DefaultMessageStore.this.messageArrivingListener.arriving(dispatchRequest.getTopic(),
- dispatchRequest.getQueueId(), dispatchRequest.getConsumeQueueOffset() + 1,
- dispatchRequest.getTagsCode(), dispatchRequest.getStoreTimestamp(),
- dispatchRequest.getBitMap(), dispatchRequest.getPropertiesMap());
+ dispatchRequest.getQueueId(), dispatchRequest.getConsumeQueueOffset() + 1,
+ dispatchRequest.getTagsCode(), dispatchRequest.getStoreTimestamp(),
+ dispatchRequest.getBitMap(), dispatchRequest.getPropertiesMap());
}
this.reputFromOffset += size;
readSize += size;
if (DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole() == BrokerRole.SLAVE) {
DefaultMessageStore.this.storeStatsService
- .getSinglePutMessageTopicTimesTotal(dispatchRequest.getTopic()).incrementAndGet();
+ .getSinglePutMessageTopicTimesTotal(dispatchRequest.getTopic()).incrementAndGet();
DefaultMessageStore.this.storeStatsService
- .getSinglePutMessageTopicSizeTotal(dispatchRequest.getTopic())
- .addAndGet(dispatchRequest.getMsgSize());
+ .getSinglePutMessageTopicSizeTotal(dispatchRequest.getTopic())
+ .addAndGet(dispatchRequest.getMsgSize());
}
} else if (size == 0) {
this.reputFromOffset = DefaultMessageStore.this.commitLog.rollNextFile(this.reputFromOffset);
@@ -1799,7 +1799,7 @@ public class DefaultMessageStore implements MessageStore {
doNext = false;
if (DefaultMessageStore.this.brokerConfig.getBrokerId() == MixAll.MASTER_ID) {
log.error("[BUG]the master dispatch message to consume queue error, COMMITLOG OFFSET: {}",
- this.reputFromOffset);
+ this.reputFromOffset);
this.reputFromOffset += result.getSize() - readSize;
}
diff --git a/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java b/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java
index 57b6999c43795781ae20761e221ea897d32cb6a0..fbaee0bc9e29cf4b2c8aa93749c206e539cfa4ba 100644
--- a/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java
+++ b/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java
@@ -292,7 +292,7 @@ public class DefaultMessageStoreTest {
private class MyMessageArrivingListener implements MessageArrivingListener {
@Override
public void arriving(String topic, int queueId, long logicOffset, long tagsCode, long msgStoreTime,
- byte[] filterBitMap, Map properties) {
+ byte[] filterBitMap, Map properties) {
}
}
}