/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.rocketmq.store; import java.io.File; import java.io.IOException; import java.io.RandomAccessFile; import java.net.Inet6Address; import java.net.InetSocketAddress; import java.net.SocketAddress; import java.nio.ByteBuffer; import java.nio.channels.FileLock; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.LinkedList; import java.util.Map; import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import org.apache.rocketmq.common.BrokerConfig; import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.ServiceThread; import org.apache.rocketmq.common.SystemClock; import org.apache.rocketmq.common.ThreadFactoryImpl; import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.message.MessageDecoder; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageExtBatch; import org.apache.rocketmq.common.running.RunningStats; import org.apache.rocketmq.common.sysflag.MessageSysFlag; import org.apache.rocketmq.common.topic.TopicValidator; import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.store.config.BrokerRole; import org.apache.rocketmq.store.config.MessageStoreConfig; import org.apache.rocketmq.store.config.StorePathConfigHelper; import org.apache.rocketmq.store.dledger.DLedgerCommitLog; import org.apache.rocketmq.store.ha.HAService; import org.apache.rocketmq.store.index.IndexService; import org.apache.rocketmq.store.index.QueryOffsetResult; import org.apache.rocketmq.store.schedule.ScheduleMessageService; import org.apache.rocketmq.store.stats.BrokerStatsManager; public class DefaultMessageStore implements MessageStore { private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME); private final MessageStoreConfig messageStoreConfig; // CommitLog private final CommitLog commitLog; private final ConcurrentMap> consumeQueueTable; private final FlushConsumeQueueService flushConsumeQueueService; private final CleanCommitLogService cleanCommitLogService; private final CleanConsumeQueueService cleanConsumeQueueService; private final IndexService indexService; private final AllocateMappedFileService allocateMappedFileService; private final ReputMessageService reputMessageService; private final HAService haService; private final ScheduleMessageService scheduleMessageService; private final StoreStatsService storeStatsService; private final TransientStorePool transientStorePool; private final RunningFlags runningFlags = new RunningFlags(); private final SystemClock systemClock = new SystemClock(); private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("StoreScheduledThread")); private final BrokerStatsManager brokerStatsManager; private final MessageArrivingListener messageArrivingListener; private final BrokerConfig brokerConfig; private volatile boolean shutdown = true; private StoreCheckpoint storeCheckpoint; private AtomicLong printTimes = new AtomicLong(0); private final LinkedList dispatcherList; private RandomAccessFile lockFile; private FileLock lock; boolean shutDownNormal = false; private final ScheduledExecutorService diskCheckScheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("DiskCheckScheduledThread")); public DefaultMessageStore(final MessageStoreConfig messageStoreConfig, final BrokerStatsManager brokerStatsManager, final MessageArrivingListener messageArrivingListener, final BrokerConfig brokerConfig) throws IOException { this.messageArrivingListener = messageArrivingListener; this.brokerConfig = brokerConfig; this.messageStoreConfig = messageStoreConfig; this.brokerStatsManager = brokerStatsManager; this.allocateMappedFileService = new AllocateMappedFileService(this); if (messageStoreConfig.isEnableDLegerCommitLog()) { this.commitLog = new DLedgerCommitLog(this); } else { this.commitLog = new CommitLog(this); } this.consumeQueueTable = new ConcurrentHashMap<>(32); this.flushConsumeQueueService = new FlushConsumeQueueService(); this.cleanCommitLogService = new CleanCommitLogService(); this.cleanConsumeQueueService = new CleanConsumeQueueService(); this.storeStatsService = new StoreStatsService(); this.indexService = new IndexService(this); if (!messageStoreConfig.isEnableDLegerCommitLog()) { this.haService = new HAService(this); } else { this.haService = null; } this.reputMessageService = new ReputMessageService(); this.scheduleMessageService = new ScheduleMessageService(this); this.transientStorePool = new TransientStorePool(messageStoreConfig); if (messageStoreConfig.isTransientStorePoolEnable()) { this.transientStorePool.init(); } this.allocateMappedFileService.start(); this.indexService.start(); this.dispatcherList = new LinkedList<>(); this.dispatcherList.addLast(new CommitLogDispatcherBuildConsumeQueue()); this.dispatcherList.addLast(new CommitLogDispatcherBuildIndex()); File file = new File(StorePathConfigHelper.getLockFile(messageStoreConfig.getStorePathRootDir())); MappedFile.ensureDirOK(file.getParent()); lockFile = new RandomAccessFile(file, "rw"); } public void truncateDirtyLogicFiles(long phyOffset) { ConcurrentMap> tables = DefaultMessageStore.this.consumeQueueTable; for (ConcurrentMap maps : tables.values()) { for (ConsumeQueue logic : maps.values()) { logic.truncateDirtyLogicFiles(phyOffset); } } } /** * @throws IOException */ public boolean load() { boolean result = true; try { boolean lastExitOK = !this.isTempFileExist(); log.info("last shutdown {}", lastExitOK ? "normally" : "abnormally"); if (null != scheduleMessageService) { result = result && this.scheduleMessageService.load(); } // load Commit Log result = result && this.commitLog.load(); // load Consume Queue result = result && this.loadConsumeQueue(); if (result) { this.storeCheckpoint = new StoreCheckpoint(StorePathConfigHelper.getStoreCheckpoint(this.messageStoreConfig.getStorePathRootDir())); this.indexService.load(lastExitOK); this.recover(lastExitOK); log.info("load over, and the max phy offset = {}", this.getMaxPhyOffset()); } } catch (Exception e) { log.error("load exception", e); result = false; } if (!result) { this.allocateMappedFileService.shutdown(); } return result; } /** * @throws Exception */ public void start() throws Exception { lock = lockFile.getChannel().tryLock(0, 1, false); if (lock == null || lock.isShared() || !lock.isValid()) { throw new RuntimeException("Lock failed,MQ already started"); } lockFile.getChannel().write(ByteBuffer.wrap("lock".getBytes())); lockFile.getChannel().force(true); { /** * 1. Make sure the fast-forward messages to be truncated during the recovering according to the max physical offset of the commitlog; * 2. DLedger committedPos may be missing, so the maxPhysicalPosInLogicQueue maybe bigger that maxOffset returned by DLedgerCommitLog, just let it go; * 3. Calculate the reput offset according to the consume queue; * 4. Make sure the fall-behind messages to be dispatched before starting the commitlog, especially when the broker role are automatically changed. */ long maxPhysicalPosInLogicQueue = commitLog.getMinOffset(); for (ConcurrentMap maps : this.consumeQueueTable.values()) { for (ConsumeQueue logic : maps.values()) { if (logic.getMaxPhysicOffset() > maxPhysicalPosInLogicQueue) { maxPhysicalPosInLogicQueue = logic.getMaxPhysicOffset(); } } } if (maxPhysicalPosInLogicQueue < 0) { maxPhysicalPosInLogicQueue = 0; } if (maxPhysicalPosInLogicQueue < this.commitLog.getMinOffset()) { maxPhysicalPosInLogicQueue = this.commitLog.getMinOffset(); /** * This happens in following conditions: * 1. If someone removes all the consumequeue files or the disk get damaged. * 2. Launch a new broker, and copy the commitlog from other brokers. * * All the conditions has the same in common that the maxPhysicalPosInLogicQueue should be 0. * If the maxPhysicalPosInLogicQueue is gt 0, there maybe something wrong. */ log.warn("[TooSmallCqOffset] maxPhysicalPosInLogicQueue={} clMinOffset={}", maxPhysicalPosInLogicQueue, this.commitLog.getMinOffset()); } log.info("[SetReputOffset] maxPhysicalPosInLogicQueue={} clMinOffset={} clMaxOffset={} clConfirmedOffset={}", maxPhysicalPosInLogicQueue, this.commitLog.getMinOffset(), this.commitLog.getMaxOffset(), this.commitLog.getConfirmOffset()); this.reputMessageService.setReputFromOffset(maxPhysicalPosInLogicQueue); this.reputMessageService.start(); /** * 1. Finish dispatching the messages fall behind, then to start other services. * 2. DLedger committedPos may be missing, so here just require dispatchBehindBytes <= 0 */ while (true) { if (dispatchBehindBytes() <= 0) { break; } Thread.sleep(1000); log.info("Try to finish doing reput the messages fall behind during the starting, reputOffset={} maxOffset={} behind={}", this.reputMessageService.getReputFromOffset(), this.getMaxPhyOffset(), this.dispatchBehindBytes()); } this.recoverTopicQueueTable(); } if (!messageStoreConfig.isEnableDLegerCommitLog()) { this.haService.start(); this.handleScheduleMessageService(messageStoreConfig.getBrokerRole()); } this.flushConsumeQueueService.start(); this.commitLog.start(); this.storeStatsService.start(); this.createTempFile(); this.addScheduleTask(); this.shutdown = false; } public void shutdown() { if (!this.shutdown) { this.shutdown = true; this.scheduledExecutorService.shutdown(); this.diskCheckScheduledExecutorService.shutdown(); try { Thread.sleep(1000); } catch (InterruptedException e) { log.error("shutdown Exception, ", e); } if (this.scheduleMessageService != null) { this.scheduleMessageService.shutdown(); } if (this.haService != null) { this.haService.shutdown(); } this.storeStatsService.shutdown(); this.indexService.shutdown(); this.commitLog.shutdown(); this.reputMessageService.shutdown(); this.flushConsumeQueueService.shutdown(); this.allocateMappedFileService.shutdown(); this.storeCheckpoint.flush(); this.storeCheckpoint.shutdown(); if (this.runningFlags.isWriteable() && dispatchBehindBytes() == 0) { this.deleteFile(StorePathConfigHelper.getAbortFile(this.messageStoreConfig.getStorePathRootDir())); shutDownNormal = true; } else { log.warn("the store may be wrong, so shutdown abnormally, and keep abort file."); } } this.transientStorePool.destroy(); if (lockFile != null && lock != null) { try { lock.release(); lockFile.close(); } catch (IOException e) { } } } public void destroy() { this.destroyLogics(); this.commitLog.destroy(); this.indexService.destroy(); this.deleteFile(StorePathConfigHelper.getAbortFile(this.messageStoreConfig.getStorePathRootDir())); this.deleteFile(StorePathConfigHelper.getStoreCheckpoint(this.messageStoreConfig.getStorePathRootDir())); } public void destroyLogics() { for (ConcurrentMap maps : this.consumeQueueTable.values()) { for (ConsumeQueue logic : maps.values()) { logic.destroy(); } } } private PutMessageStatus checkMessage(MessageExtBrokerInner msg) { if (msg.getTopic().length() > Byte.MAX_VALUE) { log.warn("putMessage message topic length too long " + msg.getTopic().length()); return PutMessageStatus.MESSAGE_ILLEGAL; } if (msg.getPropertiesString() != null && msg.getPropertiesString().length() > Short.MAX_VALUE) { log.warn("putMessage message properties length too long " + msg.getPropertiesString().length()); return PutMessageStatus.MESSAGE_ILLEGAL; } return PutMessageStatus.PUT_OK; } private PutMessageStatus checkMessages(MessageExtBatch messageExtBatch) { if (messageExtBatch.getTopic().length() > Byte.MAX_VALUE) { log.warn("putMessage message topic length too long " + messageExtBatch.getTopic().length()); return PutMessageStatus.MESSAGE_ILLEGAL; } if (messageExtBatch.getBody().length > messageStoreConfig.getMaxMessageSize()) { log.warn("PutMessages body length too long " + messageExtBatch.getBody().length); return PutMessageStatus.MESSAGE_ILLEGAL; } return PutMessageStatus.PUT_OK; } private PutMessageStatus checkStoreStatus() { if (this.shutdown) { log.warn("message store has shutdown, so putMessage is forbidden"); return PutMessageStatus.SERVICE_NOT_AVAILABLE; } if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) { long value = this.printTimes.getAndIncrement(); if ((value % 50000) == 0) { log.warn("broke role is slave, so putMessage is forbidden"); } return PutMessageStatus.SERVICE_NOT_AVAILABLE; } if (!this.runningFlags.isWriteable()) { long value = this.printTimes.getAndIncrement(); if ((value % 50000) == 0) { log.warn("the message store is not writable. It may be caused by one of the following reasons: " + "the broker's disk is full, write to logic queue error, write to index file error, etc"); } return PutMessageStatus.SERVICE_NOT_AVAILABLE; } else { this.printTimes.set(0); } if (this.isOSPageCacheBusy()) { return PutMessageStatus.OS_PAGECACHE_BUSY; } return PutMessageStatus.PUT_OK; } @Override public CompletableFuture asyncPutMessage(MessageExtBrokerInner msg) { PutMessageStatus checkStoreStatus = this.checkStoreStatus(); if (checkStoreStatus != PutMessageStatus.PUT_OK) { return CompletableFuture.completedFuture(new PutMessageResult(checkStoreStatus, null)); } PutMessageStatus msgCheckStatus = this.checkMessage(msg); if (msgCheckStatus == PutMessageStatus.MESSAGE_ILLEGAL) { return CompletableFuture.completedFuture(new PutMessageResult(msgCheckStatus, null)); } long beginTime = this.getSystemClock().now(); CompletableFuture putResultFuture = this.commitLog.asyncPutMessage(msg); putResultFuture.thenAccept((result) -> { long elapsedTime = this.getSystemClock().now() - beginTime; if (elapsedTime > 500) { log.warn("putMessage not in lock elapsed time(ms)={}, bodyLength={}", elapsedTime, msg.getBody().length); } this.storeStatsService.setPutMessageEntireTimeMax(elapsedTime); if (null == result || !result.isOk()) { this.storeStatsService.getPutMessageFailedTimes().add(1); } }); return putResultFuture; } public CompletableFuture asyncPutMessages(MessageExtBatch messageExtBatch) { PutMessageStatus checkStoreStatus = this.checkStoreStatus(); if (checkStoreStatus != PutMessageStatus.PUT_OK) { return CompletableFuture.completedFuture(new PutMessageResult(checkStoreStatus, null)); } PutMessageStatus msgCheckStatus = this.checkMessages(messageExtBatch); if (msgCheckStatus == PutMessageStatus.MESSAGE_ILLEGAL) { return CompletableFuture.completedFuture(new PutMessageResult(msgCheckStatus, null)); } long beginTime = this.getSystemClock().now(); CompletableFuture resultFuture = this.commitLog.asyncPutMessages(messageExtBatch); resultFuture.thenAccept((result) -> { long elapsedTime = this.getSystemClock().now() - beginTime; if (elapsedTime > 500) { log.warn("not in lock elapsed time(ms)={}, bodyLength={}", elapsedTime, messageExtBatch.getBody().length); } this.storeStatsService.setPutMessageEntireTimeMax(elapsedTime); if (null == result || !result.isOk()) { this.storeStatsService.getPutMessageFailedTimes().add(1); } }); return resultFuture; } @Override public PutMessageResult putMessage(MessageExtBrokerInner msg) { try { return asyncPutMessage(msg).get(); } catch (InterruptedException | ExecutionException e) { return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, null); } } @Override public PutMessageResult putMessages(MessageExtBatch messageExtBatch) { try { return asyncPutMessages(messageExtBatch).get(); } catch (InterruptedException | ExecutionException e) { return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, null); } } @Override public boolean isOSPageCacheBusy() { long begin = this.getCommitLog().getBeginTimeInLock(); long diff = this.systemClock.now() - begin; return diff < 10000000 && diff > this.messageStoreConfig.getOsPageCacheBusyTimeOutMills(); } @Override public long lockTimeMills() { return this.commitLog.lockTimeMills(); } public SystemClock getSystemClock() { return systemClock; } public CommitLog getCommitLog() { return commitLog; } public GetMessageResult getMessage(final String group, final String topic, final int queueId, final long offset, final int maxMsgNums, final MessageFilter messageFilter) { if (this.shutdown) { log.warn("message store has shutdown, so getMessage is forbidden"); return null; } if (!this.runningFlags.isReadable()) { log.warn("message store is not readable, so getMessage is forbidden " + this.runningFlags.getFlagBits()); return null; } long beginTime = this.getSystemClock().now(); GetMessageStatus status = GetMessageStatus.NO_MESSAGE_IN_QUEUE; long nextBeginOffset = offset; long minOffset = 0; long maxOffset = 0; GetMessageResult getResult = new GetMessageResult(); final long maxOffsetPy = this.commitLog.getMaxOffset(); ConsumeQueue consumeQueue = findConsumeQueue(topic, queueId); if (consumeQueue != null) { minOffset = consumeQueue.getMinOffsetInQueue(); maxOffset = consumeQueue.getMaxOffsetInQueue(); if (maxOffset == 0) { status = GetMessageStatus.NO_MESSAGE_IN_QUEUE; nextBeginOffset = nextOffsetCorrection(offset, 0); } else if (offset < minOffset) { status = GetMessageStatus.OFFSET_TOO_SMALL; nextBeginOffset = nextOffsetCorrection(offset, minOffset); } else if (offset == maxOffset) { status = GetMessageStatus.OFFSET_OVERFLOW_ONE; nextBeginOffset = nextOffsetCorrection(offset, offset); } else if (offset > maxOffset) { status = GetMessageStatus.OFFSET_OVERFLOW_BADLY; if (0 == minOffset) { nextBeginOffset = nextOffsetCorrection(offset, minOffset); } else { nextBeginOffset = nextOffsetCorrection(offset, maxOffset); } } else { SelectMappedBufferResult bufferConsumeQueue = consumeQueue.getIndexBuffer(offset); if (bufferConsumeQueue != null) { try { status = GetMessageStatus.NO_MATCHED_MESSAGE; long nextPhyFileStartOffset = Long.MIN_VALUE; long maxPhyOffsetPulling = 0; int i = 0; final int maxFilterMessageCount = Math.max(16000, maxMsgNums * ConsumeQueue.CQ_STORE_UNIT_SIZE); final boolean diskFallRecorded = this.messageStoreConfig.isDiskFallRecorded(); ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit(); for (; i < bufferConsumeQueue.getSize() && i < maxFilterMessageCount; i += ConsumeQueue.CQ_STORE_UNIT_SIZE) { long offsetPy = bufferConsumeQueue.getByteBuffer().getLong(); int sizePy = bufferConsumeQueue.getByteBuffer().getInt(); long tagsCode = bufferConsumeQueue.getByteBuffer().getLong(); maxPhyOffsetPulling = offsetPy; if (nextPhyFileStartOffset != Long.MIN_VALUE) { if (offsetPy < nextPhyFileStartOffset) continue; } boolean isInDisk = checkInDiskByCommitOffset(offsetPy, maxOffsetPy); if (this.isTheBatchFull(sizePy, maxMsgNums, getResult.getBufferTotalSize(), getResult.getMessageCount(), isInDisk)) { break; } boolean extRet = false, isTagsCodeLegal = true; if (consumeQueue.isExtAddr(tagsCode)) { extRet = consumeQueue.getExt(tagsCode, cqExtUnit); if (extRet) { tagsCode = cqExtUnit.getTagsCode(); } else { // can't find ext content.Client will filter messages by tag also. log.error("[BUG] can't find consume queue extend file content!addr={}, offsetPy={}, sizePy={}, topic={}, group={}", tagsCode, offsetPy, sizePy, topic, group); isTagsCodeLegal = false; } } if (messageFilter != null && !messageFilter.isMatchedByConsumeQueue(isTagsCodeLegal ? tagsCode : null, extRet ? cqExtUnit : null)) { if (getResult.getBufferTotalSize() == 0) { status = GetMessageStatus.NO_MATCHED_MESSAGE; } continue; } SelectMappedBufferResult selectResult = this.commitLog.getMessage(offsetPy, sizePy); if (null == selectResult) { if (getResult.getBufferTotalSize() == 0) { status = GetMessageStatus.MESSAGE_WAS_REMOVING; } nextPhyFileStartOffset = this.commitLog.rollNextFile(offsetPy); continue; } if (messageFilter != null && !messageFilter.isMatchedByCommitLog(selectResult.getByteBuffer().slice(), null)) { if (getResult.getBufferTotalSize() == 0) { status = GetMessageStatus.NO_MATCHED_MESSAGE; } // release... selectResult.release(); continue; } this.storeStatsService.getGetMessageTransferedMsgCount().add(1); getResult.addMessage(selectResult); status = GetMessageStatus.FOUND; nextPhyFileStartOffset = Long.MIN_VALUE; } if (diskFallRecorded) { long fallBehind = maxOffsetPy - maxPhyOffsetPulling; brokerStatsManager.recordDiskFallBehindSize(group, topic, queueId, fallBehind); } nextBeginOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE); long diff = maxOffsetPy - maxPhyOffsetPulling; long memory = (long) (StoreUtil.TOTAL_PHYSICAL_MEMORY_SIZE * (this.messageStoreConfig.getAccessMessageInMemoryMaxRatio() / 100.0)); getResult.setSuggestPullingFromSlave(diff > memory); } finally { bufferConsumeQueue.release(); } } else { status = GetMessageStatus.OFFSET_FOUND_NULL; nextBeginOffset = nextOffsetCorrection(offset, consumeQueue.rollNextFile(offset)); log.warn("consumer request topic: " + topic + "offset: " + offset + " minOffset: " + minOffset + " maxOffset: " + maxOffset + ", but access logic queue failed."); } } } else { status = GetMessageStatus.NO_MATCHED_LOGIC_QUEUE; nextBeginOffset = nextOffsetCorrection(offset, 0); } if (GetMessageStatus.FOUND == status) { this.storeStatsService.getGetMessageTimesTotalFound().add(1); } else { this.storeStatsService.getGetMessageTimesTotalMiss().add(1); } long elapsedTime = this.getSystemClock().now() - beginTime; this.storeStatsService.setGetMessageEntireTimeMax(elapsedTime); getResult.setStatus(status); getResult.setNextBeginOffset(nextBeginOffset); getResult.setMaxOffset(maxOffset); getResult.setMinOffset(minOffset); return getResult; } public long getMaxOffsetInQueue(String topic, int queueId) { ConsumeQueue logic = this.findConsumeQueue(topic, queueId); if (logic != null) { long offset = logic.getMaxOffsetInQueue(); return offset; } return 0; } public long getMinOffsetInQueue(String topic, int queueId) { ConsumeQueue logic = this.findConsumeQueue(topic, queueId); if (logic != null) { return logic.getMinOffsetInQueue(); } return -1; } @Override public long getCommitLogOffsetInQueue(String topic, int queueId, long consumeQueueOffset) { ConsumeQueue consumeQueue = findConsumeQueue(topic, queueId); if (consumeQueue != null) { SelectMappedBufferResult bufferConsumeQueue = consumeQueue.getIndexBuffer(consumeQueueOffset); if (bufferConsumeQueue != null) { try { long offsetPy = bufferConsumeQueue.getByteBuffer().getLong(); return offsetPy; } finally { bufferConsumeQueue.release(); } } } return 0; } public long getOffsetInQueueByTime(String topic, int queueId, long timestamp) { ConsumeQueue logic = this.findConsumeQueue(topic, queueId); if (logic != null) { return logic.getOffsetInQueueByTime(timestamp); } return 0; } public MessageExt lookMessageByOffset(long commitLogOffset) { SelectMappedBufferResult sbr = this.commitLog.getMessage(commitLogOffset, 4); if (null != sbr) { try { // 1 TOTALSIZE int size = sbr.getByteBuffer().getInt(); return lookMessageByOffset(commitLogOffset, size); } finally { sbr.release(); } } return null; } @Override public SelectMappedBufferResult selectOneMessageByOffset(long commitLogOffset) { SelectMappedBufferResult sbr = this.commitLog.getMessage(commitLogOffset, 4); if (null != sbr) { try { // 1 TOTALSIZE int size = sbr.getByteBuffer().getInt(); return this.commitLog.getMessage(commitLogOffset, size); } finally { sbr.release(); } } return null; } @Override public SelectMappedBufferResult selectOneMessageByOffset(long commitLogOffset, int msgSize) { return this.commitLog.getMessage(commitLogOffset, msgSize); } public String getRunningDataInfo() { return this.storeStatsService.toString(); } private String getStorePathPhysic() { String storePathPhysic = ""; if (DefaultMessageStore.this.getMessageStoreConfig().isEnableDLegerCommitLog()) { storePathPhysic = ((DLedgerCommitLog)DefaultMessageStore.this.getCommitLog()).getdLedgerServer().getdLedgerConfig().getDataStorePath(); } else { storePathPhysic = DefaultMessageStore.this.getMessageStoreConfig().getStorePathCommitLog(); } return storePathPhysic; } @Override public HashMap getRuntimeInfo() { HashMap result = this.storeStatsService.getRuntimeInfo(); String commitLogStorePath = DefaultMessageStore.this.getMessageStoreConfig().getStorePathCommitLog(); if (commitLogStorePath.contains(MessageStoreConfig.MULTI_PATH_SPLITTER)) { double minPhysicsUsedRatio = Double.MAX_VALUE; String[] paths = commitLogStorePath.trim().split(MessageStoreConfig.MULTI_PATH_SPLITTER); for (String clPath : paths) { double physicRatio = UtilAll.isPathExists(clPath) ? UtilAll.getDiskPartitionSpaceUsedPercent(clPath) : -1; result.put(RunningStats.commitLogDiskRatio.name() + "_" + clPath, String.valueOf(physicRatio)); minPhysicsUsedRatio = Math.min(minPhysicsUsedRatio, physicRatio); } result.put(RunningStats.commitLogDiskRatio.name(), String.valueOf(minPhysicsUsedRatio)); } else { String storePathPhysic = DefaultMessageStore.this.getMessageStoreConfig().getStorePathCommitLog(); double physicRatio = UtilAll.isPathExists(storePathPhysic) ? UtilAll.getDiskPartitionSpaceUsedPercent(storePathPhysic) : -1; result.put(RunningStats.commitLogDiskRatio.name(), String.valueOf(physicRatio)); } { String storePathLogics = StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()); double logicsRatio = UtilAll.isPathExists(storePathLogics) ? UtilAll.getDiskPartitionSpaceUsedPercent(storePathLogics) : -1; result.put(RunningStats.consumeQueueDiskRatio.name(), String.valueOf(logicsRatio)); } { if (this.scheduleMessageService != null) { this.scheduleMessageService.buildRunningStats(result); } } result.put(RunningStats.commitLogMinOffset.name(), String.valueOf(DefaultMessageStore.this.getMinPhyOffset())); result.put(RunningStats.commitLogMaxOffset.name(), String.valueOf(DefaultMessageStore.this.getMaxPhyOffset())); return result; } @Override public long getMaxPhyOffset() { return this.commitLog.getMaxOffset(); } @Override public long getMinPhyOffset() { return this.commitLog.getMinOffset(); } @Override public long getEarliestMessageTime(String topic, int queueId) { ConsumeQueue logicQueue = this.findConsumeQueue(topic, queueId); if (logicQueue != null) { long minLogicOffset = logicQueue.getMinLogicOffset(); SelectMappedBufferResult result = logicQueue.getIndexBuffer(minLogicOffset / ConsumeQueue.CQ_STORE_UNIT_SIZE); return getStoreTime(result); } return -1; } private long getStoreTime(SelectMappedBufferResult result) { if (result != null) { try { final long phyOffset = result.getByteBuffer().getLong(); final int size = result.getByteBuffer().getInt(); long storeTime = this.getCommitLog().pickupStoreTimestamp(phyOffset, size); return storeTime; } catch (Exception e) { } finally { result.release(); } } return -1; } @Override public long getEarliestMessageTime() { final long minPhyOffset = this.getMinPhyOffset(); final int size = this.messageStoreConfig.getMaxMessageSize() * 2; return this.getCommitLog().pickupStoreTimestamp(minPhyOffset, size); } @Override public long getMessageStoreTimeStamp(String topic, int queueId, long consumeQueueOffset) { ConsumeQueue logicQueue = this.findConsumeQueue(topic, queueId); if (logicQueue != null) { SelectMappedBufferResult result = logicQueue.getIndexBuffer(consumeQueueOffset); return getStoreTime(result); } return -1; } @Override public long getMessageTotalInQueue(String topic, int queueId) { ConsumeQueue logicQueue = this.findConsumeQueue(topic, queueId); if (logicQueue != null) { return logicQueue.getMessageTotalInQueue(); } return -1; } @Override public SelectMappedBufferResult getCommitLogData(final long offset) { if (this.shutdown) { log.warn("message store has shutdown, so getPhyQueueData is forbidden"); return null; } return this.commitLog.getData(offset); } @Override public boolean appendToCommitLog(long startOffset, byte[] data, int dataStart, int dataLength) { if (this.shutdown) { log.warn("message store has shutdown, so appendToPhyQueue is forbidden"); return false; } boolean result = this.commitLog.appendData(startOffset, data, dataStart, dataLength); if (result) { this.reputMessageService.wakeup(); } else { log.error("appendToPhyQueue failed " + startOffset + " " + data.length); } return result; } @Override public void executeDeleteFilesManually() { this.cleanCommitLogService.excuteDeleteFilesManualy(); } @Override public QueryMessageResult queryMessage(String topic, String key, int maxNum, long begin, long end) { QueryMessageResult queryMessageResult = new QueryMessageResult(); long lastQueryMsgTime = end; for (int i = 0; i < 3; i++) { QueryOffsetResult queryOffsetResult = this.indexService.queryOffset(topic, key, maxNum, begin, lastQueryMsgTime); if (queryOffsetResult.getPhyOffsets().isEmpty()) { break; } Collections.sort(queryOffsetResult.getPhyOffsets()); queryMessageResult.setIndexLastUpdatePhyoffset(queryOffsetResult.getIndexLastUpdatePhyoffset()); queryMessageResult.setIndexLastUpdateTimestamp(queryOffsetResult.getIndexLastUpdateTimestamp()); for (int m = 0; m < queryOffsetResult.getPhyOffsets().size(); m++) { long offset = queryOffsetResult.getPhyOffsets().get(m); try { boolean match = true; MessageExt msg = this.lookMessageByOffset(offset); if (0 == m) { lastQueryMsgTime = msg.getStoreTimestamp(); } // String[] keyArray = msg.getKeys().split(MessageConst.KEY_SEPARATOR); // if (topic.equals(msg.getTopic())) { // for (String k : keyArray) { // if (k.equals(key)) { // match = true; // break; // } // } // } if (match) { SelectMappedBufferResult result = this.commitLog.getData(offset, false); if (result != null) { int size = result.getByteBuffer().getInt(0); result.getByteBuffer().limit(size); result.setSize(size); queryMessageResult.addMessage(result); } } else { log.warn("queryMessage hash duplicate, {} {}", topic, key); } } catch (Exception e) { log.error("queryMessage exception", e); } } if (queryMessageResult.getBufferTotalSize() > 0) { break; } if (lastQueryMsgTime < begin) { break; } } return queryMessageResult; } @Override public void updateHaMasterAddress(String newAddr) { this.haService.updateMasterAddress(newAddr); } @Override public long slaveFallBehindMuch() { return this.commitLog.getMaxOffset() - this.haService.getPush2SlaveMaxOffset().get(); } @Override public long now() { return this.systemClock.now(); } @Override public int cleanUnusedTopic(Set topics) { Iterator>> it = this.consumeQueueTable.entrySet().iterator(); while (it.hasNext()) { Entry> next = it.next(); String topic = next.getKey(); if (!topics.contains(topic) && !topic.equals(TopicValidator.RMQ_SYS_SCHEDULE_TOPIC) && !topic.equals(TopicValidator.RMQ_SYS_TRANS_OP_HALF_TOPIC)) { ConcurrentMap queueTable = next.getValue(); for (ConsumeQueue cq : queueTable.values()) { cq.destroy(); log.info("cleanUnusedTopic: {} {} ConsumeQueue cleaned", cq.getTopic(), cq.getQueueId() ); this.commitLog.removeQueueFromTopicQueueTable(cq.getTopic(), cq.getQueueId()); } it.remove(); if (this.brokerConfig.isAutoDeleteUnusedStats()) { this.brokerStatsManager.onTopicDeleted(topic); } log.info("cleanUnusedTopic: {},topic destroyed", topic); } } return 0; } public void cleanExpiredConsumerQueue() { long minCommitLogOffset = this.commitLog.getMinOffset(); Iterator>> it = this.consumeQueueTable.entrySet().iterator(); while (it.hasNext()) { Entry> next = it.next(); String topic = next.getKey(); if (!topic.equals(TopicValidator.RMQ_SYS_SCHEDULE_TOPIC)) { ConcurrentMap queueTable = next.getValue(); Iterator> itQT = queueTable.entrySet().iterator(); while (itQT.hasNext()) { Entry nextQT = itQT.next(); long maxCLOffsetInConsumeQueue = nextQT.getValue().getLastOffset(); if (maxCLOffsetInConsumeQueue == -1) { log.warn("maybe ConsumeQueue was created just now. topic={} queueId={} maxPhysicOffset={} minLogicOffset={}.", nextQT.getValue().getTopic(), nextQT.getValue().getQueueId(), nextQT.getValue().getMaxPhysicOffset(), nextQT.getValue().getMinLogicOffset()); } else if (maxCLOffsetInConsumeQueue < minCommitLogOffset) { log.info( "cleanExpiredConsumerQueue: {} {} consumer queue destroyed, minCommitLogOffset: {} maxCLOffsetInConsumeQueue: {}", topic, nextQT.getKey(), minCommitLogOffset, maxCLOffsetInConsumeQueue); DefaultMessageStore.this.commitLog.removeQueueFromTopicQueueTable(nextQT.getValue().getTopic(), nextQT.getValue().getQueueId()); nextQT.getValue().destroy(); itQT.remove(); } } if (queueTable.isEmpty()) { log.info("cleanExpiredConsumerQueue: {},topic destroyed", topic); it.remove(); } } } } public Map getMessageIds(final String topic, final int queueId, long minOffset, long maxOffset, SocketAddress storeHost) { Map messageIds = new HashMap(); if (this.shutdown) { return messageIds; } ConsumeQueue consumeQueue = findConsumeQueue(topic, queueId); if (consumeQueue != null) { minOffset = Math.max(minOffset, consumeQueue.getMinOffsetInQueue()); maxOffset = Math.min(maxOffset, consumeQueue.getMaxOffsetInQueue()); if (maxOffset == 0) { return messageIds; } long nextOffset = minOffset; while (nextOffset < maxOffset) { SelectMappedBufferResult bufferConsumeQueue = consumeQueue.getIndexBuffer(nextOffset); if (bufferConsumeQueue != null) { try { int i = 0; for (; i < bufferConsumeQueue.getSize(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) { long offsetPy = bufferConsumeQueue.getByteBuffer().getLong(); InetSocketAddress inetSocketAddress = (InetSocketAddress) storeHost; int msgIdLength = (inetSocketAddress.getAddress() instanceof Inet6Address) ? 16 + 4 + 8 : 4 + 4 + 8; final ByteBuffer msgIdMemory = ByteBuffer.allocate(msgIdLength); String msgId = MessageDecoder.createMessageId(msgIdMemory, MessageExt.socketAddress2ByteBuffer(storeHost), offsetPy); messageIds.put(msgId, nextOffset++); if (nextOffset > maxOffset) { return messageIds; } } } finally { bufferConsumeQueue.release(); } } else { return messageIds; } } } return messageIds; } @Override public boolean checkInDiskByConsumeOffset(final String topic, final int queueId, long consumeOffset) { final long maxOffsetPy = this.commitLog.getMaxOffset(); ConsumeQueue consumeQueue = findConsumeQueue(topic, queueId); if (consumeQueue != null) { SelectMappedBufferResult bufferConsumeQueue = consumeQueue.getIndexBuffer(consumeOffset); if (bufferConsumeQueue != null) { try { for (int i = 0; i < bufferConsumeQueue.getSize(); ) { i += ConsumeQueue.CQ_STORE_UNIT_SIZE; long offsetPy = bufferConsumeQueue.getByteBuffer().getLong(); return checkInDiskByCommitOffset(offsetPy, maxOffsetPy); } } finally { bufferConsumeQueue.release(); } } else { return false; } } return false; } @Override public long dispatchBehindBytes() { return this.reputMessageService.behind(); } @Override public long flush() { return this.commitLog.flush(); } @Override public boolean resetWriteOffset(long phyOffset) { return this.commitLog.resetOffset(phyOffset); } @Override public long getConfirmOffset() { return this.commitLog.getConfirmOffset(); } @Override public void setConfirmOffset(long phyOffset) { this.commitLog.setConfirmOffset(phyOffset); } public MessageExt lookMessageByOffset(long commitLogOffset, int size) { SelectMappedBufferResult sbr = this.commitLog.getMessage(commitLogOffset, size); if (null != sbr) { try { return MessageDecoder.decode(sbr.getByteBuffer(), true, false); } finally { sbr.release(); } } return null; } public ConsumeQueue findConsumeQueue(String topic, int queueId) { ConcurrentMap map = consumeQueueTable.get(topic); if (null == map) { ConcurrentMap newMap = new ConcurrentHashMap(128); ConcurrentMap oldMap = consumeQueueTable.putIfAbsent(topic, newMap); if (oldMap != null) { map = oldMap; } else { map = newMap; } } ConsumeQueue logic = map.get(queueId); if (null == logic) { ConsumeQueue newLogic = new ConsumeQueue( topic, queueId, StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()), this.getMessageStoreConfig().getMappedFileSizeConsumeQueue(), this); ConsumeQueue oldLogic = map.putIfAbsent(queueId, newLogic); if (oldLogic != null) { logic = oldLogic; } else { logic = newLogic; } } return logic; } private long nextOffsetCorrection(long oldOffset, long newOffset) { long nextOffset = oldOffset; if (this.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE || this.getMessageStoreConfig().isOffsetCheckInSlave()) { nextOffset = newOffset; } return nextOffset; } private boolean checkInDiskByCommitOffset(long offsetPy, long maxOffsetPy) { long memory = (long) (StoreUtil.TOTAL_PHYSICAL_MEMORY_SIZE * (this.messageStoreConfig.getAccessMessageInMemoryMaxRatio() / 100.0)); return (maxOffsetPy - offsetPy) > memory; } private boolean isTheBatchFull(int sizePy, int maxMsgNums, int bufferTotal, int messageTotal, boolean isInDisk) { if (0 == bufferTotal || 0 == messageTotal) { return false; } if (maxMsgNums <= messageTotal) { return true; } if (isInDisk) { if ((bufferTotal + sizePy) > this.messageStoreConfig.getMaxTransferBytesOnMessageInDisk()) { return true; } if (messageTotal > this.messageStoreConfig.getMaxTransferCountOnMessageInDisk() - 1) { return true; } } else { if ((bufferTotal + sizePy) > this.messageStoreConfig.getMaxTransferBytesOnMessageInMemory()) { return true; } if (messageTotal > this.messageStoreConfig.getMaxTransferCountOnMessageInMemory() - 1) { return true; } } return false; } private void deleteFile(final String fileName) { File file = new File(fileName); boolean result = file.delete(); log.info(fileName + (result ? " delete OK" : " delete Failed")); } /** * @throws IOException */ private void createTempFile() throws IOException { String fileName = StorePathConfigHelper.getAbortFile(this.messageStoreConfig.getStorePathRootDir()); File file = new File(fileName); MappedFile.ensureDirOK(file.getParent()); boolean result = file.createNewFile(); log.info(fileName + (result ? " create OK" : " already exists")); } private void addScheduleTask() { this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { DefaultMessageStore.this.cleanFilesPeriodically(); } }, 1000 * 60, this.messageStoreConfig.getCleanResourceInterval(), TimeUnit.MILLISECONDS); this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { DefaultMessageStore.this.checkSelf(); } }, 1, 10, TimeUnit.MINUTES); this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { if (DefaultMessageStore.this.getMessageStoreConfig().isDebugLockEnable()) { try { if (DefaultMessageStore.this.commitLog.getBeginTimeInLock() != 0) { long lockTime = System.currentTimeMillis() - DefaultMessageStore.this.commitLog.getBeginTimeInLock(); if (lockTime > 1000 && lockTime < 10000000) { String stack = UtilAll.jstack(); final String fileName = System.getProperty("user.home") + File.separator + "debug/lock/stack-" + DefaultMessageStore.this.commitLog.getBeginTimeInLock() + "-" + lockTime; MixAll.string2FileNotSafe(stack, fileName); } } } catch (Exception e) { } } } }, 1, 1, TimeUnit.SECONDS); // this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { // @Override // public void run() { // DefaultMessageStore.this.cleanExpiredConsumerQueue(); // } // }, 1, 1, TimeUnit.HOURS); this.diskCheckScheduledExecutorService.scheduleAtFixedRate(new Runnable() { public void run() { DefaultMessageStore.this.cleanCommitLogService.isSpaceFull(); } }, 1000L, 10000L, TimeUnit.MILLISECONDS); } private void cleanFilesPeriodically() { this.cleanCommitLogService.run(); this.cleanConsumeQueueService.run(); } private void checkSelf() { this.commitLog.checkSelf(); Iterator>> it = this.consumeQueueTable.entrySet().iterator(); while (it.hasNext()) { Entry> next = it.next(); Iterator> itNext = next.getValue().entrySet().iterator(); while (itNext.hasNext()) { Entry cq = itNext.next(); cq.getValue().checkSelf(); } } } private boolean isTempFileExist() { String fileName = StorePathConfigHelper.getAbortFile(this.messageStoreConfig.getStorePathRootDir()); File file = new File(fileName); return file.exists(); } private boolean loadConsumeQueue() { File dirLogic = new File(StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir())); File[] fileTopicList = dirLogic.listFiles(); if (fileTopicList != null) { for (File fileTopic : fileTopicList) { String topic = fileTopic.getName(); File[] fileQueueIdList = fileTopic.listFiles(); if (fileQueueIdList != null) { for (File fileQueueId : fileQueueIdList) { int queueId; try { queueId = Integer.parseInt(fileQueueId.getName()); } catch (NumberFormatException e) { continue; } ConsumeQueue logic = new ConsumeQueue( topic, queueId, StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()), this.getMessageStoreConfig().getMappedFileSizeConsumeQueue(), this); this.putConsumeQueue(topic, queueId, logic); if (!logic.load()) { return false; } } } } } log.info("load logics queue all over, OK"); return true; } private void recover(final boolean lastExitOK) { long maxPhyOffsetOfConsumeQueue = this.recoverConsumeQueue(); if (lastExitOK) { this.commitLog.recoverNormally(maxPhyOffsetOfConsumeQueue); } else { this.commitLog.recoverAbnormally(maxPhyOffsetOfConsumeQueue); } this.recoverTopicQueueTable(); } public MessageStoreConfig getMessageStoreConfig() { return messageStoreConfig; } public TransientStorePool getTransientStorePool() { return transientStorePool; } private void putConsumeQueue(final String topic, final int queueId, final ConsumeQueue consumeQueue) { ConcurrentMap map = this.consumeQueueTable.get(topic); if (null == map) { map = new ConcurrentHashMap(); map.put(queueId, consumeQueue); this.consumeQueueTable.put(topic, map); } else { map.put(queueId, consumeQueue); } } private long recoverConsumeQueue() { long maxPhysicOffset = -1; for (ConcurrentMap maps : this.consumeQueueTable.values()) { for (ConsumeQueue logic : maps.values()) { logic.recover(); if (logic.getMaxPhysicOffset() > maxPhysicOffset) { maxPhysicOffset = logic.getMaxPhysicOffset(); } } } return maxPhysicOffset; } public void recoverTopicQueueTable() { HashMap table = new HashMap(1024); long minPhyOffset = this.commitLog.getMinOffset(); for (ConcurrentMap maps : this.consumeQueueTable.values()) { for (ConsumeQueue logic : maps.values()) { String key = logic.getTopic() + "-" + logic.getQueueId(); table.put(key, logic.getMaxOffsetInQueue()); logic.correctMinOffset(minPhyOffset); } } this.commitLog.setTopicQueueTable(table); } public AllocateMappedFileService getAllocateMappedFileService() { return allocateMappedFileService; } public StoreStatsService getStoreStatsService() { return storeStatsService; } public RunningFlags getAccessRights() { return runningFlags; } public ConcurrentMap> getConsumeQueueTable() { return consumeQueueTable; } public StoreCheckpoint getStoreCheckpoint() { return storeCheckpoint; } public HAService getHaService() { return haService; } public ScheduleMessageService getScheduleMessageService() { return scheduleMessageService; } public RunningFlags getRunningFlags() { return runningFlags; } public void doDispatch(DispatchRequest req) { for (CommitLogDispatcher dispatcher : this.dispatcherList) { dispatcher.dispatch(req); } } public void putMessagePositionInfo(DispatchRequest dispatchRequest) { ConsumeQueue cq = this.findConsumeQueue(dispatchRequest.getTopic(), dispatchRequest.getQueueId()); cq.putMessagePositionInfoWrapper(dispatchRequest); } @Override public BrokerStatsManager getBrokerStatsManager() { return brokerStatsManager; } @Override public void handleScheduleMessageService(final BrokerRole brokerRole) { if (this.scheduleMessageService != null) { if (brokerRole == BrokerRole.SLAVE) { this.scheduleMessageService.shutdown(); } else { this.scheduleMessageService.start(); } } } public int remainTransientStoreBufferNumbs() { return this.transientStorePool.availableBufferNums(); } @Override public boolean isTransientStorePoolDeficient() { return remainTransientStoreBufferNumbs() == 0; } @Override public LinkedList getDispatcherList() { return this.dispatcherList; } @Override public ConsumeQueue getConsumeQueue(String topic, int queueId) { ConcurrentMap map = consumeQueueTable.get(topic); if (map == null) { return null; } return map.get(queueId); } public void unlockMappedFile(final MappedFile mappedFile) { this.scheduledExecutorService.schedule(new Runnable() { @Override public void run() { mappedFile.munlock(); } }, 6, TimeUnit.SECONDS); } class CommitLogDispatcherBuildConsumeQueue implements CommitLogDispatcher { @Override public void dispatch(DispatchRequest request) { final int tranType = MessageSysFlag.getTransactionValue(request.getSysFlag()); switch (tranType) { case MessageSysFlag.TRANSACTION_NOT_TYPE: case MessageSysFlag.TRANSACTION_COMMIT_TYPE: DefaultMessageStore.this.putMessagePositionInfo(request); break; case MessageSysFlag.TRANSACTION_PREPARED_TYPE: case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE: break; } } } class CommitLogDispatcherBuildIndex implements CommitLogDispatcher { @Override public void dispatch(DispatchRequest request) { if (DefaultMessageStore.this.messageStoreConfig.isMessageIndexEnable()) { DefaultMessageStore.this.indexService.buildIndex(request); } } } class CleanCommitLogService { private final static int MAX_MANUAL_DELETE_FILE_TIMES = 20; private final double diskSpaceWarningLevelRatio = Double.parseDouble(System.getProperty("rocketmq.broker.diskSpaceWarningLevelRatio", "0.90")); private final double diskSpaceCleanForciblyRatio = Double.parseDouble(System.getProperty("rocketmq.broker.diskSpaceCleanForciblyRatio", "0.85")); private long lastRedeleteTimestamp = 0; private volatile int manualDeleteFileSeveralTimes = 0; private volatile boolean cleanImmediately = false; public void excuteDeleteFilesManualy() { this.manualDeleteFileSeveralTimes = MAX_MANUAL_DELETE_FILE_TIMES; DefaultMessageStore.log.info("executeDeleteFilesManually was invoked"); } public void run() { try { this.deleteExpiredFiles(); this.redeleteHangedFile(); } catch (Throwable e) { DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e); } } private void deleteExpiredFiles() { int deleteCount = 0; long fileReservedTime = DefaultMessageStore.this.getMessageStoreConfig().getFileReservedTime(); int deletePhysicFilesInterval = DefaultMessageStore.this.getMessageStoreConfig().getDeleteCommitLogFilesInterval(); int destroyMapedFileIntervalForcibly = DefaultMessageStore.this.getMessageStoreConfig().getDestroyMapedFileIntervalForcibly(); boolean timeup = this.isTimeToDelete(); boolean spacefull = this.isSpaceToDelete(); boolean manualDelete = this.manualDeleteFileSeveralTimes > 0; if (timeup || spacefull || manualDelete) { if (manualDelete) this.manualDeleteFileSeveralTimes--; boolean cleanAtOnce = DefaultMessageStore.this.getMessageStoreConfig().isCleanFileForciblyEnable() && this.cleanImmediately; log.info("begin to delete before {} hours file. timeup: {} spacefull: {} manualDeleteFileSeveralTimes: {} cleanAtOnce: {}", fileReservedTime, timeup, spacefull, manualDeleteFileSeveralTimes, cleanAtOnce); fileReservedTime *= 60 * 60 * 1000; deleteCount = DefaultMessageStore.this.commitLog.deleteExpiredFile(fileReservedTime, deletePhysicFilesInterval, destroyMapedFileIntervalForcibly, cleanAtOnce); if (deleteCount > 0) { } else if (spacefull) { log.warn("disk space will be full soon, but delete file failed."); } } } private void redeleteHangedFile() { int interval = DefaultMessageStore.this.getMessageStoreConfig().getRedeleteHangedFileInterval(); long currentTimestamp = System.currentTimeMillis(); if ((currentTimestamp - this.lastRedeleteTimestamp) > interval) { this.lastRedeleteTimestamp = currentTimestamp; int destroyMapedFileIntervalForcibly = DefaultMessageStore.this.getMessageStoreConfig().getDestroyMapedFileIntervalForcibly(); if (DefaultMessageStore.this.commitLog.retryDeleteFirstFile(destroyMapedFileIntervalForcibly)) { } } } public String getServiceName() { return CleanCommitLogService.class.getSimpleName(); } private boolean isTimeToDelete() { String when = DefaultMessageStore.this.getMessageStoreConfig().getDeleteWhen(); if (UtilAll.isItTimeToDo(when)) { DefaultMessageStore.log.info("it's time to reclaim disk space, " + when); return true; } return false; } private boolean isSpaceToDelete() { double ratio = DefaultMessageStore.this.getMessageStoreConfig().getDiskMaxUsedSpaceRatio() / 100.0; cleanImmediately = false; { String[] storePaths; String commitLogStorePath = DefaultMessageStore.this.getMessageStoreConfig().getStorePathCommitLog(); if (commitLogStorePath.contains(MessageStoreConfig.MULTI_PATH_SPLITTER)) { storePaths = commitLogStorePath.trim().split(MessageStoreConfig.MULTI_PATH_SPLITTER); } else { storePaths = new String[]{commitLogStorePath}; } Set fullStorePath = new HashSet<>(); double minPhysicRatio = 100; String minStorePath = null; for (String storePathPhysic : storePaths) { double physicRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathPhysic); if (minPhysicRatio > physicRatio) { minPhysicRatio = physicRatio; minStorePath = storePathPhysic; } if (physicRatio > diskSpaceCleanForciblyRatio) { fullStorePath.add(storePathPhysic); } } DefaultMessageStore.this.commitLog.setFullStorePaths(fullStorePath); if (minPhysicRatio > diskSpaceWarningLevelRatio) { boolean diskok = DefaultMessageStore.this.runningFlags.getAndMakeDiskFull(); if (diskok) { DefaultMessageStore.log.error("physic disk maybe full soon " + minPhysicRatio + ", so mark disk full, storePathPhysic=" + minStorePath); } cleanImmediately = true; } else if (minPhysicRatio > diskSpaceCleanForciblyRatio) { cleanImmediately = true; } else { boolean diskok = DefaultMessageStore.this.runningFlags.getAndMakeDiskOK(); if (!diskok) { DefaultMessageStore.log.info("physic disk space OK " + minPhysicRatio + ", so mark disk ok, storePathPhysic=" + minStorePath); } } if (minPhysicRatio < 0 || minPhysicRatio > ratio) { DefaultMessageStore.log.info("physic disk maybe full soon, so reclaim space, " + minPhysicRatio + ", storePathPhysic=" + minStorePath); return true; } } { String storePathLogics = StorePathConfigHelper .getStorePathConsumeQueue(DefaultMessageStore.this.getMessageStoreConfig().getStorePathRootDir()); double logicsRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathLogics); if (logicsRatio > diskSpaceWarningLevelRatio) { boolean diskok = DefaultMessageStore.this.runningFlags.getAndMakeDiskFull(); if (diskok) { DefaultMessageStore.log.error("logics disk maybe full soon " + logicsRatio + ", so mark disk full"); } cleanImmediately = true; } else if (logicsRatio > diskSpaceCleanForciblyRatio) { cleanImmediately = true; } else { boolean diskok = DefaultMessageStore.this.runningFlags.getAndMakeDiskOK(); if (!diskok) { DefaultMessageStore.log.info("logics disk space OK " + logicsRatio + ", so mark disk ok"); } } if (logicsRatio < 0 || logicsRatio > ratio) { DefaultMessageStore.log.info("logics disk maybe full soon, so reclaim space, " + logicsRatio); return true; } } return false; } public int getManualDeleteFileSeveralTimes() { return manualDeleteFileSeveralTimes; } public void setManualDeleteFileSeveralTimes(int manualDeleteFileSeveralTimes) { this.manualDeleteFileSeveralTimes = manualDeleteFileSeveralTimes; } public double calcStorePathPhysicRatio() { String storePath = getStorePathPhysic(); if (storePath.contains(MessageStoreConfig.MULTI_PATH_SPLITTER)) { Set fullStorePath = new HashSet<>(); String[] paths = storePath.trim().split(MessageStoreConfig.MULTI_PATH_SPLITTER); double minPhysicRatio = 100; for (String path : paths) { double physicRatio = UtilAll.isPathExists(path) ? UtilAll.getDiskPartitionSpaceUsedPercent(path) : -1; minPhysicRatio = Math.min(minPhysicRatio, physicRatio); if (physicRatio > diskSpaceCleanForciblyRatio) { fullStorePath.add(path); } } DefaultMessageStore.this.commitLog.setFullStorePaths(fullStorePath); return minPhysicRatio; } else { return UtilAll.isPathExists(storePath) ? UtilAll.getDiskPartitionSpaceUsedPercent(storePath) : -1; } } public boolean isSpaceFull() { double physicRatio = calcStorePathPhysicRatio(); double ratio = DefaultMessageStore.this.getMessageStoreConfig().getDiskMaxUsedSpaceRatio() / 100.0; if (physicRatio > ratio) { DefaultMessageStore.log.info("physic disk of commitLog used: " + physicRatio); } if (physicRatio > this.diskSpaceWarningLevelRatio) { boolean diskok = DefaultMessageStore.this.runningFlags.getAndMakeDiskFull(); if (diskok) { DefaultMessageStore.log.error("physic disk of commitLog maybe full soon, used " + physicRatio + ", so mark disk full"); } return true; } else { boolean diskok = DefaultMessageStore.this.runningFlags.getAndMakeDiskOK(); if (!diskok) { DefaultMessageStore.log.info("physic disk space of commitLog OK " + physicRatio + ", so mark disk ok"); } return false; } } } class CleanConsumeQueueService { private long lastPhysicalMinOffset = 0; public void run() { try { this.deleteExpiredFiles(); } catch (Throwable e) { DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e); } } private void deleteExpiredFiles() { int deleteLogicsFilesInterval = DefaultMessageStore.this.getMessageStoreConfig().getDeleteConsumeQueueFilesInterval(); long minOffset = DefaultMessageStore.this.commitLog.getMinOffset(); if (minOffset > this.lastPhysicalMinOffset) { this.lastPhysicalMinOffset = minOffset; ConcurrentMap> tables = DefaultMessageStore.this.consumeQueueTable; for (ConcurrentMap maps : tables.values()) { for (ConsumeQueue logic : maps.values()) { int deleteCount = logic.deleteExpiredFile(minOffset); if (deleteCount > 0 && deleteLogicsFilesInterval > 0) { try { Thread.sleep(deleteLogicsFilesInterval); } catch (InterruptedException ignored) { } } } } DefaultMessageStore.this.indexService.deleteExpiredFile(minOffset); } } public String getServiceName() { return CleanConsumeQueueService.class.getSimpleName(); } } class FlushConsumeQueueService extends ServiceThread { private static final int RETRY_TIMES_OVER = 3; private long lastFlushTimestamp = 0; private void doFlush(int retryTimes) { int flushConsumeQueueLeastPages = DefaultMessageStore.this.getMessageStoreConfig().getFlushConsumeQueueLeastPages(); if (retryTimes == RETRY_TIMES_OVER) { flushConsumeQueueLeastPages = 0; } long logicsMsgTimestamp = 0; int flushConsumeQueueThoroughInterval = DefaultMessageStore.this.getMessageStoreConfig().getFlushConsumeQueueThoroughInterval(); long currentTimeMillis = System.currentTimeMillis(); if (currentTimeMillis >= (this.lastFlushTimestamp + flushConsumeQueueThoroughInterval)) { this.lastFlushTimestamp = currentTimeMillis; flushConsumeQueueLeastPages = 0; logicsMsgTimestamp = DefaultMessageStore.this.getStoreCheckpoint().getLogicsMsgTimestamp(); } ConcurrentMap> tables = DefaultMessageStore.this.consumeQueueTable; for (ConcurrentMap maps : tables.values()) { for (ConsumeQueue cq : maps.values()) { boolean result = false; for (int i = 0; i < retryTimes && !result; i++) { result = cq.flush(flushConsumeQueueLeastPages); } } } if (0 == flushConsumeQueueLeastPages) { if (logicsMsgTimestamp > 0) { DefaultMessageStore.this.getStoreCheckpoint().setLogicsMsgTimestamp(logicsMsgTimestamp); } DefaultMessageStore.this.getStoreCheckpoint().flush(); } } public void run() { DefaultMessageStore.log.info(this.getServiceName() + " service started"); while (!this.isStopped()) { try { int interval = DefaultMessageStore.this.getMessageStoreConfig().getFlushIntervalConsumeQueue(); this.waitForRunning(interval); this.doFlush(1); } catch (Exception e) { DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e); } } this.doFlush(RETRY_TIMES_OVER); DefaultMessageStore.log.info(this.getServiceName() + " service end"); } @Override public String getServiceName() { return FlushConsumeQueueService.class.getSimpleName(); } @Override public long getJointime() { return 1000 * 60; } } class ReputMessageService extends ServiceThread { private volatile long reputFromOffset = 0; public long getReputFromOffset() { return reputFromOffset; } public void setReputFromOffset(long reputFromOffset) { this.reputFromOffset = reputFromOffset; } @Override public void shutdown() { for (int i = 0; i < 50 && this.isCommitLogAvailable(); i++) { try { Thread.sleep(100); } catch (InterruptedException ignored) { } } if (this.isCommitLogAvailable()) { log.warn("shutdown ReputMessageService, but commitlog have not finish to be dispatched, CL: {} reputFromOffset: {}", DefaultMessageStore.this.commitLog.getMaxOffset(), this.reputFromOffset); } super.shutdown(); } public long behind() { return DefaultMessageStore.this.commitLog.getMaxOffset() - this.reputFromOffset; } private boolean isCommitLogAvailable() { return this.reputFromOffset < DefaultMessageStore.this.commitLog.getMaxOffset(); } private void doReput() { if (this.reputFromOffset < DefaultMessageStore.this.commitLog.getMinOffset()) { log.warn("The reputFromOffset={} is smaller than minPyOffset={}, this usually indicate that the dispatch behind too much and the commitlog has expired.", this.reputFromOffset, DefaultMessageStore.this.commitLog.getMinOffset()); this.reputFromOffset = DefaultMessageStore.this.commitLog.getMinOffset(); } for (boolean doNext = true; this.isCommitLogAvailable() && doNext; ) { if (DefaultMessageStore.this.getMessageStoreConfig().isDuplicationEnable() && this.reputFromOffset >= DefaultMessageStore.this.getConfirmOffset()) { break; } SelectMappedBufferResult result = DefaultMessageStore.this.commitLog.getData(reputFromOffset); if (result != null) { try { this.reputFromOffset = result.getStartOffset(); for (int readSize = 0; readSize < result.getSize() && doNext; ) { DispatchRequest dispatchRequest = DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(result.getByteBuffer(), false, false); int size = dispatchRequest.getBufferSize() == -1 ? dispatchRequest.getMsgSize() : dispatchRequest.getBufferSize(); if (dispatchRequest.isSuccess()) { if (size > 0) { DefaultMessageStore.this.doDispatch(dispatchRequest); if (BrokerRole.SLAVE != DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole() && DefaultMessageStore.this.brokerConfig.isLongPollingEnable() && DefaultMessageStore.this.messageArrivingListener != null) { DefaultMessageStore.this.messageArrivingListener.arriving(dispatchRequest.getTopic(), dispatchRequest.getQueueId(), dispatchRequest.getConsumeQueueOffset() + 1, dispatchRequest.getTagsCode(), dispatchRequest.getStoreTimestamp(), dispatchRequest.getBitMap(), dispatchRequest.getPropertiesMap()); } this.reputFromOffset += size; readSize += size; if (DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole() == BrokerRole.SLAVE) { DefaultMessageStore.this.storeStatsService .getSinglePutMessageTopicTimesTotal(dispatchRequest.getTopic()).add(1); DefaultMessageStore.this.storeStatsService .getSinglePutMessageTopicSizeTotal(dispatchRequest.getTopic()) .add(dispatchRequest.getMsgSize()); } } else if (size == 0) { this.reputFromOffset = DefaultMessageStore.this.commitLog.rollNextFile(this.reputFromOffset); readSize = result.getSize(); } } else if (!dispatchRequest.isSuccess()) { if (size > 0) { log.error("[BUG]read total count not equals msg total size. reputFromOffset={}", reputFromOffset); this.reputFromOffset += size; } else { doNext = false; // If user open the dledger pattern or the broker is master node, // it will not ignore the exception and fix the reputFromOffset variable if (DefaultMessageStore.this.getMessageStoreConfig().isEnableDLegerCommitLog() || DefaultMessageStore.this.brokerConfig.getBrokerId() == MixAll.MASTER_ID) { log.error("[BUG]dispatch message to consume queue error, COMMITLOG OFFSET: {}", this.reputFromOffset); this.reputFromOffset += result.getSize() - readSize; } } } } } finally { result.release(); } } else { doNext = false; } } } @Override public void run() { DefaultMessageStore.log.info(this.getServiceName() + " service started"); while (!this.isStopped()) { try { Thread.sleep(1); this.doReput(); } catch (Exception e) { DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e); } } DefaultMessageStore.log.info(this.getServiceName() + " service end"); } @Override public String getServiceName() { return ReputMessageService.class.getSimpleName(); } } }