/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.store.config;
import java.io.File;
import org.apache.rocketmq.common.annotation.ImportantField;
import org.apache.rocketmq.store.ConsumeQueue;
public class MessageStoreConfig {
public static final String MULTI_PATH_SPLITTER = ":";
//The root directory in which the log data is kept
@ImportantField
private String storePathRootDir = System.getProperty("user.home") + File.separator + "store";
//The directory in which the commitlog is kept
@ImportantField
private String storePathCommitLog = System.getProperty("user.home") + File.separator + "store"
+ File.separator + "commitlog";
private String readOnlyCommitLogStorePaths = null;
// CommitLog file size,default is 1G
private int mappedFileSizeCommitLog = 1024 * 1024 * 1024;
// ConsumeQueue file size,default is 30W
private int mappedFileSizeConsumeQueue = 300000 * ConsumeQueue.CQ_STORE_UNIT_SIZE;
// enable consume queue ext
private boolean enableConsumeQueueExt = false;
// ConsumeQueue extend file size, 48M
private int mappedFileSizeConsumeQueueExt = 48 * 1024 * 1024;
// Bit count of filter bit map.
// this will be set by pipe of calculate filter bit map.
private int bitMapLengthConsumeQueueExt = 64;
// CommitLog flush interval
// flush data to disk
@ImportantField
private int flushIntervalCommitLog = 500;
// Only used if TransientStorePool enabled
// flush data to FileChannel
@ImportantField
private int commitIntervalCommitLog = 200;
/**
* introduced since 4.0.x. Determine whether to use mutex reentrantLock when putting message.
*/
private boolean useReentrantLockWhenPutMessage = true;
// Whether schedule flush
@ImportantField
private boolean flushCommitLogTimed = true;
// ConsumeQueue flush interval
private int flushIntervalConsumeQueue = 1000;
// Resource reclaim interval
private int cleanResourceInterval = 10000;
// CommitLog removal interval
private int deleteCommitLogFilesInterval = 100;
// ConsumeQueue removal interval
private int deleteConsumeQueueFilesInterval = 100;
private int destroyMapedFileIntervalForcibly = 1000 * 120;
private int redeleteHangedFileInterval = 1000 * 120;
// When to delete,default is at 4 am
@ImportantField
private String deleteWhen = "04";
private int diskMaxUsedSpaceRatio = 75;
// The number of hours to keep a log file before deleting it (in hours)
@ImportantField
private int fileReservedTime = 72;
// Flow control for ConsumeQueue
private int putMsgIndexHightWater = 600000;
// The maximum size of message,default is 4M
private int maxMessageSize = 1024 * 1024 * 4;
// Whether check the CRC32 of the records consumed.
// This ensures no on-the-wire or on-disk corruption to the messages occurred.
// This check adds some overhead,so it may be disabled in cases seeking extreme performance.
private boolean checkCRCOnRecover = true;
// How many pages are to be flushed when flush CommitLog
private int flushCommitLogLeastPages = 4;
// How many pages are to be committed when commit data to file
private int commitCommitLogLeastPages = 4;
// Flush page size when the disk in warming state
private int flushLeastPagesWhenWarmMapedFile = 1024 / 4 * 16;
// How many pages are to be flushed when flush ConsumeQueue
private int flushConsumeQueueLeastPages = 2;
private int flushCommitLogThoroughInterval = 1000 * 10;
private int commitCommitLogThoroughInterval = 200;
private int flushConsumeQueueThoroughInterval = 1000 * 60;
@ImportantField
private int maxTransferBytesOnMessageInMemory = 1024 * 256;
@ImportantField
private int maxTransferCountOnMessageInMemory = 32;
@ImportantField
private int maxTransferBytesOnMessageInDisk = 1024 * 64;
@ImportantField
private int maxTransferCountOnMessageInDisk = 8;
@ImportantField
private int accessMessageInMemoryMaxRatio = 40;
@ImportantField
private boolean messageIndexEnable = true;
private int maxHashSlotNum = 5000000;
private int maxIndexNum = 5000000 * 4;
private int maxMsgsNumBatch = 64;
@ImportantField
private boolean messageIndexSafe = false;
private int haListenPort = 10912;
private int haSendHeartbeatInterval = 1000 * 5;
private int haHousekeepingInterval = 1000 * 20;
private int haTransferBatchSize = 1024 * 32;
@ImportantField
private String haMasterAddress = null;
private int haSlaveFallbehindMax = 1024 * 1024 * 256;
@ImportantField
private BrokerRole brokerRole = BrokerRole.ASYNC_MASTER;
@ImportantField
private FlushDiskType flushDiskType = FlushDiskType.ASYNC_FLUSH;
private int syncFlushTimeout = 1000 * 5;
private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
private long flushDelayOffsetInterval = 1000 * 10;
@ImportantField
private boolean cleanFileForciblyEnable = true;
private boolean warmMapedFileEnable = false;
private boolean offsetCheckInSlave = false;
private boolean debugLockEnable = false;
private boolean duplicationEnable = false;
private boolean diskFallRecorded = true;
private long osPageCacheBusyTimeOutMills = 1000;
private int defaultQueryMaxNum = 32;
@ImportantField
private boolean transientStorePoolEnable = false;
private int transientStorePoolSize = 5;
private boolean fastFailIfNoBufferInStorePool = false;
private boolean enableDLegerCommitLog = false;
private String dLegerGroup;
private String dLegerPeers;
private String dLegerSelfId;
private String preferredLeaderId;
private boolean isEnableBatchPush = false;
private boolean enableScheduleMessageStats = true;
public boolean isDebugLockEnable() {
return debugLockEnable;
}
public void setDebugLockEnable(final boolean debugLockEnable) {
this.debugLockEnable = debugLockEnable;
}
public boolean isDuplicationEnable() {
return duplicationEnable;
}
public void setDuplicationEnable(final boolean duplicationEnable) {
this.duplicationEnable = duplicationEnable;
}
public long getOsPageCacheBusyTimeOutMills() {
return osPageCacheBusyTimeOutMills;
}
public void setOsPageCacheBusyTimeOutMills(final long osPageCacheBusyTimeOutMills) {
this.osPageCacheBusyTimeOutMills = osPageCacheBusyTimeOutMills;
}
public boolean isDiskFallRecorded() {
return diskFallRecorded;
}
public void setDiskFallRecorded(final boolean diskFallRecorded) {
this.diskFallRecorded = diskFallRecorded;
}
public boolean isWarmMapedFileEnable() {
return warmMapedFileEnable;
}
public void setWarmMapedFileEnable(boolean warmMapedFileEnable) {
this.warmMapedFileEnable = warmMapedFileEnable;
}
public int getMappedFileSizeCommitLog() {
return mappedFileSizeCommitLog;
}
public void setMappedFileSizeCommitLog(int mappedFileSizeCommitLog) {
this.mappedFileSizeCommitLog = mappedFileSizeCommitLog;
}
public int getMappedFileSizeConsumeQueue() {
int factor = (int) Math.ceil(this.mappedFileSizeConsumeQueue / (ConsumeQueue.CQ_STORE_UNIT_SIZE * 1.0));
return (int) (factor * ConsumeQueue.CQ_STORE_UNIT_SIZE);
}
public void setMappedFileSizeConsumeQueue(int mappedFileSizeConsumeQueue) {
this.mappedFileSizeConsumeQueue = mappedFileSizeConsumeQueue;
}
public boolean isEnableConsumeQueueExt() {
return enableConsumeQueueExt;
}
public void setEnableConsumeQueueExt(boolean enableConsumeQueueExt) {
this.enableConsumeQueueExt = enableConsumeQueueExt;
}
public int getMappedFileSizeConsumeQueueExt() {
return mappedFileSizeConsumeQueueExt;
}
public void setMappedFileSizeConsumeQueueExt(int mappedFileSizeConsumeQueueExt) {
this.mappedFileSizeConsumeQueueExt = mappedFileSizeConsumeQueueExt;
}
public int getBitMapLengthConsumeQueueExt() {
return bitMapLengthConsumeQueueExt;
}
public void setBitMapLengthConsumeQueueExt(int bitMapLengthConsumeQueueExt) {
this.bitMapLengthConsumeQueueExt = bitMapLengthConsumeQueueExt;
}
public int getFlushIntervalCommitLog() {
return flushIntervalCommitLog;
}
public void setFlushIntervalCommitLog(int flushIntervalCommitLog) {
this.flushIntervalCommitLog = flushIntervalCommitLog;
}
public int getFlushIntervalConsumeQueue() {
return flushIntervalConsumeQueue;
}
public void setFlushIntervalConsumeQueue(int flushIntervalConsumeQueue) {
this.flushIntervalConsumeQueue = flushIntervalConsumeQueue;
}
public int getPutMsgIndexHightWater() {
return putMsgIndexHightWater;
}
public void setPutMsgIndexHightWater(int putMsgIndexHightWater) {
this.putMsgIndexHightWater = putMsgIndexHightWater;
}
public int getCleanResourceInterval() {
return cleanResourceInterval;
}
public void setCleanResourceInterval(int cleanResourceInterval) {
this.cleanResourceInterval = cleanResourceInterval;
}
public int getMaxMessageSize() {
return maxMessageSize;
}
public void setMaxMessageSize(int maxMessageSize) {
this.maxMessageSize = maxMessageSize;
}
public boolean isCheckCRCOnRecover() {
return checkCRCOnRecover;
}
public boolean getCheckCRCOnRecover() {
return checkCRCOnRecover;
}
public void setCheckCRCOnRecover(boolean checkCRCOnRecover) {
this.checkCRCOnRecover = checkCRCOnRecover;
}
public String getStorePathCommitLog() {
return storePathCommitLog;
}
public void setStorePathCommitLog(String storePathCommitLog) {
this.storePathCommitLog = storePathCommitLog;
}
public String getDeleteWhen() {
return deleteWhen;
}
public void setDeleteWhen(String deleteWhen) {
this.deleteWhen = deleteWhen;
}
public int getDiskMaxUsedSpaceRatio() {
if (this.diskMaxUsedSpaceRatio < 10)
return 10;
if (this.diskMaxUsedSpaceRatio > 95)
return 95;
return diskMaxUsedSpaceRatio;
}
public void setDiskMaxUsedSpaceRatio(int diskMaxUsedSpaceRatio) {
this.diskMaxUsedSpaceRatio = diskMaxUsedSpaceRatio;
}
public int getDeleteCommitLogFilesInterval() {
return deleteCommitLogFilesInterval;
}
public void setDeleteCommitLogFilesInterval(int deleteCommitLogFilesInterval) {
this.deleteCommitLogFilesInterval = deleteCommitLogFilesInterval;
}
public int getDeleteConsumeQueueFilesInterval() {
return deleteConsumeQueueFilesInterval;
}
public void setDeleteConsumeQueueFilesInterval(int deleteConsumeQueueFilesInterval) {
this.deleteConsumeQueueFilesInterval = deleteConsumeQueueFilesInterval;
}
public int getMaxTransferBytesOnMessageInMemory() {
return maxTransferBytesOnMessageInMemory;
}
public void setMaxTransferBytesOnMessageInMemory(int maxTransferBytesOnMessageInMemory) {
this.maxTransferBytesOnMessageInMemory = maxTransferBytesOnMessageInMemory;
}
public int getMaxTransferCountOnMessageInMemory() {
return maxTransferCountOnMessageInMemory;
}
public void setMaxTransferCountOnMessageInMemory(int maxTransferCountOnMessageInMemory) {
this.maxTransferCountOnMessageInMemory = maxTransferCountOnMessageInMemory;
}
public int getMaxTransferBytesOnMessageInDisk() {
return maxTransferBytesOnMessageInDisk;
}
public void setMaxTransferBytesOnMessageInDisk(int maxTransferBytesOnMessageInDisk) {
this.maxTransferBytesOnMessageInDisk = maxTransferBytesOnMessageInDisk;
}
public int getMaxTransferCountOnMessageInDisk() {
return maxTransferCountOnMessageInDisk;
}
public void setMaxTransferCountOnMessageInDisk(int maxTransferCountOnMessageInDisk) {
this.maxTransferCountOnMessageInDisk = maxTransferCountOnMessageInDisk;
}
public int getFlushCommitLogLeastPages() {
return flushCommitLogLeastPages;
}
public void setFlushCommitLogLeastPages(int flushCommitLogLeastPages) {
this.flushCommitLogLeastPages = flushCommitLogLeastPages;
}
public int getFlushConsumeQueueLeastPages() {
return flushConsumeQueueLeastPages;
}
public void setFlushConsumeQueueLeastPages(int flushConsumeQueueLeastPages) {
this.flushConsumeQueueLeastPages = flushConsumeQueueLeastPages;
}
public int getFlushCommitLogThoroughInterval() {
return flushCommitLogThoroughInterval;
}
public void setFlushCommitLogThoroughInterval(int flushCommitLogThoroughInterval) {
this.flushCommitLogThoroughInterval = flushCommitLogThoroughInterval;
}
public int getFlushConsumeQueueThoroughInterval() {
return flushConsumeQueueThoroughInterval;
}
public void setFlushConsumeQueueThoroughInterval(int flushConsumeQueueThoroughInterval) {
this.flushConsumeQueueThoroughInterval = flushConsumeQueueThoroughInterval;
}
public int getDestroyMapedFileIntervalForcibly() {
return destroyMapedFileIntervalForcibly;
}
public void setDestroyMapedFileIntervalForcibly(int destroyMapedFileIntervalForcibly) {
this.destroyMapedFileIntervalForcibly = destroyMapedFileIntervalForcibly;
}
public int getFileReservedTime() {
return fileReservedTime;
}
public void setFileReservedTime(int fileReservedTime) {
this.fileReservedTime = fileReservedTime;
}
public int getRedeleteHangedFileInterval() {
return redeleteHangedFileInterval;
}
public void setRedeleteHangedFileInterval(int redeleteHangedFileInterval) {
this.redeleteHangedFileInterval = redeleteHangedFileInterval;
}
public int getAccessMessageInMemoryMaxRatio() {
return accessMessageInMemoryMaxRatio;
}
public void setAccessMessageInMemoryMaxRatio(int accessMessageInMemoryMaxRatio) {
this.accessMessageInMemoryMaxRatio = accessMessageInMemoryMaxRatio;
}
public boolean isMessageIndexEnable() {
return messageIndexEnable;
}
public void setMessageIndexEnable(boolean messageIndexEnable) {
this.messageIndexEnable = messageIndexEnable;
}
public int getMaxHashSlotNum() {
return maxHashSlotNum;
}
public void setMaxHashSlotNum(int maxHashSlotNum) {
this.maxHashSlotNum = maxHashSlotNum;
}
public int getMaxIndexNum() {
return maxIndexNum;
}
public void setMaxIndexNum(int maxIndexNum) {
this.maxIndexNum = maxIndexNum;
}
public int getMaxMsgsNumBatch() {
return maxMsgsNumBatch;
}
public void setMaxMsgsNumBatch(int maxMsgsNumBatch) {
this.maxMsgsNumBatch = maxMsgsNumBatch;
}
public int getHaListenPort() {
return haListenPort;
}
public void setHaListenPort(int haListenPort) {
this.haListenPort = haListenPort;
}
public int getHaSendHeartbeatInterval() {
return haSendHeartbeatInterval;
}
public void setHaSendHeartbeatInterval(int haSendHeartbeatInterval) {
this.haSendHeartbeatInterval = haSendHeartbeatInterval;
}
public int getHaHousekeepingInterval() {
return haHousekeepingInterval;
}
public void setHaHousekeepingInterval(int haHousekeepingInterval) {
this.haHousekeepingInterval = haHousekeepingInterval;
}
public BrokerRole getBrokerRole() {
return brokerRole;
}
public void setBrokerRole(BrokerRole brokerRole) {
this.brokerRole = brokerRole;
}
public void setBrokerRole(String brokerRole) {
this.brokerRole = BrokerRole.valueOf(brokerRole);
}
public int getHaTransferBatchSize() {
return haTransferBatchSize;
}
public void setHaTransferBatchSize(int haTransferBatchSize) {
this.haTransferBatchSize = haTransferBatchSize;
}
public int getHaSlaveFallbehindMax() {
return haSlaveFallbehindMax;
}
public void setHaSlaveFallbehindMax(int haSlaveFallbehindMax) {
this.haSlaveFallbehindMax = haSlaveFallbehindMax;
}
public FlushDiskType getFlushDiskType() {
return flushDiskType;
}
public void setFlushDiskType(FlushDiskType flushDiskType) {
this.flushDiskType = flushDiskType;
}
public void setFlushDiskType(String type) {
this.flushDiskType = FlushDiskType.valueOf(type);
}
public int getSyncFlushTimeout() {
return syncFlushTimeout;
}
public void setSyncFlushTimeout(int syncFlushTimeout) {
this.syncFlushTimeout = syncFlushTimeout;
}
public String getHaMasterAddress() {
return haMasterAddress;
}
public void setHaMasterAddress(String haMasterAddress) {
this.haMasterAddress = haMasterAddress;
}
public String getMessageDelayLevel() {
return messageDelayLevel;
}
public void setMessageDelayLevel(String messageDelayLevel) {
this.messageDelayLevel = messageDelayLevel;
}
public long getFlushDelayOffsetInterval() {
return flushDelayOffsetInterval;
}
public void setFlushDelayOffsetInterval(long flushDelayOffsetInterval) {
this.flushDelayOffsetInterval = flushDelayOffsetInterval;
}
public boolean isCleanFileForciblyEnable() {
return cleanFileForciblyEnable;
}
public void setCleanFileForciblyEnable(boolean cleanFileForciblyEnable) {
this.cleanFileForciblyEnable = cleanFileForciblyEnable;
}
public boolean isMessageIndexSafe() {
return messageIndexSafe;
}
public void setMessageIndexSafe(boolean messageIndexSafe) {
this.messageIndexSafe = messageIndexSafe;
}
public boolean isFlushCommitLogTimed() {
return flushCommitLogTimed;
}
public void setFlushCommitLogTimed(boolean flushCommitLogTimed) {
this.flushCommitLogTimed = flushCommitLogTimed;
}
public String getStorePathRootDir() {
return storePathRootDir;
}
public void setStorePathRootDir(String storePathRootDir) {
this.storePathRootDir = storePathRootDir;
}
public int getFlushLeastPagesWhenWarmMapedFile() {
return flushLeastPagesWhenWarmMapedFile;
}
public void setFlushLeastPagesWhenWarmMapedFile(int flushLeastPagesWhenWarmMapedFile) {
this.flushLeastPagesWhenWarmMapedFile = flushLeastPagesWhenWarmMapedFile;
}
public boolean isOffsetCheckInSlave() {
return offsetCheckInSlave;
}
public void setOffsetCheckInSlave(boolean offsetCheckInSlave) {
this.offsetCheckInSlave = offsetCheckInSlave;
}
public int getDefaultQueryMaxNum() {
return defaultQueryMaxNum;
}
public void setDefaultQueryMaxNum(int defaultQueryMaxNum) {
this.defaultQueryMaxNum = defaultQueryMaxNum;
}
/**
* Enable transient commitLog store pool only if transientStorePoolEnable is true and the FlushDiskType is
* ASYNC_FLUSH
*
* @return true or false
*/
public boolean isTransientStorePoolEnable() {
return transientStorePoolEnable && FlushDiskType.ASYNC_FLUSH == getFlushDiskType()
&& BrokerRole.SLAVE != getBrokerRole();
}
public void setTransientStorePoolEnable(final boolean transientStorePoolEnable) {
this.transientStorePoolEnable = transientStorePoolEnable;
}
public int getTransientStorePoolSize() {
return transientStorePoolSize;
}
public void setTransientStorePoolSize(final int transientStorePoolSize) {
this.transientStorePoolSize = transientStorePoolSize;
}
public int getCommitIntervalCommitLog() {
return commitIntervalCommitLog;
}
public void setCommitIntervalCommitLog(final int commitIntervalCommitLog) {
this.commitIntervalCommitLog = commitIntervalCommitLog;
}
public boolean isFastFailIfNoBufferInStorePool() {
return fastFailIfNoBufferInStorePool;
}
public void setFastFailIfNoBufferInStorePool(final boolean fastFailIfNoBufferInStorePool) {
this.fastFailIfNoBufferInStorePool = fastFailIfNoBufferInStorePool;
}
public boolean isUseReentrantLockWhenPutMessage() {
return useReentrantLockWhenPutMessage;
}
public void setUseReentrantLockWhenPutMessage(final boolean useReentrantLockWhenPutMessage) {
this.useReentrantLockWhenPutMessage = useReentrantLockWhenPutMessage;
}
public int getCommitCommitLogLeastPages() {
return commitCommitLogLeastPages;
}
public void setCommitCommitLogLeastPages(final int commitCommitLogLeastPages) {
this.commitCommitLogLeastPages = commitCommitLogLeastPages;
}
public int getCommitCommitLogThoroughInterval() {
return commitCommitLogThoroughInterval;
}
public void setCommitCommitLogThoroughInterval(final int commitCommitLogThoroughInterval) {
this.commitCommitLogThoroughInterval = commitCommitLogThoroughInterval;
}
public String getReadOnlyCommitLogStorePaths() {
return readOnlyCommitLogStorePaths;
}
public void setReadOnlyCommitLogStorePaths(String readOnlyCommitLogStorePaths) {
this.readOnlyCommitLogStorePaths = readOnlyCommitLogStorePaths;
}
public String getdLegerGroup() {
return dLegerGroup;
}
public void setdLegerGroup(String dLegerGroup) {
this.dLegerGroup = dLegerGroup;
}
public String getdLegerPeers() {
return dLegerPeers;
}
public void setdLegerPeers(String dLegerPeers) {
this.dLegerPeers = dLegerPeers;
}
public String getdLegerSelfId() {
return dLegerSelfId;
}
public void setdLegerSelfId(String dLegerSelfId) {
this.dLegerSelfId = dLegerSelfId;
}
public boolean isEnableDLegerCommitLog() {
return enableDLegerCommitLog;
}
public void setEnableDLegerCommitLog(boolean enableDLegerCommitLog) {
this.enableDLegerCommitLog = enableDLegerCommitLog;
}
public String getPreferredLeaderId() {
return preferredLeaderId;
}
public void setPreferredLeaderId(String preferredLeaderId) {
this.preferredLeaderId = preferredLeaderId;
}
public boolean isEnableBatchPush() {
return isEnableBatchPush;
}
public void setEnableBatchPush(boolean enableBatchPush) {
isEnableBatchPush = enableBatchPush;
}
public boolean isEnableScheduleMessageStats() {
return enableScheduleMessageStats;
}
public void setEnableScheduleMessageStats(boolean enableScheduleMessageStats) {
this.enableScheduleMessageStats = enableScheduleMessageStats;
}
}