未验证 提交 94634fbb 编写于 作者: D dust6174 提交者: GitHub

Merge branch 'develop' into develop

......@@ -10,6 +10,6 @@ devenv
*.versionsBackup
!NOTICE-BIN
!LICENSE-BIN
.DS_Store
localbin
nohup.out
.DS_Store
localbin
nohup.out
......@@ -48,3 +48,5 @@ We always welcome new contributions, whether for trivial cleanups, big new featu
----------
## License
[Apache License, Version 2.0](http://www.apache.org/licenses/LICENSE-2.0.html) Copyright (C) Apache Software Foundation
......@@ -27,6 +27,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
......@@ -39,6 +40,7 @@ import org.apache.rocketmq.broker.client.DefaultConsumerIdsChangeListener;
import org.apache.rocketmq.broker.client.ProducerManager;
import org.apache.rocketmq.broker.client.net.Broker2Client;
import org.apache.rocketmq.broker.client.rebalance.RebalanceLockManager;
import org.apache.rocketmq.broker.dledger.DLedgerRoleChangeHandler;
import org.apache.rocketmq.broker.filter.CommitLogDispatcherCalcBitMap;
import org.apache.rocketmq.broker.filter.ConsumerFilterManager;
import org.apache.rocketmq.broker.filtersrv.FilterServerManager;
......@@ -99,6 +101,7 @@ import org.apache.rocketmq.store.MessageArrivingListener;
import org.apache.rocketmq.store.MessageStore;
import org.apache.rocketmq.store.config.BrokerRole;
import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.apache.rocketmq.store.dledger.DLedgerCommitLog;
import org.apache.rocketmq.store.stats.BrokerStats;
import org.apache.rocketmq.store.stats.BrokerStatsManager;
......@@ -158,6 +161,7 @@ public class BrokerController {
private TransactionalMessageCheckService transactionalMessageCheckService;
private TransactionalMessageService transactionalMessageService;
private AbstractTransactionalMessageCheckListener transactionalMessageCheckListener;
private Future<?> slaveSyncFuture;
public BrokerController(
......@@ -234,6 +238,10 @@ public class BrokerController {
this.messageStore =
new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener,
this.brokerConfig);
if (messageStoreConfig.isEnableDLegerCommitLog()) {
DLedgerRoleChangeHandler roleChangeHandler = new DLedgerRoleChangeHandler(this, (DefaultMessageStore) messageStore);
((DLedgerCommitLog)((DefaultMessageStore) messageStore).getCommitLog()).getdLedgerServer().getdLedgerLeaderElector().addRoleChangeHandler(roleChangeHandler);
}
this.brokerStats = new BrokerStats((DefaultMessageStore) this.messageStore);
//load plugin
MessageStorePluginContext context = new MessageStorePluginContext(messageStoreConfig, brokerStatsManager, messageArrivingListener, brokerConfig);
......@@ -396,37 +404,26 @@ public class BrokerController {
}, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS);
}
if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {
if (this.messageStoreConfig.getHaMasterAddress() != null && this.messageStoreConfig.getHaMasterAddress().length() >= 6) {
this.messageStore.updateHaMasterAddress(this.messageStoreConfig.getHaMasterAddress());
this.updateMasterHAServerAddrPeriodically = false;
} else {
this.updateMasterHAServerAddrPeriodically = true;
}
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.slaveSynchronize.syncAll();
} catch (Throwable e) {
log.error("ScheduledTask syncAll slave exception", e);
}
if (!messageStoreConfig.isEnableDLegerCommitLog()) {
if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {
if (this.messageStoreConfig.getHaMasterAddress() != null && this.messageStoreConfig.getHaMasterAddress().length() >= 6) {
this.messageStore.updateHaMasterAddress(this.messageStoreConfig.getHaMasterAddress());
this.updateMasterHAServerAddrPeriodically = false;
} else {
this.updateMasterHAServerAddrPeriodically = true;
}
}, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);
} else {
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.printMasterAndSlaveDiff();
} catch (Throwable e) {
log.error("schedule printMasterAndSlaveDiff error.", e);
} else {
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.printMasterAndSlaveDiff();
} catch (Throwable e) {
log.error("schedule printMasterAndSlaveDiff error.", e);
}
}
}
}, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);
}, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);
}
}
if (TlsSystemConfig.tlsMode != TlsMode.DISABLED) {
......@@ -855,6 +852,13 @@ public class BrokerController {
this.filterServerManager.start();
}
if (!messageStoreConfig.isEnableDLegerCommitLog()) {
startProcessorByHa(messageStoreConfig.getBrokerRole());
handleSlaveSynchronize(messageStoreConfig.getBrokerRole());
}
this.registerBrokerAll(true, false, true);
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
......@@ -877,12 +881,7 @@ public class BrokerController {
this.brokerFastFailure.start();
}
if (BrokerRole.SLAVE != messageStoreConfig.getBrokerRole()) {
if (this.transactionalMessageCheckService != null) {
log.info("Start transaction service!");
this.transactionalMessageCheckService.start();
}
}
}
public synchronized void registerIncrementBrokerData(TopicConfig topicConfig, DataVersion dataVersion) {
......@@ -1101,4 +1100,118 @@ public class BrokerController {
return endTransactionThreadPoolQueue;
}
private void handleSlaveSynchronize(BrokerRole role) {
if (role == BrokerRole.SLAVE) {
if (null != slaveSyncFuture) {
slaveSyncFuture.cancel(false);
}
this.slaveSynchronize.setMasterAddr(null);
slaveSyncFuture = this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.slaveSynchronize.syncAll();
}
catch (Throwable e) {
log.error("ScheduledTask SlaveSynchronize syncAll error.", e);
}
}
}, 1000 * 3, 1000 * 10, TimeUnit.MILLISECONDS);
} else {
//handle the slave synchronise
if (null != slaveSyncFuture) {
slaveSyncFuture.cancel(false);
}
this.slaveSynchronize.setMasterAddr(null);
}
}
public void changeToSlave(int brokerId) {
log.info("Begin to change to slave brokerName={} brokerId={}", brokerConfig.getBrokerName(), brokerId);
//change the role
brokerConfig.setBrokerId(brokerId == 0 ? 1 : brokerId); //TO DO check
messageStoreConfig.setBrokerRole(BrokerRole.SLAVE);
//handle the scheduled service
try {
this.messageStore.handleScheduleMessageService(BrokerRole.SLAVE);
} catch (Throwable t) {
log.error("[MONITOR] handleScheduleMessageService failed when changing to slave", t);
}
//handle the transactional service
try {
this.shutdownProcessorByHa();
} catch (Throwable t) {
log.error("[MONITOR] shutdownProcessorByHa failed when changing to slave", t);
}
//handle the slave synchronise
handleSlaveSynchronize(BrokerRole.SLAVE);
try {
this.registerBrokerAll(true, true, brokerConfig.isForceRegister());
} catch (Throwable ignored) {
}
log.info("Finish to change to slave brokerName={} brokerId={}", brokerConfig.getBrokerName(), brokerId);
}
public void changeToMaster(BrokerRole role) {
if (role == BrokerRole.SLAVE) {
return;
}
log.info("Begin to change to master brokerName={}", brokerConfig.getBrokerName());
//handle the slave synchronise
handleSlaveSynchronize(role);
//handle the scheduled service
try {
this.messageStore.handleScheduleMessageService(role);
} catch (Throwable t) {
log.error("[MONITOR] handleScheduleMessageService failed when changing to master", t);
}
//handle the transactional service
try {
this.startProcessorByHa(BrokerRole.SYNC_MASTER);
} catch (Throwable t) {
log.error("[MONITOR] startProcessorByHa failed when changing to master", t);
}
//if the operations above are totally successful, we change to master
brokerConfig.setBrokerId(0); //TO DO check
messageStoreConfig.setBrokerRole(role);
try {
this.registerBrokerAll(true, true, brokerConfig.isForceRegister());
} catch (Throwable ignored) {
}
log.info("Finish to change to master brokerName={}", brokerConfig.getBrokerName());
}
private void startProcessorByHa(BrokerRole role) {
if (BrokerRole.SLAVE != role) {
if (this.transactionalMessageCheckService != null) {
this.transactionalMessageCheckService.start();
}
}
}
private void shutdownProcessorByHa() {
if (this.transactionalMessageCheckService != null) {
this.transactionalMessageCheckService.shutdown(true);
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.broker.dledger;
import io.openmessaging.storage.dledger.DLedgerLeaderElector;
import io.openmessaging.storage.dledger.DLedgerServer;
import io.openmessaging.storage.dledger.MemberState;
import io.openmessaging.storage.dledger.utils.DLedgerUtils;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.store.DefaultMessageStore;
import org.apache.rocketmq.store.config.BrokerRole;
import org.apache.rocketmq.store.dledger.DLedgerCommitLog;
public class DLedgerRoleChangeHandler implements DLedgerLeaderElector.RoleChangeHandler {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
private ExecutorService executorService = Executors.newSingleThreadExecutor(new ThreadFactoryImpl("DLegerRoleChangeHandler_"));
private BrokerController brokerController;
private DefaultMessageStore messageStore;
private DLedgerCommitLog dLedgerCommitLog;
private DLedgerServer dLegerServer;
public DLedgerRoleChangeHandler(BrokerController brokerController, DefaultMessageStore messageStore) {
this.brokerController = brokerController;
this.messageStore = messageStore;
this.dLedgerCommitLog = (DLedgerCommitLog) messageStore.getCommitLog();
this.dLegerServer = dLedgerCommitLog.getdLedgerServer();
}
@Override public void handle(long term, MemberState.Role role) {
Runnable runnable = new Runnable() {
@Override public void run() {
long start = System.currentTimeMillis();
try {
boolean succ = true;
log.info("Begin handling broker role change term={} role={} currStoreRole={}", term, role, messageStore.getMessageStoreConfig().getBrokerRole());
switch (role) {
case CANDIDATE:
if (messageStore.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE) {
brokerController.changeToSlave(dLedgerCommitLog.getId());
}
break;
case FOLLOWER:
brokerController.changeToSlave(dLedgerCommitLog.getId());
break;
case LEADER:
while (true) {
if (!dLegerServer.getMemberState().isLeader()) {
succ = false;
break;
}
if (dLegerServer.getdLedgerStore().getLedgerEndIndex() == -1) {
break;
}
if (dLegerServer.getdLedgerStore().getLedgerEndIndex() == dLegerServer.getdLedgerStore().getCommittedIndex()
&& messageStore.dispatchBehindBytes() == 0) {
break;
}
Thread.sleep(100);
}
if (succ) {
messageStore.recoverTopicQueueTable();
brokerController.changeToMaster(BrokerRole.SYNC_MASTER);
}
break;
default:
break;
}
log.info("Finish handling broker role change succ={} term={} role={} currStoreRole={} cost={}", succ, term, role, messageStore.getMessageStoreConfig().getBrokerRole(), DLedgerUtils.elapsed(start));
} catch (Throwable t) {
log.info("[MONITOR]Failed handling broker role change term={} role={} currStoreRole={} cost={}", term, role, messageStore.getMessageStoreConfig().getBrokerRole(), DLedgerUtils.elapsed(start), t);
}
}
};
executorService.submit(runnable);
}
@Override public void startup() {
}
@Override public void shutdown() {
executorService.shutdown();
}
}
......@@ -152,7 +152,7 @@ public class BrokerOuterAPI {
registerBrokerResultList.add(result);
}
log.info("register broker to name server {} OK", namesrvAddr);
log.info("register broker[{}]to name server {} OK", brokerId, namesrvAddr);
} catch (Exception e) {
log.warn("registerBroker Exception, {}", namesrvAddr, e);
} finally {
......
......@@ -23,7 +23,6 @@ import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.constant.PermName;
import org.apache.rocketmq.common.filter.ExpressionType;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.body.CheckClientRequestBody;
......@@ -36,6 +35,7 @@ import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
import org.apache.rocketmq.common.sysflag.TopicSysFlag;
import org.apache.rocketmq.filter.FilterFactory;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
......
......@@ -54,7 +54,7 @@ public class SlaveSynchronize {
private void syncTopicConfig() {
String masterAddrBak = this.masterAddr;
if (masterAddrBak != null) {
if (masterAddrBak != null && !masterAddrBak.equals(brokerController.getBrokerAddr())) {
try {
TopicConfigSerializeWrapper topicWrapper =
this.brokerController.getBrokerOuterAPI().getAllTopicConfig(masterAddrBak);
......@@ -78,7 +78,7 @@ public class SlaveSynchronize {
private void syncConsumerOffset() {
String masterAddrBak = this.masterAddr;
if (masterAddrBak != null) {
if (masterAddrBak != null && !masterAddrBak.equals(brokerController.getBrokerAddr())) {
try {
ConsumerOffsetSerializeWrapper offsetWrapper =
this.brokerController.getBrokerOuterAPI().getAllConsumerOffset(masterAddrBak);
......@@ -94,7 +94,7 @@ public class SlaveSynchronize {
private void syncDelayOffset() {
String masterAddrBak = this.masterAddr;
if (masterAddrBak != null) {
if (masterAddrBak != null && !masterAddrBak.equals(brokerController.getBrokerAddr())) {
try {
String delayOffset =
this.brokerController.getBrokerOuterAPI().getAllDelayOffset(masterAddrBak);
......@@ -118,7 +118,7 @@ public class SlaveSynchronize {
private void syncSubscriptionGroupConfig() {
String masterAddrBak = this.masterAddr;
if (masterAddrBak != null) {
if (masterAddrBak != null && !masterAddrBak.equals(brokerController.getBrokerAddr())) {
try {
SubscriptionGroupWrapper subscriptionWrapper =
this.brokerController.getBrokerOuterAPI()
......
......@@ -22,36 +22,15 @@ import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import java.util.concurrent.atomic.AtomicBoolean;
public class TransactionalMessageCheckService extends ServiceThread {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.TRANSACTION_LOGGER_NAME);
private BrokerController brokerController;
private final AtomicBoolean started = new AtomicBoolean(false);
public TransactionalMessageCheckService(BrokerController brokerController) {
this.brokerController = brokerController;
}
@Override
public void start() {
if (started.compareAndSet(false, true)) {
super.start();
this.brokerController.getTransactionalMessageService().open();
}
}
@Override
public void shutdown(boolean interrupt) {
if (started.compareAndSet(true, false)) {
super.shutdown(interrupt);
this.brokerController.getTransactionalMessageService().close();
this.brokerController.getTransactionalMessageCheckListener().shutDown();
}
}
@Override
public String getServiceName() {
return TransactionalMessageCheckService.class.getSimpleName();
......
......@@ -548,10 +548,10 @@ public class MQClientInstance {
}
} catch (Exception e) {
if (this.isBrokerInNameServer(addr)) {
log.info("send heart beat to broker[{} {} {}] failed", brokerName, id, addr);
log.info("send heart beat to broker[{} {} {}] failed", brokerName, id, addr, e);
} else {
log.info("send heart beat to broker[{} {} {}] exception, because the broker not up, forget it", brokerName,
id, addr);
id, addr, e);
}
}
}
......@@ -1046,20 +1046,8 @@ public class MQClientInstance {
if (this.brokerVersionTable.get(brokerName).containsKey(brokerAddr)) {
return this.brokerVersionTable.get(brokerName).get(brokerAddr);
}
} else {
HeartbeatData heartbeatData = prepareHeartbeatData();
try {
int version = this.mQClientAPIImpl.sendHearbeat(brokerAddr, heartbeatData, 3000);
return version;
} catch (Exception e) {
if (this.isBrokerInNameServer(brokerAddr)) {
log.info("send heart beat to broker[{} {}] failed", brokerName, brokerAddr);
} else {
log.info("send heart beat to broker[{} {}] exception, because the broker not up, forget it", brokerName,
brokerAddr);
}
}
}
//To do need to fresh the version
return 0;
}
......
......@@ -36,6 +36,7 @@ import org.apache.rocketmq.common.stats.StatsItemSet;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.junit.After;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
......@@ -140,6 +141,7 @@ public class ConsumeMessageConcurrentlyServiceTest {
pushConsumer.getDefaultMQPushConsumerImpl().updateTopicSubscribeInfo(topic, messageQueueSet);
}
@Ignore
@Test
public void testPullMessage_ConsumeSuccess() throws InterruptedException, RemotingException, MQBrokerException, NoSuchFieldException,Exception {
final CountDownLatch countDownLatch = new CountDownLatch(1);
......
......@@ -27,18 +27,29 @@ public abstract class ServiceThread implements Runnable {
private static final long JOIN_TIME = 90 * 1000;
protected final Thread thread;
private Thread thread;
protected final CountDownLatch2 waitPoint = new CountDownLatch2(1);
protected volatile AtomicBoolean hasNotified = new AtomicBoolean(false);
protected volatile boolean stopped = false;
protected boolean isDaemon = false;
//Make it able to restart the thread
private final AtomicBoolean started = new AtomicBoolean(false);
public ServiceThread() {
this.thread = new Thread(this, this.getServiceName());
}
public abstract String getServiceName();
public void start() {
log.info("Try to start service thread:{} started:{} lastThread:{}", getServiceName(), started.get(), thread);
if (!started.compareAndSet(false, true)) {
return;
}
stopped = false;
this.thread = new Thread(this, getServiceName());
this.thread.setDaemon(isDaemon);
this.thread.start();
}
......@@ -47,6 +58,10 @@ public abstract class ServiceThread implements Runnable {
}
public void shutdown(final boolean interrupt) {
log.info("Try to shutdown service thread:{} started:{} lastThread:{}", getServiceName(), started.get(), thread);
if (!started.compareAndSet(true, false)) {
return;
}
this.stopped = true;
log.info("shutdown thread " + this.getServiceName() + " interrupt " + interrupt);
......@@ -75,11 +90,16 @@ public abstract class ServiceThread implements Runnable {
return JOIN_TIME;
}
@Deprecated
public void stop() {
this.stop(false);
}
@Deprecated
public void stop(final boolean interrupt) {
if (!started.get()) {
return;
}
this.stopped = true;
log.info("stop thread " + this.getServiceName() + " interrupt " + interrupt);
......@@ -93,6 +113,9 @@ public abstract class ServiceThread implements Runnable {
}
public void makeStop() {
if (!started.get()) {
return;
}
this.stopped = true;
log.info("makestop thread " + this.getServiceName());
}
......@@ -128,4 +151,12 @@ public abstract class ServiceThread implements Runnable {
public boolean isStopped() {
return stopped;
}
public boolean isDaemon() {
return isDaemon;
}
public void setDaemon(boolean daemon) {
isDaemon = daemon;
}
}
......@@ -41,6 +41,7 @@ public class MessageDecoder {
public final static int MESSAGE_MAGIC_CODE = -626843481;
public static final char NAME_VALUE_SEPARATOR = 1;
public static final char PROPERTY_SEPARATOR = 2;
public static final int PHY_POS_POSITION = 4 + 4 + 4 + 4 + 4 + 8;
public static final int BODY_SIZE_POSITION = 4 // 1 TOTALSIZE
+ 4 // 2 MAGICCODE
+ 4 // 3 BODYCRC
......
......@@ -36,23 +36,19 @@ public class StatsItemSetTest {
@Test
public void test_getAndCreateStatsItem_multiThread() throws InterruptedException {
for (int i = 0; i < 50; i++) {
assertEquals(20000L, test_unit().longValue());
}
assertEquals(20L, test_unit().longValue());
}
@Test
public void test_getAndCreateMomentStatsItem_multiThread() throws InterruptedException {
for (int i = 0; i < 50; i++) {
assertEquals(10, test_unit_moment().longValue());
}
assertEquals(10, test_unit_moment().longValue());
}
private AtomicLong test_unit() throws InterruptedException {
final StatsItemSet statsItemSet = new StatsItemSet("topicTest", scheduler, null);
executor = new ThreadPoolExecutor(100, 200, 10, TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(10000), new ThreadFactoryImpl("testMultiThread"));
for (int i = 0; i < 10000; i++) {
executor = new ThreadPoolExecutor(10, 20, 10, TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(100), new ThreadFactoryImpl("testMultiThread"));
for (int i = 0; i < 10; i++) {
executor.submit(new Runnable() {
@Override
public void run() {
......@@ -61,7 +57,7 @@ public class StatsItemSetTest {
});
}
while (true) {
if (executor.getCompletedTaskCount() == 10000) {
if (executor.getCompletedTaskCount() == 10) {
break;
}
Thread.sleep(1000);
......@@ -71,9 +67,9 @@ public class StatsItemSetTest {
private AtomicLong test_unit_moment() throws InterruptedException {
final MomentStatsItemSet statsItemSet = new MomentStatsItemSet("topicTest", scheduler, null);
executor = new ThreadPoolExecutor(100, 200, 10, TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(10000), new ThreadFactoryImpl("testMultiThread"));
for (int i = 0; i < 10000; i++) {
executor = new ThreadPoolExecutor(10, 20, 10, TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(100), new ThreadFactoryImpl("testMultiThread"));
for (int i = 0; i < 10; i++) {
executor.submit(new Runnable() {
@Override
public void run() {
......@@ -82,7 +78,7 @@ public class StatsItemSetTest {
});
}
while (true) {
if (executor.getCompletedTaskCount() == 10000) {
if (executor.getCompletedTaskCount() == 10) {
break;
}
Thread.sleep(1000);
......
#!/bin/sh
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
## Revise the base dir
CURRENT_DIR="$(cd "$(dirname "$0")"; pwd)"
RMQ_DIR=$CURRENT_DIR/../..
cd $RMQ_DIR
function startNameserver() {
export JAVA_OPT_EXT=" -Xms512m -Xmx512m "
nohup bin/mqnamesrv &
}
function startBroker() {
export JAVA_OPT_EXT=" -Xms1g -Xmx1g "
conf_name=$1
nohup bin/mqbroker -c $conf_name &
}
function stopNameserver() {
PIDS=$(ps -ef|grep java|grep NamesrvStartup|grep -v grep|awk '{print $2}')
if [ ! -z "$PIDS" ]; then
kill -s TERM $PIDS
fi
}
function stopBroker() {
conf_name=$1
PIDS=$(ps -ef|grep java|grep BrokerStartup|grep $conf_name|grep -v grep|awk '{print $2}')
i=1
while [ ! -z "$PIDS" -a $i -lt 5 ]
do
echo "Waiting to kill ..."
kill -s TERM $PIDS
((i=$i+1))
sleep 2
PIDS=$(ps -ef|grep java|grep BrokerStartup|grep $conf_name|grep -v grep|awk '{print $2}')
done
PIDS=$(ps -ef|grep java|grep BrokerStartup|grep $conf_name|grep -v grep|awk '{print $2}')
if [ ! -z "$PIDS" ]; then
kill -9 $PIDS
fi
}
function stopAll() {
ps -ef|grep java|grep BrokerStartup|grep -v grep|awk '{print $2}'|xargs kill
stopNameserver
stopBroker ./conf/dledger/broker-n0.conf
stopBroker ./conf/dledger/broker-n1.conf
stopBroker ./conf/dledger/broker-n2.conf
}
function startAll() {
startNameserver
startBroker ./conf/dledger/broker-n0.conf
startBroker ./conf/dledger/broker-n1.conf
startBroker ./conf/dledger/broker-n2.conf
}
function checkConf() {
if [ ! -f ./conf/dledger/broker-n0.conf -o ! -f ./conf/dledger/broker-n1.conf -o ! -f ./conf/dledger/broker-n2.conf ]; then
echo "Make sure the ./conf/dledger/broker-n0.conf, ./conf/dledger/broker-n1.conf, ./conf/dledger/broker-n2.conf exists"
exit -1
fi
}
## Main
if [ $# -lt 1 ]; then
echo "Usage: sh $0 start|stop"
exit -1
fi
action=$1
checkConf
case $action in
"start")
startAll
exit
;;
"stop")
stopAll
;;
*)
echo "Usage: sh $0 start|stop"
;;
esac
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
brokerClusterName = RaftCluster
brokerName=RaftNode00
listenPort=30911
namesrvAddr=127.0.0.1:9876
storePathRootDir=/tmp/rmqstore/node00
storePathCommitLog=/tmp/rmqstore/node00/commitlog
enableDLegerCommitLog=true
dLegerGroup=RaftNode00
dLegerPeers=n0-127.0.0.1:40911;n1-127.0.0.1:40912;n2-127.0.0.1:40913
## must be unique
dLegerSelfId=n0
sendMessageThreadPoolNums=16
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
brokerClusterName = RaftCluster
brokerName=RaftNode00
listenPort=30921
namesrvAddr=127.0.0.1:9876
storePathRootDir=/tmp/rmqstore/node01
storePathCommitLog=/tmp/rmqstore/node01/commitlog
enableDLegerCommitLog=true
dLegerGroup=RaftNode00
dLegerPeers=n0-127.0.0.1:40911;n1-127.0.0.1:40912;n2-127.0.0.1:40913
## must be unique
dLegerSelfId=n1
sendMessageThreadPoolNums=16
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
brokerClusterName = RaftCluster
brokerName=RaftNode00
listenPort=30931
namesrvAddr=127.0.0.1:9876
storePathRootDir=/tmp/rmqstore/node02
storePathCommitLog=/tmp/rmqstore/node02/commitlog
enableDLegerCommitLog=true
dLegerGroup=RaftNode00
dLegerPeers=n0-127.0.0.1:40911;n1-127.0.0.1:40912;n2-127.0.0.1:40913
## must be unique
dLegerSelfId=n2
sendMessageThreadPoolNums=16
......@@ -40,7 +40,7 @@
<fileSet>
<includes>
<include>bin/*</include>
<include>bin/**</include>
</includes>
<fileMode>0755</fileMode>
</fileSet>
......
# 基本概念
## 消息模型(Message Model)
<img src="./images/message_model_1.jpg" style="width: 560px"/>
RocketMQ 消息模型如图1所示,主要由 Producer、Broker、Consumer 三部分组成,其中Producer 负责生产消息,Consumer 负责消费消息,Broker 负责存储消息。Broker 在实际部署过程中对应一台服务器,每个 Broker 可以存储多个Topic的消息,每个Topic的消息也可以分片存储于不同的 Broker。Message Queue 用于存储消息的物理地址,每个Topic中的消息地址存储于多个 Message Queue 中。ConsumerGroup 由多个Consumer 实例构成。
RocketMQ主要由 Producer、Broker、Consumer 三部分组成,其中Producer 负责生产消息,Consumer 负责消费消息,Broker 负责存储消息。Broker 在实际部署过程中对应一台服务器,每个 Broker 可以存储多个Topic的消息,每个Topic的消息也可以分片存储于不同的 Broker。Message Queue 用于存储消息的物理地址,每个Topic中的消息地址存储于多个 Message Queue 中。ConsumerGroup 由多个Consumer 实例构成。
## 消息生产者(Producer)
负责生产消息,一般由业务系统负责生产消息。一个消息生产者会把业务应用系统里产生的消息发送到broker服务器。RocketMQ提供多种发送方式,同步发送、异步发送、顺序发送、单向发送。同步和异步方式均需要Broker返回确认信息,单向发送不需要。
## 消息消费者(Consumer)
......
## 前言
该文档主要介绍如何部署自动容灾切换的 RocketMQ-on-DLedger Group。
RocketMQ-on-DLedger Group 是指一组**相同名称**的 Broker,至少需要 3 个节点,通过 Raft 自动选举出一个 Leader,其余节点 作为 Follower,并在 Leader 和 Follower 之间复制数据以保证高可用。
RocketMQ-on-DLedger Group 能自动容灾切换,并保证数据一致。
RocketMQ-on-DLedger Group 是可以水平扩展的,也即可以部署任意多个 RocketMQ-on-DLedger Group 同时对外提供服务。
## 1. 新集群部署
#### 1.1 编写配置
每个 RocketMQ-on-DLedger Group 至少准备三台机器(本文假设为 3)。
编写 3 个配置文件,建议参考 conf/dledger 目录下的配置文件样例。
关键配置介绍:
| name | 含义 | 举例 |
| --- | --- | --- |
| enableDLegerCommitLog | 是否启动 DLedger  | true |
| dLegerGroup | DLedger Raft Group的名字,建议和 brokerName 保持一致 | RaftNode00 |
| dLegerPeers | DLedger Group 内各节点的端口信息,同一个 Group 内的各个节点配置必须要保证一致 | n0-127.0.0.1:40911;n1-127.0.0.1:40912;n2-127.0.0.1:40913 |
| dLegerSelfId | 节点 id, 必须属于 dLegerPeers 中的一个;同 Group 内各个节点要唯一 | n0 |
| sendMessageThreadPoolNums | 发送线程个数,建议配置成 Cpu 核数 | 16 |
这里贴出 conf/dledger/broker-n0.conf 的配置举例。
```
brokerClusterName = RaftCluster
brokerName=RaftNode00
listenPort=30911
namesrvAddr=127.0.0.1:9876
storePathRootDir=/tmp/rmqstore/node00
storePathCommitLog=/tmp/rmqstore/node00/commitlog
enableDLegerCommitLog=true
dLegerGroup=RaftNode00
dLegerPeers=n0-127.0.0.1:40911;n1-127.0.0.1:40912;n2-127.0.0.1:40913
## must be unique
dLegerSelfId=n0
sendMessageThreadPoolNums=16
```
### 1.2 启动 Broker
与老版本的启动方式一致。
`nohup sh bin/mqbroker -c conf/dledger/xxx-n0.conf & `
`nohup sh bin/mqbroker -c conf/dledger/xxx-n1.conf & `
`nohup sh bin/mqbroker -c conf/dledger/xxx-n2.conf & `
## 2. 旧集群升级
如果旧集群采用 Master 方式部署,则每个 Master 都需要转换成一个 RocketMQ-on-DLedger Group。
如果旧集群采用 Master-Slave 方式部署,则每个 Master-Slave 组都需要转换成一个 RocketMQ-on-DLedger Group。
### 2.1 杀掉旧的 Broker
可以通过 kill 命令来完成,也可以调用 `bin/mqshutdown broker`
### 2.2 检查旧的 Commitlog
RocketMQ-on-DLedger 组中的每个节点,可以兼容旧的 Commitlog ,但其 Raft 复制过程,只能针对新增加的消息。因此,为了避免出现异常,需要保证 旧的 Commitlog 是一致的。
如果旧的集群是采用 Master-Slave 方式部署,有可能在shutdown时,其数据并不是一致的,建议通过md5sum 的方式,检查最近的最少 2 个 Commmitlog 文件,如果发现不一致,则通过拷贝的方式进行对齐。
虽然 RocketMQ-on-DLedger Group 也可以以 2 节点方式部署,但其会丧失容灾切换能力(2n + 1 原则,至少需要3个节点才能容忍其中 1 个宕机)。
所以在对齐了 Master 和 Slave 的 Commitlog 之后,还需要准备第 3 台机器,并把旧的 Commitlog 从 Master 拷贝到 第 3 台机器(记得同时拷贝一下 config 文件夹)。
在 3 台机器准备好了之后,旧 Commitlog 文件也保证一致之后,就可以开始走下一步修改配置了。
### 2.3 修改配置
参考新集群部署。
### 2.4 重新启动 Broker
参考新集群部署。
### 前言
该文档主要介绍如何快速构建和部署基于 DLedger 的可以自动容灾切换的 RocketMQ 集群。
详细的新集群部署和旧集群升级指南请参考 [部署指南](deploy_guide.md)
### 1. 源码构建
构建分为两个部分,需要先构建 DLedger,然后 构建 RocketMQ
#### 1.1 构建 DLedger
`git clone https://github.com/openmessaging/openmessaging-storage-dledger.git`
`cd openmessaging-storage-dledger`
`mvn clean install -DskipTests`
#### 1.2 构建 RocketMQ
`git clone https://github.com/apache/rocketmq.git`
`cd rocketmq`
`git checkout -b store_with_dledger origin/store_with_dledger`
`mvn -Prelease-all -DskipTests clean install -U`
### 2. 快速部署
在构建成功后
`cd distribution/target/apache-rocketmq`
`sh bin/dledger/fast-try.sh start`
如果上面的步骤执行成功,可以通过 mqadmin 运维命令查看集群状态。
`sh bin/mqadmin clusterList -n 127.0.0.1:9876`
顺利的话,会看到如下内容:
![ClusterList](https://img.alicdn.com/5476e8b07b923/TB11Z.ZyCzqK1RjSZFLXXcn2XXa)
(BID 为 0 的表示 Master,其余都是 Follower)
启动成功,现在可以向集群收发消息,并进行容灾切换测试了。
关闭快速集群,可以执行:
`sh bin/dledger/fast-try.sh stop`
快速部署,默认配置在 conf/dledger 里面,默认的存储路径在 /tmp/rmqstore。
### 3. 容灾切换
部署成功,杀掉 Leader 之后(在上面的例子中,杀掉端口 30931 所在的进程),等待约 10s 左右,用 clusterList 命令查看集群,就会发现 Leader 切换到另一个节点了。
......@@ -262,7 +262,7 @@ Before introducing the mqadmin management tool, the following points need to be
height:244.0pt;border-top:none;width:133pt'><span
style='mso-spacerun:yes'> </span>clusterList</td>
<td rowspan=4 class=xl70 width=175 style='border-bottom:1.0pt;
border-top:none;width:131pt'>View cluster information, cluster, brokerName, brokerId, TPS, and so on.</td>
border-top:none;width:131pt'>View cluster information, cluster, brokerName, brokerId, TPS, and so on</td>
<td class=xl65 width=177 style='width:133pt'>-m</td>
<td class=xl66 width=185 style='width:139pt'>Print more information (add print to # InTotalYest,
#OutTotalYest, #InTotalToday ,#OutTotalToday)</td>
......
......@@ -2,40 +2,40 @@
## 1 Message Model
The RocketMQ message model is mainly composed of Producer, Broker and Consumer. The Producer is responsible for producing messages, the Consumer is responsible for consuming messages, and the Broker is responsible for storing messages.
The Broker corresponds to one server during actual deployment, and each Broker can store messages from multiple topics, and messages from each Topic can be stored in a different Broker by sharding strategy.
The Message Queue is used to store physical offsets of messages, and the Message addresses in multiple Topic are stored in multiple Message queues. The Consumer Group consists of multiple Consumer instances.
RocketMQ message model is mainly composed of Producer, Broker and Consumer. The producer is responsible for producing messages and the consumer is for consuming messages, while the broker stores messages.
The broker is an independent server during actual deployment, and each broker can store messages from multiple topics. Even messages from the same topic can be stored in the different brokers by sharding strategy.
The message queue is used to store physical offsets of messages, and the message addresses are stored in seperate queues. The consumer group consists of multiple consumer instances.
## 2 Producer
The Producer is responsible for producing messages, typically by business systems. It sends messages generated by the business application systems to brokers. The RocketMQ provides multiple paradigms of sending: synchronous, asynchronous,sequential and one-way. Both synchronous and asynchronous methods require the Broker to return confirmation information, while one-way sending is not required.
The Producer is responsible for producing messages, typically by business systems. It sends messages generated by the systems to brokers. RocketMQ provides multiple paradigms of sending: synchronous, asynchronous, sequential and one-way. Both synchronous and asynchronous methods require the confirmation information return from the Broker, but one-way method does not require it.
## 3 Consumer
The Consumer is responsible for consuming messages, typically the background system is responsible for asynchronous consumption. The Consumer pulls messages from brokers and feeds them into application. In perspective of user application, two types of consumers are provided:Pull Consumer and Push Consumer.
The Consumer is responsible for consuming messages, typically the background system is responsible for asynchronous consumption. The consumer pulls messages from brokers and feeds them into application. From the perspective of user, two types of consumers are provided: pull consumer and push consumer.
## 4 Topic
The Topic refers to a collection of one kind message. Each topic contains several messages and one message can only belong to one topic. The Topic is the basic unit of RocketMQ for message subscription.
The Topic refers to a collection of one kind of message. Each topic contains several messages and one message can only belong to one topic. The topic is the basic unit of RocketMQ for message subscription.
## 5 Broker Server
As the role of the transfer station, the Broker Server stores, and forwards messages. In the RocketMQ system, the Broker Server is responsible for receiving messages sent from producers, storing them and preparing to handle pull requests. It also stores message related meta data, including consumer groups, consuming progress, topics and queues info.
As the role of the transfer station, the Broker Server stores and forwards messages. In RocketMQ, the broker server is responsible for receiving messages sent from producers, storing them and preparing to handle pull requests. It also stores the related message meta data, including consumer groups, consuming progress, topics, queues info and so on.
## 6 Name Server
The Name Server serves as the provider of routing service. The Producer or Consumer can find the list of Broker IP addresses corresponding each topic through name server. Multiple Name Servers can be deployed as a cluster, but are independent of each other and do not exchange information.
The Name Server serves as the provider of routing service. The producer or the consumer can find the list of broker IP addresses for each topic through name server. Multiple name servers can be deployed in one cluster, but they are independent of each other and do not exchange information.
## 7 Pull Consumer
A type of Consumer. Applications are usually pulls messages from brokers by actively calling the Consumer's pull message method, and the application has the advantages of controlling the timing and frequency of pulling messages. Once batches of messages are pulled, user application initiates consuming process.
A type of Consumer, the application pulls messages from brokers by actively invoking the consumer pull message method, and the application has the advantages of controlling the timing and frequency of pulling messages. Once the batch of messages is pulled, user application will initiate consuming process.
## 8 Push Consumer
A type of Consumer. Under this mode, after the Broker receives the data, it will actively push it to the consumer, which is generally of high real-time performance.
A type of Consumer. Under this high real-time performance mode, it will push the message to the consumer actively when the Broker receives the data.
## 9 Producer Group
A collection of the same type of Producer, which sends the same type of messages with consistent logic. If a transaction message is sent and the original producer crashes after sending, the Broker Server will contacts other Producer in the same Producer group to commit or rollback the transactional message.
A collection of the same type of Producer, which sends the same type of messages with consistent logic. If a transaction message is sent and the original producer crashes after sending, the broker server will contact other producers in the same producer group to commit or rollback the transactional message.
## 10 Consumer Group
A collection of the same type of Consumer, which sends the same type of messages with consistent logic. The Consumer Group makes load-balance and fault-tolerance super easy in terms of message consuming.
Warning: consumer instances of one consumer group must have exactly the same topic subscription(s).
A collection of the same type of Consumer, which sends the same type of messages with consistent logic. The consumer group makes load-balance and fault-tolerance super easy in terms of message consuming.
Warning: consumer instances of one consumer group must have exactly the same topic subscription(s).
RocketMQ supports two type of consumption mode:Clustering and Broadcasting.
RocketMQ supports two types of consumption mode:Clustering and Broadcasting.
## 11 Consumption Mode - Clustering
Under the Clustering mode, all messages from one topic will be delivered to all consumer instances averagely as much as possible. That is, one message can be consumed by only one consumer instance.
Under the Clustering mode, all the messages from one topic will be delivered to all the consumers instances averagely as much as possible. That is, one message can be consumed by only one consumer instance.
## 12 Consumption Mode - Broadcasting
Under the Broadcasting mode, each Consumer instance of the same Consumer Group receives every message that published to the corresponding topic.
Under the Broadcasting mode, each consumer instance of the same consumer group receives every message published to the corresponding topic.
## 13 Normal Ordered Message
Under the Normal Ordered Message mode, the messages received by consumers from the same ConsumeQueue are sequential, while the messages received from different message queues may be non-sequential.
Under the Normal Ordered Message mode, the messages received by consumers from the same ConsumeQueue are sequential, but the messages received from the different message queues may be non-sequential.
## 14 Strictly Ordered Message
Under the Strictly Ordered Message mode, all messages received by the consumers from the same topic are sequential, as the order they were stored.
Under the Strictly Ordered Message mode, all messages received by the consumers from the same topic are sequential as the order they are stored.
## 15 Message
The physical carrier of information transmitted by a messaging system, the smallest unit of production and consumption data, each message must belong to a topic.
Each Message in RocketMQ has a unique Message ID and can carry a key, generally used to store business-related value. The system provides the function to query messages by Message ID and Key.
The physical carrier of information transmitted by a messaging system, the smallest unit of production and consumption data, each message must belong to one topic.
Each Message in RocketMQ has a unique message id and can carry a key used to store business-related value. The system has the function to query messages by its id or key.
## 16 Tag
Flags set for messages to distinguish different types of messages under the same topic, functioning as a "sub-topic". Messages from the same business unit can set different tags under the same topic in terms of different business purposes. The Tag can effectively maintain the clarity and consistency of the code and optimize the query system provided by RocketMQ. The Consumer can realize different "sub-topic" by using Tag, so as to achieve better expansibility.
Flags set for messages to distinguish different types of messages under the same topic, functioning as a "sub-topic". Messages from the same business unit can set different tags under the same topic in terms of different business purposes. The tag can effectively maintain the clarity and consistency of the code and optimize the query system provided by RocketMQ. The consumer can realize different "sub-topic" by using tag in order to achieve better expansibility.
......@@ -2,7 +2,7 @@
Relative to RocketMQ's Broker cluster, producers and consumers are client. In this section, it mainly describes the common behavior configuration of producers and consumers.
### Client Addressing mode
### 1 Client Addressing mode
```RocketMQ``` can let client find the ```Name Server```, and then find the ```Broker```by the ```Name Server```. Followings show a variety of configurations, and priority level from highly to lower, the highly priority configurations can override the lower priority configurations.
......@@ -36,11 +36,11 @@ By default, the client accesses the HTTP server every 2 minutes, and update the
```
HTTP static server addressing is recommended, because it is simple client deployment, and the Name Server cluster can be upgraded hot.
### Client Configuration
### 2 Client Configuration
```DefaultMQProducer```,```TransactionMQProducer```,```DefaultMQPushConsumer```,```DefaultMQPullConsumer``` all extends the ```ClientConfig``` Class, ```ClientConfig``` as the client common configuration class. Client configuration style like getXXX,setXXX, each of the parameters can config by spring and also config their in the code. Such as the ```namesrvAddr``` parameter: ```producer.setNamesrvAddr("192.168.0.1:9876")```, same with the other parameters.
#### Client Common Configuration
#### 2.1 Client Common Configuration
| Pamater Name | Default Value | Description |
| ----------------------------- | ------- | ------------------------------------------------------------ |
......@@ -52,7 +52,7 @@ HTTP static server addressing is recommended, because it is simple client deploy
| heartbeatBrokerInterval | 30000 | The heartbeat interval, in milliseconds, is sent to the Broker |
| persistConsumerOffsetInterval | 5000 | The persistent Consumer consumes the progress interval in milliseconds |
#### Producer Configuration
#### 2.2 Producer Configuration
| Pamater Name | Default Value | Description |
| -------------------------------- | ---------------- | ------------------------------------------------------------ |
......@@ -70,7 +70,7 @@ HTTP static server addressing is recommended, because it is simple client deploy
| checkRequestHoldMax | 2000 | Producer local buffer request queue size when Broker look back Producer transaction status |
| RPCHook | null | This parameter is passed in when the Producer is creating, including the pre-processing before the message sending and the processing after the message response. The user can do some security control or other operations in the first interface.|
#### PushConsumer Configuration
#### 2.3 PushConsumer Configuration
| Pamater Name | Default Value | Description |
| ---------------------------- | ----------------------------- | ------------------------------------------------------------ |
......@@ -91,7 +91,7 @@ HTTP static server addressing is recommended, because it is simple client deploy
| consumeMessageBatchMaxSize | 1 | Batch consume message |
| pullBatchSize | 32 | Batch pull message |
#### PullConsumer Configuration
#### 2.4 PullConsumer Configuration
| Pamater Name | Default Value | Description |
| -------------------------------- | ----------------------------- | ------------------------------------------------------------ |
......@@ -105,7 +105,7 @@ HTTP static server addressing is recommended, because it is simple client deploy
| registerTopics | | Collection of registered topics |
| allocateMessageQueueStrategy | AllocateMessageQueueAveragely | Implements strategy about Rebalance algorithm |
#### Message Data Structure
#### 2.5 Message Data Structure
| Field Name | Default Value | Description |
| -------------- | ------ | ------------------------------------------------------------ |
......
# ** The system configuration** #
# The system configuration #
This section focuses on the configuration of the system (JVM/OS)
## **1 JVM Options** ##
The latest released version of JDK 1.8 is recommended. Set the same Xms and Xmx value to prevent the JVM from resizing the heap for better performance. A simple JVM configurations looks like this:
The latest released version of JDK 1.8 is recommended. Set the same Xms and Xmx value to prevent the JVM from resizing the heap for better performance. A simple JVM configuration is as follows:
-server -Xms8g -Xmx8g -Xmn4g
......@@ -22,7 +22,7 @@ As for garbage collection, G1 collector with JDK 1.8 is recommended:
-XX:G1ReservePercent=25
-XX:InitiatingHeapOccupancyPercent=30
These GC options looks a little aggressive, but it’s proved to have good performance in our production environment
These GC options looks a little aggressive, but it’s proved to have good performance in our production environment.
Don’t set a too small value for -XX:MaxGCPauseMillis, otherwise JVM will use a small young generation to achieve this goal which will cause very frequent minor GC.So use rolling GC log file is recommended:
......@@ -30,13 +30,13 @@ Don’t set a too small value for -XX:MaxGCPauseMillis, otherwise JVM will use a
-XX:NumberOfGCLogFiles=5
-XX:GCLogFileSize=30m
If write GC file will increase latency of broker, consider redirect GC log file to a memory file system:
If writing a GC file increases the latency of broker, please consider redirecting it to memory file system:
-Xloggc:/dev/shm/mq_gc_%p.log123
## 2 Linux Kernel Parameters ##
There is a os.sh script that lists a lot of kernel parameters in folder bin which can be used for production use with minor changes. Below parameters need attention, and more details please refer to documentation for /proc/sys/vm/*.
There is an os.sh script that lists a lot of kernel parameters in folder bin which can be used for production enviroment with minor changes. Below parameters need attention, if need more details please refer to documentation for /proc/sys/vm/*.
......@@ -51,11 +51,11 @@ There is a os.sh script that lists a lot of kernel parameters in folder bin whic
- **vm.max_map_count**, limits the maximum number of memory map areas a process may have. RocketMQ will use mmap to load CommitLog and ConsumeQueue, so set a bigger value for this parameter is recommended.
- **vm.max_map_count**, limits the maximum number of memory map areas a process may have. RocketMQ will use mmap to load CommitLog and ConsumeQueue, so a bigger value is recommended.
- **vm.swappiness**, define how aggressive the kernel will swap memory pages. Higher values will increase agressiveness, lower values decrease the amount of swap. 10 is recommended for this value to avoid swap latency.
- **vm.swappiness**, defines how aggressive the kernel will swap memory pages. Higher values will increase agressiveness, lower values decrease the amount of swap. 10 is recommended for this value to avoid swap latency.
......@@ -64,4 +64,4 @@ There is a os.sh script that lists a lot of kernel parameters in folder bin whic
- **Disk scheduler**, the deadline I/O scheduler is recommended for RocketMQ, which attempts to provide a guaranteed latency for requests.
# Deployment Architectures and Setup Steps
## Cluster Setup
# Installation Guides
### 1 Single Master mode
This is the simplest, but also the riskiest, mode that makes the entire service unavailable once the broker restarts or goes down. Production environments are not recommended, but can be used for local testing and development. Here are the steps to build.
This is the simplest but also the riskiest mode, that makes the entire service unavailable once the broker restarts or goes down. Production environments are not recommended, but can be used for local testing and development. Here are the steps to build.
**1)Start NameServer**
......@@ -69,7 +67,7 @@ $ nohup sh mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-noslave/broker
...
```
The boot command shown above is used in the case of a single NameServer.For clusters of multiple NameServer, the address list after the -n argument in the broker boot command is separated by semicolons, for example, 192.168.1.1: 9876;192.161.2: 9876.
The boot command shown above is used in the case of a single NameServer. For clusters of multiple NameServer, the address list after the -n argument in the broker boot command is separated by semicolons, for example, 192.168.1.1: 9876;192.161.2: 9876.
### 3 Multiple Master And Multiple Slave Mode-Asynchronous replication
......
## 2 Communication Mechanism
RocketMQ message queue cluster mainly includes four roles: NameServer, Broker (Master/Slave), Producer and Consumer. The basic communication process is as follows:
(1) After Broker start-up, it needs to complete one operation: register itself to NameServer, and then report Topic routing information to NameServer at regular intervals of 30 seconds.
(2) When message producer Producer sends a message as a client, it needs to obtain routing information from the local cache TopicPublishInfoTable according to the Topic of the message. If not, it will be retrieved from NameServer and update to local cache, at the same time, Producer will retrieve routing information from NameServer every 30 seconds by default.
(2) When message Producer sends a message as a client, it needs to obtain routing information from the local cache TopicPublishInfoTable according to the Topic of the message. If not, it will be retrieved from NameServer and update to local cache, at the same time, Producer will retrieve routing information from NameServer every 30 seconds by default.
(3) Message producer Producer chooses a queue to send the message according to the routing information obtained in 2); Broker receives the message and records it in disk as the receiver of the message.
(4) After message consumer Consumer get the routing information according to 2) and complete the load balancing of the client, then select one or several message queues to pull messages and consume them.
......
......@@ -3,7 +3,10 @@
![](images/rocketmq_storage_arch.png)
Message storage is the most complicated and important part of RocketMQ. This section will describe the three aspects of RocketMQ: message storage architecture, PageCache and memory mapping, and RocketMQ's two different disk flushing methods.
Message storage is the most complicated and important part of RocketMQ. This section will describe the three aspects of RocketMQ:
* Message storage architecture
* PageCache and memory mapping
* RocketMQ's two different disk flushing methods.
## 1 Message Storage Architecture
......@@ -18,7 +21,7 @@ The message storage architecture diagram consists of 3 files related to message
From the above architecture of the RocketMQ message storage, we can see RocketMQ uses a hybrid storage structure, that is, all the queues in an instance of the broker share a single log file `CommitLog` to store messages. RocketMQ's hybrid storage structure(messages of multiple topics are stored in one CommitLog) uses a separate storage structure for the data and index parts for Producer and Consumer respectively. The Producer sends the message to the Broker, then the Broker persists the message to the CommitLog file synchronously or asynchronously. As long as the message is persisted to the CommitLog on the disk, the message sent by the Producer will not be lost. Because of this, Consumer will definitely have the opportunity to consume this message. When no message can be pulled, the consumer can wait for the next pull. And the server also supports the long polling mode: if a pull request pulls no messages, the Broker can wait for 30 seconds, as long as new message arrives in this interval, it will be returned directly to the consumer. Here, RocketMQ's specific approach is using Broker's background service thread `ReputMessageService` to continuously dispatch requests and asynchronously build ConsumeQueue (Logical Queue) and IndexFile data.
## 2 PageCache and memory map
## 2 PageCache and Memory Map
PageCache is a cache of files by the operating system to speed up the reading and writing of files. In general, the speed of sequential read and write files is almost the same as the speed of read and write memory. The main reason is that the OS uses a portion of the memory as PageCache to optimize the performance of the read and write operations. For data writing, the OS will first write to the Cache, and then the `pdflush` kernel thread asynchronously flush the data in the Cache to the physical disk. For data reading, if it can not hit the page cache when reading a file at a time, the OS will read the file from the physical disk and prefetch the data files of other neighboring blocks sequentially.
......
# Schedule example
### 1.Start consumer to wait for incoming subscribed messages
### 1 Start consumer to wait for incoming subscribed messages
```java
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
......@@ -35,7 +35,7 @@ public class ScheduledMessageConsumer {
}
```
### 2.Send scheduled messages
### 2 Send scheduled messages
```java
import org.apache.rocketmq.client.producer.DefaultMQProducer;
......@@ -64,15 +64,15 @@ public class ScheduledMessageProducer {
}
```
### 3.Verification
### 3 Verification
You should see messages are consumed about 10 seconds later than their storing time.
### 4.Use scenarios for scheduled messages
### 4 Use scenarios for scheduled messages
For example, in e-commerce, if an order is submitted, a delay message can be sent, and the status of the order can be checked after 1 hour. If the order is still unpaid, the order can be cancelled and the inventory released.
### 5.Restrictions on the use of scheduled messages
### 5 Restrictions on the use of scheduled messages
```java
// org/apache/rocketmq/store/config/MessageStoreConfig.java
......@@ -82,4 +82,4 @@ private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m
Nowadays RocketMq does not support any time delay. It needs to set several fixed delay levels, which correspond to level 1 to 18 from 1s to 2h. Message consumption failure will enter the delay message queue. Message sending time is related to the set delay level and the number of retries.
See `SendMessageProcessor.java`
\ No newline at end of file
See `SendMessageProcessor.java`
# Example for ordered messages
# Example for Ordered Messages
RocketMQ provides ordered messages using FIFO order. All related messages need to be sent into the same message queue in an orderly manner.
......@@ -168,7 +168,7 @@ public class Producer {
```
## 2.2 Consume ordered messages
## 2 Consume ordered messages
```java
......
# Transaction message example
# Transaction Message Example
## 1 Transaction message status
There are three states for transaction message:
......
......@@ -2,34 +2,39 @@
The following questions are frequently asked with regard to the RocketMQ project in general.
## General
## 1 General
### 1. Why did we create rocketmq project instead of selecting other products?
1. Why did we create rocketmq project instead of selecting other products?
Please refer to [Why RocketMQ](http://rocketmq.apache.org/docs/motivation)
Please refer to [Why RocketMQ](http://rocketmq.apache.org/docs/motivation)
### 2. Do I have to install other softeware, such as zookeeper, to use RocketMQ?
2. Do I have to install other softeware, such as zookeeper, to use RocketMQ?
No. RocketMQ can run independently.
No. RocketMQ can run independently.
## Usage
## 2 Usage
### 1. Where does the newly created Consumer ID start consuming messages?
1. If the topic sends a message within three days, then the consumer start consuming messages from the first message saved in the server.
2. If the topic sends a message three days ago, the consumer starts to consume messages from the latest message in the server, in other words, starting from the tail of message queue.
3. If such consumer is rebooted, then it starts to consume messages from the last consumption location.
&#8195;1) If the topic sends a message within three days, then the consumer start consuming messages from the first message saved in the server.
&#8195;2) If the topic sends a message three days ago, the consumer starts to consume messages from the latest message in the server, in other words, starting from the tail of message queue.
&#8195;3) If such consumer is rebooted, then it starts to consume messages from the last consumption location.
### 2. How to reconsume message when consumption fails?
1. Cluster consumption pattern, The consumer business logic code returns Action.ReconsumerLater, NULL, or throws an exception, if a message failed to be consumed, it will retry for up to 16 times, after that, the message would be descarded.
2. Broadcast consumption patternThe broadcaset consumption still ensures that a message is consumered at least once, but no resend option is provided.
&#8195;1) Cluster consumption pattern, The consumer business logic code returns Action.ReconsumerLater, NULL, or throws an exception, if a message failed to be consumed, it will retry for up to 16 times, after that, the message would be descarded.
&#8195;2) Broadcast consumption patternThe broadcaset consumption still ensures that a message is consumered at least once, but no resend option is provided.
### 3. How to query the failed message if there is a consumption failure?
1. Using topic query by time, you can query messages within a period of time.
2. Using Topic and Message Id to accurately query the message.
3. Using Topic and Message Key accurately query a class of messages with the same Message Key.
&#8195;1) Using topic query by time, you can query messages within a period of time.
&#8195;2) Using Topic and Message Id to accurately query the message.
&#8195;3) Using Topic and Message Key accurately query a class of messages with the same Message Key.
### 4. Are messages delivered exactly once?
......@@ -37,10 +42,11 @@ RocketMQ ensures that all messages are delivered at least once. In most cases, t
### 5. How to add a new broker?
1. Start up a new broker and register it to the same list of name servers.
2. By default, only internal system topics and consumer groups are created automatically. If you would like to have your business topic and consumer groups on the new node, please replicate them from the existing broker. Admin tool and command lines are provided to handle this.
&#8195;1) Start up a new broker and register it to the same list of name servers.
&#8195;2) By default, only internal system topics and consumer groups are created automatically. If you would like to have your business topic and consumer groups on the new node, please replicate them from the existing broker. Admin tool and command lines are provided to handle this.
## Configuration related
## 3 Configuration related
The following answers are all default values and can be modified by configuration.
......@@ -60,7 +66,7 @@ consumer.setConsumeThreadMin(20);
consumer.setConsumeThreadMax(20);
```
## Errors
## 4 Errors
### 1. If you start a producer or consumer failed and the error message is producer group or consumer repeat.
......@@ -76,15 +82,15 @@ Solution: Fastjson version has to be upgraded to rocketmq client dependent versi
### 3. What is the impact of a broker crash.
1. Master crashes
&#8195;1) Master crashes
Messages can no longer be sent to this broker set, but if you have another broker set available, messages can still be sent given the topic is present. Messages can still be consumed from slaves.
2. Some slave crash
&#8195;2) Some slave crash
As long as there is another working slave, there will be no impact on sending messages. There will also be no impact on consuming messages except when the consumer group is set to consume from this slave preferably. By default, comsumer group consumes from master.
3. All slaves crash
&#8195;3) All slaves crash
There will be no impact on sending messages to master, but, if the master is SYNC_MASTER, producer will get a SLAVE_NOT_AVAILABLE indicating that the message is not sent to any slaves. There will also be no impact on consuming messages except that if the consumer group is set to consume from slave preferably. By default, comsumer group consumes from master.
......@@ -92,9 +98,12 @@ There will be no impact on sending messages to master, but, if the master is SYN
This happens when you are trying to send messages to a topic whose routing info is not available to the producer.
1. Make sure that the producer can connect to a name server and is capable of fetching routing meta info from it.
2. Make sure that name servers do contain routing meta info of the topic. You may query the routing meta info from name server through topicRoute using admin tools or web console.
3. Make sure that your brokers are sending heartbeats to the same list of name servers your producer is connecting to.
4. Make sure that the topic’s permssion is 6(rw-), or at least 2(-w-).
&#8195;1) Make sure that the producer can connect to a name server and is capable of fetching routing meta info from it.
&#8195;2) Make sure that name servers do contain routing meta info of the topic. You may query the routing meta info from name server through topicRoute using admin tools or web console.
&#8195;3) Make sure that your brokers are sending heartbeats to the same list of name servers your producer is connecting to.
&#8195;4) Make sure that the topic’s permssion is 6(rw-), or at least 2(-w-).
If you can’t find this topic, create it on a broker via admin tools command updateTopic or web console.
# 3 Broker
## 3.1 Broker Role
Broker Role is ASYNC_MASTER, SYNC_MASTER or SLAVE. If you cannot tolerate message missing, we suggest you deploy SYNC_MASTER and attach a SLAVE to it. If you feel ok about missing, but you want the Broker to be always available, you may deploy ASYNC_MASTER with SLAVE. If you just want to make it easy, you may only need a ASYNC_MASTER without SLAVE.
## 3.2 FlushDiskType
ASYNC_FLUSH is recommended, for SYNC_FLUSH is expensive and will cause too much performance loss. If you want reliability, we recommend you use SYNC_MASTER with SLAVE.
## 3.3 Broker Configuration
| Parameter name | Default | Description |
| -------------------------------- | ----------------------------- | ------------------------------------------------------------ |
| listenPort | 10911 | listen port for client |
| namesrvAddr | null | name server address |
| brokerIP1 | InetAddress for network interface | Should be configured if having multiple addresses |
| brokerIP2 | InetAddress for network interface | If configured for the Master broker in the Master/Slave cluster, slave broker will connect to this port for data synchronization |
| brokerName | null | broker name |
| brokerClusterName | DefaultCluster | this broker belongs to which cluster |
| brokerId | 0 | broker id, 0 means master, positive integers mean slave |
| storePathCommitLog | $HOME/store/commitlog/ | file path for commit log |
| storePathConsumerQueue | $HOME/store/consumequeue/ | file path for consume queue |
| mapedFileSizeCommitLog | 1024 * 1024 * 1024(1G) | mapped file size for commit log |​
| deleteWhen | 04 | When to delete the commitlog which is out of the reserve time |​
| fileReserverdTime | 72 | The number of hours to keep a commitlog before deleting it |​
| brokerRole | ASYNC_MASTER | SYNC_MASTER/ASYNC_MASTER/SLAVE |​
| flushDiskType | ASYNC_FLUSH | {SYNC_FLUSH/ASYNC_FLUSH}. Broker of SYNC_FLUSH mode flushes each message onto disk before acknowledging producer. Broker of ASYNC_FLUSH mode, on the other hand, takes advantage of group-committing, achieving better performance. |​
\ No newline at end of file
......@@ -2,24 +2,24 @@
----
### 2.1 Consumption process idempotent
### 1 Consumption process idempotent
RocketMQ cannot avoid Exactly-Once, so if the business is very sensitive to consumption repetition, it is important to perform deduplication at the business level. Deduplication can be done with a relational database. First, you need to determine the unique key of the message, which can be either msgId or a unique identifier field in the message content, such as the order Id. Determine if a unique key exists in the relational database before consumption. If it does not exist, insert it and consume it, otherwise skip it. (The actual process should consider the atomic problem, determine whether there is an attempt to insert, if the primary key conflicts, the insertion fails, skip directly)
### 2.2 Slow message processing
### 2 Slow message processing
#### 2.2.1 Increase consumption parallelism
#### 2.1 Increase consumption parallelism
Most messages consumption behaviors are IO-intensive, That is, it may be to operate the database, or call RPC. The consumption speed of such consumption behavior lies in the throughput of the back-end database or the external system. By increasing the consumption parallelism, the total consumption throughput can be increased, but the degree of parallelism is increased to a certain extent. Instead it will fall.Therefore, the application must set a reasonable degree of parallelism. There are several ways to modify the degree of parallelism of consumption as follows:
* Under the same ConsumerGroup, increase the degree of parallelism by increasing the number of Consumer instances (note that the Consumer instance that exceeds the number of subscription queues is invalid). Can be done by adding machines, or by starting multiple processes on an existing machine.
* Improve the consumption parallel thread of a single Consumer by modifying the parameters consumeThreadMin and consumeThreadMax.
#### 2.2.2 Batch mode consumption
#### 2.2 Batch mode consumption
Some business processes can increase consumption throughput to a large extent if they support batch mode consumption. For example, order deduction application, it takes 1s to process one order at a time, and it takes only 2s to process 10 orders at a time. In this way, the throughput of consumption can be greatly improved. By setting the consumer's consumeMessageBatchMaxSize to return a parameter, the default is 1, that is, only one message is consumed at a time, for example, set to N, then the number of messages consumed each time is less than or equal to N.
#### 2.2.3 Skip non-critical messages
#### 2.3 Skip non-critical messages
When a message is accumulated, if the consumption speed cannot keep up with the transmission speed, if the service does not require high data, you can choose to discard the unimportant message. For example, when the number of messages in a queue is more than 100,000 , try to discard some or all of the messages, so that you can quickly catch up with the speed of sending messages. The sample code is as follows:
......@@ -40,7 +40,7 @@ public ConsumeConcurrentlyStatus consumeMessage(
}
```
#### 2.2.4 Optimize each message consumption process
#### 2.4 Optimize each message consumption process
For example, the consumption process of a message is as follows:
......@@ -52,7 +52,7 @@ For example, the consumption process of a message is as follows:
There are 4 interactions with the DB in the consumption process of this message. If it is calculated by 5ms each time, it takes a total of 20ms. If the business calculation takes 5ms, then the total time is 25ms, So if you can optimize 4 DB interactions to 2 times, the total time can be optimized to 15ms, which means the overall performance is increased by 40%. Therefore, if the application is sensitive to delay, the DB can be deployed on the SSD hard disk. Compared with the SCSI disk, the former RT will be much smaller.
### 2.3 Print Log
### 3 Print Log
If the amount of messages is small, it is recommended to print the message in the consumption entry method, consume time, etc., to facilitate subsequent troubleshooting.
......@@ -68,33 +68,33 @@ public ConsumeConcurrentlyStatus consumeMessage(
If you can print the time spent on each message, it will be more convenient when troubleshooting online problems such as slow consumption.
### 2.4 Other consumption suggestions
### 4 Other consumption suggestions
#### 2.4.1、Consumer Group and Subscriptions
#### 4.1、Consumer Group and Subscriptions
The first thing you should be aware of is that different Consumer Group can consume the same topic independently, and each of them will have their own consuming offsets. Please make sure each Consumer within the same Group to subscribe the same topics.
#### 2.4.2、Orderly
#### 4.2、Orderly
The Consumer will lock each MessageQueue to make sure it is consumed one by one in order. This will cause a performance loss, but it is useful when you care about the order of the messages. It is not recommended to throw exceptions, you can return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT instead.
#### 2.4.3、Concurrently
#### 4.3、Concurrently
As the name tells, the Consumer will consume the messages concurrently. It is recommended to use this for good performance. It is not recommended to throw exceptions, you can return ConsumeConcurrentlyStatus.RECONSUME_LATER instead.
#### 2.4.4、Consume Status
#### 4.4、Consume Status
For MessageListenerConcurrently, you can return RECONSUME_LATER to tell the consumer that you can not consume it right now and want to reconsume it later. Then you can continue to consume other messages. For MessageListenerOrderly, because you care about the order, you can not jump over the message, but you can return SUSPEND_CURRENT_QUEUE_A_MOMENT to tell the consumer to wait for a moment.
#### 2.4.5、Blocking
#### 4.5、Blocking
It is not recommend to block the Listener, because it will block the thread pool, and eventually may stop the consuming process.
#### 2.4.6、Thread Number
#### 4.6、Thread Number
The consumer use a ThreadPoolExecutor to process consuming internally, so you can change it by setting setConsumeThreadMin or setConsumeThreadMax.
#### 2.4.7、ConsumeFromWhere
#### 4.7、ConsumeFromWhere
When a new Consumer Group is established, it will need to decide whether it needs to consume the historical messages which had already existed in the Broker. CONSUME_FROM_LAST_OFFSET will ignore the historical messages, and consume anything produced after that. CONSUME_FROM_FIRST_OFFSET will consume every message existed in the Broker. You can also use CONSUME_FROM_TIMESTAMP to consume messages produced after the specified timestamp.
......
......@@ -2,7 +2,7 @@
----
##### 1 Message Sending Tips
###### 1.1 The Use of Tags
One application instance should use one topic as much as possible and the subtype of messages can be marked by tags. Tag provides extra flexibility to users. In the consume subscribing process, the messages filtering can only be handled by using tags when the tags are specified in the message sending process: `message.setTags("TagA")`
One application instance should use one topic as much as possible and the subtype of messages can be marked by tags. Tag provides extra flexibility to users. In the consume subscribing process, the messages filtering can only be handled by using tags when the tags are specified in the message sending process: `message.setTags("TagA")`.
###### 1.2 The Use of Keys
A business key can be set in one message and it will be easier to look up the message on a broker server to diagnose issues during development. Each message will be created index(hash index) by server, instance can query the content of this message by topic and key and who consumes the message.Because of the hash index, make sure that the key should be unique in order to avoid potential hash index conflict.
``` java
......@@ -41,4 +41,4 @@ The message sending is usually a process like below:
* Sever handles request
* Sever returns response to client
The total costing time of sending one message is the sum of costing time of three steps above. Some situations demand that total costing time must be in a quite low level, however, do not take reliable performance into consideration, such as log collection. This kind of application could be called in one-way mode, which means client sends request but not wait for response. In this kind of mode, the cost of sending request is only a call of system operation which means one operation writing data to client socket buffer. Generally, the time cost of this process will be controlled n microseconds level.
\ No newline at end of file
The total costing time of sending one message is the sum of costing time of three steps above. Some situations demand that total costing time must be in a quite low level, however, do not take reliable performance into consideration, such as log collection. This kind of application could be called in one-way mode, which means client sends request but not wait for response. In this kind of mode, the cost of sending request is only a call of system operation which means one operation writing data to client socket buffer. Generally, the time cost of this process will be controlled n microseconds level.
# Message Trace
## 1. Key Attributes of Message Trace Data
## 1 Key Attributes of Message Trace Data
| Producer | Consumer | Broker |
| ---------------- | ----------------- | ------------ |
......@@ -11,7 +9,7 @@
| whether the message was sent successfully | Whether the message was successfully consumed | The Key of the message |
| Time spent sending | Time spent consuming | Tag of the message |
## 2. Support for Message Trace Cluster Deployment
## 2 Support for Message Trace Cluster Deployment
### 2.1 Broker Configuration Fille
......@@ -44,7 +42,7 @@ For scenarios with large amount of trace message data , one of the Broker nodes
### 2.4 Start the Broker that Starts the MessageTrace
`nohup sh mqbroker -c ../conf/2m-noslave/broker-a.properties &`
## 3. Save the Topic Definition of Message Trace
## 3 Save the Topic Definition of Message Trace
RocketMQ's message trace feature supports two ways to store trace data:
### 3.1 System-level TraceTopic
......@@ -53,7 +51,7 @@ By default, message track data is stored in the system-level TraceTopic(names:
### 3.2 Custom TraceTopic
If the user is not prepared to store the message track data in the system-level default TraceTopic, you can also define and create a user-level Topic to save the track (that is, to create a regular Topic to save the message track data)。The following section introduces how the Client interface supports the user-defined TraceTopic.
## 4. Client Practices that Support Message Trace
## 4 Client Practices that Support Message Trace
In order to reduce as much as possible the transformation work of RocketMQ message trace feature used in the user service system, the author added a switch parameter (**enableMsgTrace**) to the original interface in the design to realize whether the message trace is opened or not.
### 4.1 Opening the Message Trace when Sending the Message
......
# Operation FAQ
## 1. RocketMQ's mqadmin command error.
## 1 RocketMQ's mqadmin command error.
> Problem: Sometimes after deploying the RocketMQ cluster, when you try to execute some commands of "mqadmin", the following exception will appear:
>
......@@ -10,7 +10,7 @@
Solution: Execute `export NAMESRV_ADDR=ip:9876` (ip refers to the address of NameServer deployed in the cluster) on the VM that deploys the RocketMQ cluster.Then you will execute commands of "mqadmin" successfully.
## 2. The inconsistent version of RocketMQ between the producer and consumer leads to the problem that message can't be consumed normally.
## 2 The inconsistent version of RocketMQ between the producer and consumer leads to the problem that message can't be consumed normally.
> Problem: The same producer sends a message, consumer A can consume, but consumer B can't consume, and the RocketMQ Console appears:
>
......@@ -20,7 +20,7 @@ Solution: Execute `export NAMESRV_ADDR=ip:9876` (ip refers to the address of Na
Solution: The jar package of RocketMQ, such as rocketmq-client, should be the same version on the consumer and producer.
## 3. When adding a new topic consumer group, historical messages can't be consumed.
## 3 When adding a new topic consumer group, historical messages can't be consumed.
> Problem: When a new consumer group of the same topic is started, the consumed message is the current offset message, and the historical message is not obtained.
......@@ -50,11 +50,11 @@ consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_TIMESTAMP);
```
## 4. How to enable reading data from Slave
## 4 How to enable reading data from Slave
In some cases, the Consumer needs to reset the consume position to 1-2 days ago. At this time, on the Master Broker with limited memory, the CommitLog will carry a relatively heavy IO pressure, affecting the reading and writing of other messages on that Broker. You can enable `slaveReadEnable=true`. When Master Broker finds that the difference between the Consumer's consume position and the latest value of CommitLog exceeds the percentage of machine's memory (`accessMessageInMemoryMaxRatio=40%`), it will recommend Consumer to read from Slave Broker and relieve Master Broker's IO.
## 5. Performance
## 5 Performance
Asynchronous flush disk is recommended to use spin lock.
......@@ -64,7 +64,7 @@ Asynchronous flush disk is recommended to open `TransientStorePoolEnable` and cl
Synchronous flush disk is recommended to increase the `sendMessageThreadPoolNums` appropriately. The specific configuration needs to be tested.
## 6. The meaning and difference between msgId and offsetMsgId in RocketMQ
## 6 The meaning and difference between msgId and offsetMsgId in RocketMQ
After sending message with RocketMQ, you will usually see the following log:
......
......@@ -128,6 +128,17 @@ public class RouteInfoManager {
brokerData = new BrokerData(clusterName, brokerName, new HashMap<Long, String>());
this.brokerAddrTable.put(brokerName, brokerData);
}
Map<Long, String> brokerAddrsMap = brokerData.getBrokerAddrs();
//Switch slave to master: first remove <1, IP:PORT> in namesrv, then add <0, IP:PORT>
//The same IP:PORT must only have one record in brokerAddrTable
Iterator<Entry<Long, String>> it = brokerAddrsMap.entrySet().iterator();
while (it.hasNext()) {
Entry<Long, String> item = it.next();
if (null != brokerAddr && brokerAddr.equals(item.getValue()) && brokerId != item.getKey()) {
it.remove();
}
}
String oldAddr = brokerData.getBrokerAddrs().put(brokerId, brokerAddr);
registerFirst = registerFirst || (null == oldAddr);
......
......@@ -100,8 +100,8 @@
<maven.test.skip>false</maven.test.skip>
<maven.javadoc.skip>true</maven.javadoc.skip>
<!-- Compiler settings properties -->
<maven.compiler.source>1.7</maven.compiler.source>
<maven.compiler.target>1.7</maven.compiler.target>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<sonar.java.coveragePlugin>jacoco</sonar.java.coveragePlugin>
<!-- Exclude all generated code -->
<sonar.jacoco.itReportPath>${project.basedir}/../test/target/jacoco-it.exec</sonar.jacoco.itReportPath>
......@@ -259,6 +259,7 @@
<exclude>*/target/**</exclude>
<exclude>*/*.iml</exclude>
<exclude>docs/**</exclude>
<exclude>localbin/**</exclude>
</excludes>
</configuration>
</plugin>
......
......@@ -28,6 +28,21 @@
<name>rocketmq-store ${project.version}</name>
<dependencies>
<dependency>
<groupId>io.openmessaging.storage</groupId>
<artifactId>dledger</artifactId>
<version>0.1</version>
<exclusions>
<exclusion>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-remoting</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>rocketmq-common</artifactId>
......
......@@ -120,16 +120,9 @@ public class AllocateMappedFileService extends ServiceThread {
return AllocateMappedFileService.class.getSimpleName();
}
@Override
public void shutdown() {
this.stopped = true;
this.thread.interrupt();
try {
this.thread.join(this.getJointime());
} catch (InterruptedException e) {
log.error("Interrupted", e);
}
super.shutdown(true);
for (AllocateRequest req : this.requestTable.values()) {
if (req.mappedFile != null) {
log.info("delete pre allocated maped file, {}", req.mappedFile.getFileName());
......
......@@ -45,11 +45,11 @@ import org.apache.rocketmq.store.schedule.ScheduleMessageService;
public class CommitLog {
// Message's MAGIC CODE daa320a7
public final static int MESSAGE_MAGIC_CODE = -626843481;
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
protected static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
// End of file empty MAGIC CODE cbd43194
private final static int BLANK_MAGIC_CODE = -875286124;
private final MappedFileQueue mappedFileQueue;
private final DefaultMessageStore defaultMessageStore;
protected final static int BLANK_MAGIC_CODE = -875286124;
protected final MappedFileQueue mappedFileQueue;
protected final DefaultMessageStore defaultMessageStore;
private final FlushCommitLogService flushCommitLogService;
//If TransientStorePool enabled, we must flush message to FileChannel at fixed periods
......@@ -57,11 +57,11 @@ public class CommitLog {
private final AppendMessageCallback appendMessageCallback;
private final ThreadLocal<MessageExtBatchEncoder> batchEncoderThreadLocal;
private HashMap<String/* topic-queueid */, Long/* offset */> topicQueueTable = new HashMap<String, Long>(1024);
private volatile long confirmOffset = -1L;
protected HashMap<String/* topic-queueid */, Long/* offset */> topicQueueTable = new HashMap<String, Long>(1024);
protected volatile long confirmOffset = -1L;
private volatile long beginTimeInLock = 0;
private final PutMessageLock putMessageLock;
protected final PutMessageLock putMessageLock;
public CommitLog(final DefaultMessageStore defaultMessageStore) {
this.mappedFileQueue = new MappedFileQueue(defaultMessageStore.getMessageStoreConfig().getStorePathCommitLog(),
......@@ -212,6 +212,12 @@ public class CommitLog {
log.warn("maxPhyOffsetOfConsumeQueue({}) >= processOffset({}), truncate dirty logic files", maxPhyOffsetOfConsumeQueue, processOffset);
this.defaultMessageStore.truncateDirtyLogicFiles(processOffset);
}
} else {
// Commitlog case files are deleted
log.warn("The commitlog files are deleted, and delete the consume queue files");
this.mappedFileQueue.setFlushedWhere(0);
this.mappedFileQueue.setCommittedWhere(0);
this.defaultMessageStore.destroyLogics();
}
}
......@@ -366,7 +372,7 @@ public class CommitLog {
return new DispatchRequest(-1, false /* success */);
}
private static int calMsgLength(int bodyLength, int topicLength, int propertiesLength) {
protected static int calMsgLength(int bodyLength, int topicLength, int propertiesLength) {
final int msgLen = 4 //TOTALSIZE
+ 4 //MAGICCODE
+ 4 //BODYCRC
......@@ -396,6 +402,7 @@ public class CommitLog {
this.confirmOffset = phyOffset;
}
@Deprecated
public void recoverAbnormally(long maxPhyOffsetOfConsumeQueue) {
// recover by the minimum time stamp
boolean checkCRCOnRecover = this.defaultMessageStore.getMessageStoreConfig().isCheckCRCOnRecover();
......@@ -456,7 +463,7 @@ public class CommitLog {
}
}
} else {
log.info("recover physics file end, " + mappedFile.getFileName());
log.info("recover physics file end, " + mappedFile.getFileName() + " pos=" + byteBuffer.position());
break;
}
}
......@@ -474,6 +481,7 @@ public class CommitLog {
}
// Commitlog case files are deleted
else {
log.warn("The commitlog files are deleted, and delete the consume queue files");
this.mappedFileQueue.setFlushedWhere(0);
this.mappedFileQueue.setCommittedWhere(0);
this.defaultMessageStore.destroyLogics();
......
......@@ -106,7 +106,7 @@ public class ConsumeQueue {
if (offset >= 0 && size > 0) {
mappedFileOffset = i + CQ_STORE_UNIT_SIZE;
this.maxPhysicOffset = offset;
this.maxPhysicOffset = offset + size;
if (isExtAddr(tagsCode)) {
maxExtAddr = tagsCode;
}
......@@ -224,7 +224,7 @@ public class ConsumeQueue {
int logicFileSize = this.mappedFileSize;
this.maxPhysicOffset = phyOffet - 1;
this.maxPhysicOffset = phyOffet;
long maxExtAddr = 1;
while (true) {
MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
......@@ -249,7 +249,7 @@ public class ConsumeQueue {
mappedFile.setWrotePosition(pos);
mappedFile.setCommittedPosition(pos);
mappedFile.setFlushedPosition(pos);
this.maxPhysicOffset = offset;
this.maxPhysicOffset = offset + size;
// This maybe not take effect, when not every consume queue has extend file.
if (isExtAddr(tagsCode)) {
maxExtAddr = tagsCode;
......@@ -267,7 +267,7 @@ public class ConsumeQueue {
mappedFile.setWrotePosition(pos);
mappedFile.setCommittedPosition(pos);
mappedFile.setFlushedPosition(pos);
this.maxPhysicOffset = offset;
this.maxPhysicOffset = offset + size;
if (isExtAddr(tagsCode)) {
maxExtAddr = tagsCode;
}
......@@ -348,7 +348,7 @@ public class ConsumeQueue {
long tagsCode = result.getByteBuffer().getLong();
if (offsetPy >= phyMinOffset) {
this.minLogicOffset = result.getMappedFile().getFileFromOffset() + i;
this.minLogicOffset = mappedFile.getFileFromOffset() + i;
log.info("Compute logical min offset: {}, topic: {}, queueId: {}",
this.getMinOffsetInQueue(), this.topic, this.queueId);
// This maybe not take effect, when not every consume queue has extend file.
......@@ -420,7 +420,8 @@ public class ConsumeQueue {
private boolean putMessagePositionInfo(final long offset, final int size, final long tagsCode,
final long cqOffset) {
if (offset <= this.maxPhysicOffset) {
if (offset + size <= this.maxPhysicOffset) {
log.warn("Maybe try to build consume queue repeatedly maxPhysicOffset={} phyOffset={}", maxPhysicOffset, offset);
return true;
}
......@@ -464,7 +465,7 @@ public class ConsumeQueue {
);
}
}
this.maxPhysicOffset = offset;
this.maxPhysicOffset = offset + size;
return mappedFile.appendMessage(this.byteBufferIndex.array());
}
return false;
......
......@@ -42,24 +42,23 @@ import org.apache.rocketmq.common.SystemClock;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageExtBatch;
import org.apache.rocketmq.common.running.RunningStats;
import org.apache.rocketmq.common.sysflag.MessageSysFlag;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.store.config.BrokerRole;
import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.apache.rocketmq.store.config.StorePathConfigHelper;
import org.apache.rocketmq.store.dledger.DLedgerCommitLog;
import org.apache.rocketmq.store.ha.HAService;
import org.apache.rocketmq.store.index.IndexService;
import org.apache.rocketmq.store.index.QueryOffsetResult;
import org.apache.rocketmq.store.schedule.ScheduleMessageService;
import org.apache.rocketmq.store.stats.BrokerStatsManager;
import static org.apache.rocketmq.store.config.BrokerRole.SLAVE;
public class DefaultMessageStore implements MessageStore {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
......@@ -119,7 +118,11 @@ public class DefaultMessageStore implements MessageStore {
this.messageStoreConfig = messageStoreConfig;
this.brokerStatsManager = brokerStatsManager;
this.allocateMappedFileService = new AllocateMappedFileService(this);
this.commitLog = new CommitLog(this);
if (messageStoreConfig.isEnableDLegerCommitLog()) {
this.commitLog = new DLedgerCommitLog(this);
} else {
this.commitLog = new CommitLog(this);
}
this.consumeQueueTable = new ConcurrentHashMap<>(32);
this.flushConsumeQueueService = new FlushConsumeQueueService();
......@@ -127,8 +130,11 @@ public class DefaultMessageStore implements MessageStore {
this.cleanConsumeQueueService = new CleanConsumeQueueService();
this.storeStatsService = new StoreStatsService();
this.indexService = new IndexService(this);
this.haService = new HAService(this);
if (!messageStoreConfig.isEnableDLegerCommitLog()) {
this.haService = new HAService(this);
} else {
this.haService = null;
}
this.reputMessageService = new ReputMessageService();
this.scheduleMessageService = new ScheduleMessageService(this);
......@@ -216,29 +222,71 @@ public class DefaultMessageStore implements MessageStore {
lockFile.getChannel().write(ByteBuffer.wrap("lock".getBytes()));
lockFile.getChannel().force(true);
this.flushConsumeQueueService.start();
this.commitLog.start();
this.storeStatsService.start();
if (this.scheduleMessageService != null && SLAVE != messageStoreConfig.getBrokerRole()) {
this.scheduleMessageService.start();
{
/**
* 1. Make sure the fast-forward messages to be truncated during the recovering according to the max physical offset of the commitlog;
* 2. DLedger committedPos may be missing, so the maxPhysicalPosInLogicQueue maybe bigger that maxOffset returned by DLedgerCommitLog, just let it go;
* 3. Calculate the reput offset according to the consume queue;
* 4. Make sure the fall-behind messages to be dispatched before starting the commitlog, especially when the broker role are automatically changed.
*/
long maxPhysicalPosInLogicQueue = commitLog.getMinOffset();
for (ConcurrentMap<Integer, ConsumeQueue> maps : this.consumeQueueTable.values()) {
for (ConsumeQueue logic : maps.values()) {
if (logic.getMaxPhysicOffset() > maxPhysicalPosInLogicQueue) {
maxPhysicalPosInLogicQueue = logic.getMaxPhysicOffset();
}
}
}
if (maxPhysicalPosInLogicQueue < 0) {
maxPhysicalPosInLogicQueue = 0;
}
if (maxPhysicalPosInLogicQueue < this.commitLog.getMinOffset()) {
maxPhysicalPosInLogicQueue = this.commitLog.getMinOffset();
/**
* This happens in following conditions:
* 1. If someone removes all the consumequeue files or the disk get damaged.
* 2. Launch a new broker, and copy the commitlog from other brokers.
*
* All the conditions has the same in common that the maxPhysicalPosInLogicQueue should be 0.
* If the maxPhysicalPosInLogicQueue is gt 0, there maybe something wrong.
*/
log.warn("[TooSmallCqOffset] maxPhysicalPosInLogicQueue={} clMinOffset={}", maxPhysicalPosInLogicQueue, this.commitLog.getMinOffset());
}
log.info("[SetReputOffset] maxPhysicalPosInLogicQueue={} clMinOffset={} clMaxOffset={} clConfirmedOffset={}",
maxPhysicalPosInLogicQueue, this.commitLog.getMinOffset(), this.commitLog.getMaxOffset(), this.commitLog.getConfirmOffset());
this.reputMessageService.setReputFromOffset(maxPhysicalPosInLogicQueue);
this.reputMessageService.start();
/**
* 1. Finish dispatching the messages fall behind, then to start other services.
* 2. DLedger committedPos may be missing, so here just require dispatchBehindBytes <= 0
*/
while (true) {
if (dispatchBehindBytes() <= 0) {
break;
}
Thread.sleep(1000);
log.info("Try to finish doing reput the messages fall behind during the starting, reputOffset={} maxOffset={} behind={}", this.reputMessageService.getReputFromOffset(), this.getMaxPhyOffset(), this.dispatchBehindBytes());
}
this.recoverTopicQueueTable();
}
if (this.getMessageStoreConfig().isDuplicationEnable()) {
this.reputMessageService.setReputFromOffset(this.commitLog.getConfirmOffset());
} else {
this.reputMessageService.setReputFromOffset(this.commitLog.getMaxOffset());
if (!messageStoreConfig.isEnableDLegerCommitLog()) {
this.haService.start();
this.handleScheduleMessageService(messageStoreConfig.getBrokerRole());
}
this.reputMessageService.start();
this.haService.start();
this.flushConsumeQueueService.start();
this.commitLog.start();
this.storeStatsService.start();
this.createTempFile();
this.addScheduleTask();
this.shutdown = false;
}
public void shutdown() {
if (!this.shutdown) {
this.shutdown = true;
......@@ -255,8 +303,9 @@ public class DefaultMessageStore implements MessageStore {
if (this.scheduleMessageService != null) {
this.scheduleMessageService.shutdown();
}
this.haService.shutdown();
if (this.haService != null) {
this.haService.shutdown();
}
this.storeStatsService.shutdown();
this.indexService.shutdown();
......@@ -1039,6 +1088,7 @@ public class DefaultMessageStore implements MessageStore {
return false;
}
@Override
public long dispatchBehindBytes() {
return this.reputMessageService.behind();
}
......@@ -1320,7 +1370,7 @@ public class DefaultMessageStore implements MessageStore {
return maxPhysicOffset;
}
private void recoverTopicQueueTable() {
public void recoverTopicQueueTable() {
HashMap<String/* topic-queueid */, Long/* offset */> table = new HashMap<String, Long>(1024);
long minPhyOffset = this.commitLog.getMinOffset();
for (ConcurrentMap<Integer, ConsumeQueue> maps : this.consumeQueueTable.values()) {
......@@ -1382,6 +1432,18 @@ public class DefaultMessageStore implements MessageStore {
return brokerStatsManager;
}
@Override
public void handleScheduleMessageService(final BrokerRole brokerRole) {
if (this.scheduleMessageService != null) {
if (brokerRole == BrokerRole.SLAVE) {
this.scheduleMessageService.shutdown();
} else {
this.scheduleMessageService.start();
}
}
}
public int remainTransientStoreBufferNumbs() {
return this.transientStorePool.remainBufferNumbs();
}
......@@ -1748,6 +1810,11 @@ public class DefaultMessageStore implements MessageStore {
}
private void doReput() {
if (this.reputFromOffset < DefaultMessageStore.this.commitLog.getMinOffset()) {
log.warn("The reputFromOffset={} is smaller than minPyOffset={}, this usually indicate that the dispatch behind too much and the commitlog has expired.",
this.reputFromOffset, DefaultMessageStore.this.commitLog.getMinOffset());
this.reputFromOffset = DefaultMessageStore.this.commitLog.getMinOffset();
}
for (boolean doNext = true; this.isCommitLogAvailable() && doNext; ) {
if (DefaultMessageStore.this.getMessageStoreConfig().isDuplicationEnable()
......@@ -1763,7 +1830,7 @@ public class DefaultMessageStore implements MessageStore {
for (int readSize = 0; readSize < result.getSize() && doNext; ) {
DispatchRequest dispatchRequest =
DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(result.getByteBuffer(), false, false);
int size = dispatchRequest.getMsgSize();
int size = dispatchRequest.getBufferSize() == -1 ? dispatchRequest.getMsgSize() : dispatchRequest.getBufferSize();
if (dispatchRequest.isSuccess()) {
if (size > 0) {
......@@ -1797,12 +1864,10 @@ public class DefaultMessageStore implements MessageStore {
this.reputFromOffset += size;
} else {
doNext = false;
if (DefaultMessageStore.this.brokerConfig.getBrokerId() == MixAll.MASTER_ID) {
log.error("[BUG]the master dispatch message to consume queue error, COMMITLOG OFFSET: {}",
this.reputFromOffset);
log.error("[BUG]dispatch message to consume queue error, COMMITLOG OFFSET: {}",
this.reputFromOffset);
this.reputFromOffset += result.getSize() - readSize;
}
this.reputFromOffset += result.getSize() - readSize;
}
}
}
......
......@@ -22,7 +22,7 @@ public class DispatchRequest {
private final String topic;
private final int queueId;
private final long commitLogOffset;
private final int msgSize;
private int msgSize;
private final long tagsCode;
private final long storeTimestamp;
private final long consumeQueueOffset;
......@@ -35,6 +35,8 @@ public class DispatchRequest {
private final Map<String, String> propertiesMap;
private byte[] bitMap;
private int bufferSize = -1;//the buffer size maybe larger than the msg size if the message is wrapped by something
public DispatchRequest(
final String topic,
final int queueId,
......@@ -156,4 +158,16 @@ public class DispatchRequest {
public void setBitMap(byte[] bitMap) {
this.bitMap = bitMap;
}
public void setMsgSize(int msgSize) {
this.msgSize = msgSize;
}
public int getBufferSize() {
return bufferSize;
}
public void setBufferSize(int bufferSize) {
this.bufferSize = bufferSize;
}
}
......@@ -154,8 +154,8 @@ public class MappedFileQueue {
if (file.length() != this.mappedFileSize) {
log.warn(file + "\t" + file.length()
+ " length not matched message store config value, ignore it");
return true;
+ " length not matched message store config value, please check it manually");
return false;
}
try {
......
......@@ -21,6 +21,7 @@ import java.util.LinkedList;
import java.util.Set;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageExtBatch;
import org.apache.rocketmq.store.config.BrokerRole;
import org.apache.rocketmq.store.stats.BrokerStatsManager;
/**
......@@ -366,4 +367,10 @@ public interface MessageStore {
* @return BrokerStatsManager.
*/
BrokerStatsManager getBrokerStatsManager();
/**
* handle
* @param brokerRole
*/
void handleScheduleMessageService(BrokerRole brokerRole);
}
......@@ -48,9 +48,9 @@ public class SelectMappedBufferResult {
this.byteBuffer.limit(this.size);
}
public MappedFile getMappedFile() {
/* public MappedFile getMappedFile() {
return mappedFile;
}
}*/
// @Override
// protected void finalize() {
......
......@@ -143,6 +143,11 @@ public class MessageStoreConfig {
private int transientStorePoolSize = 5;
private boolean fastFailIfNoBufferInStorePool = false;
private boolean enableDLegerCommitLog = false;
private String dLegerGroup;
private String dLegerPeers;
private String dLegerSelfId;
public boolean isDebugLockEnable() {
return debugLockEnable;
}
......@@ -666,4 +671,35 @@ public class MessageStoreConfig {
this.commitCommitLogThoroughInterval = commitCommitLogThoroughInterval;
}
public String getdLegerGroup() {
return dLegerGroup;
}
public void setdLegerGroup(String dLegerGroup) {
this.dLegerGroup = dLegerGroup;
}
public String getdLegerPeers() {
return dLegerPeers;
}
public void setdLegerPeers(String dLegerPeers) {
this.dLegerPeers = dLegerPeers;
}
public String getdLegerSelfId() {
return dLegerSelfId;
}
public void setdLegerSelfId(String dLegerSelfId) {
this.dLegerSelfId = dLegerSelfId;
}
public boolean isEnableDLegerCommitLog() {
return enableDLegerCommitLog;
}
public void setEnableDLegerCommitLog(boolean enableDLegerCommitLog) {
this.enableDLegerCommitLog = enableDLegerCommitLog;
}
}
......@@ -90,7 +90,7 @@ public class HAConnection {
this.selector = RemotingUtil.openSelector();
this.socketChannel = socketChannel;
this.socketChannel.register(this.selector, SelectionKey.OP_READ);
this.thread.setDaemon(true);
this.setDaemon(true);
}
@Override
......@@ -205,7 +205,7 @@ public class HAConnection {
this.selector = RemotingUtil.openSelector();
this.socketChannel = socketChannel;
this.socketChannel.register(this.selector, SelectionKey.OP_WRITE);
this.thread.setDaemon(true);
this.setDaemon(true);
}
@Override
......
......@@ -19,11 +19,11 @@ package org.apache.rocketmq.store.schedule;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.rocketmq.common.ConfigManager;
import org.apache.rocketmq.common.TopicFilterType;
import org.apache.rocketmq.common.constant.LoggerName;
......@@ -38,6 +38,7 @@ import org.apache.rocketmq.store.ConsumeQueue;
import org.apache.rocketmq.store.ConsumeQueueExt;
import org.apache.rocketmq.store.DefaultMessageStore;
import org.apache.rocketmq.store.MessageExtBrokerInner;
import org.apache.rocketmq.store.MessageStore;
import org.apache.rocketmq.store.PutMessageResult;
import org.apache.rocketmq.store.PutMessageStatus;
import org.apache.rocketmq.store.SelectMappedBufferResult;
......@@ -56,15 +57,15 @@ public class ScheduleMessageService extends ConfigManager {
private final ConcurrentMap<Integer /* level */, Long/* offset */> offsetTable =
new ConcurrentHashMap<Integer, Long>(32);
private final Timer timer = new Timer("ScheduleMessageTimerThread", true);
private final DefaultMessageStore defaultMessageStore;
private final AtomicBoolean started = new AtomicBoolean(false);
private Timer timer;
private MessageStore writeMessageStore;
private int maxDelayLevel;
public ScheduleMessageService(final DefaultMessageStore defaultMessageStore) {
this.defaultMessageStore = defaultMessageStore;
this.writeMessageStore = defaultMessageStore;
}
public static int queueId2DelayLevel(final int queueId) {
......@@ -75,10 +76,18 @@ public class ScheduleMessageService extends ConfigManager {
return delayLevel - 1;
}
/**
* @param writeMessageStore
* the writeMessageStore to set
*/
public void setWriteMessageStore(MessageStore writeMessageStore) {
this.writeMessageStore = writeMessageStore;
}
public void buildRunningStats(HashMap<String, String> stats) {
Iterator<Entry<Integer, Long>> it = this.offsetTable.entrySet().iterator();
Iterator<Map.Entry<Integer, Long>> it = this.offsetTable.entrySet().iterator();
while (it.hasNext()) {
Entry<Integer, Long> next = it.next();
Map.Entry<Integer, Long> next = it.next();
int queueId = delayLevel2QueueId(next.getKey());
long delayOffset = next.getValue();
long maxOffset = this.defaultMessageStore.getMaxOffsetInQueue(SCHEDULE_TOPIC, queueId);
......@@ -102,35 +111,45 @@ public class ScheduleMessageService extends ConfigManager {
}
public void start() {
if (started.compareAndSet(false, true)) {
this.timer = new Timer("ScheduleMessageTimerThread", true);
for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) {
Integer level = entry.getKey();
Long timeDelay = entry.getValue();
Long offset = this.offsetTable.get(level);
if (null == offset) {
offset = 0L;
}
for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) {
Integer level = entry.getKey();
Long timeDelay = entry.getValue();
Long offset = this.offsetTable.get(level);
if (null == offset) {
offset = 0L;
}
if (timeDelay != null) {
this.timer.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME);
if (timeDelay != null) {
this.timer.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME);
}
}
}
this.timer.scheduleAtFixedRate(new TimerTask() {
this.timer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
try {
ScheduleMessageService.this.persist();
} catch (Throwable e) {
log.error("scheduleAtFixedRate flush exception", e);
@Override
public void run() {
try {
if (started.get()) ScheduleMessageService.this.persist();
} catch (Throwable e) {
log.error("scheduleAtFixedRate flush exception", e);
}
}
}
}, 10000, this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval());
}, 10000, this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval());
}
}
public void shutdown() {
this.timer.cancel();
if (this.started.compareAndSet(true, false)) {
if (null != this.timer)
this.timer.cancel();
}
}
public boolean isStarted() {
return started.get();
}
public int getMaxDelayLevel() {
......@@ -214,7 +233,9 @@ public class ScheduleMessageService extends ConfigManager {
@Override
public void run() {
try {
this.executeOnTimeup();
if (isStarted()) {
this.executeOnTimeup();
}
} catch (Exception e) {
// XXX: warn and notify me
log.error("ScheduleMessageService, executeOnTimeup exception", e);
......@@ -285,7 +306,7 @@ public class ScheduleMessageService extends ConfigManager {
try {
MessageExtBrokerInner msgInner = this.messageTimeup(msgExt);
PutMessageResult putMessageResult =
ScheduleMessageService.this.defaultMessageStore
ScheduleMessageService.this.writeMessageStore
.putMessage(msgInner);
if (putMessageResult != null
......
......@@ -30,9 +30,10 @@ import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.apache.rocketmq.store.stats.BrokerStatsManager;
import static org.assertj.core.api.Assertions.assertThat;
import org.junit.Test;
import static org.assertj.core.api.Assertions.assertThat;
public class ConsumeQueueTest {
private static final String msg = "Once, there was a chance for me!";
......
......@@ -42,9 +42,9 @@ public class DefaultMessageStoreShutDownTest {
public void init() throws Exception {
messageStore = spy(buildMessageStore());
boolean load = messageStore.load();
when(messageStore.dispatchBehindBytes()).thenReturn(100L);
assertTrue(load);
messageStore.start();
when(messageStore.dispatchBehindBytes()).thenReturn(100L);
}
@Test
......
......@@ -40,6 +40,7 @@ import org.apache.rocketmq.store.config.StorePathConfigHelper;
import org.apache.rocketmq.store.stats.BrokerStatsManager;
import org.junit.After;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.junit.MockitoJUnitRunner;
......@@ -123,6 +124,8 @@ public class DefaultMessageStoreTest {
messageStore.putMessage(buildMessage());
}
StoreTestUtil.waitCommitLogReput((DefaultMessageStore) messageStore);
for (long i = 0; i < totalMsgs; i++) {
GetMessageResult result = messageStore.getMessage("GROUP_A", "TOPIC_A", 0, i, 1024 * 1024, null);
assertThat(result).isNotNull();
......@@ -180,7 +183,8 @@ public class DefaultMessageStoreTest {
int queueId = 0;
String topic = "FooBar";
AppendMessageResult[] appendMessageResults = putMessages(totalCount, topic, queueId, true);
Thread.sleep(10);
//Thread.sleep(10);
StoreTestUtil.waitCommitLogReput((DefaultMessageStore) messageStore);
ConsumeQueue consumeQueue = getDefaultMessageStore().findConsumeQueue(topic, queueId);
for (AppendMessageResult appendMessageResult : appendMessageResults) {
......@@ -198,7 +202,8 @@ public class DefaultMessageStoreTest {
int queueId = 0;
String topic = "FooBar";
AppendMessageResult[] appendMessageResults = putMessages(totalCount, topic, queueId, true);
Thread.sleep(10);
//Thread.sleep(10);
StoreTestUtil.waitCommitLogReput((DefaultMessageStore) messageStore);
int skewing = 2;
ConsumeQueue consumeQueue = getDefaultMessageStore().findConsumeQueue(topic, queueId);
......@@ -222,7 +227,8 @@ public class DefaultMessageStoreTest {
int queueId = 0;
String topic = "FooBar";
AppendMessageResult[] appendMessageResults = putMessages(totalCount, topic, queueId, true);
Thread.sleep(10);
//Thread.sleep(10);
StoreTestUtil.waitCommitLogReput((DefaultMessageStore) messageStore);
int skewing = 20000;
ConsumeQueue consumeQueue = getDefaultMessageStore().findConsumeQueue(topic, queueId);
......@@ -235,6 +241,9 @@ public class DefaultMessageStoreTest {
assertThat(indexBuffer.getByteBuffer().getInt()).isEqualTo(appendMessageResults[totalCount - 1].getWroteBytes());
assertThat(indexBuffer2.getByteBuffer().getLong()).isEqualTo(appendMessageResults[0].getWroteOffset());
assertThat(indexBuffer2.getByteBuffer().getInt()).isEqualTo(appendMessageResults[0].getWroteBytes());
indexBuffer.release();
indexBuffer2.release();
}
}
......@@ -245,7 +254,9 @@ public class DefaultMessageStoreTest {
int wrongQueueId = 1;
String topic = "FooBar";
AppendMessageResult[] appendMessageResults = putMessages(totalCount, topic, queueId, false);
Thread.sleep(10);
//Thread.sleep(10);
StoreTestUtil.waitCommitLogReput((DefaultMessageStore) messageStore);
long offset = messageStore.getOffsetInQueueByTime(topic, wrongQueueId, appendMessageResults[0].getStoreTimestamp());
......@@ -259,7 +270,8 @@ public class DefaultMessageStoreTest {
int wrongQueueId = 1;
String topic = "FooBar";
putMessages(totalCount, topic, queueId, false);
Thread.sleep(10);
//Thread.sleep(10);
StoreTestUtil.waitCommitLogReput((DefaultMessageStore) messageStore);
long messageStoreTimeStamp = messageStore.getMessageStoreTimeStamp(topic, wrongQueueId, 0);
......@@ -273,7 +285,9 @@ public class DefaultMessageStoreTest {
int wrongQueueId = 1;
String topic = "FooBar";
putMessages(totalCount, topic, queueId, true);
Thread.sleep(10);
//Thread.sleep(10);
StoreTestUtil.waitCommitLogReput((DefaultMessageStore) messageStore);
long messageStoreTimeStamp = messageStore.getMessageStoreTimeStamp(topic, wrongQueueId, -1);
......@@ -287,7 +301,8 @@ public class DefaultMessageStoreTest {
int queueId = 0;
String topic = "FooBar";
AppendMessageResult[] appendMessageResults = putMessages(totalCount, topic, queueId, false);
Thread.sleep(10);
//Thread.sleep(10);
StoreTestUtil.waitCommitLogReput((DefaultMessageStore) messageStore);
ConsumeQueue consumeQueue = getDefaultMessageStore().findConsumeQueue(topic, queueId);
int minOffsetInQueue = (int)consumeQueue.getMinOffsetInQueue();
......@@ -310,7 +325,8 @@ public class DefaultMessageStoreTest {
int queueId = 0;
String topic = "FooBar";
AppendMessageResult[] appendMessageResults = putMessages(totalCount, topic, queueId, false);
Thread.sleep(10);
//Thread.sleep(10);
StoreTestUtil.waitCommitLogReput((DefaultMessageStore) messageStore);
ConsumeQueue consumeQueue = messageStore.getConsumeQueue(topic, queueId);
for (int i = 0; i < totalCount; i++) {
......@@ -412,6 +428,8 @@ public class DefaultMessageStoreTest {
master.putMessage(buildMessage());
}
StoreTestUtil.waitCommitLogReput((DefaultMessageStore) messageStore);
for (long i = 0; i < totalMsgs; i++) {
GetMessageResult result = master.getMessage("GROUP_A", "TOPIC_A", 0, i, 1024 * 1024, null);
assertThat(result).isNotNull();
......@@ -432,16 +450,21 @@ public class DefaultMessageStoreTest {
}
// wait for consume queue build
// the sleep time should be great than consume queue flush interval
Thread.sleep(100);
//Thread.sleep(100);
StoreTestUtil.waitCommitLogReput((DefaultMessageStore) messageStore);
String group = "simple";
GetMessageResult getMessageResult32 = messageStore.getMessage(group, topic, 0, 0, 32, null);
assertThat(getMessageResult32.getMessageBufferList().size()).isEqualTo(32);
getMessageResult32.release();
GetMessageResult getMessageResult20 = messageStore.getMessage(group, topic, 0, 0, 20, null);
assertThat(getMessageResult20.getMessageBufferList().size()).isEqualTo(20);
getMessageResult20.release();
GetMessageResult getMessageResult45 = messageStore.getMessage(group, topic, 0, 0, 10, null);
assertThat(getMessageResult45.getMessageBufferList().size()).isEqualTo(10);
getMessageResult45.release();
}
@Test
......@@ -455,7 +478,9 @@ public class DefaultMessageStoreTest {
messageStore.putMessage(messageExtBrokerInner);
}
Thread.sleep(100);//wait for build consumer queue
// Thread.sleep(100);//wait for build consumer queue
StoreTestUtil.waitCommitLogReput((DefaultMessageStore) messageStore);
long maxPhyOffset = messageStore.getMaxPhyOffset();
long maxCqOffset = messageStore.getMaxOffsetInQueue(topic, 0);
......@@ -475,7 +500,8 @@ public class DefaultMessageStoreTest {
messageExtBrokerInner.setQueueId(0);
messageStore.putMessage(messageExtBrokerInner);
}
Thread.sleep(100);
//Thread.sleep(100);
StoreTestUtil.waitCommitLogReput((DefaultMessageStore) messageStore);
long secondLastPhyOffset = messageStore.getMaxPhyOffset();
long secondLastCqOffset = messageStore.getMaxOffsetInQueue(topic, 0);
......@@ -504,7 +530,8 @@ public class DefaultMessageStoreTest {
messageExtBrokerInner.setQueueId(0);
messageStore.putMessage(messageExtBrokerInner);
}
Thread.sleep(100);
//Thread.sleep(100);
StoreTestUtil.waitCommitLogReput((DefaultMessageStore) messageStore);
secondLastPhyOffset = messageStore.getMaxPhyOffset();
secondLastCqOffset = messageStore.getMaxOffsetInQueue(topic, 0);
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.store;
import java.io.File;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.HashSet;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.store.config.StorePathConfigHelper;
import org.junit.After;
public class StoreTestBase {
private int QUEUE_TOTAL = 100;
private AtomicInteger QueueId = new AtomicInteger(0);
private SocketAddress BornHost = new InetSocketAddress("127.0.0.1", 8123);
private SocketAddress StoreHost = BornHost;
private byte[] MessageBody = new byte[1024];
protected Set<String> baseDirs = new HashSet<>();
private static AtomicInteger port = new AtomicInteger(30000);
public static synchronized int nextPort() {
return port.addAndGet(5);
}
protected MessageExtBrokerInner buildMessage() {
MessageExtBrokerInner msg = new MessageExtBrokerInner();
msg.setTopic("StoreTest");
msg.setTags("TAG1");
msg.setKeys("Hello");
msg.setBody(MessageBody);
msg.setKeys(String.valueOf(System.currentTimeMillis()));
msg.setQueueId(Math.abs(QueueId.getAndIncrement()) % QUEUE_TOTAL);
msg.setSysFlag(0);
msg.setBornTimestamp(System.currentTimeMillis());
msg.setStoreHost(StoreHost);
msg.setBornHost(BornHost);
return msg;
}
public static String createBaseDir() {
String baseDir = System.getProperty("user.home") + File.separator + "unitteststore" + File.separator + UUID.randomUUID();
final File file = new File(baseDir);
if (file.exists()) {
System.exit(1);
}
return baseDir;
}
public static boolean makeSureFileExists(String fileName) throws Exception {
File file = new File(fileName);
MappedFile.ensureDirOK(file.getParent());
return file.createNewFile();
}
public static void deleteFile(String fileName) {
deleteFile(new File(fileName));
}
public static void deleteFile(File file) {
UtilAll.deleteFile(file);
}
@After
public void clear() {
for (String baseDir : baseDirs) {
deleteFile(baseDir);
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.store.dledger;
import io.openmessaging.storage.dledger.DLedgerServer;
import io.openmessaging.storage.dledger.store.file.DLedgerMmapFileStore;
import io.openmessaging.storage.dledger.store.file.MmapFileList;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.store.DefaultMessageStore;
import org.apache.rocketmq.store.GetMessageResult;
import org.apache.rocketmq.store.GetMessageStatus;
import org.apache.rocketmq.store.MessageExtBrokerInner;
import org.apache.rocketmq.store.PutMessageResult;
import org.apache.rocketmq.store.PutMessageStatus;
import org.junit.Assert;
import org.junit.Test;
public class DLedgerCommitlogTest extends MessageStoreTestBase {
@Test
public void testTruncateCQ() throws Exception {
String base = createBaseDir();
String peers = String.format("n0-localhost:%d", nextPort());
String group = UUID.randomUUID().toString();
String topic = UUID.randomUUID().toString();
{
DefaultMessageStore messageStore = createDledgerMessageStore(base, group, "n0", peers, null, false, 0);
DLedgerCommitLog dLedgerCommitLog = (DLedgerCommitLog) messageStore.getCommitLog();
DLedgerServer dLedgerServer = dLedgerCommitLog.getdLedgerServer();
DLedgerMmapFileStore dLedgerMmapFileStore = (DLedgerMmapFileStore) dLedgerServer.getdLedgerStore();
MmapFileList mmapFileList = dLedgerMmapFileStore.getDataFileList();
Thread.sleep(2000);
doPutMessages(messageStore, topic, 0, 2000, 0);
Thread.sleep(100);
Assert.assertEquals(24, mmapFileList.getMappedFiles().size());
Assert.assertEquals(0, messageStore.getMinOffsetInQueue(topic, 0));
Assert.assertEquals(2000, messageStore.getMaxOffsetInQueue(topic, 0));
Assert.assertEquals(0, messageStore.dispatchBehindBytes());
doGetMessages(messageStore, topic, 0, 2000, 0);
messageStore.shutdown();
}
{
//Abnormal recover, left some commitlogs
DefaultMessageStore messageStore = createDledgerMessageStore(base, group, "n0", peers, null, true, 4);
DLedgerCommitLog dLedgerCommitLog = (DLedgerCommitLog) messageStore.getCommitLog();
DLedgerServer dLedgerServer = dLedgerCommitLog.getdLedgerServer();
DLedgerMmapFileStore dLedgerMmapFileStore = (DLedgerMmapFileStore) dLedgerServer.getdLedgerStore();
MmapFileList mmapFileList = dLedgerMmapFileStore.getDataFileList();
Thread.sleep(1000);
Assert.assertEquals(20, mmapFileList.getMappedFiles().size());
Assert.assertEquals(0, messageStore.getMinOffsetInQueue(topic, 0));
Assert.assertEquals(1700, messageStore.getMaxOffsetInQueue(topic, 0));
Assert.assertEquals(0, messageStore.dispatchBehindBytes());
doGetMessages(messageStore, topic, 0, 1700, 0);
messageStore.shutdown();
}
{
//Abnormal recover, left none commitlogs
DefaultMessageStore messageStore = createDledgerMessageStore(base, group, "n0", peers, null, true, 20);
DLedgerCommitLog dLedgerCommitLog = (DLedgerCommitLog) messageStore.getCommitLog();
DLedgerServer dLedgerServer = dLedgerCommitLog.getdLedgerServer();
DLedgerMmapFileStore dLedgerMmapFileStore = (DLedgerMmapFileStore) dLedgerServer.getdLedgerStore();
MmapFileList mmapFileList = dLedgerMmapFileStore.getDataFileList();
Thread.sleep(1000);
Assert.assertEquals(0, mmapFileList.getMappedFiles().size());
Assert.assertEquals(0, messageStore.getMinOffsetInQueue(topic, 0));
Assert.assertEquals(0, messageStore.getMaxOffsetInQueue(topic, 0));
Assert.assertEquals(0, messageStore.dispatchBehindBytes());
messageStore.shutdown();
}
}
@Test
public void testRecover() throws Exception {
String base = createBaseDir();
String peers = String.format("n0-localhost:%d", nextPort());
String group = UUID.randomUUID().toString();
String topic = UUID.randomUUID().toString();
{
DefaultMessageStore messageStore = createDledgerMessageStore(base, group, "n0", peers, null, false, 0);
Thread.sleep(1000);
doPutMessages(messageStore, topic, 0, 1000, 0);
Thread.sleep(100);
Assert.assertEquals(0, messageStore.getMinOffsetInQueue(topic, 0));
Assert.assertEquals(1000, messageStore.getMaxOffsetInQueue(topic, 0));
Assert.assertEquals(0, messageStore.dispatchBehindBytes());
doGetMessages(messageStore, topic, 0, 1000, 0);
messageStore.shutdown();
}
{
//normal recover
DefaultMessageStore messageStore = createDledgerMessageStore(base, group, "n0", peers, null, false, 0);
Assert.assertEquals(0, messageStore.getMinOffsetInQueue(topic, 0));
Assert.assertEquals(1000, messageStore.getMaxOffsetInQueue(topic, 0));
Assert.assertEquals(0, messageStore.dispatchBehindBytes());
doGetMessages(messageStore, topic, 0, 1000, 0);
messageStore.shutdown();
}
{
//abnormal recover
DefaultMessageStore messageStore = createDledgerMessageStore(base, group, "n0", peers, null, true, 0);
Assert.assertEquals(0, messageStore.getMinOffsetInQueue(topic, 0));
Assert.assertEquals(1000, messageStore.getMaxOffsetInQueue(topic, 0));
Assert.assertEquals(0, messageStore.dispatchBehindBytes());
doGetMessages(messageStore, topic, 0, 1000, 0);
messageStore.shutdown();
}
}
@Test
public void testPutAndGetMessage() throws Exception {
String base = createBaseDir();
String peers = String.format("n0-localhost:%d", nextPort());
String group = UUID.randomUUID().toString();
DefaultMessageStore messageStore = createDledgerMessageStore(base, group, "n0", peers, null, false, 0);
Thread.sleep(1000);
String topic = UUID.randomUUID().toString();
List<PutMessageResult> results = new ArrayList<>();
for (int i = 0; i < 10; i++) {
MessageExtBrokerInner msgInner = buildMessage();
msgInner.setTopic(topic);
msgInner.setQueueId(0);
PutMessageResult putMessageResult = messageStore.putMessage(msgInner);
results.add(putMessageResult);
Assert.assertEquals(PutMessageStatus.PUT_OK, putMessageResult.getPutMessageStatus());
Assert.assertEquals(i, putMessageResult.getAppendMessageResult().getLogicsOffset());
}
Thread.sleep(100);
Assert.assertEquals(0, messageStore.getMinOffsetInQueue(topic, 0));
Assert.assertEquals(10, messageStore.getMaxOffsetInQueue(topic, 0));
Assert.assertEquals(0, messageStore.dispatchBehindBytes());
GetMessageResult getMessageResult = messageStore.getMessage("group", topic, 0, 0, 32, null);
Assert.assertEquals(GetMessageStatus.FOUND, getMessageResult.getStatus());
Assert.assertEquals(10, getMessageResult.getMessageBufferList().size());
Assert.assertEquals(10, getMessageResult.getMessageMapedList().size());
for (int i = 0; i < results.size(); i++) {
ByteBuffer buffer = getMessageResult.getMessageBufferList().get(i);
MessageExt messageExt = MessageDecoder.decode(buffer);
Assert.assertEquals(i, messageExt.getQueueOffset());
Assert.assertEquals(results.get(i).getAppendMessageResult().getMsgId(), messageExt.getMsgId());
Assert.assertEquals(results.get(i).getAppendMessageResult().getWroteOffset(), messageExt.getCommitLogOffset());
}
messageStore.destroy();
messageStore.shutdown();
}
@Test
public void testCommittedPos() throws Exception {
String peers = String.format("n0-localhost:%d;n1-localhost:%d", nextPort(), nextPort());
String group = UUID.randomUUID().toString();
DefaultMessageStore leaderStore = createDledgerMessageStore(createBaseDir(), group,"n0", peers, "n0", false, 0);
String topic = UUID.randomUUID().toString();
MessageExtBrokerInner msgInner = buildMessage();
msgInner.setTopic(topic);
msgInner.setQueueId(0);
PutMessageResult putMessageResult = leaderStore.putMessage(msgInner);
Assert.assertEquals(PutMessageStatus.OS_PAGECACHE_BUSY, putMessageResult.getPutMessageStatus());
Thread.sleep(1000);
Assert.assertEquals(0, leaderStore.getCommitLog().getMaxOffset());
Assert.assertEquals(0, leaderStore.getMaxOffsetInQueue(topic, 0));
DefaultMessageStore followerStore = createDledgerMessageStore(createBaseDir(), group,"n1", peers, "n0", false, 0);
Thread.sleep(2000);
Assert.assertEquals(1, leaderStore.getMaxOffsetInQueue(topic, 0));
Assert.assertEquals(1, followerStore.getMaxOffsetInQueue(topic, 0));
Assert.assertTrue(leaderStore.getCommitLog().getMaxOffset() > 0);
leaderStore.destroy();
followerStore.destroy();
leaderStore.shutdown();
followerStore.shutdown();
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.store.dledger;
import io.openmessaging.storage.dledger.DLedgerConfig;
import io.openmessaging.storage.dledger.DLedgerServer;
import java.io.File;
import java.util.Arrays;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.store.DefaultMessageStore;
import org.apache.rocketmq.store.GetMessageResult;
import org.apache.rocketmq.store.MessageExtBrokerInner;
import org.apache.rocketmq.store.MessageStore;
import org.apache.rocketmq.store.PutMessageResult;
import org.apache.rocketmq.store.PutMessageStatus;
import org.apache.rocketmq.store.StoreTestBase;
import org.apache.rocketmq.store.config.FlushDiskType;
import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.apache.rocketmq.store.config.StorePathConfigHelper;
import org.apache.rocketmq.store.stats.BrokerStatsManager;
import org.junit.Assert;
public class MessageStoreTestBase extends StoreTestBase {
protected DefaultMessageStore createDledgerMessageStore(String base, String group, String selfId, String peers, String leaderId, boolean createAbort, int deleteFileNum) throws Exception {
System.setProperty("dledger.disk.ratio.check", "0.95");
System.setProperty("dledger.disk.ratio.clean", "0.95");
baseDirs.add(base);
MessageStoreConfig storeConfig = new MessageStoreConfig();
storeConfig.setMapedFileSizeCommitLog(1024 * 100);
storeConfig.setMapedFileSizeConsumeQueue(1024);
storeConfig.setMaxHashSlotNum(100);
storeConfig.setMaxIndexNum(100 * 10);
storeConfig.setStorePathRootDir(base);
storeConfig.setStorePathCommitLog(base + File.separator + "commitlog");
storeConfig.setFlushDiskType(FlushDiskType.ASYNC_FLUSH);
storeConfig.setEnableDLegerCommitLog(true);
storeConfig.setdLegerGroup(group);
storeConfig.setdLegerPeers(peers);
storeConfig.setdLegerSelfId(selfId);
DefaultMessageStore defaultMessageStore = new DefaultMessageStore(storeConfig, new BrokerStatsManager("DLedgerCommitlogTest"), (topic, queueId, logicOffset, tagsCode, msgStoreTime, filterBitMap, properties) -> {
}, new BrokerConfig());
DLedgerServer dLegerServer = ((DLedgerCommitLog) defaultMessageStore.getCommitLog()).getdLedgerServer();
if (leaderId != null) {
dLegerServer.getdLedgerConfig().setEnableLeaderElector(false);
if (selfId.equals(leaderId)) {
dLegerServer.getMemberState().changeToLeader(-1);
} else {
dLegerServer.getMemberState().changeToFollower(-1, leaderId);
}
}
if (createAbort) {
String fileName = StorePathConfigHelper.getAbortFile(storeConfig.getStorePathRootDir());
makeSureFileExists(fileName);
}
if (deleteFileNum > 0) {
DLedgerConfig config = dLegerServer.getdLedgerConfig();
if (deleteFileNum > 0) {
File dir = new File(config.getDataStorePath());
File[] files = dir.listFiles();
if (files != null) {
Arrays.sort(files);
for (int i = files.length - 1; i >= 0; i--) {
File file = files[i];
file.delete();
if (files.length - i >= deleteFileNum) {
break;
}
}
}
}
}
Assert.assertTrue(defaultMessageStore.load());
defaultMessageStore.start();
return defaultMessageStore;
}
protected DefaultMessageStore createMessageStore(String base, boolean createAbort) throws Exception {
baseDirs.add(base);
MessageStoreConfig storeConfig = new MessageStoreConfig();
storeConfig.setMapedFileSizeCommitLog(1024 * 100);
storeConfig.setMapedFileSizeConsumeQueue(1024);
storeConfig.setMaxHashSlotNum(100);
storeConfig.setMaxIndexNum(100 * 10);
storeConfig.setStorePathRootDir(base);
storeConfig.setStorePathCommitLog(base + File.separator + "commitlog");
storeConfig.setFlushDiskType(FlushDiskType.ASYNC_FLUSH);
DefaultMessageStore defaultMessageStore = new DefaultMessageStore(storeConfig, new BrokerStatsManager("CommitlogTest"), (topic, queueId, logicOffset, tagsCode, msgStoreTime, filterBitMap, properties) -> {
}, new BrokerConfig());
if (createAbort) {
String fileName = StorePathConfigHelper.getAbortFile(storeConfig.getStorePathRootDir());
makeSureFileExists(fileName);
}
Assert.assertTrue(defaultMessageStore.load());
defaultMessageStore.start();
return defaultMessageStore;
}
protected void doPutMessages(MessageStore messageStore, String topic, int queueId, int num, long beginLogicsOffset) {
for (int i = 0; i < num; i++) {
MessageExtBrokerInner msgInner = buildMessage();
msgInner.setTopic(topic);
msgInner.setQueueId(queueId);
PutMessageResult putMessageResult = messageStore.putMessage(msgInner);
Assert.assertEquals(PutMessageStatus.PUT_OK, putMessageResult.getPutMessageStatus());
Assert.assertEquals(beginLogicsOffset + i, putMessageResult.getAppendMessageResult().getLogicsOffset());
}
}
protected void doGetMessages(MessageStore messageStore, String topic, int queueId, int num, long beginLogicsOffset) {
for (int i = 0; i < num; i++) {
GetMessageResult getMessageResult = messageStore.getMessage("group", topic, queueId, beginLogicsOffset + i, 3, null);
Assert.assertNotNull(getMessageResult);
Assert.assertTrue(!getMessageResult.getMessageBufferList().isEmpty());
MessageExt messageExt = MessageDecoder.decode(getMessageResult.getMessageBufferList().get(0));
Assert.assertEquals(beginLogicsOffset + i, messageExt.getQueueOffset());
getMessageResult.release();
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.store.dledger;
import java.util.UUID;
import org.apache.rocketmq.store.DefaultMessageStore;
import org.apache.rocketmq.store.StoreTestBase;
import org.apache.rocketmq.store.config.StorePathConfigHelper;
import org.junit.Assert;
import org.junit.Test;
public class MixCommitlogTest extends MessageStoreTestBase {
@Test
public void testFallBehindCQ() throws Exception {
String base = createBaseDir();
String topic = UUID.randomUUID().toString();
String peers = String.format("n0-localhost:%d", nextPort());
String group = UUID.randomUUID().toString();
{
DefaultMessageStore originalStore = createMessageStore(base, false);
doPutMessages(originalStore, topic, 0, 1000, 0);
Assert.assertEquals(11, originalStore.getMaxPhyOffset()/originalStore.getMessageStoreConfig().getMapedFileSizeCommitLog());
Thread.sleep(500);
Assert.assertEquals(0, originalStore.getMinOffsetInQueue(topic, 0));
Assert.assertEquals(1000, originalStore.getMaxOffsetInQueue(topic, 0));
Assert.assertEquals(0, originalStore.dispatchBehindBytes());
doGetMessages(originalStore, topic, 0, 1000, 0);
originalStore.shutdown();
}
//delete the cq files
{
StoreTestBase.deleteFile(StorePathConfigHelper.getStorePathConsumeQueue(base));
}
{
DefaultMessageStore dledgerStore = createDledgerMessageStore(base, group, "n0", peers, null, true, 0);
Thread.sleep(2000);
Assert.assertEquals(0, dledgerStore.getMinOffsetInQueue(topic, 0));
Assert.assertEquals(1000, dledgerStore.getMaxOffsetInQueue(topic, 0));
Assert.assertEquals(0, dledgerStore.dispatchBehindBytes());
doGetMessages(dledgerStore, topic, 0, 1000, 0);
doPutMessages(dledgerStore, topic, 0, 1000, 1000);
Thread.sleep(500);
Assert.assertEquals(0, dledgerStore.getMinOffsetInQueue(topic, 0));
Assert.assertEquals(2000, dledgerStore.getMaxOffsetInQueue(topic, 0));
Assert.assertEquals(0, dledgerStore.dispatchBehindBytes());
doGetMessages(dledgerStore, topic, 0, 2000, 0);
dledgerStore.shutdown();
}
}
@Test
public void testPutAndGet() throws Exception {
String base = createBaseDir();
String topic = UUID.randomUUID().toString();
String peers = String.format("n0-localhost:%d", nextPort());
String group = UUID.randomUUID().toString();
long dividedOffset;
{
DefaultMessageStore originalStore = createMessageStore(base, false);
doPutMessages(originalStore, topic, 0, 1000, 0);
Thread.sleep(500);
Assert.assertEquals(0, originalStore.getMinOffsetInQueue(topic, 0));
Assert.assertEquals(1000, originalStore.getMaxOffsetInQueue(topic, 0));
Assert.assertEquals(0, originalStore.dispatchBehindBytes());
dividedOffset = originalStore.getCommitLog().getMaxOffset();
dividedOffset = dividedOffset - dividedOffset % originalStore.getMessageStoreConfig().getMapedFileSizeCommitLog() + originalStore.getMessageStoreConfig().getMapedFileSizeCommitLog();
doGetMessages(originalStore, topic, 0, 1000, 0);
originalStore.shutdown();
}
{
DefaultMessageStore recoverOriginalStore = createMessageStore(base, true);
Thread.sleep(500);
Assert.assertEquals(0, recoverOriginalStore.getMinOffsetInQueue(topic, 0));
Assert.assertEquals(1000, recoverOriginalStore.getMaxOffsetInQueue(topic, 0));
Assert.assertEquals(0, recoverOriginalStore.dispatchBehindBytes());
doGetMessages(recoverOriginalStore, topic, 0, 1000, 0);
recoverOriginalStore.shutdown();
}
{
DefaultMessageStore dledgerStore = createDledgerMessageStore(base, group, "n0", peers, null, true, 0);
DLedgerCommitLog dLedgerCommitLog = (DLedgerCommitLog) dledgerStore.getCommitLog();
Assert.assertFalse(dLedgerCommitLog.getdLedgerServer().getdLedgerConfig().isEnableDiskForceClean());
Assert.assertEquals(dividedOffset, dLedgerCommitLog.getDividedCommitlogOffset());
Assert.assertEquals(0, dledgerStore.dispatchBehindBytes());
Assert.assertEquals(dividedOffset, dLedgerCommitLog.getMaxOffset());
Thread.sleep(2000);
doPutMessages(dledgerStore, topic, 0, 1000, 1000);
Thread.sleep(500);
Assert.assertEquals(0, dledgerStore.getMinOffsetInQueue(topic, 0));
Assert.assertEquals(2000, dledgerStore.getMaxOffsetInQueue(topic, 0));
Assert.assertEquals(0, dledgerStore.dispatchBehindBytes());
doGetMessages(dledgerStore, topic, 0, 2000, 0);
dledgerStore.shutdown();
}
{
DefaultMessageStore recoverDledgerStore = createDledgerMessageStore(base, group, "n0", peers, null, true, 0);
DLedgerCommitLog dLedgerCommitLog = (DLedgerCommitLog) recoverDledgerStore.getCommitLog();
Assert.assertFalse(dLedgerCommitLog.getdLedgerServer().getdLedgerConfig().isEnableDiskForceClean());
Assert.assertEquals(dividedOffset, dLedgerCommitLog.getDividedCommitlogOffset());
Thread.sleep(2000);
doPutMessages(recoverDledgerStore, topic, 0, 1000, 2000);
Thread.sleep(500);
Assert.assertEquals(0, recoverDledgerStore.getMinOffsetInQueue(topic, 0));
Assert.assertEquals(3000, recoverDledgerStore.getMaxOffsetInQueue(topic, 0));
Assert.assertEquals(0, recoverDledgerStore.dispatchBehindBytes());
doGetMessages(recoverDledgerStore, topic, 0, 3000, 0);
recoverDledgerStore.shutdown();
}
}
@Test
public void testDeleteExpiredFiles() throws Exception {
String base = createBaseDir();
String topic = UUID.randomUUID().toString();
String peers = String.format("n0-localhost:%d", nextPort());
String group = UUID.randomUUID().toString();
long dividedOffset;
{
DefaultMessageStore originalStore = createMessageStore(base, false);
doPutMessages(originalStore, topic, 0, 1000, 0);
Thread.sleep(500);
Assert.assertEquals(0, originalStore.getMinOffsetInQueue(topic, 0));
Assert.assertEquals(1000, originalStore.getMaxOffsetInQueue(topic, 0));
Assert.assertEquals(0, originalStore.dispatchBehindBytes());
dividedOffset = originalStore.getCommitLog().getMaxOffset();
dividedOffset = dividedOffset - dividedOffset % originalStore.getMessageStoreConfig().getMapedFileSizeCommitLog() + originalStore.getMessageStoreConfig().getMapedFileSizeCommitLog();
originalStore.shutdown();
}
long maxPhysicalOffset;
{
DefaultMessageStore dledgerStore = createDledgerMessageStore(base, group, "n0", peers, null, true, 0);
DLedgerCommitLog dLedgerCommitLog = (DLedgerCommitLog) dledgerStore.getCommitLog();
Assert.assertTrue(dledgerStore.getMessageStoreConfig().isCleanFileForciblyEnable());
Assert.assertFalse(dLedgerCommitLog.getdLedgerServer().getdLedgerConfig().isEnableDiskForceClean());
Assert.assertEquals(dividedOffset, dLedgerCommitLog.getDividedCommitlogOffset());
Thread.sleep(2000);
doPutMessages(dledgerStore, topic, 0, 1000, 1000);
Thread.sleep(500);
Assert.assertEquals(0, dledgerStore.getMinOffsetInQueue(topic, 0));
Assert.assertEquals(2000, dledgerStore.getMaxOffsetInQueue(topic, 0));
Assert.assertEquals(0, dledgerStore.dispatchBehindBytes());
Assert.assertEquals(0, dledgerStore.getMinPhyOffset());
maxPhysicalOffset = dledgerStore.getMaxPhyOffset();
Assert.assertTrue(maxPhysicalOffset > 0);
doGetMessages(dledgerStore, topic, 0, 2000, 0);
for (int i = 0; i < 100; i++) {
dledgerStore.getCommitLog().deleteExpiredFile(System.currentTimeMillis(), 0, 0, true);
}
Assert.assertEquals(dividedOffset, dledgerStore.getMinPhyOffset());
Assert.assertEquals(maxPhysicalOffset, dledgerStore.getMaxPhyOffset());
for (int i = 0; i < 100; i++) {
Assert.assertEquals(Integer.MAX_VALUE, dledgerStore.getCommitLog().deleteExpiredFile(System.currentTimeMillis(), 0, 0, true));
}
Assert.assertEquals(dividedOffset, dledgerStore.getMinPhyOffset());
Assert.assertEquals(maxPhysicalOffset, dledgerStore.getMaxPhyOffset());
Assert.assertTrue(dledgerStore.getMessageStoreConfig().isCleanFileForciblyEnable());
Assert.assertTrue(dLedgerCommitLog.getdLedgerServer().getdLedgerConfig().isEnableDiskForceClean());
//Test fresh
dledgerStore.getMessageStoreConfig().setCleanFileForciblyEnable(false);
for (int i = 0; i < 100; i++) {
Assert.assertEquals(Integer.MAX_VALUE, dledgerStore.getCommitLog().deleteExpiredFile(System.currentTimeMillis(), 0, 0, true));
}
Assert.assertFalse(dLedgerCommitLog.getdLedgerServer().getdLedgerConfig().isEnableDiskForceClean());
doGetMessages(dledgerStore, topic, 0, 1000, 1000);
dledgerStore.shutdown();
}
}
}
......@@ -28,7 +28,7 @@
<appender-ref ref="STDOUT"/>
</logger>
<root level="ERROR">
<root level="OFF">
<appender-ref ref="STDOUT"/>
</root>
......
......@@ -17,6 +17,8 @@
package org.apache.rocketmq.test.factory;
import java.util.UUID;
import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.client.consumer.MessageSelector;
import org.apache.rocketmq.test.client.rmq.RMQBroadCastConsumer;
import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer;
......@@ -60,4 +62,12 @@ public class ConsumerFactory {
consumer.start();
return consumer;
}
public static DefaultMQPullConsumer getRMQPullConsumer(String nsAddr, String consumerGroup) throws Exception {
DefaultMQPullConsumer defaultMQPullConsumer = new DefaultMQPullConsumer(consumerGroup);
defaultMQPullConsumer.setInstanceName(UUID.randomUUID().toString());
defaultMQPullConsumer.setNamesrvAddr(nsAddr);
defaultMQPullConsumer.start();
return defaultMQPullConsumer;
}
}
......@@ -17,6 +17,7 @@
package org.apache.rocketmq.test.factory;
import java.util.UUID;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.test.util.RandomUtil;
......@@ -25,6 +26,7 @@ public class ProducerFactory {
public static DefaultMQProducer getRMQProducer(String ns) {
DefaultMQProducer producer = new DefaultMQProducer(RandomUtil.getStringByUUID());
producer.setInstanceName(UUID.randomUUID().toString());
producer.setNamesrvAddr(ns);
try {
producer.start();
......
......@@ -19,6 +19,7 @@ package org.apache.rocketmq.test.util;
import java.util.HashMap;
import java.util.Set;
import java.util.UUID;
import org.apache.log4j.Logger;
import org.apache.rocketmq.common.admin.TopicStatsTable;
import org.apache.rocketmq.common.protocol.body.ClusterInfo;
......@@ -40,6 +41,7 @@ public class MQAdmin {
int queueNum, int waitTimeSec) {
boolean createResult = false;
DefaultMQAdminExt mqAdminExt = new DefaultMQAdminExt();
mqAdminExt.setInstanceName(UUID.randomUUID().toString());
mqAdminExt.setNamesrvAddr(nameSrvAddr);
try {
mqAdminExt.start();
......
......@@ -36,13 +36,13 @@ import org.apache.rocketmq.test.util.MQAdmin;
import org.apache.rocketmq.test.util.MQRandomUtils;
public class BaseConf {
protected static String nsAddr;
public static String nsAddr;
protected static String broker1Name;
protected static String broker2Name;
protected static String clusterName;
protected static int brokerNum;
protected static int waitTime = 5;
protected static int consumeTime = 5 * 60 * 1000;
protected static int consumeTime = 2 * 60 * 1000;
protected static NamesrvController namesrvController;
protected static BrokerController brokerController1;
protected static BrokerController brokerController2;
......
......@@ -47,9 +47,14 @@ public class IntegrationTestBase {
protected static final List<BrokerController> BROKER_CONTROLLERS = new ArrayList<>();
protected static final List<NamesrvController> NAMESRV_CONTROLLERS = new ArrayList<>();
protected static int topicCreateTime = 30 * 1000;
protected static final int COMMIT_LOG_SIZE = 1024 * 1024 * 256;
protected static final int COMMIT_LOG_SIZE = 1024 * 1024 * 100;
protected static final int INDEX_NUM = 1000;
private static final AtomicInteger port = new AtomicInteger(40000);
public static synchronized int nextPort() {
return port.addAndGet(random.nextInt(10) + 10);
}
protected static Random random = new Random();
static {
......@@ -87,7 +92,7 @@ public class IntegrationTestBase {
}
private static String createBaseDir() {
public static String createBaseDir() {
String baseDir = System.getProperty("user.home") + SEP + "unitteststore-" + UUID.randomUUID();
final File file = new File(baseDir);
if (file.exists()) {
......@@ -105,14 +110,14 @@ public class IntegrationTestBase {
namesrvConfig.setKvConfigPath(baseDir + SEP + "namesrv" + SEP + "kvConfig.json");
namesrvConfig.setConfigStorePath(baseDir + SEP + "namesrv" + SEP + "namesrv.properties");
nameServerNettyServerConfig.setListenPort(9000 + random.nextInt(1000));
nameServerNettyServerConfig.setListenPort(nextPort());
NamesrvController namesrvController = new NamesrvController(namesrvConfig, nameServerNettyServerConfig);
try {
Assert.assertTrue(namesrvController.initialize());
logger.info("Name Server Start:{}", nameServerNettyServerConfig.getListenPort());
namesrvController.start();
} catch (Exception e) {
logger.info("Name Server start failed");
logger.info("Name Server start failed", e);
System.exit(1);
}
NAMESRV_CONTROLLERS.add(namesrvController);
......@@ -123,8 +128,6 @@ public class IntegrationTestBase {
public static BrokerController createAndStartBroker(String nsAddr) {
String baseDir = createBaseDir();
BrokerConfig brokerConfig = new BrokerConfig();
NettyServerConfig nettyServerConfig = new NettyServerConfig();
NettyClientConfig nettyClientConfig = new NettyClientConfig();
MessageStoreConfig storeConfig = new MessageStoreConfig();
brokerConfig.setBrokerName(BROKER_NAME_PREFIX + BROKER_INDEX.getAndIncrement());
brokerConfig.setBrokerIP1("127.0.0.1");
......@@ -132,18 +135,25 @@ public class IntegrationTestBase {
brokerConfig.setEnablePropertyFilter(true);
storeConfig.setStorePathRootDir(baseDir);
storeConfig.setStorePathCommitLog(baseDir + SEP + "commitlog");
storeConfig.setHaListenPort(8000 + random.nextInt(1000));
storeConfig.setMapedFileSizeCommitLog(COMMIT_LOG_SIZE);
storeConfig.setMaxIndexNum(INDEX_NUM);
storeConfig.setMaxHashSlotNum(INDEX_NUM * 4);
nettyServerConfig.setListenPort(10000 + random.nextInt(1000));
return createAndStartBroker(storeConfig, brokerConfig);
}
public static BrokerController createAndStartBroker(MessageStoreConfig storeConfig, BrokerConfig brokerConfig) {
NettyServerConfig nettyServerConfig = new NettyServerConfig();
NettyClientConfig nettyClientConfig = new NettyClientConfig();
nettyServerConfig.setListenPort(nextPort());
storeConfig.setHaListenPort(nextPort());
BrokerController brokerController = new BrokerController(brokerConfig, nettyServerConfig, nettyClientConfig, storeConfig);
try {
Assert.assertTrue(brokerController.initialize());
logger.info("Broker Start name:{} addr:{}", brokerConfig.getBrokerName(), brokerController.getBrokerAddr());
brokerController.start();
} catch (Exception e) {
logger.info("Broker start failed");
} catch (Throwable t) {
logger.error("Broker start failed, will exit", t);
System.exit(1);
}
BROKER_CONTROLLERS.add(brokerController);
......@@ -179,15 +189,7 @@ public class IntegrationTestBase {
if (!file.exists()) {
return;
}
if (file.isFile()) {
file.delete();
} else if (file.isDirectory()) {
File[] files = file.listFiles();
for (File file1 : files) {
deleteFile(file1);
}
file.delete();
}
UtilAll.deleteFile(file);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.test.base.dledger;
import java.util.UUID;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.client.consumer.PullResult;
import org.apache.rocketmq.client.consumer.PullStatus;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.store.config.BrokerRole;
import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.apache.rocketmq.test.base.BaseConf;
import org.apache.rocketmq.test.base.IntegrationTestBase;
import org.apache.rocketmq.test.factory.ConsumerFactory;
import org.apache.rocketmq.test.factory.ProducerFactory;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;
import static org.apache.rocketmq.test.base.IntegrationTestBase.nextPort;
import static sun.util.locale.BaseLocale.SEP;
public class DLedgerProduceAndConsumeIT {
public BrokerConfig buildBrokerConfig(String cluster, String brokerName) {
BrokerConfig brokerConfig = new BrokerConfig();
brokerConfig.setBrokerClusterName(cluster);
brokerConfig.setBrokerName(brokerName);
brokerConfig.setBrokerIP1("127.0.0.1");
brokerConfig.setNamesrvAddr(BaseConf.nsAddr);
return brokerConfig;
}
public MessageStoreConfig buildStoreConfig(String brokerName, String peers, String selfId) {
MessageStoreConfig storeConfig = new MessageStoreConfig();
String baseDir = IntegrationTestBase.createBaseDir();
storeConfig.setStorePathRootDir(baseDir);
storeConfig.setStorePathCommitLog(baseDir + SEP + "commitlog");
storeConfig.setHaListenPort(nextPort());
storeConfig.setMapedFileSizeCommitLog(10 * 1024 * 1024);
storeConfig.setEnableDLegerCommitLog(true);
storeConfig.setdLegerGroup(brokerName);
storeConfig.setdLegerSelfId(selfId);
storeConfig.setdLegerPeers(peers);
return storeConfig;
}
@Test
public void testProduceAndConsume() throws Exception {
String cluster = UUID.randomUUID().toString();
String brokerName = UUID.randomUUID().toString();
String selfId = "n0";
String peers = String.format("n0-localhost:%d", nextPort());
BrokerConfig brokerConfig = buildBrokerConfig(cluster, brokerName);
MessageStoreConfig storeConfig = buildStoreConfig(brokerName, peers, selfId);
BrokerController brokerController = IntegrationTestBase.createAndStartBroker(storeConfig, brokerConfig);
Thread.sleep(3000);
Assert.assertEquals(BrokerRole.SYNC_MASTER, storeConfig.getBrokerRole());
String topic = UUID.randomUUID().toString();
String consumerGroup = UUID.randomUUID().toString();
IntegrationTestBase.initTopic(topic, BaseConf.nsAddr, cluster, 1);
DefaultMQProducer producer = ProducerFactory.getRMQProducer(BaseConf.nsAddr);
DefaultMQPullConsumer consumer = ConsumerFactory.getRMQPullConsumer(BaseConf.nsAddr, consumerGroup);
for (int i = 0; i < 10; i++) {
Message message = new Message();
message.setTopic(topic);
message.setBody(("Hello" + i).getBytes());
SendResult sendResult = producer.send(message);
Assert.assertEquals(SendStatus.SEND_OK, sendResult.getSendStatus());
Assert.assertEquals(0, sendResult.getMessageQueue().getQueueId());
Assert.assertEquals(brokerName, sendResult.getMessageQueue().getBrokerName());
Assert.assertEquals(i, sendResult.getQueueOffset());
Assert.assertNotNull(sendResult.getMsgId());
Assert.assertNotNull(sendResult.getOffsetMsgId());
}
Thread.sleep(500);
Assert.assertEquals(0, brokerController.getMessageStore().getMinOffsetInQueue(topic, 0));
Assert.assertEquals(10, brokerController.getMessageStore().getMaxOffsetInQueue(topic, 0));
MessageQueue messageQueue = new MessageQueue(topic, brokerName, 0);
PullResult pullResult= consumer.pull(messageQueue, "*", 0, 32);
Assert.assertEquals(PullStatus.FOUND, pullResult.getPullStatus());
Assert.assertEquals(10, pullResult.getMsgFoundList().size());
for (int i = 0; i < 10; i++) {
MessageExt messageExt = pullResult.getMsgFoundList().get(i);
Assert.assertEquals(i, messageExt.getQueueOffset());
Assert.assertArrayEquals(("Hello" + i).getBytes(), messageExt.getBody());
}
producer.shutdown();
consumer.shutdown();
brokerController.shutdown();
}
}
......@@ -29,10 +29,15 @@ import org.apache.rocketmq.test.util.TestUtils;
import org.apache.rocketmq.test.util.VerifyUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import static com.google.common.truth.Truth.assertThat;
/**
* Currently, dose not support the ordered broadcast message
*/
@Ignore
public class OrderMsgBroadCastIT extends BaseBroadCastIT {
private static Logger logger = Logger.getLogger(OrderMsgBroadCastIT.class);
private RMQNormalProducer producer = null;
......
......@@ -51,7 +51,8 @@ public class NormalMsgDelayIT extends DelayConf {
}
@Test
public void testDelayLevell() {
public void testDelayLevel1() throws Exception {
Thread.sleep(3000);
int delayLevel = 1;
List<Object> delayMsgs = MQMessageFactory.getDelayMsg(topic, delayLevel, msgSize);
producer.send(delayMsgs);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册