未验证 提交 e90d20dd 编写于 作者: Z Zhendong Liu 提交者: GitHub

Merge pull request #1046 from apache/store_with_dledger

Add store with dledger
......@@ -10,6 +10,6 @@ devenv
*.versionsBackup
!NOTICE-BIN
!LICENSE-BIN
.DS_Store
localbin
nohup.out
.DS_Store
localbin
nohup.out
......@@ -48,3 +48,5 @@ We always welcome new contributions, whether for trivial cleanups, big new featu
----------
## License
[Apache License, Version 2.0](http://www.apache.org/licenses/LICENSE-2.0.html) Copyright (C) Apache Software Foundation
......@@ -27,6 +27,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
......@@ -39,6 +40,7 @@ import org.apache.rocketmq.broker.client.DefaultConsumerIdsChangeListener;
import org.apache.rocketmq.broker.client.ProducerManager;
import org.apache.rocketmq.broker.client.net.Broker2Client;
import org.apache.rocketmq.broker.client.rebalance.RebalanceLockManager;
import org.apache.rocketmq.broker.dledger.DLedgerRoleChangeHandler;
import org.apache.rocketmq.broker.filter.CommitLogDispatcherCalcBitMap;
import org.apache.rocketmq.broker.filter.ConsumerFilterManager;
import org.apache.rocketmq.broker.filtersrv.FilterServerManager;
......@@ -99,6 +101,7 @@ import org.apache.rocketmq.store.MessageArrivingListener;
import org.apache.rocketmq.store.MessageStore;
import org.apache.rocketmq.store.config.BrokerRole;
import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.apache.rocketmq.store.dledger.DLedgerCommitLog;
import org.apache.rocketmq.store.stats.BrokerStats;
import org.apache.rocketmq.store.stats.BrokerStatsManager;
......@@ -158,6 +161,7 @@ public class BrokerController {
private TransactionalMessageCheckService transactionalMessageCheckService;
private TransactionalMessageService transactionalMessageService;
private AbstractTransactionalMessageCheckListener transactionalMessageCheckListener;
private Future<?> slaveSyncFuture;
public BrokerController(
......@@ -234,6 +238,10 @@ public class BrokerController {
this.messageStore =
new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener,
this.brokerConfig);
if (messageStoreConfig.isEnableDLegerCommitLog()) {
DLedgerRoleChangeHandler roleChangeHandler = new DLedgerRoleChangeHandler(this, (DefaultMessageStore) messageStore);
((DLedgerCommitLog)((DefaultMessageStore) messageStore).getCommitLog()).getdLedgerServer().getdLedgerLeaderElector().addRoleChangeHandler(roleChangeHandler);
}
this.brokerStats = new BrokerStats((DefaultMessageStore) this.messageStore);
//load plugin
MessageStorePluginContext context = new MessageStorePluginContext(messageStoreConfig, brokerStatsManager, messageArrivingListener, brokerConfig);
......@@ -396,37 +404,26 @@ public class BrokerController {
}, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS);
}
if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {
if (this.messageStoreConfig.getHaMasterAddress() != null && this.messageStoreConfig.getHaMasterAddress().length() >= 6) {
this.messageStore.updateHaMasterAddress(this.messageStoreConfig.getHaMasterAddress());
this.updateMasterHAServerAddrPeriodically = false;
} else {
this.updateMasterHAServerAddrPeriodically = true;
}
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.slaveSynchronize.syncAll();
} catch (Throwable e) {
log.error("ScheduledTask syncAll slave exception", e);
}
if (!messageStoreConfig.isEnableDLegerCommitLog()) {
if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {
if (this.messageStoreConfig.getHaMasterAddress() != null && this.messageStoreConfig.getHaMasterAddress().length() >= 6) {
this.messageStore.updateHaMasterAddress(this.messageStoreConfig.getHaMasterAddress());
this.updateMasterHAServerAddrPeriodically = false;
} else {
this.updateMasterHAServerAddrPeriodically = true;
}
}, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);
} else {
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.printMasterAndSlaveDiff();
} catch (Throwable e) {
log.error("schedule printMasterAndSlaveDiff error.", e);
} else {
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.printMasterAndSlaveDiff();
} catch (Throwable e) {
log.error("schedule printMasterAndSlaveDiff error.", e);
}
}
}
}, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);
}, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);
}
}
if (TlsSystemConfig.tlsMode != TlsMode.DISABLED) {
......@@ -855,6 +852,13 @@ public class BrokerController {
this.filterServerManager.start();
}
if (!messageStoreConfig.isEnableDLegerCommitLog()) {
startProcessorByHa(messageStoreConfig.getBrokerRole());
handleSlaveSynchronize(messageStoreConfig.getBrokerRole());
}
this.registerBrokerAll(true, false, true);
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
......@@ -877,12 +881,7 @@ public class BrokerController {
this.brokerFastFailure.start();
}
if (BrokerRole.SLAVE != messageStoreConfig.getBrokerRole()) {
if (this.transactionalMessageCheckService != null) {
log.info("Start transaction service!");
this.transactionalMessageCheckService.start();
}
}
}
public synchronized void registerIncrementBrokerData(TopicConfig topicConfig, DataVersion dataVersion) {
......@@ -1101,4 +1100,118 @@ public class BrokerController {
return endTransactionThreadPoolQueue;
}
private void handleSlaveSynchronize(BrokerRole role) {
if (role == BrokerRole.SLAVE) {
if (null != slaveSyncFuture) {
slaveSyncFuture.cancel(false);
}
this.slaveSynchronize.setMasterAddr(null);
slaveSyncFuture = this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.slaveSynchronize.syncAll();
}
catch (Throwable e) {
log.error("ScheduledTask SlaveSynchronize syncAll error.", e);
}
}
}, 1000 * 3, 1000 * 10, TimeUnit.MILLISECONDS);
} else {
//handle the slave synchronise
if (null != slaveSyncFuture) {
slaveSyncFuture.cancel(false);
}
this.slaveSynchronize.setMasterAddr(null);
}
}
public void changeToSlave(int brokerId) {
log.info("Begin to change to slave brokerName={} brokerId={}", brokerConfig.getBrokerName(), brokerId);
//change the role
brokerConfig.setBrokerId(brokerId == 0 ? 1 : brokerId); //TO DO check
messageStoreConfig.setBrokerRole(BrokerRole.SLAVE);
//handle the scheduled service
try {
this.messageStore.handleScheduleMessageService(BrokerRole.SLAVE);
} catch (Throwable t) {
log.error("[MONITOR] handleScheduleMessageService failed when changing to slave", t);
}
//handle the transactional service
try {
this.shutdownProcessorByHa();
} catch (Throwable t) {
log.error("[MONITOR] shutdownProcessorByHa failed when changing to slave", t);
}
//handle the slave synchronise
handleSlaveSynchronize(BrokerRole.SLAVE);
try {
this.registerBrokerAll(true, true, brokerConfig.isForceRegister());
} catch (Throwable ignored) {
}
log.info("Finish to change to slave brokerName={} brokerId={}", brokerConfig.getBrokerName(), brokerId);
}
public void changeToMaster(BrokerRole role) {
if (role == BrokerRole.SLAVE) {
return;
}
log.info("Begin to change to master brokerName={}", brokerConfig.getBrokerName());
//handle the slave synchronise
handleSlaveSynchronize(role);
//handle the scheduled service
try {
this.messageStore.handleScheduleMessageService(role);
} catch (Throwable t) {
log.error("[MONITOR] handleScheduleMessageService failed when changing to master", t);
}
//handle the transactional service
try {
this.startProcessorByHa(BrokerRole.SYNC_MASTER);
} catch (Throwable t) {
log.error("[MONITOR] startProcessorByHa failed when changing to master", t);
}
//if the operations above are totally successful, we change to master
brokerConfig.setBrokerId(0); //TO DO check
messageStoreConfig.setBrokerRole(role);
try {
this.registerBrokerAll(true, true, brokerConfig.isForceRegister());
} catch (Throwable ignored) {
}
log.info("Finish to change to master brokerName={}", brokerConfig.getBrokerName());
}
private void startProcessorByHa(BrokerRole role) {
if (BrokerRole.SLAVE != role) {
if (this.transactionalMessageCheckService != null) {
this.transactionalMessageCheckService.start();
}
}
}
private void shutdownProcessorByHa() {
if (this.transactionalMessageCheckService != null) {
this.transactionalMessageCheckService.shutdown(true);
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.broker.dledger;
import io.openmessaging.storage.dledger.DLedgerLeaderElector;
import io.openmessaging.storage.dledger.DLedgerServer;
import io.openmessaging.storage.dledger.MemberState;
import io.openmessaging.storage.dledger.utils.DLedgerUtils;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.store.DefaultMessageStore;
import org.apache.rocketmq.store.config.BrokerRole;
import org.apache.rocketmq.store.dledger.DLedgerCommitLog;
public class DLedgerRoleChangeHandler implements DLedgerLeaderElector.RoleChangeHandler {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
private ExecutorService executorService = Executors.newSingleThreadExecutor(new ThreadFactoryImpl("DLegerRoleChangeHandler_"));
private BrokerController brokerController;
private DefaultMessageStore messageStore;
private DLedgerCommitLog dLedgerCommitLog;
private DLedgerServer dLegerServer;
public DLedgerRoleChangeHandler(BrokerController brokerController, DefaultMessageStore messageStore) {
this.brokerController = brokerController;
this.messageStore = messageStore;
this.dLedgerCommitLog = (DLedgerCommitLog) messageStore.getCommitLog();
this.dLegerServer = dLedgerCommitLog.getdLedgerServer();
}
@Override public void handle(long term, MemberState.Role role) {
Runnable runnable = new Runnable() {
@Override public void run() {
long start = System.currentTimeMillis();
try {
boolean succ = true;
log.info("Begin handling broker role change term={} role={} currStoreRole={}", term, role, messageStore.getMessageStoreConfig().getBrokerRole());
switch (role) {
case CANDIDATE:
if (messageStore.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE) {
brokerController.changeToSlave(dLedgerCommitLog.getId());
}
break;
case FOLLOWER:
brokerController.changeToSlave(dLedgerCommitLog.getId());
break;
case LEADER:
while (true) {
if (!dLegerServer.getMemberState().isLeader()) {
succ = false;
break;
}
if (dLegerServer.getdLedgerStore().getLedgerEndIndex() == -1) {
break;
}
if (dLegerServer.getdLedgerStore().getLedgerEndIndex() == dLegerServer.getdLedgerStore().getCommittedIndex()
&& messageStore.dispatchBehindBytes() == 0) {
break;
}
Thread.sleep(100);
}
if (succ) {
messageStore.recoverTopicQueueTable();
brokerController.changeToMaster(BrokerRole.SYNC_MASTER);
}
break;
default:
break;
}
log.info("Finish handling broker role change succ={} term={} role={} currStoreRole={} cost={}", succ, term, role, messageStore.getMessageStoreConfig().getBrokerRole(), DLedgerUtils.elapsed(start));
} catch (Throwable t) {
log.info("[MONITOR]Failed handling broker role change term={} role={} currStoreRole={} cost={}", term, role, messageStore.getMessageStoreConfig().getBrokerRole(), DLedgerUtils.elapsed(start), t);
}
}
};
executorService.submit(runnable);
}
@Override public void startup() {
}
@Override public void shutdown() {
executorService.shutdown();
}
}
......@@ -152,7 +152,7 @@ public class BrokerOuterAPI {
registerBrokerResultList.add(result);
}
log.info("register broker to name server {} OK", namesrvAddr);
log.info("register broker[{}]to name server {} OK", brokerId, namesrvAddr);
} catch (Exception e) {
log.warn("registerBroker Exception, {}", namesrvAddr, e);
} finally {
......
......@@ -23,7 +23,6 @@ import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.constant.PermName;
import org.apache.rocketmq.common.filter.ExpressionType;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.body.CheckClientRequestBody;
......@@ -36,6 +35,7 @@ import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
import org.apache.rocketmq.common.sysflag.TopicSysFlag;
import org.apache.rocketmq.filter.FilterFactory;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
......
......@@ -54,7 +54,7 @@ public class SlaveSynchronize {
private void syncTopicConfig() {
String masterAddrBak = this.masterAddr;
if (masterAddrBak != null) {
if (masterAddrBak != null && !masterAddrBak.equals(brokerController.getBrokerAddr())) {
try {
TopicConfigSerializeWrapper topicWrapper =
this.brokerController.getBrokerOuterAPI().getAllTopicConfig(masterAddrBak);
......@@ -78,7 +78,7 @@ public class SlaveSynchronize {
private void syncConsumerOffset() {
String masterAddrBak = this.masterAddr;
if (masterAddrBak != null) {
if (masterAddrBak != null && !masterAddrBak.equals(brokerController.getBrokerAddr())) {
try {
ConsumerOffsetSerializeWrapper offsetWrapper =
this.brokerController.getBrokerOuterAPI().getAllConsumerOffset(masterAddrBak);
......@@ -94,7 +94,7 @@ public class SlaveSynchronize {
private void syncDelayOffset() {
String masterAddrBak = this.masterAddr;
if (masterAddrBak != null) {
if (masterAddrBak != null && !masterAddrBak.equals(brokerController.getBrokerAddr())) {
try {
String delayOffset =
this.brokerController.getBrokerOuterAPI().getAllDelayOffset(masterAddrBak);
......@@ -118,7 +118,7 @@ public class SlaveSynchronize {
private void syncSubscriptionGroupConfig() {
String masterAddrBak = this.masterAddr;
if (masterAddrBak != null) {
if (masterAddrBak != null && !masterAddrBak.equals(brokerController.getBrokerAddr())) {
try {
SubscriptionGroupWrapper subscriptionWrapper =
this.brokerController.getBrokerOuterAPI()
......
......@@ -22,36 +22,15 @@ import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import java.util.concurrent.atomic.AtomicBoolean;
public class TransactionalMessageCheckService extends ServiceThread {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.TRANSACTION_LOGGER_NAME);
private BrokerController brokerController;
private final AtomicBoolean started = new AtomicBoolean(false);
public TransactionalMessageCheckService(BrokerController brokerController) {
this.brokerController = brokerController;
}
@Override
public void start() {
if (started.compareAndSet(false, true)) {
super.start();
this.brokerController.getTransactionalMessageService().open();
}
}
@Override
public void shutdown(boolean interrupt) {
if (started.compareAndSet(true, false)) {
super.shutdown(interrupt);
this.brokerController.getTransactionalMessageService().close();
this.brokerController.getTransactionalMessageCheckListener().shutDown();
}
}
@Override
public String getServiceName() {
return TransactionalMessageCheckService.class.getSimpleName();
......
......@@ -548,10 +548,10 @@ public class MQClientInstance {
}
} catch (Exception e) {
if (this.isBrokerInNameServer(addr)) {
log.info("send heart beat to broker[{} {} {}] failed", brokerName, id, addr);
log.info("send heart beat to broker[{} {} {}] failed", brokerName, id, addr, e);
} else {
log.info("send heart beat to broker[{} {} {}] exception, because the broker not up, forget it", brokerName,
id, addr);
id, addr, e);
}
}
}
......@@ -1046,20 +1046,8 @@ public class MQClientInstance {
if (this.brokerVersionTable.get(brokerName).containsKey(brokerAddr)) {
return this.brokerVersionTable.get(brokerName).get(brokerAddr);
}
} else {
HeartbeatData heartbeatData = prepareHeartbeatData();
try {
int version = this.mQClientAPIImpl.sendHearbeat(brokerAddr, heartbeatData, 3000);
return version;
} catch (Exception e) {
if (this.isBrokerInNameServer(brokerAddr)) {
log.info("send heart beat to broker[{} {}] failed", brokerName, brokerAddr);
} else {
log.info("send heart beat to broker[{} {}] exception, because the broker not up, forget it", brokerName,
brokerAddr);
}
}
}
//To do need to fresh the version
return 0;
}
......
......@@ -27,18 +27,29 @@ public abstract class ServiceThread implements Runnable {
private static final long JOIN_TIME = 90 * 1000;
protected final Thread thread;
private Thread thread;
protected final CountDownLatch2 waitPoint = new CountDownLatch2(1);
protected volatile AtomicBoolean hasNotified = new AtomicBoolean(false);
protected volatile boolean stopped = false;
protected boolean isDaemon = false;
//Make it able to restart the thread
private final AtomicBoolean started = new AtomicBoolean(false);
public ServiceThread() {
this.thread = new Thread(this, this.getServiceName());
}
public abstract String getServiceName();
public void start() {
log.info("Try to start service thread:{} started:{} lastThread:{}", getServiceName(), started.get(), thread);
if (!started.compareAndSet(false, true)) {
return;
}
stopped = false;
this.thread = new Thread(this, getServiceName());
this.thread.setDaemon(isDaemon);
this.thread.start();
}
......@@ -47,6 +58,10 @@ public abstract class ServiceThread implements Runnable {
}
public void shutdown(final boolean interrupt) {
log.info("Try to shutdown service thread:{} started:{} lastThread:{}", getServiceName(), started.get(), thread);
if (!started.compareAndSet(true, false)) {
return;
}
this.stopped = true;
log.info("shutdown thread " + this.getServiceName() + " interrupt " + interrupt);
......@@ -75,11 +90,16 @@ public abstract class ServiceThread implements Runnable {
return JOIN_TIME;
}
@Deprecated
public void stop() {
this.stop(false);
}
@Deprecated
public void stop(final boolean interrupt) {
if (!started.get()) {
return;
}
this.stopped = true;
log.info("stop thread " + this.getServiceName() + " interrupt " + interrupt);
......@@ -93,6 +113,9 @@ public abstract class ServiceThread implements Runnable {
}
public void makeStop() {
if (!started.get()) {
return;
}
this.stopped = true;
log.info("makestop thread " + this.getServiceName());
}
......@@ -128,4 +151,12 @@ public abstract class ServiceThread implements Runnable {
public boolean isStopped() {
return stopped;
}
public boolean isDaemon() {
return isDaemon;
}
public void setDaemon(boolean daemon) {
isDaemon = daemon;
}
}
......@@ -41,6 +41,7 @@ public class MessageDecoder {
public final static int MESSAGE_MAGIC_CODE = -626843481;
public static final char NAME_VALUE_SEPARATOR = 1;
public static final char PROPERTY_SEPARATOR = 2;
public static final int PHY_POS_POSITION = 4 + 4 + 4 + 4 + 4 + 8;
public static final int BODY_SIZE_POSITION = 4 // 1 TOTALSIZE
+ 4 // 2 MAGICCODE
+ 4 // 3 BODYCRC
......
#!/bin/sh
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
## Revise the base dir
CURRENT_DIR="$(cd "$(dirname "$0")"; pwd)"
RMQ_DIR=$CURRENT_DIR/../..
cd $RMQ_DIR
function startNameserver() {
export JAVA_OPT_EXT=" -Xms512m -Xmx512m "
nohup bin/mqnamesrv &
}
function startBroker() {
export JAVA_OPT_EXT=" -Xms1g -Xmx1g "
conf_name=$1
nohup bin/mqbroker -c $conf_name &
}
function stopNameserver() {
PIDS=$(ps -ef|grep java|grep NamesrvStartup|grep -v grep|awk '{print $2}')
if [ ! -z "$PIDS" ]; then
kill -s TERM $PIDS
fi
}
function stopBroker() {
conf_name=$1
PIDS=$(ps -ef|grep java|grep BrokerStartup|grep $conf_name|grep -v grep|awk '{print $2}')
i=1
while [ ! -z "$PIDS" -a $i -lt 5 ]
do
echo "Waiting to kill ..."
kill -s TERM $PIDS
((i=$i+1))
sleep 2
PIDS=$(ps -ef|grep java|grep BrokerStartup|grep $conf_name|grep -v grep|awk '{print $2}')
done
PIDS=$(ps -ef|grep java|grep BrokerStartup|grep $conf_name|grep -v grep|awk '{print $2}')
if [ ! -z "$PIDS" ]; then
kill -9 $PIDS
fi
}
function stopAll() {
ps -ef|grep java|grep BrokerStartup|grep -v grep|awk '{print $2}'|xargs kill
stopNameserver
stopBroker ./conf/dledger/broker-n0.conf
stopBroker ./conf/dledger/broker-n1.conf
stopBroker ./conf/dledger/broker-n2.conf
}
function startAll() {
startNameserver
startBroker ./conf/dledger/broker-n0.conf
startBroker ./conf/dledger/broker-n1.conf
startBroker ./conf/dledger/broker-n2.conf
}
function checkConf() {
if [ ! -f ./conf/dledger/broker-n0.conf -o ! -f ./conf/dledger/broker-n1.conf -o ! -f ./conf/dledger/broker-n2.conf ]; then
echo "Make sure the ./conf/dledger/broker-n0.conf, ./conf/dledger/broker-n1.conf, ./conf/dledger/broker-n2.conf exists"
exit -1
fi
}
## Main
if [ $# -lt 1 ]; then
echo "Usage: sh $0 start|stop"
exit -1
fi
action=$1
checkConf
case $action in
"start")
startAll
exit
;;
"stop")
stopAll
;;
*)
echo "Usage: sh $0 start|stop"
;;
esac
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
brokerClusterName = RaftCluster
brokerName=RaftNode00
listenPort=30911
namesrvAddr=127.0.0.1:9876
storePathRootDir=/tmp/rmqstore/node00
storePathCommitLog=/tmp/rmqstore/node00/commitlog
enableDLegerCommitLog=true
dLegerGroup=RaftNode00
dLegerPeers=n0-127.0.0.1:40911;n1-127.0.0.1:40912;n2-127.0.0.1:40913
## must be unique
dLegerSelfId=n0
sendMessageThreadPoolNums=16
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
brokerClusterName = RaftCluster
brokerName=RaftNode00
listenPort=30921
namesrvAddr=127.0.0.1:9876
storePathRootDir=/tmp/rmqstore/node01
storePathCommitLog=/tmp/rmqstore/node01/commitlog
enableDLegerCommitLog=true
dLegerGroup=RaftNode00
dLegerPeers=n0-127.0.0.1:40911;n1-127.0.0.1:40912;n2-127.0.0.1:40913
## must be unique
dLegerSelfId=n1
sendMessageThreadPoolNums=16
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
brokerClusterName = RaftCluster
brokerName=RaftNode00
listenPort=30931
namesrvAddr=127.0.0.1:9876
storePathRootDir=/tmp/rmqstore/node02
storePathCommitLog=/tmp/rmqstore/node02/commitlog
enableDLegerCommitLog=true
dLegerGroup=RaftNode00
dLegerPeers=n0-127.0.0.1:40911;n1-127.0.0.1:40912;n2-127.0.0.1:40913
## must be unique
dLegerSelfId=n2
sendMessageThreadPoolNums=16
......@@ -40,7 +40,7 @@
<fileSet>
<includes>
<include>bin/*</include>
<include>bin/**</include>
</includes>
<fileMode>0755</fileMode>
</fileSet>
......
## 前言
该文档主要介绍如何部署自动容灾切换的 RocketMQ-on-DLedger Group。
RocketMQ-on-DLedger Group 是指一组**相同名称**的 Broker,至少需要 3 个节点,通过 Raft 自动选举出一个 Leader,其余节点 作为 Follower,并在 Leader 和 Follower 之间复制数据以保证高可用。
RocketMQ-on-DLedger Group 能自动容灾切换,并保证数据一致。
RocketMQ-on-DLedger Group 是可以水平扩展的,也即可以部署任意多个 RocketMQ-on-DLedger Group 同时对外提供服务。
## 1. 新集群部署
#### 1.1 编写配置
每个 RocketMQ-on-DLedger Group 至少准备三台机器(本文假设为 3)。
编写 3 个配置文件,建议参考 conf/dledger 目录下的配置文件样例。
关键配置介绍:
| name | 含义 | 举例 |
| --- | --- | --- |
| enableDLegerCommitLog | 是否启动 DLedger  | true |
| dLegerGroup | DLedger Raft Group的名字,建议和 brokerName 保持一致 | RaftNode00 |
| dLegerPeers | DLedger Group 内各节点的端口信息,同一个 Group 内的各个节点配置必须要保证一致 | n0-127.0.0.1:40911;n1-127.0.0.1:40912;n2-127.0.0.1:40913 |
| dLegerSelfId | 节点 id, 必须属于 dLegerPeers 中的一个;同 Group 内各个节点要唯一 | n0 |
| sendMessageThreadPoolNums | 发送线程个数,建议配置成 Cpu 核数 | 16 |
这里贴出 conf/dledger/broker-n0.conf 的配置举例。
```
brokerClusterName = RaftCluster
brokerName=RaftNode00
listenPort=30911
namesrvAddr=127.0.0.1:9876
storePathRootDir=/tmp/rmqstore/node00
storePathCommitLog=/tmp/rmqstore/node00/commitlog
enableDLegerCommitLog=true
dLegerGroup=RaftNode00
dLegerPeers=n0-127.0.0.1:40911;n1-127.0.0.1:40912;n2-127.0.0.1:40913
## must be unique
dLegerSelfId=n0
sendMessageThreadPoolNums=16
```
### 1.2 启动 Broker
与老版本的启动方式一致。
`nohup sh bin/mqbroker -c conf/dledger/xxx-n0.conf & `
`nohup sh bin/mqbroker -c conf/dledger/xxx-n1.conf & `
`nohup sh bin/mqbroker -c conf/dledger/xxx-n2.conf & `
## 2. 旧集群升级
如果旧集群采用 Master 方式部署,则每个 Master 都需要转换成一个 RocketMQ-on-DLedger Group。
如果旧集群采用 Master-Slave 方式部署,则每个 Master-Slave 组都需要转换成一个 RocketMQ-on-DLedger Group。
### 2.1 杀掉旧的 Broker
可以通过 kill 命令来完成,也可以调用 `bin/mqshutdown broker`
### 2.2 检查旧的 Commitlog
RocketMQ-on-DLedger 组中的每个节点,可以兼容旧的 Commitlog ,但其 Raft 复制过程,只能针对新增加的消息。因此,为了避免出现异常,需要保证 旧的 Commitlog 是一致的。
如果旧的集群是采用 Master-Slave 方式部署,有可能在shutdown时,其数据并不是一致的,建议通过md5sum 的方式,检查最近的最少 2 个 Commmitlog 文件,如果发现不一致,则通过拷贝的方式进行对齐。
虽然 RocketMQ-on-DLedger Group 也可以以 2 节点方式部署,但其会丧失容灾切换能力(2n + 1 原则,至少需要3个节点才能容忍其中 1 个宕机)。
所以在对齐了 Master 和 Slave 的 Commitlog 之后,还需要准备第 3 台机器,并把旧的 Commitlog 从 Master 拷贝到 第 3 台机器(记得同时拷贝一下 config 文件夹)。
在 3 台机器准备好了之后,旧 Commitlog 文件也保证一致之后,就可以开始走下一步修改配置了。
### 2.3 修改配置
参考新集群部署。
### 2.4 重新启动 Broker
参考新集群部署。
### 前言
该文档主要介绍如何快速构建和部署基于 DLedger 的可以自动容灾切换的 RocketMQ 集群。
详细的新集群部署和旧集群升级指南请参考 [部署指南](deploy_guide.md)
### 1. 源码构建
构建分为两个部分,需要先构建 DLedger,然后 构建 RocketMQ
#### 1.1 构建 DLedger
`git clone https://github.com/openmessaging/openmessaging-storage-dledger.git`
`cd openmessaging-storage-dledger`
`mvn clean install -DskipTests`
#### 1.2 构建 RocketMQ
`git clone https://github.com/apache/rocketmq.git`
`cd rocketmq`
`git checkout -b store_with_dledger origin/store_with_dledger`
`mvn -Prelease-all -DskipTests clean install -U`
### 2. 快速部署
在构建成功后
`cd distribution/target/apache-rocketmq`
`sh bin/dledger/fast-try.sh start`
如果上面的步骤执行成功,可以通过 mqadmin 运维命令查看集群状态。
`sh bin/mqadmin clusterList -n 127.0.0.1:9876`
顺利的话,会看到如下内容:
![ClusterList](https://img.alicdn.com/5476e8b07b923/TB11Z.ZyCzqK1RjSZFLXXcn2XXa)
(BID 为 0 的表示 Master,其余都是 Follower)
启动成功,现在可以向集群收发消息,并进行容灾切换测试了。
关闭快速集群,可以执行:
`sh bin/dledger/fast-try.sh stop`
快速部署,默认配置在 conf/dledger 里面,默认的存储路径在 /tmp/rmqstore。
### 3. 容灾切换
部署成功,杀掉 Leader 之后(在上面的例子中,杀掉端口 30931 所在的进程),等待约 10s 左右,用 clusterList 命令查看集群,就会发现 Leader 切换到另一个节点了。
......@@ -128,6 +128,17 @@ public class RouteInfoManager {
brokerData = new BrokerData(clusterName, brokerName, new HashMap<Long, String>());
this.brokerAddrTable.put(brokerName, brokerData);
}
Map<Long, String> brokerAddrsMap = brokerData.getBrokerAddrs();
//Switch slave to master: first remove <1, IP:PORT> in namesrv, then add <0, IP:PORT>
//The same IP:PORT must only have one record in brokerAddrTable
Iterator<Entry<Long, String>> it = brokerAddrsMap.entrySet().iterator();
while (it.hasNext()) {
Entry<Long, String> item = it.next();
if (null != brokerAddr && brokerAddr.equals(item.getValue()) && brokerId != item.getKey()) {
it.remove();
}
}
String oldAddr = brokerData.getBrokerAddrs().put(brokerId, brokerAddr);
registerFirst = registerFirst || (null == oldAddr);
......
......@@ -100,8 +100,8 @@
<maven.test.skip>false</maven.test.skip>
<maven.javadoc.skip>true</maven.javadoc.skip>
<!-- Compiler settings properties -->
<maven.compiler.source>1.7</maven.compiler.source>
<maven.compiler.target>1.7</maven.compiler.target>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<sonar.java.coveragePlugin>jacoco</sonar.java.coveragePlugin>
<!-- Exclude all generated code -->
<sonar.jacoco.itReportPath>${project.basedir}/../test/target/jacoco-it.exec</sonar.jacoco.itReportPath>
......@@ -259,6 +259,7 @@
<exclude>*/target/**</exclude>
<exclude>*/*.iml</exclude>
<exclude>docs/**</exclude>
<exclude>localbin/**</exclude>
</excludes>
</configuration>
</plugin>
......
......@@ -28,6 +28,21 @@
<name>rocketmq-store ${project.version}</name>
<dependencies>
<dependency>
<groupId>io.openmessaging.storage</groupId>
<artifactId>dledger</artifactId>
<version>0.1</version>
<exclusions>
<exclusion>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-remoting</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>rocketmq-common</artifactId>
......
......@@ -120,16 +120,9 @@ public class AllocateMappedFileService extends ServiceThread {
return AllocateMappedFileService.class.getSimpleName();
}
@Override
public void shutdown() {
this.stopped = true;
this.thread.interrupt();
try {
this.thread.join(this.getJointime());
} catch (InterruptedException e) {
log.error("Interrupted", e);
}
super.shutdown(true);
for (AllocateRequest req : this.requestTable.values()) {
if (req.mappedFile != null) {
log.info("delete pre allocated maped file, {}", req.mappedFile.getFileName());
......
......@@ -45,11 +45,11 @@ import org.apache.rocketmq.store.schedule.ScheduleMessageService;
public class CommitLog {
// Message's MAGIC CODE daa320a7
public final static int MESSAGE_MAGIC_CODE = -626843481;
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
protected static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
// End of file empty MAGIC CODE cbd43194
private final static int BLANK_MAGIC_CODE = -875286124;
private final MappedFileQueue mappedFileQueue;
private final DefaultMessageStore defaultMessageStore;
protected final static int BLANK_MAGIC_CODE = -875286124;
protected final MappedFileQueue mappedFileQueue;
protected final DefaultMessageStore defaultMessageStore;
private final FlushCommitLogService flushCommitLogService;
//If TransientStorePool enabled, we must flush message to FileChannel at fixed periods
......@@ -57,11 +57,11 @@ public class CommitLog {
private final AppendMessageCallback appendMessageCallback;
private final ThreadLocal<MessageExtBatchEncoder> batchEncoderThreadLocal;
private HashMap<String/* topic-queueid */, Long/* offset */> topicQueueTable = new HashMap<String, Long>(1024);
private volatile long confirmOffset = -1L;
protected HashMap<String/* topic-queueid */, Long/* offset */> topicQueueTable = new HashMap<String, Long>(1024);
protected volatile long confirmOffset = -1L;
private volatile long beginTimeInLock = 0;
private final PutMessageLock putMessageLock;
protected final PutMessageLock putMessageLock;
public CommitLog(final DefaultMessageStore defaultMessageStore) {
this.mappedFileQueue = new MappedFileQueue(defaultMessageStore.getMessageStoreConfig().getStorePathCommitLog(),
......@@ -212,6 +212,12 @@ public class CommitLog {
log.warn("maxPhyOffsetOfConsumeQueue({}) >= processOffset({}), truncate dirty logic files", maxPhyOffsetOfConsumeQueue, processOffset);
this.defaultMessageStore.truncateDirtyLogicFiles(processOffset);
}
} else {
// Commitlog case files are deleted
log.warn("The commitlog files are deleted, and delete the consume queue files");
this.mappedFileQueue.setFlushedWhere(0);
this.mappedFileQueue.setCommittedWhere(0);
this.defaultMessageStore.destroyLogics();
}
}
......@@ -366,7 +372,7 @@ public class CommitLog {
return new DispatchRequest(-1, false /* success */);
}
private static int calMsgLength(int bodyLength, int topicLength, int propertiesLength) {
protected static int calMsgLength(int bodyLength, int topicLength, int propertiesLength) {
final int msgLen = 4 //TOTALSIZE
+ 4 //MAGICCODE
+ 4 //BODYCRC
......@@ -396,6 +402,7 @@ public class CommitLog {
this.confirmOffset = phyOffset;
}
@Deprecated
public void recoverAbnormally(long maxPhyOffsetOfConsumeQueue) {
// recover by the minimum time stamp
boolean checkCRCOnRecover = this.defaultMessageStore.getMessageStoreConfig().isCheckCRCOnRecover();
......@@ -456,7 +463,7 @@ public class CommitLog {
}
}
} else {
log.info("recover physics file end, " + mappedFile.getFileName());
log.info("recover physics file end, " + mappedFile.getFileName() + " pos=" + byteBuffer.position());
break;
}
}
......@@ -474,6 +481,7 @@ public class CommitLog {
}
// Commitlog case files are deleted
else {
log.warn("The commitlog files are deleted, and delete the consume queue files");
this.mappedFileQueue.setFlushedWhere(0);
this.mappedFileQueue.setCommittedWhere(0);
this.defaultMessageStore.destroyLogics();
......
......@@ -106,7 +106,7 @@ public class ConsumeQueue {
if (offset >= 0 && size > 0) {
mappedFileOffset = i + CQ_STORE_UNIT_SIZE;
this.maxPhysicOffset = offset;
this.maxPhysicOffset = offset + size;
if (isExtAddr(tagsCode)) {
maxExtAddr = tagsCode;
}
......@@ -224,7 +224,7 @@ public class ConsumeQueue {
int logicFileSize = this.mappedFileSize;
this.maxPhysicOffset = phyOffet - 1;
this.maxPhysicOffset = phyOffet;
long maxExtAddr = 1;
while (true) {
MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
......@@ -249,7 +249,7 @@ public class ConsumeQueue {
mappedFile.setWrotePosition(pos);
mappedFile.setCommittedPosition(pos);
mappedFile.setFlushedPosition(pos);
this.maxPhysicOffset = offset;
this.maxPhysicOffset = offset + size;
// This maybe not take effect, when not every consume queue has extend file.
if (isExtAddr(tagsCode)) {
maxExtAddr = tagsCode;
......@@ -267,7 +267,7 @@ public class ConsumeQueue {
mappedFile.setWrotePosition(pos);
mappedFile.setCommittedPosition(pos);
mappedFile.setFlushedPosition(pos);
this.maxPhysicOffset = offset;
this.maxPhysicOffset = offset + size;
if (isExtAddr(tagsCode)) {
maxExtAddr = tagsCode;
}
......@@ -348,7 +348,7 @@ public class ConsumeQueue {
long tagsCode = result.getByteBuffer().getLong();
if (offsetPy >= phyMinOffset) {
this.minLogicOffset = result.getMappedFile().getFileFromOffset() + i;
this.minLogicOffset = mappedFile.getFileFromOffset() + i;
log.info("Compute logical min offset: {}, topic: {}, queueId: {}",
this.getMinOffsetInQueue(), this.topic, this.queueId);
// This maybe not take effect, when not every consume queue has extend file.
......@@ -420,7 +420,8 @@ public class ConsumeQueue {
private boolean putMessagePositionInfo(final long offset, final int size, final long tagsCode,
final long cqOffset) {
if (offset <= this.maxPhysicOffset) {
if (offset + size <= this.maxPhysicOffset) {
log.warn("Maybe try to build consume queue repeatedly maxPhysicOffset={} phyOffset={}", maxPhysicOffset, offset);
return true;
}
......@@ -464,7 +465,7 @@ public class ConsumeQueue {
);
}
}
this.maxPhysicOffset = offset;
this.maxPhysicOffset = offset + size;
return mappedFile.appendMessage(this.byteBufferIndex.array());
}
return false;
......
......@@ -42,24 +42,23 @@ import org.apache.rocketmq.common.SystemClock;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageExtBatch;
import org.apache.rocketmq.common.running.RunningStats;
import org.apache.rocketmq.common.sysflag.MessageSysFlag;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.store.config.BrokerRole;
import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.apache.rocketmq.store.config.StorePathConfigHelper;
import org.apache.rocketmq.store.dledger.DLedgerCommitLog;
import org.apache.rocketmq.store.ha.HAService;
import org.apache.rocketmq.store.index.IndexService;
import org.apache.rocketmq.store.index.QueryOffsetResult;
import org.apache.rocketmq.store.schedule.ScheduleMessageService;
import org.apache.rocketmq.store.stats.BrokerStatsManager;
import static org.apache.rocketmq.store.config.BrokerRole.SLAVE;
public class DefaultMessageStore implements MessageStore {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
......@@ -119,7 +118,11 @@ public class DefaultMessageStore implements MessageStore {
this.messageStoreConfig = messageStoreConfig;
this.brokerStatsManager = brokerStatsManager;
this.allocateMappedFileService = new AllocateMappedFileService(this);
this.commitLog = new CommitLog(this);
if (messageStoreConfig.isEnableDLegerCommitLog()) {
this.commitLog = new DLedgerCommitLog(this);
} else {
this.commitLog = new CommitLog(this);
}
this.consumeQueueTable = new ConcurrentHashMap<>(32);
this.flushConsumeQueueService = new FlushConsumeQueueService();
......@@ -127,8 +130,11 @@ public class DefaultMessageStore implements MessageStore {
this.cleanConsumeQueueService = new CleanConsumeQueueService();
this.storeStatsService = new StoreStatsService();
this.indexService = new IndexService(this);
this.haService = new HAService(this);
if (!messageStoreConfig.isEnableDLegerCommitLog()) {
this.haService = new HAService(this);
} else {
this.haService = null;
}
this.reputMessageService = new ReputMessageService();
this.scheduleMessageService = new ScheduleMessageService(this);
......@@ -216,29 +222,71 @@ public class DefaultMessageStore implements MessageStore {
lockFile.getChannel().write(ByteBuffer.wrap("lock".getBytes()));
lockFile.getChannel().force(true);
this.flushConsumeQueueService.start();
this.commitLog.start();
this.storeStatsService.start();
if (this.scheduleMessageService != null && SLAVE != messageStoreConfig.getBrokerRole()) {
this.scheduleMessageService.start();
{
/**
* 1. Make sure the fast-forward messages to be truncated during the recovering according to the max physical offset of the commitlog;
* 2. DLedger committedPos may be missing, so the maxPhysicalPosInLogicQueue maybe bigger that maxOffset returned by DLedgerCommitLog, just let it go;
* 3. Calculate the reput offset according to the consume queue;
* 4. Make sure the fall-behind messages to be dispatched before starting the commitlog, especially when the broker role are automatically changed.
*/
long maxPhysicalPosInLogicQueue = commitLog.getMinOffset();
for (ConcurrentMap<Integer, ConsumeQueue> maps : this.consumeQueueTable.values()) {
for (ConsumeQueue logic : maps.values()) {
if (logic.getMaxPhysicOffset() > maxPhysicalPosInLogicQueue) {
maxPhysicalPosInLogicQueue = logic.getMaxPhysicOffset();
}
}
}
if (maxPhysicalPosInLogicQueue < 0) {
maxPhysicalPosInLogicQueue = 0;
}
if (maxPhysicalPosInLogicQueue < this.commitLog.getMinOffset()) {
maxPhysicalPosInLogicQueue = this.commitLog.getMinOffset();
/**
* This happens in following conditions:
* 1. If someone removes all the consumequeue files or the disk get damaged.
* 2. Launch a new broker, and copy the commitlog from other brokers.
*
* All the conditions has the same in common that the maxPhysicalPosInLogicQueue should be 0.
* If the maxPhysicalPosInLogicQueue is gt 0, there maybe something wrong.
*/
log.warn("[TooSmallCqOffset] maxPhysicalPosInLogicQueue={} clMinOffset={}", maxPhysicalPosInLogicQueue, this.commitLog.getMinOffset());
}
log.info("[SetReputOffset] maxPhysicalPosInLogicQueue={} clMinOffset={} clMaxOffset={} clConfirmedOffset={}",
maxPhysicalPosInLogicQueue, this.commitLog.getMinOffset(), this.commitLog.getMaxOffset(), this.commitLog.getConfirmOffset());
this.reputMessageService.setReputFromOffset(maxPhysicalPosInLogicQueue);
this.reputMessageService.start();
/**
* 1. Finish dispatching the messages fall behind, then to start other services.
* 2. DLedger committedPos may be missing, so here just require dispatchBehindBytes <= 0
*/
while (true) {
if (dispatchBehindBytes() <= 0) {
break;
}
Thread.sleep(1000);
log.info("Try to finish doing reput the messages fall behind during the starting, reputOffset={} maxOffset={} behind={}", this.reputMessageService.getReputFromOffset(), this.getMaxPhyOffset(), this.dispatchBehindBytes());
}
this.recoverTopicQueueTable();
}
if (this.getMessageStoreConfig().isDuplicationEnable()) {
this.reputMessageService.setReputFromOffset(this.commitLog.getConfirmOffset());
} else {
this.reputMessageService.setReputFromOffset(this.commitLog.getMaxOffset());
if (!messageStoreConfig.isEnableDLegerCommitLog()) {
this.haService.start();
this.handleScheduleMessageService(messageStoreConfig.getBrokerRole());
}
this.reputMessageService.start();
this.haService.start();
this.flushConsumeQueueService.start();
this.commitLog.start();
this.storeStatsService.start();
this.createTempFile();
this.addScheduleTask();
this.shutdown = false;
}
public void shutdown() {
if (!this.shutdown) {
this.shutdown = true;
......@@ -255,8 +303,9 @@ public class DefaultMessageStore implements MessageStore {
if (this.scheduleMessageService != null) {
this.scheduleMessageService.shutdown();
}
this.haService.shutdown();
if (this.haService != null) {
this.haService.shutdown();
}
this.storeStatsService.shutdown();
this.indexService.shutdown();
......@@ -1039,6 +1088,7 @@ public class DefaultMessageStore implements MessageStore {
return false;
}
@Override
public long dispatchBehindBytes() {
return this.reputMessageService.behind();
}
......@@ -1320,7 +1370,7 @@ public class DefaultMessageStore implements MessageStore {
return maxPhysicOffset;
}
private void recoverTopicQueueTable() {
public void recoverTopicQueueTable() {
HashMap<String/* topic-queueid */, Long/* offset */> table = new HashMap<String, Long>(1024);
long minPhyOffset = this.commitLog.getMinOffset();
for (ConcurrentMap<Integer, ConsumeQueue> maps : this.consumeQueueTable.values()) {
......@@ -1382,6 +1432,18 @@ public class DefaultMessageStore implements MessageStore {
return brokerStatsManager;
}
@Override
public void handleScheduleMessageService(final BrokerRole brokerRole) {
if (this.scheduleMessageService != null) {
if (brokerRole == BrokerRole.SLAVE) {
this.scheduleMessageService.shutdown();
} else {
this.scheduleMessageService.start();
}
}
}
public int remainTransientStoreBufferNumbs() {
return this.transientStorePool.remainBufferNumbs();
}
......@@ -1748,6 +1810,11 @@ public class DefaultMessageStore implements MessageStore {
}
private void doReput() {
if (this.reputFromOffset < DefaultMessageStore.this.commitLog.getMinOffset()) {
log.warn("The reputFromOffset={} is smaller than minPyOffset={}, this usually indicate that the dispatch behind too much and the commitlog has expired.",
this.reputFromOffset, DefaultMessageStore.this.commitLog.getMinOffset());
this.reputFromOffset = DefaultMessageStore.this.commitLog.getMinOffset();
}
for (boolean doNext = true; this.isCommitLogAvailable() && doNext; ) {
if (DefaultMessageStore.this.getMessageStoreConfig().isDuplicationEnable()
......@@ -1763,7 +1830,7 @@ public class DefaultMessageStore implements MessageStore {
for (int readSize = 0; readSize < result.getSize() && doNext; ) {
DispatchRequest dispatchRequest =
DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(result.getByteBuffer(), false, false);
int size = dispatchRequest.getMsgSize();
int size = dispatchRequest.getBufferSize() == -1 ? dispatchRequest.getMsgSize() : dispatchRequest.getBufferSize();
if (dispatchRequest.isSuccess()) {
if (size > 0) {
......@@ -1797,12 +1864,10 @@ public class DefaultMessageStore implements MessageStore {
this.reputFromOffset += size;
} else {
doNext = false;
if (DefaultMessageStore.this.brokerConfig.getBrokerId() == MixAll.MASTER_ID) {
log.error("[BUG]the master dispatch message to consume queue error, COMMITLOG OFFSET: {}",
this.reputFromOffset);
log.error("[BUG]dispatch message to consume queue error, COMMITLOG OFFSET: {}",
this.reputFromOffset);
this.reputFromOffset += result.getSize() - readSize;
}
this.reputFromOffset += result.getSize() - readSize;
}
}
}
......
......@@ -22,7 +22,7 @@ public class DispatchRequest {
private final String topic;
private final int queueId;
private final long commitLogOffset;
private final int msgSize;
private int msgSize;
private final long tagsCode;
private final long storeTimestamp;
private final long consumeQueueOffset;
......@@ -35,6 +35,8 @@ public class DispatchRequest {
private final Map<String, String> propertiesMap;
private byte[] bitMap;
private int bufferSize = -1;//the buffer size maybe larger than the msg size if the message is wrapped by something
public DispatchRequest(
final String topic,
final int queueId,
......@@ -156,4 +158,16 @@ public class DispatchRequest {
public void setBitMap(byte[] bitMap) {
this.bitMap = bitMap;
}
public void setMsgSize(int msgSize) {
this.msgSize = msgSize;
}
public int getBufferSize() {
return bufferSize;
}
public void setBufferSize(int bufferSize) {
this.bufferSize = bufferSize;
}
}
......@@ -154,8 +154,8 @@ public class MappedFileQueue {
if (file.length() != this.mappedFileSize) {
log.warn(file + "\t" + file.length()
+ " length not matched message store config value, ignore it");
return true;
+ " length not matched message store config value, please check it manually");
return false;
}
try {
......
......@@ -21,6 +21,7 @@ import java.util.LinkedList;
import java.util.Set;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageExtBatch;
import org.apache.rocketmq.store.config.BrokerRole;
import org.apache.rocketmq.store.stats.BrokerStatsManager;
/**
......@@ -366,4 +367,10 @@ public interface MessageStore {
* @return BrokerStatsManager.
*/
BrokerStatsManager getBrokerStatsManager();
/**
* handle
* @param brokerRole
*/
void handleScheduleMessageService(BrokerRole brokerRole);
}
......@@ -48,9 +48,9 @@ public class SelectMappedBufferResult {
this.byteBuffer.limit(this.size);
}
public MappedFile getMappedFile() {
/* public MappedFile getMappedFile() {
return mappedFile;
}
}*/
// @Override
// protected void finalize() {
......
......@@ -143,6 +143,11 @@ public class MessageStoreConfig {
private int transientStorePoolSize = 5;
private boolean fastFailIfNoBufferInStorePool = false;
private boolean enableDLegerCommitLog = false;
private String dLegerGroup;
private String dLegerPeers;
private String dLegerSelfId;
public boolean isDebugLockEnable() {
return debugLockEnable;
}
......@@ -666,4 +671,35 @@ public class MessageStoreConfig {
this.commitCommitLogThoroughInterval = commitCommitLogThoroughInterval;
}
public String getdLegerGroup() {
return dLegerGroup;
}
public void setdLegerGroup(String dLegerGroup) {
this.dLegerGroup = dLegerGroup;
}
public String getdLegerPeers() {
return dLegerPeers;
}
public void setdLegerPeers(String dLegerPeers) {
this.dLegerPeers = dLegerPeers;
}
public String getdLegerSelfId() {
return dLegerSelfId;
}
public void setdLegerSelfId(String dLegerSelfId) {
this.dLegerSelfId = dLegerSelfId;
}
public boolean isEnableDLegerCommitLog() {
return enableDLegerCommitLog;
}
public void setEnableDLegerCommitLog(boolean enableDLegerCommitLog) {
this.enableDLegerCommitLog = enableDLegerCommitLog;
}
}
......@@ -90,7 +90,7 @@ public class HAConnection {
this.selector = RemotingUtil.openSelector();
this.socketChannel = socketChannel;
this.socketChannel.register(this.selector, SelectionKey.OP_READ);
this.thread.setDaemon(true);
this.setDaemon(true);
}
@Override
......@@ -205,7 +205,7 @@ public class HAConnection {
this.selector = RemotingUtil.openSelector();
this.socketChannel = socketChannel;
this.socketChannel.register(this.selector, SelectionKey.OP_WRITE);
this.thread.setDaemon(true);
this.setDaemon(true);
}
@Override
......
......@@ -19,11 +19,11 @@ package org.apache.rocketmq.store.schedule;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.rocketmq.common.ConfigManager;
import org.apache.rocketmq.common.TopicFilterType;
import org.apache.rocketmq.common.constant.LoggerName;
......@@ -38,6 +38,7 @@ import org.apache.rocketmq.store.ConsumeQueue;
import org.apache.rocketmq.store.ConsumeQueueExt;
import org.apache.rocketmq.store.DefaultMessageStore;
import org.apache.rocketmq.store.MessageExtBrokerInner;
import org.apache.rocketmq.store.MessageStore;
import org.apache.rocketmq.store.PutMessageResult;
import org.apache.rocketmq.store.PutMessageStatus;
import org.apache.rocketmq.store.SelectMappedBufferResult;
......@@ -56,15 +57,15 @@ public class ScheduleMessageService extends ConfigManager {
private final ConcurrentMap<Integer /* level */, Long/* offset */> offsetTable =
new ConcurrentHashMap<Integer, Long>(32);
private final Timer timer = new Timer("ScheduleMessageTimerThread", true);
private final DefaultMessageStore defaultMessageStore;
private final AtomicBoolean started = new AtomicBoolean(false);
private Timer timer;
private MessageStore writeMessageStore;
private int maxDelayLevel;
public ScheduleMessageService(final DefaultMessageStore defaultMessageStore) {
this.defaultMessageStore = defaultMessageStore;
this.writeMessageStore = defaultMessageStore;
}
public static int queueId2DelayLevel(final int queueId) {
......@@ -75,10 +76,18 @@ public class ScheduleMessageService extends ConfigManager {
return delayLevel - 1;
}
/**
* @param writeMessageStore
* the writeMessageStore to set
*/
public void setWriteMessageStore(MessageStore writeMessageStore) {
this.writeMessageStore = writeMessageStore;
}
public void buildRunningStats(HashMap<String, String> stats) {
Iterator<Entry<Integer, Long>> it = this.offsetTable.entrySet().iterator();
Iterator<Map.Entry<Integer, Long>> it = this.offsetTable.entrySet().iterator();
while (it.hasNext()) {
Entry<Integer, Long> next = it.next();
Map.Entry<Integer, Long> next = it.next();
int queueId = delayLevel2QueueId(next.getKey());
long delayOffset = next.getValue();
long maxOffset = this.defaultMessageStore.getMaxOffsetInQueue(SCHEDULE_TOPIC, queueId);
......@@ -102,35 +111,45 @@ public class ScheduleMessageService extends ConfigManager {
}
public void start() {
if (started.compareAndSet(false, true)) {
this.timer = new Timer("ScheduleMessageTimerThread", true);
for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) {
Integer level = entry.getKey();
Long timeDelay = entry.getValue();
Long offset = this.offsetTable.get(level);
if (null == offset) {
offset = 0L;
}
for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) {
Integer level = entry.getKey();
Long timeDelay = entry.getValue();
Long offset = this.offsetTable.get(level);
if (null == offset) {
offset = 0L;
}
if (timeDelay != null) {
this.timer.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME);
if (timeDelay != null) {
this.timer.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME);
}
}
}
this.timer.scheduleAtFixedRate(new TimerTask() {
this.timer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
try {
ScheduleMessageService.this.persist();
} catch (Throwable e) {
log.error("scheduleAtFixedRate flush exception", e);
@Override
public void run() {
try {
if (started.get()) ScheduleMessageService.this.persist();
} catch (Throwable e) {
log.error("scheduleAtFixedRate flush exception", e);
}
}
}
}, 10000, this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval());
}, 10000, this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval());
}
}
public void shutdown() {
this.timer.cancel();
if (this.started.compareAndSet(true, false)) {
if (null != this.timer)
this.timer.cancel();
}
}
public boolean isStarted() {
return started.get();
}
public int getMaxDelayLevel() {
......@@ -214,7 +233,9 @@ public class ScheduleMessageService extends ConfigManager {
@Override
public void run() {
try {
this.executeOnTimeup();
if (isStarted()) {
this.executeOnTimeup();
}
} catch (Exception e) {
// XXX: warn and notify me
log.error("ScheduleMessageService, executeOnTimeup exception", e);
......@@ -285,7 +306,7 @@ public class ScheduleMessageService extends ConfigManager {
try {
MessageExtBrokerInner msgInner = this.messageTimeup(msgExt);
PutMessageResult putMessageResult =
ScheduleMessageService.this.defaultMessageStore
ScheduleMessageService.this.writeMessageStore
.putMessage(msgInner);
if (putMessageResult != null
......
......@@ -30,9 +30,10 @@ import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.apache.rocketmq.store.stats.BrokerStatsManager;
import static org.assertj.core.api.Assertions.assertThat;
import org.junit.Test;
import static org.assertj.core.api.Assertions.assertThat;
public class ConsumeQueueTest {
private static final String msg = "Once, there was a chance for me!";
......
......@@ -42,9 +42,9 @@ public class DefaultMessageStoreShutDownTest {
public void init() throws Exception {
messageStore = spy(buildMessageStore());
boolean load = messageStore.load();
when(messageStore.dispatchBehindBytes()).thenReturn(100L);
assertTrue(load);
messageStore.start();
when(messageStore.dispatchBehindBytes()).thenReturn(100L);
}
@Test
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.store;
import java.io.File;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.HashSet;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.store.config.StorePathConfigHelper;
import org.junit.After;
public class StoreTestBase {
private int QUEUE_TOTAL = 100;
private AtomicInteger QueueId = new AtomicInteger(0);
private SocketAddress BornHost = new InetSocketAddress("127.0.0.1", 8123);
private SocketAddress StoreHost = BornHost;
private byte[] MessageBody = new byte[1024];
protected Set<String> baseDirs = new HashSet<>();
private static AtomicInteger port = new AtomicInteger(30000);
public static synchronized int nextPort() {
return port.addAndGet(5);
}
protected MessageExtBrokerInner buildMessage() {
MessageExtBrokerInner msg = new MessageExtBrokerInner();
msg.setTopic("StoreTest");
msg.setTags("TAG1");
msg.setKeys("Hello");
msg.setBody(MessageBody);
msg.setKeys(String.valueOf(System.currentTimeMillis()));
msg.setQueueId(Math.abs(QueueId.getAndIncrement()) % QUEUE_TOTAL);
msg.setSysFlag(0);
msg.setBornTimestamp(System.currentTimeMillis());
msg.setStoreHost(StoreHost);
msg.setBornHost(BornHost);
return msg;
}
public static String createBaseDir() {
String baseDir = System.getProperty("user.home") + File.separator + "unitteststore" + File.separator + UUID.randomUUID();
final File file = new File(baseDir);
if (file.exists()) {
System.exit(1);
}
return baseDir;
}
public static boolean makeSureFileExists(String fileName) throws Exception {
File file = new File(fileName);
MappedFile.ensureDirOK(file.getParent());
return file.createNewFile();
}
public static void deleteFile(String fileName) {
deleteFile(new File(fileName));
}
public static void deleteFile(File file) {
UtilAll.deleteFile(file);
}
@After
public void clear() {
for (String baseDir : baseDirs) {
deleteFile(baseDir);
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.store.dledger;
import io.openmessaging.storage.dledger.DLedgerServer;
import io.openmessaging.storage.dledger.store.file.DLedgerMmapFileStore;
import io.openmessaging.storage.dledger.store.file.MmapFileList;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.store.DefaultMessageStore;
import org.apache.rocketmq.store.GetMessageResult;
import org.apache.rocketmq.store.GetMessageStatus;
import org.apache.rocketmq.store.MessageExtBrokerInner;
import org.apache.rocketmq.store.PutMessageResult;
import org.apache.rocketmq.store.PutMessageStatus;
import org.junit.Assert;
import org.junit.Test;
public class DLedgerCommitlogTest extends MessageStoreTestBase {
@Test
public void testTruncateCQ() throws Exception {
String base = createBaseDir();
String peers = String.format("n0-localhost:%d", nextPort());
String group = UUID.randomUUID().toString();
String topic = UUID.randomUUID().toString();
{
DefaultMessageStore messageStore = createDledgerMessageStore(base, group, "n0", peers, null, false, 0);
DLedgerCommitLog dLedgerCommitLog = (DLedgerCommitLog) messageStore.getCommitLog();
DLedgerServer dLedgerServer = dLedgerCommitLog.getdLedgerServer();
DLedgerMmapFileStore dLedgerMmapFileStore = (DLedgerMmapFileStore) dLedgerServer.getdLedgerStore();
MmapFileList mmapFileList = dLedgerMmapFileStore.getDataFileList();
Thread.sleep(2000);
doPutMessages(messageStore, topic, 0, 2000, 0);
Thread.sleep(100);
Assert.assertEquals(24, mmapFileList.getMappedFiles().size());
Assert.assertEquals(0, messageStore.getMinOffsetInQueue(topic, 0));
Assert.assertEquals(2000, messageStore.getMaxOffsetInQueue(topic, 0));
Assert.assertEquals(0, messageStore.dispatchBehindBytes());
doGetMessages(messageStore, topic, 0, 2000, 0);
messageStore.shutdown();
}
{
//Abnormal recover, left some commitlogs
DefaultMessageStore messageStore = createDledgerMessageStore(base, group, "n0", peers, null, true, 4);
DLedgerCommitLog dLedgerCommitLog = (DLedgerCommitLog) messageStore.getCommitLog();
DLedgerServer dLedgerServer = dLedgerCommitLog.getdLedgerServer();
DLedgerMmapFileStore dLedgerMmapFileStore = (DLedgerMmapFileStore) dLedgerServer.getdLedgerStore();
MmapFileList mmapFileList = dLedgerMmapFileStore.getDataFileList();
Thread.sleep(1000);
Assert.assertEquals(20, mmapFileList.getMappedFiles().size());
Assert.assertEquals(0, messageStore.getMinOffsetInQueue(topic, 0));
Assert.assertEquals(1700, messageStore.getMaxOffsetInQueue(topic, 0));
Assert.assertEquals(0, messageStore.dispatchBehindBytes());
doGetMessages(messageStore, topic, 0, 1700, 0);
messageStore.shutdown();
}
{
//Abnormal recover, left none commitlogs
DefaultMessageStore messageStore = createDledgerMessageStore(base, group, "n0", peers, null, true, 20);
DLedgerCommitLog dLedgerCommitLog = (DLedgerCommitLog) messageStore.getCommitLog();
DLedgerServer dLedgerServer = dLedgerCommitLog.getdLedgerServer();
DLedgerMmapFileStore dLedgerMmapFileStore = (DLedgerMmapFileStore) dLedgerServer.getdLedgerStore();
MmapFileList mmapFileList = dLedgerMmapFileStore.getDataFileList();
Thread.sleep(1000);
Assert.assertEquals(0, mmapFileList.getMappedFiles().size());
Assert.assertEquals(0, messageStore.getMinOffsetInQueue(topic, 0));
Assert.assertEquals(0, messageStore.getMaxOffsetInQueue(topic, 0));
Assert.assertEquals(0, messageStore.dispatchBehindBytes());
messageStore.shutdown();
}
}
@Test
public void testRecover() throws Exception {
String base = createBaseDir();
String peers = String.format("n0-localhost:%d", nextPort());
String group = UUID.randomUUID().toString();
String topic = UUID.randomUUID().toString();
{
DefaultMessageStore messageStore = createDledgerMessageStore(base, group, "n0", peers, null, false, 0);
Thread.sleep(1000);
doPutMessages(messageStore, topic, 0, 1000, 0);
Thread.sleep(100);
Assert.assertEquals(0, messageStore.getMinOffsetInQueue(topic, 0));
Assert.assertEquals(1000, messageStore.getMaxOffsetInQueue(topic, 0));
Assert.assertEquals(0, messageStore.dispatchBehindBytes());
doGetMessages(messageStore, topic, 0, 1000, 0);
messageStore.shutdown();
}
{
//normal recover
DefaultMessageStore messageStore = createDledgerMessageStore(base, group, "n0", peers, null, false, 0);
Assert.assertEquals(0, messageStore.getMinOffsetInQueue(topic, 0));
Assert.assertEquals(1000, messageStore.getMaxOffsetInQueue(topic, 0));
Assert.assertEquals(0, messageStore.dispatchBehindBytes());
doGetMessages(messageStore, topic, 0, 1000, 0);
messageStore.shutdown();
}
{
//abnormal recover
DefaultMessageStore messageStore = createDledgerMessageStore(base, group, "n0", peers, null, true, 0);
Assert.assertEquals(0, messageStore.getMinOffsetInQueue(topic, 0));
Assert.assertEquals(1000, messageStore.getMaxOffsetInQueue(topic, 0));
Assert.assertEquals(0, messageStore.dispatchBehindBytes());
doGetMessages(messageStore, topic, 0, 1000, 0);
messageStore.shutdown();
}
}
@Test
public void testPutAndGetMessage() throws Exception {
String base = createBaseDir();
String peers = String.format("n0-localhost:%d", nextPort());
String group = UUID.randomUUID().toString();
DefaultMessageStore messageStore = createDledgerMessageStore(base, group, "n0", peers, null, false, 0);
Thread.sleep(1000);
String topic = UUID.randomUUID().toString();
List<PutMessageResult> results = new ArrayList<>();
for (int i = 0; i < 10; i++) {
MessageExtBrokerInner msgInner = buildMessage();
msgInner.setTopic(topic);
msgInner.setQueueId(0);
PutMessageResult putMessageResult = messageStore.putMessage(msgInner);
results.add(putMessageResult);
Assert.assertEquals(PutMessageStatus.PUT_OK, putMessageResult.getPutMessageStatus());
Assert.assertEquals(i, putMessageResult.getAppendMessageResult().getLogicsOffset());
}
Thread.sleep(100);
Assert.assertEquals(0, messageStore.getMinOffsetInQueue(topic, 0));
Assert.assertEquals(10, messageStore.getMaxOffsetInQueue(topic, 0));
Assert.assertEquals(0, messageStore.dispatchBehindBytes());
GetMessageResult getMessageResult = messageStore.getMessage("group", topic, 0, 0, 32, null);
Assert.assertEquals(GetMessageStatus.FOUND, getMessageResult.getStatus());
Assert.assertEquals(10, getMessageResult.getMessageBufferList().size());
Assert.assertEquals(10, getMessageResult.getMessageMapedList().size());
for (int i = 0; i < results.size(); i++) {
ByteBuffer buffer = getMessageResult.getMessageBufferList().get(i);
MessageExt messageExt = MessageDecoder.decode(buffer);
Assert.assertEquals(i, messageExt.getQueueOffset());
Assert.assertEquals(results.get(i).getAppendMessageResult().getMsgId(), messageExt.getMsgId());
Assert.assertEquals(results.get(i).getAppendMessageResult().getWroteOffset(), messageExt.getCommitLogOffset());
}
messageStore.destroy();
messageStore.shutdown();
}
@Test
public void testCommittedPos() throws Exception {
String peers = String.format("n0-localhost:%d;n1-localhost:%d", nextPort(), nextPort());
String group = UUID.randomUUID().toString();
DefaultMessageStore leaderStore = createDledgerMessageStore(createBaseDir(), group,"n0", peers, "n0", false, 0);
String topic = UUID.randomUUID().toString();
MessageExtBrokerInner msgInner = buildMessage();
msgInner.setTopic(topic);
msgInner.setQueueId(0);
PutMessageResult putMessageResult = leaderStore.putMessage(msgInner);
Assert.assertEquals(PutMessageStatus.OS_PAGECACHE_BUSY, putMessageResult.getPutMessageStatus());
Thread.sleep(1000);
Assert.assertEquals(0, leaderStore.getCommitLog().getMaxOffset());
Assert.assertEquals(0, leaderStore.getMaxOffsetInQueue(topic, 0));
DefaultMessageStore followerStore = createDledgerMessageStore(createBaseDir(), group,"n1", peers, "n0", false, 0);
Thread.sleep(2000);
Assert.assertEquals(1, leaderStore.getMaxOffsetInQueue(topic, 0));
Assert.assertEquals(1, followerStore.getMaxOffsetInQueue(topic, 0));
Assert.assertTrue(leaderStore.getCommitLog().getMaxOffset() > 0);
leaderStore.destroy();
followerStore.destroy();
leaderStore.shutdown();
followerStore.shutdown();
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.store.dledger;
import io.openmessaging.storage.dledger.DLedgerConfig;
import io.openmessaging.storage.dledger.DLedgerServer;
import java.io.File;
import java.util.Arrays;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.store.DefaultMessageStore;
import org.apache.rocketmq.store.GetMessageResult;
import org.apache.rocketmq.store.MessageExtBrokerInner;
import org.apache.rocketmq.store.MessageStore;
import org.apache.rocketmq.store.PutMessageResult;
import org.apache.rocketmq.store.PutMessageStatus;
import org.apache.rocketmq.store.StoreTestBase;
import org.apache.rocketmq.store.config.FlushDiskType;
import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.apache.rocketmq.store.config.StorePathConfigHelper;
import org.apache.rocketmq.store.stats.BrokerStatsManager;
import org.junit.Assert;
public class MessageStoreTestBase extends StoreTestBase {
protected DefaultMessageStore createDledgerMessageStore(String base, String group, String selfId, String peers, String leaderId, boolean createAbort, int deleteFileNum) throws Exception {
System.setProperty("dledger.disk.ratio.check", "0.95");
System.setProperty("dledger.disk.ratio.clean", "0.95");
baseDirs.add(base);
MessageStoreConfig storeConfig = new MessageStoreConfig();
storeConfig.setMapedFileSizeCommitLog(1024 * 100);
storeConfig.setMapedFileSizeConsumeQueue(1024);
storeConfig.setMaxHashSlotNum(100);
storeConfig.setMaxIndexNum(100 * 10);
storeConfig.setStorePathRootDir(base);
storeConfig.setStorePathCommitLog(base + File.separator + "commitlog");
storeConfig.setFlushDiskType(FlushDiskType.ASYNC_FLUSH);
storeConfig.setEnableDLegerCommitLog(true);
storeConfig.setdLegerGroup(group);
storeConfig.setdLegerPeers(peers);
storeConfig.setdLegerSelfId(selfId);
DefaultMessageStore defaultMessageStore = new DefaultMessageStore(storeConfig, new BrokerStatsManager("DLedgerCommitlogTest"), (topic, queueId, logicOffset, tagsCode, msgStoreTime, filterBitMap, properties) -> {
}, new BrokerConfig());
DLedgerServer dLegerServer = ((DLedgerCommitLog) defaultMessageStore.getCommitLog()).getdLedgerServer();
if (leaderId != null) {
dLegerServer.getdLedgerConfig().setEnableLeaderElector(false);
if (selfId.equals(leaderId)) {
dLegerServer.getMemberState().changeToLeader(-1);
} else {
dLegerServer.getMemberState().changeToFollower(-1, leaderId);
}
}
if (createAbort) {
String fileName = StorePathConfigHelper.getAbortFile(storeConfig.getStorePathRootDir());
makeSureFileExists(fileName);
}
if (deleteFileNum > 0) {
DLedgerConfig config = dLegerServer.getdLedgerConfig();
if (deleteFileNum > 0) {
File dir = new File(config.getDataStorePath());
File[] files = dir.listFiles();
if (files != null) {
Arrays.sort(files);
for (int i = files.length - 1; i >= 0; i--) {
File file = files[i];
file.delete();
if (files.length - i >= deleteFileNum) {
break;
}
}
}
}
}
Assert.assertTrue(defaultMessageStore.load());
defaultMessageStore.start();
return defaultMessageStore;
}
protected DefaultMessageStore createMessageStore(String base, boolean createAbort) throws Exception {
baseDirs.add(base);
MessageStoreConfig storeConfig = new MessageStoreConfig();
storeConfig.setMapedFileSizeCommitLog(1024 * 100);
storeConfig.setMapedFileSizeConsumeQueue(1024);
storeConfig.setMaxHashSlotNum(100);
storeConfig.setMaxIndexNum(100 * 10);
storeConfig.setStorePathRootDir(base);
storeConfig.setStorePathCommitLog(base + File.separator + "commitlog");
storeConfig.setFlushDiskType(FlushDiskType.ASYNC_FLUSH);
DefaultMessageStore defaultMessageStore = new DefaultMessageStore(storeConfig, new BrokerStatsManager("CommitlogTest"), (topic, queueId, logicOffset, tagsCode, msgStoreTime, filterBitMap, properties) -> {
}, new BrokerConfig());
if (createAbort) {
String fileName = StorePathConfigHelper.getAbortFile(storeConfig.getStorePathRootDir());
makeSureFileExists(fileName);
}
Assert.assertTrue(defaultMessageStore.load());
defaultMessageStore.start();
return defaultMessageStore;
}
protected void doPutMessages(MessageStore messageStore, String topic, int queueId, int num, long beginLogicsOffset) {
for (int i = 0; i < num; i++) {
MessageExtBrokerInner msgInner = buildMessage();
msgInner.setTopic(topic);
msgInner.setQueueId(queueId);
PutMessageResult putMessageResult = messageStore.putMessage(msgInner);
Assert.assertEquals(PutMessageStatus.PUT_OK, putMessageResult.getPutMessageStatus());
Assert.assertEquals(beginLogicsOffset + i, putMessageResult.getAppendMessageResult().getLogicsOffset());
}
}
protected void doGetMessages(MessageStore messageStore, String topic, int queueId, int num, long beginLogicsOffset) {
for (int i = 0; i < num; i++) {
GetMessageResult getMessageResult = messageStore.getMessage("group", topic, queueId, beginLogicsOffset + i, 3, null);
Assert.assertNotNull(getMessageResult);
Assert.assertTrue(!getMessageResult.getMessageBufferList().isEmpty());
MessageExt messageExt = MessageDecoder.decode(getMessageResult.getMessageBufferList().get(0));
Assert.assertEquals(beginLogicsOffset + i, messageExt.getQueueOffset());
getMessageResult.release();
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.store.dledger;
import java.util.UUID;
import org.apache.rocketmq.store.DefaultMessageStore;
import org.apache.rocketmq.store.StoreTestBase;
import org.apache.rocketmq.store.config.StorePathConfigHelper;
import org.junit.Assert;
import org.junit.Test;
public class MixCommitlogTest extends MessageStoreTestBase {
@Test
public void testFallBehindCQ() throws Exception {
String base = createBaseDir();
String topic = UUID.randomUUID().toString();
String peers = String.format("n0-localhost:%d", nextPort());
String group = UUID.randomUUID().toString();
{
DefaultMessageStore originalStore = createMessageStore(base, false);
doPutMessages(originalStore, topic, 0, 1000, 0);
Assert.assertEquals(11, originalStore.getMaxPhyOffset()/originalStore.getMessageStoreConfig().getMapedFileSizeCommitLog());
Thread.sleep(500);
Assert.assertEquals(0, originalStore.getMinOffsetInQueue(topic, 0));
Assert.assertEquals(1000, originalStore.getMaxOffsetInQueue(topic, 0));
Assert.assertEquals(0, originalStore.dispatchBehindBytes());
doGetMessages(originalStore, topic, 0, 1000, 0);
originalStore.shutdown();
}
//delete the cq files
{
StoreTestBase.deleteFile(StorePathConfigHelper.getStorePathConsumeQueue(base));
}
{
DefaultMessageStore dledgerStore = createDledgerMessageStore(base, group, "n0", peers, null, true, 0);
Thread.sleep(2000);
Assert.assertEquals(0, dledgerStore.getMinOffsetInQueue(topic, 0));
Assert.assertEquals(1000, dledgerStore.getMaxOffsetInQueue(topic, 0));
Assert.assertEquals(0, dledgerStore.dispatchBehindBytes());
doGetMessages(dledgerStore, topic, 0, 1000, 0);
doPutMessages(dledgerStore, topic, 0, 1000, 1000);
Thread.sleep(500);
Assert.assertEquals(0, dledgerStore.getMinOffsetInQueue(topic, 0));
Assert.assertEquals(2000, dledgerStore.getMaxOffsetInQueue(topic, 0));
Assert.assertEquals(0, dledgerStore.dispatchBehindBytes());
doGetMessages(dledgerStore, topic, 0, 2000, 0);
dledgerStore.shutdown();
}
}
@Test
public void testPutAndGet() throws Exception {
String base = createBaseDir();
String topic = UUID.randomUUID().toString();
String peers = String.format("n0-localhost:%d", nextPort());
String group = UUID.randomUUID().toString();
long dividedOffset;
{
DefaultMessageStore originalStore = createMessageStore(base, false);
doPutMessages(originalStore, topic, 0, 1000, 0);
Thread.sleep(500);
Assert.assertEquals(0, originalStore.getMinOffsetInQueue(topic, 0));
Assert.assertEquals(1000, originalStore.getMaxOffsetInQueue(topic, 0));
Assert.assertEquals(0, originalStore.dispatchBehindBytes());
dividedOffset = originalStore.getCommitLog().getMaxOffset();
dividedOffset = dividedOffset - dividedOffset % originalStore.getMessageStoreConfig().getMapedFileSizeCommitLog() + originalStore.getMessageStoreConfig().getMapedFileSizeCommitLog();
doGetMessages(originalStore, topic, 0, 1000, 0);
originalStore.shutdown();
}
{
DefaultMessageStore recoverOriginalStore = createMessageStore(base, true);
Thread.sleep(500);
Assert.assertEquals(0, recoverOriginalStore.getMinOffsetInQueue(topic, 0));
Assert.assertEquals(1000, recoverOriginalStore.getMaxOffsetInQueue(topic, 0));
Assert.assertEquals(0, recoverOriginalStore.dispatchBehindBytes());
doGetMessages(recoverOriginalStore, topic, 0, 1000, 0);
recoverOriginalStore.shutdown();
}
{
DefaultMessageStore dledgerStore = createDledgerMessageStore(base, group, "n0", peers, null, true, 0);
DLedgerCommitLog dLedgerCommitLog = (DLedgerCommitLog) dledgerStore.getCommitLog();
Assert.assertFalse(dLedgerCommitLog.getdLedgerServer().getdLedgerConfig().isEnableDiskForceClean());
Assert.assertEquals(dividedOffset, dLedgerCommitLog.getDividedCommitlogOffset());
Assert.assertEquals(0, dledgerStore.dispatchBehindBytes());
Assert.assertEquals(dividedOffset, dLedgerCommitLog.getMaxOffset());
Thread.sleep(2000);
doPutMessages(dledgerStore, topic, 0, 1000, 1000);
Thread.sleep(500);
Assert.assertEquals(0, dledgerStore.getMinOffsetInQueue(topic, 0));
Assert.assertEquals(2000, dledgerStore.getMaxOffsetInQueue(topic, 0));
Assert.assertEquals(0, dledgerStore.dispatchBehindBytes());
doGetMessages(dledgerStore, topic, 0, 2000, 0);
dledgerStore.shutdown();
}
{
DefaultMessageStore recoverDledgerStore = createDledgerMessageStore(base, group, "n0", peers, null, true, 0);
DLedgerCommitLog dLedgerCommitLog = (DLedgerCommitLog) recoverDledgerStore.getCommitLog();
Assert.assertFalse(dLedgerCommitLog.getdLedgerServer().getdLedgerConfig().isEnableDiskForceClean());
Assert.assertEquals(dividedOffset, dLedgerCommitLog.getDividedCommitlogOffset());
Thread.sleep(2000);
doPutMessages(recoverDledgerStore, topic, 0, 1000, 2000);
Thread.sleep(500);
Assert.assertEquals(0, recoverDledgerStore.getMinOffsetInQueue(topic, 0));
Assert.assertEquals(3000, recoverDledgerStore.getMaxOffsetInQueue(topic, 0));
Assert.assertEquals(0, recoverDledgerStore.dispatchBehindBytes());
doGetMessages(recoverDledgerStore, topic, 0, 3000, 0);
recoverDledgerStore.shutdown();
}
}
@Test
public void testDeleteExpiredFiles() throws Exception {
String base = createBaseDir();
String topic = UUID.randomUUID().toString();
String peers = String.format("n0-localhost:%d", nextPort());
String group = UUID.randomUUID().toString();
long dividedOffset;
{
DefaultMessageStore originalStore = createMessageStore(base, false);
doPutMessages(originalStore, topic, 0, 1000, 0);
Thread.sleep(500);
Assert.assertEquals(0, originalStore.getMinOffsetInQueue(topic, 0));
Assert.assertEquals(1000, originalStore.getMaxOffsetInQueue(topic, 0));
Assert.assertEquals(0, originalStore.dispatchBehindBytes());
dividedOffset = originalStore.getCommitLog().getMaxOffset();
dividedOffset = dividedOffset - dividedOffset % originalStore.getMessageStoreConfig().getMapedFileSizeCommitLog() + originalStore.getMessageStoreConfig().getMapedFileSizeCommitLog();
originalStore.shutdown();
}
long maxPhysicalOffset;
{
DefaultMessageStore dledgerStore = createDledgerMessageStore(base, group, "n0", peers, null, true, 0);
DLedgerCommitLog dLedgerCommitLog = (DLedgerCommitLog) dledgerStore.getCommitLog();
Assert.assertTrue(dledgerStore.getMessageStoreConfig().isCleanFileForciblyEnable());
Assert.assertFalse(dLedgerCommitLog.getdLedgerServer().getdLedgerConfig().isEnableDiskForceClean());
Assert.assertEquals(dividedOffset, dLedgerCommitLog.getDividedCommitlogOffset());
Thread.sleep(2000);
doPutMessages(dledgerStore, topic, 0, 1000, 1000);
Thread.sleep(500);
Assert.assertEquals(0, dledgerStore.getMinOffsetInQueue(topic, 0));
Assert.assertEquals(2000, dledgerStore.getMaxOffsetInQueue(topic, 0));
Assert.assertEquals(0, dledgerStore.dispatchBehindBytes());
Assert.assertEquals(0, dledgerStore.getMinPhyOffset());
maxPhysicalOffset = dledgerStore.getMaxPhyOffset();
Assert.assertTrue(maxPhysicalOffset > 0);
doGetMessages(dledgerStore, topic, 0, 2000, 0);
for (int i = 0; i < 100; i++) {
dledgerStore.getCommitLog().deleteExpiredFile(System.currentTimeMillis(), 0, 0, true);
}
Assert.assertEquals(dividedOffset, dledgerStore.getMinPhyOffset());
Assert.assertEquals(maxPhysicalOffset, dledgerStore.getMaxPhyOffset());
for (int i = 0; i < 100; i++) {
Assert.assertEquals(Integer.MAX_VALUE, dledgerStore.getCommitLog().deleteExpiredFile(System.currentTimeMillis(), 0, 0, true));
}
Assert.assertEquals(dividedOffset, dledgerStore.getMinPhyOffset());
Assert.assertEquals(maxPhysicalOffset, dledgerStore.getMaxPhyOffset());
Assert.assertTrue(dledgerStore.getMessageStoreConfig().isCleanFileForciblyEnable());
Assert.assertTrue(dLedgerCommitLog.getdLedgerServer().getdLedgerConfig().isEnableDiskForceClean());
//Test fresh
dledgerStore.getMessageStoreConfig().setCleanFileForciblyEnable(false);
for (int i = 0; i < 100; i++) {
Assert.assertEquals(Integer.MAX_VALUE, dledgerStore.getCommitLog().deleteExpiredFile(System.currentTimeMillis(), 0, 0, true));
}
Assert.assertFalse(dLedgerCommitLog.getdLedgerServer().getdLedgerConfig().isEnableDiskForceClean());
doGetMessages(dledgerStore, topic, 0, 1000, 1000);
dledgerStore.shutdown();
}
}
}
......@@ -28,7 +28,7 @@
<appender-ref ref="STDOUT"/>
</logger>
<root level="ERROR">
<root level="OFF">
<appender-ref ref="STDOUT"/>
</root>
......
......@@ -17,6 +17,8 @@
package org.apache.rocketmq.test.factory;
import java.util.UUID;
import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.client.consumer.MessageSelector;
import org.apache.rocketmq.test.client.rmq.RMQBroadCastConsumer;
import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer;
......@@ -60,4 +62,12 @@ public class ConsumerFactory {
consumer.start();
return consumer;
}
public static DefaultMQPullConsumer getRMQPullConsumer(String nsAddr, String consumerGroup) throws Exception {
DefaultMQPullConsumer defaultMQPullConsumer = new DefaultMQPullConsumer(consumerGroup);
defaultMQPullConsumer.setInstanceName(UUID.randomUUID().toString());
defaultMQPullConsumer.setNamesrvAddr(nsAddr);
defaultMQPullConsumer.start();
return defaultMQPullConsumer;
}
}
......@@ -17,6 +17,7 @@
package org.apache.rocketmq.test.factory;
import java.util.UUID;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.test.util.RandomUtil;
......@@ -25,6 +26,7 @@ public class ProducerFactory {
public static DefaultMQProducer getRMQProducer(String ns) {
DefaultMQProducer producer = new DefaultMQProducer(RandomUtil.getStringByUUID());
producer.setInstanceName(UUID.randomUUID().toString());
producer.setNamesrvAddr(ns);
try {
producer.start();
......
......@@ -19,6 +19,7 @@ package org.apache.rocketmq.test.util;
import java.util.HashMap;
import java.util.Set;
import java.util.UUID;
import org.apache.log4j.Logger;
import org.apache.rocketmq.common.admin.TopicStatsTable;
import org.apache.rocketmq.common.protocol.body.ClusterInfo;
......@@ -40,6 +41,7 @@ public class MQAdmin {
int queueNum, int waitTimeSec) {
boolean createResult = false;
DefaultMQAdminExt mqAdminExt = new DefaultMQAdminExt();
mqAdminExt.setInstanceName(UUID.randomUUID().toString());
mqAdminExt.setNamesrvAddr(nameSrvAddr);
try {
mqAdminExt.start();
......
......@@ -36,13 +36,13 @@ import org.apache.rocketmq.test.util.MQAdmin;
import org.apache.rocketmq.test.util.MQRandomUtils;
public class BaseConf {
protected static String nsAddr;
public static String nsAddr;
protected static String broker1Name;
protected static String broker2Name;
protected static String clusterName;
protected static int brokerNum;
protected static int waitTime = 5;
protected static int consumeTime = 5 * 60 * 1000;
protected static int consumeTime = 2 * 60 * 1000;
protected static NamesrvController namesrvController;
protected static BrokerController brokerController1;
protected static BrokerController brokerController2;
......
......@@ -47,9 +47,14 @@ public class IntegrationTestBase {
protected static final List<BrokerController> BROKER_CONTROLLERS = new ArrayList<>();
protected static final List<NamesrvController> NAMESRV_CONTROLLERS = new ArrayList<>();
protected static int topicCreateTime = 30 * 1000;
protected static final int COMMIT_LOG_SIZE = 1024 * 1024 * 256;
protected static final int COMMIT_LOG_SIZE = 1024 * 1024 * 100;
protected static final int INDEX_NUM = 1000;
private static final AtomicInteger port = new AtomicInteger(40000);
public static synchronized int nextPort() {
return port.addAndGet(random.nextInt(10) + 10);
}
protected static Random random = new Random();
static {
......@@ -87,7 +92,7 @@ public class IntegrationTestBase {
}
private static String createBaseDir() {
public static String createBaseDir() {
String baseDir = System.getProperty("user.home") + SEP + "unitteststore-" + UUID.randomUUID();
final File file = new File(baseDir);
if (file.exists()) {
......@@ -105,14 +110,14 @@ public class IntegrationTestBase {
namesrvConfig.setKvConfigPath(baseDir + SEP + "namesrv" + SEP + "kvConfig.json");
namesrvConfig.setConfigStorePath(baseDir + SEP + "namesrv" + SEP + "namesrv.properties");
nameServerNettyServerConfig.setListenPort(9000 + random.nextInt(1000));
nameServerNettyServerConfig.setListenPort(nextPort());
NamesrvController namesrvController = new NamesrvController(namesrvConfig, nameServerNettyServerConfig);
try {
Assert.assertTrue(namesrvController.initialize());
logger.info("Name Server Start:{}", nameServerNettyServerConfig.getListenPort());
namesrvController.start();
} catch (Exception e) {
logger.info("Name Server start failed");
logger.info("Name Server start failed", e);
System.exit(1);
}
NAMESRV_CONTROLLERS.add(namesrvController);
......@@ -123,8 +128,6 @@ public class IntegrationTestBase {
public static BrokerController createAndStartBroker(String nsAddr) {
String baseDir = createBaseDir();
BrokerConfig brokerConfig = new BrokerConfig();
NettyServerConfig nettyServerConfig = new NettyServerConfig();
NettyClientConfig nettyClientConfig = new NettyClientConfig();
MessageStoreConfig storeConfig = new MessageStoreConfig();
brokerConfig.setBrokerName(BROKER_NAME_PREFIX + BROKER_INDEX.getAndIncrement());
brokerConfig.setBrokerIP1("127.0.0.1");
......@@ -132,18 +135,25 @@ public class IntegrationTestBase {
brokerConfig.setEnablePropertyFilter(true);
storeConfig.setStorePathRootDir(baseDir);
storeConfig.setStorePathCommitLog(baseDir + SEP + "commitlog");
storeConfig.setHaListenPort(8000 + random.nextInt(1000));
storeConfig.setMapedFileSizeCommitLog(COMMIT_LOG_SIZE);
storeConfig.setMaxIndexNum(INDEX_NUM);
storeConfig.setMaxHashSlotNum(INDEX_NUM * 4);
nettyServerConfig.setListenPort(10000 + random.nextInt(1000));
return createAndStartBroker(storeConfig, brokerConfig);
}
public static BrokerController createAndStartBroker(MessageStoreConfig storeConfig, BrokerConfig brokerConfig) {
NettyServerConfig nettyServerConfig = new NettyServerConfig();
NettyClientConfig nettyClientConfig = new NettyClientConfig();
nettyServerConfig.setListenPort(nextPort());
storeConfig.setHaListenPort(nextPort());
BrokerController brokerController = new BrokerController(brokerConfig, nettyServerConfig, nettyClientConfig, storeConfig);
try {
Assert.assertTrue(brokerController.initialize());
logger.info("Broker Start name:{} addr:{}", brokerConfig.getBrokerName(), brokerController.getBrokerAddr());
brokerController.start();
} catch (Exception e) {
logger.info("Broker start failed");
} catch (Throwable t) {
logger.error("Broker start failed, will exit", t);
System.exit(1);
}
BROKER_CONTROLLERS.add(brokerController);
......@@ -179,15 +189,7 @@ public class IntegrationTestBase {
if (!file.exists()) {
return;
}
if (file.isFile()) {
file.delete();
} else if (file.isDirectory()) {
File[] files = file.listFiles();
for (File file1 : files) {
deleteFile(file1);
}
file.delete();
}
UtilAll.deleteFile(file);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.test.base.dledger;
import java.util.UUID;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.client.consumer.PullResult;
import org.apache.rocketmq.client.consumer.PullStatus;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.store.config.BrokerRole;
import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.apache.rocketmq.test.base.BaseConf;
import org.apache.rocketmq.test.base.IntegrationTestBase;
import org.apache.rocketmq.test.factory.ConsumerFactory;
import org.apache.rocketmq.test.factory.ProducerFactory;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;
import static org.apache.rocketmq.test.base.IntegrationTestBase.nextPort;
import static sun.util.locale.BaseLocale.SEP;
public class DLedgerProduceAndConsumeIT {
public BrokerConfig buildBrokerConfig(String cluster, String brokerName) {
BrokerConfig brokerConfig = new BrokerConfig();
brokerConfig.setBrokerClusterName(cluster);
brokerConfig.setBrokerName(brokerName);
brokerConfig.setBrokerIP1("127.0.0.1");
brokerConfig.setNamesrvAddr(BaseConf.nsAddr);
return brokerConfig;
}
public MessageStoreConfig buildStoreConfig(String brokerName, String peers, String selfId) {
MessageStoreConfig storeConfig = new MessageStoreConfig();
String baseDir = IntegrationTestBase.createBaseDir();
storeConfig.setStorePathRootDir(baseDir);
storeConfig.setStorePathCommitLog(baseDir + SEP + "commitlog");
storeConfig.setHaListenPort(nextPort());
storeConfig.setMapedFileSizeCommitLog(10 * 1024 * 1024);
storeConfig.setEnableDLegerCommitLog(true);
storeConfig.setdLegerGroup(brokerName);
storeConfig.setdLegerSelfId(selfId);
storeConfig.setdLegerPeers(peers);
return storeConfig;
}
@Test
public void testProduceAndConsume() throws Exception {
String cluster = UUID.randomUUID().toString();
String brokerName = UUID.randomUUID().toString();
String selfId = "n0";
String peers = String.format("n0-localhost:%d", nextPort());
BrokerConfig brokerConfig = buildBrokerConfig(cluster, brokerName);
MessageStoreConfig storeConfig = buildStoreConfig(brokerName, peers, selfId);
BrokerController brokerController = IntegrationTestBase.createAndStartBroker(storeConfig, brokerConfig);
Thread.sleep(3000);
Assert.assertEquals(BrokerRole.SYNC_MASTER, storeConfig.getBrokerRole());
String topic = UUID.randomUUID().toString();
String consumerGroup = UUID.randomUUID().toString();
IntegrationTestBase.initTopic(topic, BaseConf.nsAddr, cluster, 1);
DefaultMQProducer producer = ProducerFactory.getRMQProducer(BaseConf.nsAddr);
DefaultMQPullConsumer consumer = ConsumerFactory.getRMQPullConsumer(BaseConf.nsAddr, consumerGroup);
for (int i = 0; i < 10; i++) {
Message message = new Message();
message.setTopic(topic);
message.setBody(("Hello" + i).getBytes());
SendResult sendResult = producer.send(message);
Assert.assertEquals(SendStatus.SEND_OK, sendResult.getSendStatus());
Assert.assertEquals(0, sendResult.getMessageQueue().getQueueId());
Assert.assertEquals(brokerName, sendResult.getMessageQueue().getBrokerName());
Assert.assertEquals(i, sendResult.getQueueOffset());
Assert.assertNotNull(sendResult.getMsgId());
Assert.assertNotNull(sendResult.getOffsetMsgId());
}
Thread.sleep(500);
Assert.assertEquals(0, brokerController.getMessageStore().getMinOffsetInQueue(topic, 0));
Assert.assertEquals(10, brokerController.getMessageStore().getMaxOffsetInQueue(topic, 0));
MessageQueue messageQueue = new MessageQueue(topic, brokerName, 0);
PullResult pullResult= consumer.pull(messageQueue, "*", 0, 32);
Assert.assertEquals(PullStatus.FOUND, pullResult.getPullStatus());
Assert.assertEquals(10, pullResult.getMsgFoundList().size());
for (int i = 0; i < 10; i++) {
MessageExt messageExt = pullResult.getMsgFoundList().get(i);
Assert.assertEquals(i, messageExt.getQueueOffset());
Assert.assertArrayEquals(("Hello" + i).getBytes(), messageExt.getBody());
}
producer.shutdown();
consumer.shutdown();
brokerController.shutdown();
}
}
......@@ -29,10 +29,15 @@ import org.apache.rocketmq.test.util.TestUtils;
import org.apache.rocketmq.test.util.VerifyUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import static com.google.common.truth.Truth.assertThat;
/**
* Currently, dose not support the ordered broadcast message
*/
@Ignore
public class OrderMsgBroadCastIT extends BaseBroadCastIT {
private static Logger logger = Logger.getLogger(OrderMsgBroadCastIT.class);
private RMQNormalProducer producer = null;
......
......@@ -51,7 +51,8 @@ public class NormalMsgDelayIT extends DelayConf {
}
@Test
public void testDelayLevell() {
public void testDelayLevel1() throws Exception {
Thread.sleep(3000);
int delayLevel = 1;
List<Object> delayMsgs = MQMessageFactory.getDelayMsg(topic, delayLevel, msgSize);
producer.send(delayMsgs);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册