diff --git a/.gitignore b/.gitignore index 8abdfd8fd6ee453c7abf967a941fe00b50fee5cf..264f48d0dd75ab35d8f83b0ba5affae1905a0db5 100644 --- a/.gitignore +++ b/.gitignore @@ -10,6 +10,6 @@ devenv *.versionsBackup !NOTICE-BIN !LICENSE-BIN -.DS_Store -localbin -nohup.out +.DS_Store +localbin +nohup.out diff --git a/README.md b/README.md index 478f32fa6bfc4510e84544ec714d9c171e836ee3..f964f46fc99fcc2cb56e8121441e14d95c0edb99 100644 --- a/README.md +++ b/README.md @@ -48,3 +48,5 @@ We always welcome new contributions, whether for trivial cleanups, big new featu ---------- ## License [Apache License, Version 2.0](http://www.apache.org/licenses/LICENSE-2.0.html) Copyright (C) Apache Software Foundation + + diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java index c3431ca41e519420c0b544ba774b081da69c5f92..427f86118905e5eb18d0cbd0a1bced777706e050 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java @@ -27,6 +27,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadPoolExecutor; @@ -39,6 +40,7 @@ import org.apache.rocketmq.broker.client.DefaultConsumerIdsChangeListener; import org.apache.rocketmq.broker.client.ProducerManager; import org.apache.rocketmq.broker.client.net.Broker2Client; import org.apache.rocketmq.broker.client.rebalance.RebalanceLockManager; +import org.apache.rocketmq.broker.dledger.DLedgerRoleChangeHandler; import org.apache.rocketmq.broker.filter.CommitLogDispatcherCalcBitMap; import org.apache.rocketmq.broker.filter.ConsumerFilterManager; import org.apache.rocketmq.broker.filtersrv.FilterServerManager; @@ -99,6 +101,7 @@ import org.apache.rocketmq.store.MessageArrivingListener; import org.apache.rocketmq.store.MessageStore; import org.apache.rocketmq.store.config.BrokerRole; import org.apache.rocketmq.store.config.MessageStoreConfig; +import org.apache.rocketmq.store.dledger.DLedgerCommitLog; import org.apache.rocketmq.store.stats.BrokerStats; import org.apache.rocketmq.store.stats.BrokerStatsManager; @@ -158,6 +161,7 @@ public class BrokerController { private TransactionalMessageCheckService transactionalMessageCheckService; private TransactionalMessageService transactionalMessageService; private AbstractTransactionalMessageCheckListener transactionalMessageCheckListener; + private Future slaveSyncFuture; public BrokerController( @@ -234,6 +238,10 @@ public class BrokerController { this.messageStore = new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener, this.brokerConfig); + if (messageStoreConfig.isEnableDLegerCommitLog()) { + DLedgerRoleChangeHandler roleChangeHandler = new DLedgerRoleChangeHandler(this, (DefaultMessageStore) messageStore); + ((DLedgerCommitLog)((DefaultMessageStore) messageStore).getCommitLog()).getdLedgerServer().getdLedgerLeaderElector().addRoleChangeHandler(roleChangeHandler); + } this.brokerStats = new BrokerStats((DefaultMessageStore) this.messageStore); //load plugin MessageStorePluginContext context = new MessageStorePluginContext(messageStoreConfig, brokerStatsManager, messageArrivingListener, brokerConfig); @@ -396,37 +404,26 @@ public class BrokerController { }, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS); } - if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) { - if (this.messageStoreConfig.getHaMasterAddress() != null && this.messageStoreConfig.getHaMasterAddress().length() >= 6) { - this.messageStore.updateHaMasterAddress(this.messageStoreConfig.getHaMasterAddress()); - this.updateMasterHAServerAddrPeriodically = false; - } else { - this.updateMasterHAServerAddrPeriodically = true; - } - - this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { - - @Override - public void run() { - try { - BrokerController.this.slaveSynchronize.syncAll(); - } catch (Throwable e) { - log.error("ScheduledTask syncAll slave exception", e); - } + if (!messageStoreConfig.isEnableDLegerCommitLog()) { + if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) { + if (this.messageStoreConfig.getHaMasterAddress() != null && this.messageStoreConfig.getHaMasterAddress().length() >= 6) { + this.messageStore.updateHaMasterAddress(this.messageStoreConfig.getHaMasterAddress()); + this.updateMasterHAServerAddrPeriodically = false; + } else { + this.updateMasterHAServerAddrPeriodically = true; } - }, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS); - } else { - this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { - - @Override - public void run() { - try { - BrokerController.this.printMasterAndSlaveDiff(); - } catch (Throwable e) { - log.error("schedule printMasterAndSlaveDiff error.", e); + } else { + this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { + @Override + public void run() { + try { + BrokerController.this.printMasterAndSlaveDiff(); + } catch (Throwable e) { + log.error("schedule printMasterAndSlaveDiff error.", e); + } } - } - }, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS); + }, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS); + } } if (TlsSystemConfig.tlsMode != TlsMode.DISABLED) { @@ -855,6 +852,13 @@ public class BrokerController { this.filterServerManager.start(); } + if (!messageStoreConfig.isEnableDLegerCommitLog()) { + startProcessorByHa(messageStoreConfig.getBrokerRole()); + handleSlaveSynchronize(messageStoreConfig.getBrokerRole()); + } + + + this.registerBrokerAll(true, false, true); this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @@ -877,12 +881,7 @@ public class BrokerController { this.brokerFastFailure.start(); } - if (BrokerRole.SLAVE != messageStoreConfig.getBrokerRole()) { - if (this.transactionalMessageCheckService != null) { - log.info("Start transaction service!"); - this.transactionalMessageCheckService.start(); - } - } + } public synchronized void registerIncrementBrokerData(TopicConfig topicConfig, DataVersion dataVersion) { @@ -1101,4 +1100,118 @@ public class BrokerController { return endTransactionThreadPoolQueue; } + + + + private void handleSlaveSynchronize(BrokerRole role) { + if (role == BrokerRole.SLAVE) { + if (null != slaveSyncFuture) { + slaveSyncFuture.cancel(false); + } + this.slaveSynchronize.setMasterAddr(null); + slaveSyncFuture = this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { + @Override + public void run() { + try { + BrokerController.this.slaveSynchronize.syncAll(); + } + catch (Throwable e) { + log.error("ScheduledTask SlaveSynchronize syncAll error.", e); + } + } + }, 1000 * 3, 1000 * 10, TimeUnit.MILLISECONDS); + } else { + //handle the slave synchronise + if (null != slaveSyncFuture) { + slaveSyncFuture.cancel(false); + } + this.slaveSynchronize.setMasterAddr(null); + } + } + + public void changeToSlave(int brokerId) { + log.info("Begin to change to slave brokerName={} brokerId={}", brokerConfig.getBrokerName(), brokerId); + + //change the role + brokerConfig.setBrokerId(brokerId == 0 ? 1 : brokerId); //TO DO check + messageStoreConfig.setBrokerRole(BrokerRole.SLAVE); + + //handle the scheduled service + try { + this.messageStore.handleScheduleMessageService(BrokerRole.SLAVE); + } catch (Throwable t) { + log.error("[MONITOR] handleScheduleMessageService failed when changing to slave", t); + } + + //handle the transactional service + try { + this.shutdownProcessorByHa(); + } catch (Throwable t) { + log.error("[MONITOR] shutdownProcessorByHa failed when changing to slave", t); + } + + //handle the slave synchronise + handleSlaveSynchronize(BrokerRole.SLAVE); + + try { + this.registerBrokerAll(true, true, brokerConfig.isForceRegister()); + } catch (Throwable ignored) { + + } + log.info("Finish to change to slave brokerName={} brokerId={}", brokerConfig.getBrokerName(), brokerId); + } + + + + public void changeToMaster(BrokerRole role) { + if (role == BrokerRole.SLAVE) { + return; + } + log.info("Begin to change to master brokerName={}", brokerConfig.getBrokerName()); + + //handle the slave synchronise + handleSlaveSynchronize(role); + + //handle the scheduled service + try { + this.messageStore.handleScheduleMessageService(role); + } catch (Throwable t) { + log.error("[MONITOR] handleScheduleMessageService failed when changing to master", t); + } + + //handle the transactional service + try { + this.startProcessorByHa(BrokerRole.SYNC_MASTER); + } catch (Throwable t) { + log.error("[MONITOR] startProcessorByHa failed when changing to master", t); + } + + //if the operations above are totally successful, we change to master + brokerConfig.setBrokerId(0); //TO DO check + messageStoreConfig.setBrokerRole(role); + + try { + this.registerBrokerAll(true, true, brokerConfig.isForceRegister()); + } catch (Throwable ignored) { + + } + log.info("Finish to change to master brokerName={}", brokerConfig.getBrokerName()); + } + + private void startProcessorByHa(BrokerRole role) { + if (BrokerRole.SLAVE != role) { + if (this.transactionalMessageCheckService != null) { + this.transactionalMessageCheckService.start(); + } + } + } + + private void shutdownProcessorByHa() { + if (this.transactionalMessageCheckService != null) { + this.transactionalMessageCheckService.shutdown(true); + } + } + + + } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/dledger/DLedgerRoleChangeHandler.java b/broker/src/main/java/org/apache/rocketmq/broker/dledger/DLedgerRoleChangeHandler.java new file mode 100644 index 0000000000000000000000000000000000000000..09bf10c327de1b9a0158ea7b32cc9a5c4d866305 --- /dev/null +++ b/broker/src/main/java/org/apache/rocketmq/broker/dledger/DLedgerRoleChangeHandler.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.broker.dledger; + +import io.openmessaging.storage.dledger.DLedgerLeaderElector; +import io.openmessaging.storage.dledger.DLedgerServer; +import io.openmessaging.storage.dledger.MemberState; +import io.openmessaging.storage.dledger.utils.DLedgerUtils; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import org.apache.rocketmq.broker.BrokerController; +import org.apache.rocketmq.common.ThreadFactoryImpl; +import org.apache.rocketmq.common.constant.LoggerName; +import org.apache.rocketmq.logging.InternalLogger; +import org.apache.rocketmq.logging.InternalLoggerFactory; +import org.apache.rocketmq.store.DefaultMessageStore; +import org.apache.rocketmq.store.config.BrokerRole; +import org.apache.rocketmq.store.dledger.DLedgerCommitLog; + +public class DLedgerRoleChangeHandler implements DLedgerLeaderElector.RoleChangeHandler { + + private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); + private ExecutorService executorService = Executors.newSingleThreadExecutor(new ThreadFactoryImpl("DLegerRoleChangeHandler_")); + private BrokerController brokerController; + private DefaultMessageStore messageStore; + private DLedgerCommitLog dLedgerCommitLog; + private DLedgerServer dLegerServer; + public DLedgerRoleChangeHandler(BrokerController brokerController, DefaultMessageStore messageStore) { + this.brokerController = brokerController; + this.messageStore = messageStore; + this.dLedgerCommitLog = (DLedgerCommitLog) messageStore.getCommitLog(); + this.dLegerServer = dLedgerCommitLog.getdLedgerServer(); + } + + @Override public void handle(long term, MemberState.Role role) { + Runnable runnable = new Runnable() { + @Override public void run() { + long start = System.currentTimeMillis(); + try { + boolean succ = true; + log.info("Begin handling broker role change term={} role={} currStoreRole={}", term, role, messageStore.getMessageStoreConfig().getBrokerRole()); + switch (role) { + case CANDIDATE: + if (messageStore.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE) { + brokerController.changeToSlave(dLedgerCommitLog.getId()); + } + break; + case FOLLOWER: + brokerController.changeToSlave(dLedgerCommitLog.getId()); + break; + case LEADER: + while (true) { + if (!dLegerServer.getMemberState().isLeader()) { + succ = false; + break; + } + if (dLegerServer.getdLedgerStore().getLedgerEndIndex() == -1) { + break; + } + if (dLegerServer.getdLedgerStore().getLedgerEndIndex() == dLegerServer.getdLedgerStore().getCommittedIndex() + && messageStore.dispatchBehindBytes() == 0) { + break; + } + Thread.sleep(100); + } + if (succ) { + messageStore.recoverTopicQueueTable(); + brokerController.changeToMaster(BrokerRole.SYNC_MASTER); + } + break; + default: + break; + } + log.info("Finish handling broker role change succ={} term={} role={} currStoreRole={} cost={}", succ, term, role, messageStore.getMessageStoreConfig().getBrokerRole(), DLedgerUtils.elapsed(start)); + } catch (Throwable t) { + log.info("[MONITOR]Failed handling broker role change term={} role={} currStoreRole={} cost={}", term, role, messageStore.getMessageStoreConfig().getBrokerRole(), DLedgerUtils.elapsed(start), t); + } + } + }; + executorService.submit(runnable); + } + + @Override public void startup() { + + } + + @Override public void shutdown() { + executorService.shutdown(); + } +} diff --git a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java index 4dee01cbff0dcd3b21020ee0b7ff81c031649d44..6caa2358e32eb9ece9be1e14f3fc158b3406d034 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java @@ -152,7 +152,7 @@ public class BrokerOuterAPI { registerBrokerResultList.add(result); } - log.info("register broker to name server {} OK", namesrvAddr); + log.info("register broker[{}]to name server {} OK", brokerId, namesrvAddr); } catch (Exception e) { log.warn("registerBroker Exception, {}", namesrvAddr, e); } finally { diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/ClientManageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/ClientManageProcessor.java index b5e608558cbfc148388f28f44c76f4e1cb315fd1..971237929ab0bbcca01237677881becee84d3c4c 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/ClientManageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/ClientManageProcessor.java @@ -23,7 +23,6 @@ import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.constant.PermName; import org.apache.rocketmq.common.filter.ExpressionType; -import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.common.protocol.RequestCode; import org.apache.rocketmq.common.protocol.ResponseCode; import org.apache.rocketmq.common.protocol.body.CheckClientRequestBody; @@ -36,6 +35,7 @@ import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData; import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig; import org.apache.rocketmq.common.sysflag.TopicSysFlag; import org.apache.rocketmq.filter.FilterFactory; +import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.remoting.common.RemotingHelper; import org.apache.rocketmq.remoting.exception.RemotingCommandException; diff --git a/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java b/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java index 643a812fa02b8d64f62279b2c1cc814c1d6b6a7d..7b5e5645a0b0f383863116892efd27e7e22c0604 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java @@ -54,7 +54,7 @@ public class SlaveSynchronize { private void syncTopicConfig() { String masterAddrBak = this.masterAddr; - if (masterAddrBak != null) { + if (masterAddrBak != null && !masterAddrBak.equals(brokerController.getBrokerAddr())) { try { TopicConfigSerializeWrapper topicWrapper = this.brokerController.getBrokerOuterAPI().getAllTopicConfig(masterAddrBak); @@ -78,7 +78,7 @@ public class SlaveSynchronize { private void syncConsumerOffset() { String masterAddrBak = this.masterAddr; - if (masterAddrBak != null) { + if (masterAddrBak != null && !masterAddrBak.equals(brokerController.getBrokerAddr())) { try { ConsumerOffsetSerializeWrapper offsetWrapper = this.brokerController.getBrokerOuterAPI().getAllConsumerOffset(masterAddrBak); @@ -94,7 +94,7 @@ public class SlaveSynchronize { private void syncDelayOffset() { String masterAddrBak = this.masterAddr; - if (masterAddrBak != null) { + if (masterAddrBak != null && !masterAddrBak.equals(brokerController.getBrokerAddr())) { try { String delayOffset = this.brokerController.getBrokerOuterAPI().getAllDelayOffset(masterAddrBak); @@ -118,7 +118,7 @@ public class SlaveSynchronize { private void syncSubscriptionGroupConfig() { String masterAddrBak = this.masterAddr; - if (masterAddrBak != null) { + if (masterAddrBak != null && !masterAddrBak.equals(brokerController.getBrokerAddr())) { try { SubscriptionGroupWrapper subscriptionWrapper = this.brokerController.getBrokerOuterAPI() diff --git a/broker/src/main/java/org/apache/rocketmq/broker/transaction/TransactionalMessageCheckService.java b/broker/src/main/java/org/apache/rocketmq/broker/transaction/TransactionalMessageCheckService.java index 5d515d6f11bbcc5945d87a2183b3cbba76802c60..a2cc0baa9e715366a695b1980925bfb40be84dbc 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/transaction/TransactionalMessageCheckService.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/transaction/TransactionalMessageCheckService.java @@ -22,36 +22,15 @@ import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLoggerFactory; -import java.util.concurrent.atomic.AtomicBoolean; - public class TransactionalMessageCheckService extends ServiceThread { private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.TRANSACTION_LOGGER_NAME); private BrokerController brokerController; - private final AtomicBoolean started = new AtomicBoolean(false); - public TransactionalMessageCheckService(BrokerController brokerController) { this.brokerController = brokerController; } - @Override - public void start() { - if (started.compareAndSet(false, true)) { - super.start(); - this.brokerController.getTransactionalMessageService().open(); - } - } - - @Override - public void shutdown(boolean interrupt) { - if (started.compareAndSet(true, false)) { - super.shutdown(interrupt); - this.brokerController.getTransactionalMessageService().close(); - this.brokerController.getTransactionalMessageCheckListener().shutDown(); - } - } - @Override public String getServiceName() { return TransactionalMessageCheckService.class.getSimpleName(); diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java index 80347d1052ed31f19abe8bb743d4b37ceb4a534d..57d2eda18697c7347d53362500eff59985ee9273 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java @@ -548,10 +548,10 @@ public class MQClientInstance { } } catch (Exception e) { if (this.isBrokerInNameServer(addr)) { - log.info("send heart beat to broker[{} {} {}] failed", brokerName, id, addr); + log.info("send heart beat to broker[{} {} {}] failed", brokerName, id, addr, e); } else { log.info("send heart beat to broker[{} {} {}] exception, because the broker not up, forget it", brokerName, - id, addr); + id, addr, e); } } } @@ -1046,20 +1046,8 @@ public class MQClientInstance { if (this.brokerVersionTable.get(brokerName).containsKey(brokerAddr)) { return this.brokerVersionTable.get(brokerName).get(brokerAddr); } - } else { - HeartbeatData heartbeatData = prepareHeartbeatData(); - try { - int version = this.mQClientAPIImpl.sendHearbeat(brokerAddr, heartbeatData, 3000); - return version; - } catch (Exception e) { - if (this.isBrokerInNameServer(brokerAddr)) { - log.info("send heart beat to broker[{} {}] failed", brokerName, brokerAddr); - } else { - log.info("send heart beat to broker[{} {}] exception, because the broker not up, forget it", brokerName, - brokerAddr); - } - } } + //To do need to fresh the version return 0; } diff --git a/common/src/main/java/org/apache/rocketmq/common/ServiceThread.java b/common/src/main/java/org/apache/rocketmq/common/ServiceThread.java index 53ca32908267a24ab0a878e03dbf3301b98b4506..ae46e78fa6289a6fd8b500b76fc8cc775ed3f28b 100644 --- a/common/src/main/java/org/apache/rocketmq/common/ServiceThread.java +++ b/common/src/main/java/org/apache/rocketmq/common/ServiceThread.java @@ -27,18 +27,29 @@ public abstract class ServiceThread implements Runnable { private static final long JOIN_TIME = 90 * 1000; - protected final Thread thread; + private Thread thread; protected final CountDownLatch2 waitPoint = new CountDownLatch2(1); protected volatile AtomicBoolean hasNotified = new AtomicBoolean(false); protected volatile boolean stopped = false; + protected boolean isDaemon = false; + + //Make it able to restart the thread + private final AtomicBoolean started = new AtomicBoolean(false); public ServiceThread() { - this.thread = new Thread(this, this.getServiceName()); + } public abstract String getServiceName(); public void start() { + log.info("Try to start service thread:{} started:{} lastThread:{}", getServiceName(), started.get(), thread); + if (!started.compareAndSet(false, true)) { + return; + } + stopped = false; + this.thread = new Thread(this, getServiceName()); + this.thread.setDaemon(isDaemon); this.thread.start(); } @@ -47,6 +58,10 @@ public abstract class ServiceThread implements Runnable { } public void shutdown(final boolean interrupt) { + log.info("Try to shutdown service thread:{} started:{} lastThread:{}", getServiceName(), started.get(), thread); + if (!started.compareAndSet(true, false)) { + return; + } this.stopped = true; log.info("shutdown thread " + this.getServiceName() + " interrupt " + interrupt); @@ -75,11 +90,16 @@ public abstract class ServiceThread implements Runnable { return JOIN_TIME; } + @Deprecated public void stop() { this.stop(false); } + @Deprecated public void stop(final boolean interrupt) { + if (!started.get()) { + return; + } this.stopped = true; log.info("stop thread " + this.getServiceName() + " interrupt " + interrupt); @@ -93,6 +113,9 @@ public abstract class ServiceThread implements Runnable { } public void makeStop() { + if (!started.get()) { + return; + } this.stopped = true; log.info("makestop thread " + this.getServiceName()); } @@ -128,4 +151,12 @@ public abstract class ServiceThread implements Runnable { public boolean isStopped() { return stopped; } + + public boolean isDaemon() { + return isDaemon; + } + + public void setDaemon(boolean daemon) { + isDaemon = daemon; + } } diff --git a/common/src/main/java/org/apache/rocketmq/common/message/MessageDecoder.java b/common/src/main/java/org/apache/rocketmq/common/message/MessageDecoder.java index 1eb9f4d9948dde797218908789a249bc478bb5f8..af0b638d3d0ff62bed2718708255e52b26d8c85f 100644 --- a/common/src/main/java/org/apache/rocketmq/common/message/MessageDecoder.java +++ b/common/src/main/java/org/apache/rocketmq/common/message/MessageDecoder.java @@ -41,6 +41,7 @@ public class MessageDecoder { public final static int MESSAGE_MAGIC_CODE = -626843481; public static final char NAME_VALUE_SEPARATOR = 1; public static final char PROPERTY_SEPARATOR = 2; + public static final int PHY_POS_POSITION = 4 + 4 + 4 + 4 + 4 + 8; public static final int BODY_SIZE_POSITION = 4 // 1 TOTALSIZE + 4 // 2 MAGICCODE + 4 // 3 BODYCRC diff --git a/distribution/bin/dledger/fast-try.sh b/distribution/bin/dledger/fast-try.sh new file mode 100644 index 0000000000000000000000000000000000000000..ff8a96036d1a4b759fd72ae5979cf56cbe9f8534 --- /dev/null +++ b/distribution/bin/dledger/fast-try.sh @@ -0,0 +1,102 @@ +#!/bin/sh + +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +## Revise the base dir +CURRENT_DIR="$(cd "$(dirname "$0")"; pwd)" +RMQ_DIR=$CURRENT_DIR/../.. +cd $RMQ_DIR + +function startNameserver() { + export JAVA_OPT_EXT=" -Xms512m -Xmx512m " + nohup bin/mqnamesrv & +} + +function startBroker() { + export JAVA_OPT_EXT=" -Xms1g -Xmx1g " + conf_name=$1 + nohup bin/mqbroker -c $conf_name & +} + +function stopNameserver() { + PIDS=$(ps -ef|grep java|grep NamesrvStartup|grep -v grep|awk '{print $2}') + if [ ! -z "$PIDS" ]; then + kill -s TERM $PIDS + fi +} + +function stopBroker() { + conf_name=$1 + PIDS=$(ps -ef|grep java|grep BrokerStartup|grep $conf_name|grep -v grep|awk '{print $2}') + i=1 + while [ ! -z "$PIDS" -a $i -lt 5 ] + do + echo "Waiting to kill ..." + kill -s TERM $PIDS + ((i=$i+1)) + sleep 2 + PIDS=$(ps -ef|grep java|grep BrokerStartup|grep $conf_name|grep -v grep|awk '{print $2}') + done + PIDS=$(ps -ef|grep java|grep BrokerStartup|grep $conf_name|grep -v grep|awk '{print $2}') + if [ ! -z "$PIDS" ]; then + kill -9 $PIDS + fi +} + +function stopAll() { + ps -ef|grep java|grep BrokerStartup|grep -v grep|awk '{print $2}'|xargs kill + stopNameserver + stopBroker ./conf/dledger/broker-n0.conf + stopBroker ./conf/dledger/broker-n1.conf + stopBroker ./conf/dledger/broker-n2.conf +} + +function startAll() { + startNameserver + startBroker ./conf/dledger/broker-n0.conf + startBroker ./conf/dledger/broker-n1.conf + startBroker ./conf/dledger/broker-n2.conf +} + +function checkConf() { + if [ ! -f ./conf/dledger/broker-n0.conf -o ! -f ./conf/dledger/broker-n1.conf -o ! -f ./conf/dledger/broker-n2.conf ]; then + echo "Make sure the ./conf/dledger/broker-n0.conf, ./conf/dledger/broker-n1.conf, ./conf/dledger/broker-n2.conf exists" + exit -1 + fi +} + + + +## Main +if [ $# -lt 1 ]; then + echo "Usage: sh $0 start|stop" + exit -1 +fi +action=$1 +checkConf +case $action in + "start") + startAll + exit + ;; + "stop") + stopAll + ;; + *) + echo "Usage: sh $0 start|stop" + ;; +esac + diff --git a/distribution/conf/dledger/broker-n0.conf b/distribution/conf/dledger/broker-n0.conf new file mode 100644 index 0000000000000000000000000000000000000000..5351e497dc8147560d4f7ed371ab30d63df738cd --- /dev/null +++ b/distribution/conf/dledger/broker-n0.conf @@ -0,0 +1,27 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +brokerClusterName = RaftCluster +brokerName=RaftNode00 +listenPort=30911 +namesrvAddr=127.0.0.1:9876 +storePathRootDir=/tmp/rmqstore/node00 +storePathCommitLog=/tmp/rmqstore/node00/commitlog +enableDLegerCommitLog=true +dLegerGroup=RaftNode00 +dLegerPeers=n0-127.0.0.1:40911;n1-127.0.0.1:40912;n2-127.0.0.1:40913 +## must be unique +dLegerSelfId=n0 +sendMessageThreadPoolNums=16 diff --git a/distribution/conf/dledger/broker-n1.conf b/distribution/conf/dledger/broker-n1.conf new file mode 100644 index 0000000000000000000000000000000000000000..6aaf8f9309c82fc2eb273260afb9f781e9e2d608 --- /dev/null +++ b/distribution/conf/dledger/broker-n1.conf @@ -0,0 +1,27 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +brokerClusterName = RaftCluster +brokerName=RaftNode00 +listenPort=30921 +namesrvAddr=127.0.0.1:9876 +storePathRootDir=/tmp/rmqstore/node01 +storePathCommitLog=/tmp/rmqstore/node01/commitlog +enableDLegerCommitLog=true +dLegerGroup=RaftNode00 +dLegerPeers=n0-127.0.0.1:40911;n1-127.0.0.1:40912;n2-127.0.0.1:40913 +## must be unique +dLegerSelfId=n1 +sendMessageThreadPoolNums=16 diff --git a/distribution/conf/dledger/broker-n2.conf b/distribution/conf/dledger/broker-n2.conf new file mode 100644 index 0000000000000000000000000000000000000000..c863d89ee820c6fcabadde8f76afcf451761ddab --- /dev/null +++ b/distribution/conf/dledger/broker-n2.conf @@ -0,0 +1,27 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +brokerClusterName = RaftCluster +brokerName=RaftNode00 +listenPort=30931 +namesrvAddr=127.0.0.1:9876 +storePathRootDir=/tmp/rmqstore/node02 +storePathCommitLog=/tmp/rmqstore/node02/commitlog +enableDLegerCommitLog=true +dLegerGroup=RaftNode00 +dLegerPeers=n0-127.0.0.1:40911;n1-127.0.0.1:40912;n2-127.0.0.1:40913 +## must be unique +dLegerSelfId=n2 +sendMessageThreadPoolNums=16 diff --git a/distribution/release.xml b/distribution/release.xml index 76bf5fbb873c38780ba6d15211d7cc43d773f7f5..ae9eb081875a4a2d807559685c8e1faf0359f72c 100644 --- a/distribution/release.xml +++ b/distribution/release.xml @@ -40,7 +40,7 @@ - bin/* + bin/** 0755 diff --git a/docs/cn/dledger/deploy_guide.md b/docs/cn/dledger/deploy_guide.md new file mode 100644 index 0000000000000000000000000000000000000000..faebb96e59856b98b77dabfe59cb79740fa36dd8 --- /dev/null +++ b/docs/cn/dledger/deploy_guide.md @@ -0,0 +1,76 @@ +## 前言 +该文档主要介绍如何部署自动容灾切换的 RocketMQ-on-DLedger Group。 + +RocketMQ-on-DLedger Group 是指一组**相同名称**的 Broker,至少需要 3 个节点,通过 Raft 自动选举出一个 Leader,其余节点 作为 Follower,并在 Leader 和 Follower 之间复制数据以保证高可用。 +RocketMQ-on-DLedger Group 能自动容灾切换,并保证数据一致。 +RocketMQ-on-DLedger Group 是可以水平扩展的,也即可以部署任意多个 RocketMQ-on-DLedger Group 同时对外提供服务。 + +## 1. 新集群部署 + +#### 1.1 编写配置 +每个 RocketMQ-on-DLedger Group 至少准备三台机器(本文假设为 3)。 +编写 3 个配置文件,建议参考 conf/dledger 目录下的配置文件样例。 +关键配置介绍: + +| name | 含义 | 举例 | +| --- | --- | --- | +| enableDLegerCommitLog | 是否启动 DLedger  | true | +| dLegerGroup | DLedger Raft Group的名字,建议和 brokerName 保持一致 | RaftNode00 | +| dLegerPeers | DLedger Group 内各节点的端口信息,同一个 Group 内的各个节点配置必须要保证一致 | n0-127.0.0.1:40911;n1-127.0.0.1:40912;n2-127.0.0.1:40913 | +| dLegerSelfId | 节点 id, 必须属于 dLegerPeers 中的一个;同 Group 内各个节点要唯一 | n0 | +| sendMessageThreadPoolNums | 发送线程个数,建议配置成 Cpu 核数 | 16 | + +这里贴出 conf/dledger/broker-n0.conf 的配置举例。 + +``` +brokerClusterName = RaftCluster +brokerName=RaftNode00 +listenPort=30911 +namesrvAddr=127.0.0.1:9876 +storePathRootDir=/tmp/rmqstore/node00 +storePathCommitLog=/tmp/rmqstore/node00/commitlog +enableDLegerCommitLog=true +dLegerGroup=RaftNode00 +dLegerPeers=n0-127.0.0.1:40911;n1-127.0.0.1:40912;n2-127.0.0.1:40913 +## must be unique +dLegerSelfId=n0 +sendMessageThreadPoolNums=16 +``` + +### 1.2 启动 Broker + +与老版本的启动方式一致。 + +`nohup sh bin/mqbroker -c conf/dledger/xxx-n0.conf & ` +`nohup sh bin/mqbroker -c conf/dledger/xxx-n1.conf & ` +`nohup sh bin/mqbroker -c conf/dledger/xxx-n2.conf & ` + + +## 2. 旧集群升级 + +如果旧集群采用 Master 方式部署,则每个 Master 都需要转换成一个 RocketMQ-on-DLedger Group。 +如果旧集群采用 Master-Slave 方式部署,则每个 Master-Slave 组都需要转换成一个 RocketMQ-on-DLedger Group。 + +### 2.1 杀掉旧的 Broker + +可以通过 kill 命令来完成,也可以调用 `bin/mqshutdown broker`。 + +### 2.2 检查旧的 Commitlog + +RocketMQ-on-DLedger 组中的每个节点,可以兼容旧的 Commitlog ,但其 Raft 复制过程,只能针对新增加的消息。因此,为了避免出现异常,需要保证 旧的 Commitlog 是一致的。 +如果旧的集群是采用 Master-Slave 方式部署,有可能在shutdown时,其数据并不是一致的,建议通过md5sum 的方式,检查最近的最少 2 个 Commmitlog 文件,如果发现不一致,则通过拷贝的方式进行对齐。 + +虽然 RocketMQ-on-DLedger Group 也可以以 2 节点方式部署,但其会丧失容灾切换能力(2n + 1 原则,至少需要3个节点才能容忍其中 1 个宕机)。 +所以在对齐了 Master 和 Slave 的 Commitlog 之后,还需要准备第 3 台机器,并把旧的 Commitlog 从 Master 拷贝到 第 3 台机器(记得同时拷贝一下 config 文件夹)。 + +在 3 台机器准备好了之后,旧 Commitlog 文件也保证一致之后,就可以开始走下一步修改配置了。 + +### 2.3 修改配置 + +参考新集群部署。 + +### 2.4 重新启动 Broker + +参考新集群部署。 + + diff --git a/docs/cn/dledger/quick_start.md b/docs/cn/dledger/quick_start.md new file mode 100644 index 0000000000000000000000000000000000000000..3d1989a52cd8a3b2d606f8934059fcdb69e87539 --- /dev/null +++ b/docs/cn/dledger/quick_start.md @@ -0,0 +1,61 @@ +### 前言 +该文档主要介绍如何快速构建和部署基于 DLedger 的可以自动容灾切换的 RocketMQ 集群。 + +详细的新集群部署和旧集群升级指南请参考 [部署指南](deploy_guide.md)。 + +### 1. 源码构建 +构建分为两个部分,需要先构建 DLedger,然后 构建 RocketMQ + +#### 1.1 构建 DLedger + +`git clone https://github.com/openmessaging/openmessaging-storage-dledger.git` + +`cd openmessaging-storage-dledger` + +`mvn clean install -DskipTests` + +#### 1.2 构建 RocketMQ + +`git clone https://github.com/apache/rocketmq.git` + +`cd rocketmq` + +`git checkout -b store_with_dledger origin/store_with_dledger` + +`mvn -Prelease-all -DskipTests clean install -U` + +### 2. 快速部署 + +在构建成功后 + +`cd distribution/target/apache-rocketmq` + +`sh bin/dledger/fast-try.sh start` + +如果上面的步骤执行成功,可以通过 mqadmin 运维命令查看集群状态。 + +`sh bin/mqadmin clusterList -n 127.0.0.1:9876` + +顺利的话,会看到如下内容: + +![ClusterList](https://img.alicdn.com/5476e8b07b923/TB11Z.ZyCzqK1RjSZFLXXcn2XXa) + +(BID 为 0 的表示 Master,其余都是 Follower) + +启动成功,现在可以向集群收发消息,并进行容灾切换测试了。 + +关闭快速集群,可以执行: + +`sh bin/dledger/fast-try.sh stop` + +快速部署,默认配置在 conf/dledger 里面,默认的存储路径在 /tmp/rmqstore。 + + +### 3. 容灾切换 + +部署成功,杀掉 Leader 之后(在上面的例子中,杀掉端口 30931 所在的进程),等待约 10s 左右,用 clusterList 命令查看集群,就会发现 Leader 切换到另一个节点了。 + + + + + diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java index 00962ef246ed8859863add3b29b1a58292d3946c..ecd057a29ac28a9cf736ea7bea51d469afe5e1ff 100644 --- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java +++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java @@ -128,6 +128,17 @@ public class RouteInfoManager { brokerData = new BrokerData(clusterName, brokerName, new HashMap()); this.brokerAddrTable.put(brokerName, brokerData); } + Map brokerAddrsMap = brokerData.getBrokerAddrs(); + //Switch slave to master: first remove <1, IP:PORT> in namesrv, then add <0, IP:PORT> + //The same IP:PORT must only have one record in brokerAddrTable + Iterator> it = brokerAddrsMap.entrySet().iterator(); + while (it.hasNext()) { + Entry item = it.next(); + if (null != brokerAddr && brokerAddr.equals(item.getValue()) && brokerId != item.getKey()) { + it.remove(); + } + } + String oldAddr = brokerData.getBrokerAddrs().put(brokerId, brokerAddr); registerFirst = registerFirst || (null == oldAddr); diff --git a/pom.xml b/pom.xml index ec8949cf5ae7b04a09a20ab1c421c4d4cd85fb8d..7a349369c0ef5d770e869d3d30a45b4c17996667 100644 --- a/pom.xml +++ b/pom.xml @@ -100,8 +100,8 @@ false true - 1.7 - 1.7 + 1.8 + 1.8 jacoco ${project.basedir}/../test/target/jacoco-it.exec @@ -259,6 +259,7 @@ */target/** */*.iml docs/** + localbin/** diff --git a/store/pom.xml b/store/pom.xml index 727a198c8870efedf5bccafa1de532cd797564df..a1aaf39a419f5cce09cdd2076a19fa8d440b5035 100644 --- a/store/pom.xml +++ b/store/pom.xml @@ -28,6 +28,21 @@ rocketmq-store ${project.version} + + io.openmessaging.storage + dledger + 0.1 + + + org.apache.rocketmq + rocketmq-remoting + + + org.slf4j + slf4j-log4j12 + + + ${project.groupId} rocketmq-common diff --git a/store/src/main/java/org/apache/rocketmq/store/AllocateMappedFileService.java b/store/src/main/java/org/apache/rocketmq/store/AllocateMappedFileService.java index dbb27cd744c5e98ef4ffa87de26eb5593f5dc849..5e86b8a85fc466477b16af02adad224a44822bdf 100644 --- a/store/src/main/java/org/apache/rocketmq/store/AllocateMappedFileService.java +++ b/store/src/main/java/org/apache/rocketmq/store/AllocateMappedFileService.java @@ -120,16 +120,9 @@ public class AllocateMappedFileService extends ServiceThread { return AllocateMappedFileService.class.getSimpleName(); } + @Override public void shutdown() { - this.stopped = true; - this.thread.interrupt(); - - try { - this.thread.join(this.getJointime()); - } catch (InterruptedException e) { - log.error("Interrupted", e); - } - + super.shutdown(true); for (AllocateRequest req : this.requestTable.values()) { if (req.mappedFile != null) { log.info("delete pre allocated maped file, {}", req.mappedFile.getFileName()); diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java index 03b1151643ff7e750902e3ed1f7a79f6a9da1895..cbcc1a7b728d1be62a43db41973a77090724abc2 100644 --- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java +++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java @@ -45,11 +45,11 @@ import org.apache.rocketmq.store.schedule.ScheduleMessageService; public class CommitLog { // Message's MAGIC CODE daa320a7 public final static int MESSAGE_MAGIC_CODE = -626843481; - private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME); + protected static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME); // End of file empty MAGIC CODE cbd43194 - private final static int BLANK_MAGIC_CODE = -875286124; - private final MappedFileQueue mappedFileQueue; - private final DefaultMessageStore defaultMessageStore; + protected final static int BLANK_MAGIC_CODE = -875286124; + protected final MappedFileQueue mappedFileQueue; + protected final DefaultMessageStore defaultMessageStore; private final FlushCommitLogService flushCommitLogService; //If TransientStorePool enabled, we must flush message to FileChannel at fixed periods @@ -57,11 +57,11 @@ public class CommitLog { private final AppendMessageCallback appendMessageCallback; private final ThreadLocal batchEncoderThreadLocal; - private HashMap topicQueueTable = new HashMap(1024); - private volatile long confirmOffset = -1L; + protected HashMap topicQueueTable = new HashMap(1024); + protected volatile long confirmOffset = -1L; private volatile long beginTimeInLock = 0; - private final PutMessageLock putMessageLock; + protected final PutMessageLock putMessageLock; public CommitLog(final DefaultMessageStore defaultMessageStore) { this.mappedFileQueue = new MappedFileQueue(defaultMessageStore.getMessageStoreConfig().getStorePathCommitLog(), @@ -212,6 +212,12 @@ public class CommitLog { log.warn("maxPhyOffsetOfConsumeQueue({}) >= processOffset({}), truncate dirty logic files", maxPhyOffsetOfConsumeQueue, processOffset); this.defaultMessageStore.truncateDirtyLogicFiles(processOffset); } + } else { + // Commitlog case files are deleted + log.warn("The commitlog files are deleted, and delete the consume queue files"); + this.mappedFileQueue.setFlushedWhere(0); + this.mappedFileQueue.setCommittedWhere(0); + this.defaultMessageStore.destroyLogics(); } } @@ -366,7 +372,7 @@ public class CommitLog { return new DispatchRequest(-1, false /* success */); } - private static int calMsgLength(int bodyLength, int topicLength, int propertiesLength) { + protected static int calMsgLength(int bodyLength, int topicLength, int propertiesLength) { final int msgLen = 4 //TOTALSIZE + 4 //MAGICCODE + 4 //BODYCRC @@ -396,6 +402,7 @@ public class CommitLog { this.confirmOffset = phyOffset; } + @Deprecated public void recoverAbnormally(long maxPhyOffsetOfConsumeQueue) { // recover by the minimum time stamp boolean checkCRCOnRecover = this.defaultMessageStore.getMessageStoreConfig().isCheckCRCOnRecover(); @@ -456,7 +463,7 @@ public class CommitLog { } } } else { - log.info("recover physics file end, " + mappedFile.getFileName()); + log.info("recover physics file end, " + mappedFile.getFileName() + " pos=" + byteBuffer.position()); break; } } @@ -474,6 +481,7 @@ public class CommitLog { } // Commitlog case files are deleted else { + log.warn("The commitlog files are deleted, and delete the consume queue files"); this.mappedFileQueue.setFlushedWhere(0); this.mappedFileQueue.setCommittedWhere(0); this.defaultMessageStore.destroyLogics(); diff --git a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java index 08c7f99906962f20c748ba29bf12f58c81e8035c..a63d3746b30663e562778201f6c2ee4dd1365b17 100644 --- a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java +++ b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java @@ -106,7 +106,7 @@ public class ConsumeQueue { if (offset >= 0 && size > 0) { mappedFileOffset = i + CQ_STORE_UNIT_SIZE; - this.maxPhysicOffset = offset; + this.maxPhysicOffset = offset + size; if (isExtAddr(tagsCode)) { maxExtAddr = tagsCode; } @@ -224,7 +224,7 @@ public class ConsumeQueue { int logicFileSize = this.mappedFileSize; - this.maxPhysicOffset = phyOffet - 1; + this.maxPhysicOffset = phyOffet; long maxExtAddr = 1; while (true) { MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(); @@ -249,7 +249,7 @@ public class ConsumeQueue { mappedFile.setWrotePosition(pos); mappedFile.setCommittedPosition(pos); mappedFile.setFlushedPosition(pos); - this.maxPhysicOffset = offset; + this.maxPhysicOffset = offset + size; // This maybe not take effect, when not every consume queue has extend file. if (isExtAddr(tagsCode)) { maxExtAddr = tagsCode; @@ -267,7 +267,7 @@ public class ConsumeQueue { mappedFile.setWrotePosition(pos); mappedFile.setCommittedPosition(pos); mappedFile.setFlushedPosition(pos); - this.maxPhysicOffset = offset; + this.maxPhysicOffset = offset + size; if (isExtAddr(tagsCode)) { maxExtAddr = tagsCode; } @@ -348,7 +348,7 @@ public class ConsumeQueue { long tagsCode = result.getByteBuffer().getLong(); if (offsetPy >= phyMinOffset) { - this.minLogicOffset = result.getMappedFile().getFileFromOffset() + i; + this.minLogicOffset = mappedFile.getFileFromOffset() + i; log.info("Compute logical min offset: {}, topic: {}, queueId: {}", this.getMinOffsetInQueue(), this.topic, this.queueId); // This maybe not take effect, when not every consume queue has extend file. @@ -420,7 +420,8 @@ public class ConsumeQueue { private boolean putMessagePositionInfo(final long offset, final int size, final long tagsCode, final long cqOffset) { - if (offset <= this.maxPhysicOffset) { + if (offset + size <= this.maxPhysicOffset) { + log.warn("Maybe try to build consume queue repeatedly maxPhysicOffset={} phyOffset={}", maxPhysicOffset, offset); return true; } @@ -464,7 +465,7 @@ public class ConsumeQueue { ); } } - this.maxPhysicOffset = offset; + this.maxPhysicOffset = offset + size; return mappedFile.appendMessage(this.byteBufferIndex.array()); } return false; diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java index e0aef4f37450dd546d49687d2af96ec212bbe3e3..6cec58d861637826856f04324325646eccc3d14f 100644 --- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java @@ -42,24 +42,23 @@ import org.apache.rocketmq.common.SystemClock; import org.apache.rocketmq.common.ThreadFactoryImpl; import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.constant.LoggerName; -import org.apache.rocketmq.logging.InternalLogger; -import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.common.message.MessageDecoder; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageExtBatch; import org.apache.rocketmq.common.running.RunningStats; import org.apache.rocketmq.common.sysflag.MessageSysFlag; +import org.apache.rocketmq.logging.InternalLogger; +import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.store.config.BrokerRole; import org.apache.rocketmq.store.config.MessageStoreConfig; import org.apache.rocketmq.store.config.StorePathConfigHelper; +import org.apache.rocketmq.store.dledger.DLedgerCommitLog; import org.apache.rocketmq.store.ha.HAService; import org.apache.rocketmq.store.index.IndexService; import org.apache.rocketmq.store.index.QueryOffsetResult; import org.apache.rocketmq.store.schedule.ScheduleMessageService; import org.apache.rocketmq.store.stats.BrokerStatsManager; -import static org.apache.rocketmq.store.config.BrokerRole.SLAVE; - public class DefaultMessageStore implements MessageStore { private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME); @@ -119,7 +118,11 @@ public class DefaultMessageStore implements MessageStore { this.messageStoreConfig = messageStoreConfig; this.brokerStatsManager = brokerStatsManager; this.allocateMappedFileService = new AllocateMappedFileService(this); - this.commitLog = new CommitLog(this); + if (messageStoreConfig.isEnableDLegerCommitLog()) { + this.commitLog = new DLedgerCommitLog(this); + } else { + this.commitLog = new CommitLog(this); + } this.consumeQueueTable = new ConcurrentHashMap<>(32); this.flushConsumeQueueService = new FlushConsumeQueueService(); @@ -127,8 +130,11 @@ public class DefaultMessageStore implements MessageStore { this.cleanConsumeQueueService = new CleanConsumeQueueService(); this.storeStatsService = new StoreStatsService(); this.indexService = new IndexService(this); - this.haService = new HAService(this); - + if (!messageStoreConfig.isEnableDLegerCommitLog()) { + this.haService = new HAService(this); + } else { + this.haService = null; + } this.reputMessageService = new ReputMessageService(); this.scheduleMessageService = new ScheduleMessageService(this); @@ -216,29 +222,71 @@ public class DefaultMessageStore implements MessageStore { lockFile.getChannel().write(ByteBuffer.wrap("lock".getBytes())); lockFile.getChannel().force(true); - - this.flushConsumeQueueService.start(); - this.commitLog.start(); - this.storeStatsService.start(); - - if (this.scheduleMessageService != null && SLAVE != messageStoreConfig.getBrokerRole()) { - this.scheduleMessageService.start(); + { + /** + * 1. Make sure the fast-forward messages to be truncated during the recovering according to the max physical offset of the commitlog; + * 2. DLedger committedPos may be missing, so the maxPhysicalPosInLogicQueue maybe bigger that maxOffset returned by DLedgerCommitLog, just let it go; + * 3. Calculate the reput offset according to the consume queue; + * 4. Make sure the fall-behind messages to be dispatched before starting the commitlog, especially when the broker role are automatically changed. + */ + long maxPhysicalPosInLogicQueue = commitLog.getMinOffset(); + for (ConcurrentMap maps : this.consumeQueueTable.values()) { + for (ConsumeQueue logic : maps.values()) { + if (logic.getMaxPhysicOffset() > maxPhysicalPosInLogicQueue) { + maxPhysicalPosInLogicQueue = logic.getMaxPhysicOffset(); + } + } + } + if (maxPhysicalPosInLogicQueue < 0) { + maxPhysicalPosInLogicQueue = 0; + } + if (maxPhysicalPosInLogicQueue < this.commitLog.getMinOffset()) { + maxPhysicalPosInLogicQueue = this.commitLog.getMinOffset(); + /** + * This happens in following conditions: + * 1. If someone removes all the consumequeue files or the disk get damaged. + * 2. Launch a new broker, and copy the commitlog from other brokers. + * + * All the conditions has the same in common that the maxPhysicalPosInLogicQueue should be 0. + * If the maxPhysicalPosInLogicQueue is gt 0, there maybe something wrong. + */ + log.warn("[TooSmallCqOffset] maxPhysicalPosInLogicQueue={} clMinOffset={}", maxPhysicalPosInLogicQueue, this.commitLog.getMinOffset()); + } + log.info("[SetReputOffset] maxPhysicalPosInLogicQueue={} clMinOffset={} clMaxOffset={} clConfirmedOffset={}", + maxPhysicalPosInLogicQueue, this.commitLog.getMinOffset(), this.commitLog.getMaxOffset(), this.commitLog.getConfirmOffset()); + this.reputMessageService.setReputFromOffset(maxPhysicalPosInLogicQueue); + this.reputMessageService.start(); + + /** + * 1. Finish dispatching the messages fall behind, then to start other services. + * 2. DLedger committedPos may be missing, so here just require dispatchBehindBytes <= 0 + */ + while (true) { + if (dispatchBehindBytes() <= 0) { + break; + } + Thread.sleep(1000); + log.info("Try to finish doing reput the messages fall behind during the starting, reputOffset={} maxOffset={} behind={}", this.reputMessageService.getReputFromOffset(), this.getMaxPhyOffset(), this.dispatchBehindBytes()); + } + this.recoverTopicQueueTable(); } - if (this.getMessageStoreConfig().isDuplicationEnable()) { - this.reputMessageService.setReputFromOffset(this.commitLog.getConfirmOffset()); - } else { - this.reputMessageService.setReputFromOffset(this.commitLog.getMaxOffset()); + if (!messageStoreConfig.isEnableDLegerCommitLog()) { + this.haService.start(); + this.handleScheduleMessageService(messageStoreConfig.getBrokerRole()); } - this.reputMessageService.start(); - this.haService.start(); + this.flushConsumeQueueService.start(); + this.commitLog.start(); + this.storeStatsService.start(); this.createTempFile(); this.addScheduleTask(); this.shutdown = false; } + + public void shutdown() { if (!this.shutdown) { this.shutdown = true; @@ -255,8 +303,9 @@ public class DefaultMessageStore implements MessageStore { if (this.scheduleMessageService != null) { this.scheduleMessageService.shutdown(); } - - this.haService.shutdown(); + if (this.haService != null) { + this.haService.shutdown(); + } this.storeStatsService.shutdown(); this.indexService.shutdown(); @@ -1039,6 +1088,7 @@ public class DefaultMessageStore implements MessageStore { return false; } + @Override public long dispatchBehindBytes() { return this.reputMessageService.behind(); } @@ -1320,7 +1370,7 @@ public class DefaultMessageStore implements MessageStore { return maxPhysicOffset; } - private void recoverTopicQueueTable() { + public void recoverTopicQueueTable() { HashMap table = new HashMap(1024); long minPhyOffset = this.commitLog.getMinOffset(); for (ConcurrentMap maps : this.consumeQueueTable.values()) { @@ -1382,6 +1432,18 @@ public class DefaultMessageStore implements MessageStore { return brokerStatsManager; } + @Override + public void handleScheduleMessageService(final BrokerRole brokerRole) { + if (this.scheduleMessageService != null) { + if (brokerRole == BrokerRole.SLAVE) { + this.scheduleMessageService.shutdown(); + } else { + this.scheduleMessageService.start(); + } + } + + } + public int remainTransientStoreBufferNumbs() { return this.transientStorePool.remainBufferNumbs(); } @@ -1748,6 +1810,11 @@ public class DefaultMessageStore implements MessageStore { } private void doReput() { + if (this.reputFromOffset < DefaultMessageStore.this.commitLog.getMinOffset()) { + log.warn("The reputFromOffset={} is smaller than minPyOffset={}, this usually indicate that the dispatch behind too much and the commitlog has expired.", + this.reputFromOffset, DefaultMessageStore.this.commitLog.getMinOffset()); + this.reputFromOffset = DefaultMessageStore.this.commitLog.getMinOffset(); + } for (boolean doNext = true; this.isCommitLogAvailable() && doNext; ) { if (DefaultMessageStore.this.getMessageStoreConfig().isDuplicationEnable() @@ -1763,7 +1830,7 @@ public class DefaultMessageStore implements MessageStore { for (int readSize = 0; readSize < result.getSize() && doNext; ) { DispatchRequest dispatchRequest = DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(result.getByteBuffer(), false, false); - int size = dispatchRequest.getMsgSize(); + int size = dispatchRequest.getBufferSize() == -1 ? dispatchRequest.getMsgSize() : dispatchRequest.getBufferSize(); if (dispatchRequest.isSuccess()) { if (size > 0) { @@ -1797,12 +1864,10 @@ public class DefaultMessageStore implements MessageStore { this.reputFromOffset += size; } else { doNext = false; - if (DefaultMessageStore.this.brokerConfig.getBrokerId() == MixAll.MASTER_ID) { - log.error("[BUG]the master dispatch message to consume queue error, COMMITLOG OFFSET: {}", - this.reputFromOffset); + log.error("[BUG]dispatch message to consume queue error, COMMITLOG OFFSET: {}", + this.reputFromOffset); - this.reputFromOffset += result.getSize() - readSize; - } + this.reputFromOffset += result.getSize() - readSize; } } } diff --git a/store/src/main/java/org/apache/rocketmq/store/DispatchRequest.java b/store/src/main/java/org/apache/rocketmq/store/DispatchRequest.java index 819bb948c758d7ecfab5a982c8b5454b5907daad..89d47ced5ba55b344c5376b0bcd6aa7871b11940 100644 --- a/store/src/main/java/org/apache/rocketmq/store/DispatchRequest.java +++ b/store/src/main/java/org/apache/rocketmq/store/DispatchRequest.java @@ -22,7 +22,7 @@ public class DispatchRequest { private final String topic; private final int queueId; private final long commitLogOffset; - private final int msgSize; + private int msgSize; private final long tagsCode; private final long storeTimestamp; private final long consumeQueueOffset; @@ -35,6 +35,8 @@ public class DispatchRequest { private final Map propertiesMap; private byte[] bitMap; + private int bufferSize = -1;//the buffer size maybe larger than the msg size if the message is wrapped by something + public DispatchRequest( final String topic, final int queueId, @@ -156,4 +158,16 @@ public class DispatchRequest { public void setBitMap(byte[] bitMap) { this.bitMap = bitMap; } + + public void setMsgSize(int msgSize) { + this.msgSize = msgSize; + } + + public int getBufferSize() { + return bufferSize; + } + + public void setBufferSize(int bufferSize) { + this.bufferSize = bufferSize; + } } diff --git a/store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.java b/store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.java index 86de3d2c1deed6db6b78615b9a07b90efcaed127..cc145921cef90239ca13dc5ff9b3714e2f4a27c2 100644 --- a/store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.java +++ b/store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.java @@ -154,8 +154,8 @@ public class MappedFileQueue { if (file.length() != this.mappedFileSize) { log.warn(file + "\t" + file.length() - + " length not matched message store config value, ignore it"); - return true; + + " length not matched message store config value, please check it manually"); + return false; } try { diff --git a/store/src/main/java/org/apache/rocketmq/store/MessageStore.java b/store/src/main/java/org/apache/rocketmq/store/MessageStore.java index 0f9b4f0ae6e35ef552c521855078b5638b705313..5a046ff181f747624f1bb7c427541dfba28509e3 100644 --- a/store/src/main/java/org/apache/rocketmq/store/MessageStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/MessageStore.java @@ -21,6 +21,7 @@ import java.util.LinkedList; import java.util.Set; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageExtBatch; +import org.apache.rocketmq.store.config.BrokerRole; import org.apache.rocketmq.store.stats.BrokerStatsManager; /** @@ -366,4 +367,10 @@ public interface MessageStore { * @return BrokerStatsManager. */ BrokerStatsManager getBrokerStatsManager(); + + /** + * handle + * @param brokerRole + */ + void handleScheduleMessageService(BrokerRole brokerRole); } diff --git a/store/src/main/java/org/apache/rocketmq/store/SelectMappedBufferResult.java b/store/src/main/java/org/apache/rocketmq/store/SelectMappedBufferResult.java index 7a17114c8f3627bec4d3bc6086118547ac78052c..1f826fedd891b6ff9c55ec56f2b48b2bbdbbe558 100644 --- a/store/src/main/java/org/apache/rocketmq/store/SelectMappedBufferResult.java +++ b/store/src/main/java/org/apache/rocketmq/store/SelectMappedBufferResult.java @@ -48,9 +48,9 @@ public class SelectMappedBufferResult { this.byteBuffer.limit(this.size); } - public MappedFile getMappedFile() { + /* public MappedFile getMappedFile() { return mappedFile; - } + }*/ // @Override // protected void finalize() { diff --git a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java index 8d60321ed8f2529699e07e8b96f845bd81785665..cb17345c3a1b6e4b9efccc15022d28f4d98553e3 100644 --- a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java +++ b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java @@ -143,6 +143,11 @@ public class MessageStoreConfig { private int transientStorePoolSize = 5; private boolean fastFailIfNoBufferInStorePool = false; + private boolean enableDLegerCommitLog = false; + private String dLegerGroup; + private String dLegerPeers; + private String dLegerSelfId; + public boolean isDebugLockEnable() { return debugLockEnable; } @@ -666,4 +671,35 @@ public class MessageStoreConfig { this.commitCommitLogThoroughInterval = commitCommitLogThoroughInterval; } + public String getdLegerGroup() { + return dLegerGroup; + } + + public void setdLegerGroup(String dLegerGroup) { + this.dLegerGroup = dLegerGroup; + } + + public String getdLegerPeers() { + return dLegerPeers; + } + + public void setdLegerPeers(String dLegerPeers) { + this.dLegerPeers = dLegerPeers; + } + + public String getdLegerSelfId() { + return dLegerSelfId; + } + + public void setdLegerSelfId(String dLegerSelfId) { + this.dLegerSelfId = dLegerSelfId; + } + + public boolean isEnableDLegerCommitLog() { + return enableDLegerCommitLog; + } + + public void setEnableDLegerCommitLog(boolean enableDLegerCommitLog) { + this.enableDLegerCommitLog = enableDLegerCommitLog; + } } diff --git a/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java b/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java new file mode 100644 index 0000000000000000000000000000000000000000..a48c1bed8bacbd84cc6ca0442087190d63b8a6a4 --- /dev/null +++ b/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java @@ -0,0 +1,747 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.store.dledger; + +import io.openmessaging.storage.dledger.AppendFuture; +import io.openmessaging.storage.dledger.DLedgerConfig; +import io.openmessaging.storage.dledger.DLedgerServer; +import io.openmessaging.storage.dledger.entry.DLedgerEntry; +import io.openmessaging.storage.dledger.protocol.AppendEntryRequest; +import io.openmessaging.storage.dledger.protocol.AppendEntryResponse; +import io.openmessaging.storage.dledger.protocol.DLedgerResponseCode; +import io.openmessaging.storage.dledger.store.file.DLedgerMmapFileStore; +import io.openmessaging.storage.dledger.store.file.MmapFile; +import io.openmessaging.storage.dledger.store.file.MmapFileList; +import io.openmessaging.storage.dledger.store.file.SelectMmapBufferResult; +import io.openmessaging.storage.dledger.utils.DLedgerUtils; +import java.nio.ByteBuffer; +import java.util.HashMap; +import java.util.concurrent.TimeUnit; +import org.apache.rocketmq.common.UtilAll; +import org.apache.rocketmq.common.message.MessageAccessor; +import org.apache.rocketmq.common.message.MessageConst; +import org.apache.rocketmq.common.message.MessageDecoder; +import org.apache.rocketmq.common.message.MessageExtBatch; +import org.apache.rocketmq.common.sysflag.MessageSysFlag; +import org.apache.rocketmq.store.AppendMessageResult; +import org.apache.rocketmq.store.AppendMessageStatus; +import org.apache.rocketmq.store.CommitLog; +import org.apache.rocketmq.store.DefaultMessageStore; +import org.apache.rocketmq.store.DispatchRequest; +import org.apache.rocketmq.store.MappedFile; +import org.apache.rocketmq.store.MessageExtBrokerInner; +import org.apache.rocketmq.store.PutMessageResult; +import org.apache.rocketmq.store.PutMessageStatus; +import org.apache.rocketmq.store.SelectMappedBufferResult; +import org.apache.rocketmq.store.StoreStatsService; +import org.apache.rocketmq.store.schedule.ScheduleMessageService; + +/** + * Store all metadata downtime for recovery, data protection reliability + */ +public class DLedgerCommitLog extends CommitLog { + private final DLedgerServer dLedgerServer; + private final DLedgerConfig dLedgerConfig; + private final DLedgerMmapFileStore dLedgerFileStore; + private final MmapFileList dLedgerFileList; + + //The id identifies the broker role, 0 means master, others means slave + private final int id; + + private final MessageSerializer messageSerializer; + private volatile long beginTimeInDledgerLock = 0; + + //This offset separate the old commitlog from dledger commitlog + private long dividedCommitlogOffset = -1; + + + private boolean isInrecoveringOldCommitlog = false; + + public DLedgerCommitLog(final DefaultMessageStore defaultMessageStore) { + super(defaultMessageStore); + dLedgerConfig = new DLedgerConfig(); + dLedgerConfig.setEnableDiskForceClean(defaultMessageStore.getMessageStoreConfig().isCleanFileForciblyEnable()); + dLedgerConfig.setStoreType(DLedgerConfig.FILE); + dLedgerConfig.setSelfId(defaultMessageStore.getMessageStoreConfig().getdLegerSelfId()); + dLedgerConfig.setGroup(defaultMessageStore.getMessageStoreConfig().getdLegerGroup()); + dLedgerConfig.setPeers(defaultMessageStore.getMessageStoreConfig().getdLegerPeers()); + dLedgerConfig.setStoreBaseDir(defaultMessageStore.getMessageStoreConfig().getStorePathRootDir()); + dLedgerConfig.setMappedFileSizeForEntryData(defaultMessageStore.getMessageStoreConfig().getMapedFileSizeCommitLog()); + dLedgerConfig.setDeleteWhen(defaultMessageStore.getMessageStoreConfig().getDeleteWhen()); + dLedgerConfig.setFileReservedHours(defaultMessageStore.getMessageStoreConfig().getFileReservedTime() + 1); + id = Integer.valueOf(dLedgerConfig.getSelfId().substring(1)) + 1; + dLedgerServer = new DLedgerServer(dLedgerConfig); + dLedgerFileStore = (DLedgerMmapFileStore) dLedgerServer.getdLedgerStore(); + DLedgerMmapFileStore.AppendHook appendHook = (entry, buffer, bodyOffset) -> { + assert bodyOffset == DLedgerEntry.BODY_OFFSET; + buffer.position(buffer.position() + bodyOffset + MessageDecoder.PHY_POS_POSITION); + buffer.putLong(entry.getPos() + bodyOffset); + }; + dLedgerFileStore.addAppendHook(appendHook); + dLedgerFileList = dLedgerFileStore.getDataFileList(); + this.messageSerializer = new MessageSerializer(defaultMessageStore.getMessageStoreConfig().getMaxMessageSize()); + + } + + @Override + public boolean load() { + boolean result = super.load(); + if (!result) { + return false; + } + + return true; + } + + private void refreshConfig() { + dLedgerConfig.setEnableDiskForceClean(defaultMessageStore.getMessageStoreConfig().isCleanFileForciblyEnable()); + dLedgerConfig.setDeleteWhen(defaultMessageStore.getMessageStoreConfig().getDeleteWhen()); + dLedgerConfig.setFileReservedHours(defaultMessageStore.getMessageStoreConfig().getFileReservedTime() + 1); + } + + private void disableDeleteDledger() { + dLedgerConfig.setEnableDiskForceClean(false); + dLedgerConfig.setFileReservedHours(24 * 365 * 10); + } + + @Override + public void start() { + dLedgerServer.startup(); + } + + @Override + public void shutdown() { + dLedgerServer.shutdown(); + } + + @Override + public long flush() { + dLedgerFileStore.flush(); + return dLedgerFileList.getFlushedWhere(); + } + + @Override + public long getMaxOffset() { + if (dLedgerFileStore.getCommittedPos() > 0) { + return dLedgerFileStore.getCommittedPos(); + } + if (dLedgerFileList.getMinOffset() > 0) { + return dLedgerFileList.getMinOffset(); + } + return 0; + } + + @Override + public long getMinOffset() { + if (!mappedFileQueue.getMappedFiles().isEmpty()) { + return mappedFileQueue.getMinOffset(); + } + return dLedgerFileList.getMinOffset(); + } + + @Override + public long getConfirmOffset() { + return this.getMaxOffset(); + } + + @Override + public void setConfirmOffset(long phyOffset) { + log.warn("Should not set confirm offset {} for dleger commitlog", phyOffset); + } + + + + @Override + public long remainHowManyDataToCommit() { + return dLedgerFileList.remainHowManyDataToCommit(); + } + + @Override + public long remainHowManyDataToFlush() { + return dLedgerFileList.remainHowManyDataToFlush(); + } + + @Override + public int deleteExpiredFile( + final long expiredTime, + final int deleteFilesInterval, + final long intervalForcibly, + final boolean cleanImmediately + ) { + if (mappedFileQueue.getMappedFiles().isEmpty()) { + refreshConfig(); + //To prevent too much log in defaultMessageStore + return Integer.MAX_VALUE; + } else { + disableDeleteDledger(); + } + int count = super.deleteExpiredFile(expiredTime, deleteFilesInterval, intervalForcibly, cleanImmediately); + if (count > 0 || mappedFileQueue.getMappedFiles().size() != 1) { + return count; + } + //the old logic will keep the last file, here to delete it + MappedFile mappedFile = mappedFileQueue.getLastMappedFile(); + log.info("Try to delete the last old commitlog file {}", mappedFile.getFileName()); + long liveMaxTimestamp = mappedFile.getLastModifiedTimestamp() + expiredTime; + if (System.currentTimeMillis() >= liveMaxTimestamp || cleanImmediately) { + while (!mappedFile.destroy(10 * 1000)) { + DLedgerUtils.sleep(1000); + } + mappedFileQueue.getMappedFiles().remove(mappedFile); + } + return 1; + } + + + public SelectMappedBufferResult convertSbr(SelectMmapBufferResult sbr) { + if (sbr == null) { + return null; + } else { + return new DLedgerSelectMappedBufferResult(sbr); + } + + } + + public SelectMmapBufferResult truncate(SelectMmapBufferResult sbr) { + long committedPos = dLedgerFileStore.getCommittedPos(); + if (sbr == null || sbr.getStartOffset() == committedPos) { + return null; + } + if (sbr.getStartOffset() + sbr.getSize() <= committedPos) { + return sbr; + } else { + sbr.setSize((int) (committedPos - sbr.getStartOffset())); + return sbr; + } + } + + @Override + public SelectMappedBufferResult getData(final long offset) { + if (offset < dividedCommitlogOffset) { + return super.getData(offset); + } + return this.getData(offset, offset == 0); + } + + + @Override + public SelectMappedBufferResult getData(final long offset, final boolean returnFirstOnNotFound) { + if (offset < dividedCommitlogOffset) { + return super.getData(offset, returnFirstOnNotFound); + } + if (offset >= dLedgerFileStore.getCommittedPos()) { + return null; + } + int mappedFileSize = this.dLedgerServer.getdLedgerConfig().getMappedFileSizeForEntryData(); + MmapFile mappedFile = this.dLedgerFileList.findMappedFileByOffset(offset, returnFirstOnNotFound); + if (mappedFile != null) { + int pos = (int) (offset % mappedFileSize); + SelectMmapBufferResult sbr = mappedFile.selectMappedBuffer(pos); + return convertSbr(truncate(sbr)); + } + + return null; + } + + private void recover(long maxPhyOffsetOfConsumeQueue) { + dLedgerFileStore.load(); + if (dLedgerFileList.getMappedFiles().size() > 0) { + dLedgerFileStore.recover(); + dividedCommitlogOffset = dLedgerFileList.getFirstMappedFile().getFileFromOffset(); + MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(); + if (mappedFile != null) { + disableDeleteDledger(); + } + long maxPhyOffset = dLedgerFileList.getMaxWrotePosition(); + // Clear ConsumeQueue redundant data + if (maxPhyOffsetOfConsumeQueue >= maxPhyOffset) { + log.warn("[TruncateCQ]maxPhyOffsetOfConsumeQueue({}) >= processOffset({}), truncate dirty logic files", maxPhyOffsetOfConsumeQueue, maxPhyOffset); + this.defaultMessageStore.truncateDirtyLogicFiles(maxPhyOffset); + } + return; + } + //Indicate that, it is the first time to load mixed commitlog, need to recover the old commitlog + isInrecoveringOldCommitlog = true; + //No need the abnormal recover + super.recoverNormally(maxPhyOffsetOfConsumeQueue); + isInrecoveringOldCommitlog = false; + MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(); + if (mappedFile == null) { + return; + } + ByteBuffer byteBuffer = mappedFile.sliceByteBuffer(); + byteBuffer.position(mappedFile.getWrotePosition()); + boolean needWriteMagicCode = true; + // 1 TOTAL SIZE + byteBuffer.getInt(); //size + int magicCode = byteBuffer.getInt(); + if (magicCode == CommitLog.BLANK_MAGIC_CODE) { + needWriteMagicCode = false; + } else { + log.info("Recover old commitlog found a illegal magic code={}", magicCode); + } + dLedgerConfig.setEnableDiskForceClean(false); + dividedCommitlogOffset = mappedFile.getFileFromOffset() + mappedFile.getFileSize(); + log.info("Recover old commitlog needWriteMagicCode={} pos={} file={} dividedCommitlogOffset={}", needWriteMagicCode, mappedFile.getFileFromOffset() + mappedFile.getWrotePosition(), mappedFile.getFileName(), dividedCommitlogOffset); + if (needWriteMagicCode) { + byteBuffer.position(mappedFile.getWrotePosition()); + byteBuffer.putInt(mappedFile.getFileSize() - mappedFile.getWrotePosition()); + byteBuffer.putInt(BLANK_MAGIC_CODE); + mappedFile.flush(0); + } + mappedFile.setWrotePosition(mappedFile.getFileSize()); + mappedFile.setCommittedPosition(mappedFile.getFileSize()); + mappedFile.setFlushedPosition(mappedFile.getFileSize()); + dLedgerFileList.getLastMappedFile(dividedCommitlogOffset); + log.info("Will set the initial commitlog offset={} for dledger", dividedCommitlogOffset); + } + + @Override + public void recoverNormally(long maxPhyOffsetOfConsumeQueue) { + recover(maxPhyOffsetOfConsumeQueue); + } + + @Override + public void recoverAbnormally(long maxPhyOffsetOfConsumeQueue) { + recover(maxPhyOffsetOfConsumeQueue); + } + + @Override + public DispatchRequest checkMessageAndReturnSize(ByteBuffer byteBuffer, final boolean checkCRC) { + return this.checkMessageAndReturnSize(byteBuffer, checkCRC, true); + } + + @Override + public DispatchRequest checkMessageAndReturnSize(ByteBuffer byteBuffer, final boolean checkCRC, + final boolean readBody) { + if (isInrecoveringOldCommitlog) { + return super.checkMessageAndReturnSize(byteBuffer, checkCRC, readBody); + } + try { + int bodyOffset = DLedgerEntry.BODY_OFFSET; + int pos = byteBuffer.position(); + int magic = byteBuffer.getInt(); + //In dledger, this field is size, it must be gt 0, so it could prevent collision + int magicOld = byteBuffer.getInt(); + if (magicOld == CommitLog.BLANK_MAGIC_CODE || magicOld == CommitLog.MESSAGE_MAGIC_CODE) { + byteBuffer.position(pos); + return super.checkMessageAndReturnSize(byteBuffer, checkCRC, readBody); + } + if (magic == MmapFileList.BLANK_MAGIC_CODE) { + return new DispatchRequest(0, true); + } + byteBuffer.position(pos + bodyOffset); + DispatchRequest dispatchRequest = super.checkMessageAndReturnSize(byteBuffer, checkCRC, readBody); + if (dispatchRequest.isSuccess()) { + dispatchRequest.setBufferSize(dispatchRequest.getMsgSize() + bodyOffset); + } else if (dispatchRequest.getMsgSize() > 0) { + dispatchRequest.setBufferSize(dispatchRequest.getMsgSize() + bodyOffset); + } + return dispatchRequest; + } catch (Throwable ignored) { + } + + return new DispatchRequest(-1, false /* success */); + } + + @Override + public boolean resetOffset(long offset) { + //currently, it seems resetOffset has no use + return false; + } + + @Override + public long getBeginTimeInLock() { + return beginTimeInDledgerLock; + } + + @Override + public PutMessageResult putMessage(final MessageExtBrokerInner msg) { + // Set the storage time + msg.setStoreTimestamp(System.currentTimeMillis()); + // Set the message body BODY CRC (consider the most appropriate setting + // on the client) + msg.setBodyCRC(UtilAll.crc32(msg.getBody())); + + StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService(); + + String topic = msg.getTopic(); + int queueId = msg.getQueueId(); + + //should be consistent with the old version + final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag()); + if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE + || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) { + // Delay Delivery + if (msg.getDelayTimeLevel() > 0) { + if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) { + msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()); + } + + topic = ScheduleMessageService.SCHEDULE_TOPIC; + queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel()); + + // Backup real topic, queueId + MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic()); + MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId())); + msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties())); + + msg.setTopic(topic); + msg.setQueueId(queueId); + } + } + + // Back to Results + AppendMessageResult appendResult; + AppendFuture dledgerFuture; + EncodeResult encodeResult; + + putMessageLock.lock(); //spin or ReentrantLock ,depending on store config + long eclipseTimeInLock; + long queueOffset; + try { + beginTimeInDledgerLock = this.defaultMessageStore.getSystemClock().now(); + encodeResult = this.messageSerializer.serialize(msg); + queueOffset = topicQueueTable.get(encodeResult.queueOffsetKey); + if (encodeResult.status != AppendMessageStatus.PUT_OK) { + return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, new AppendMessageResult(encodeResult.status)); + } + AppendEntryRequest request = new AppendEntryRequest(); + request.setGroup(dLedgerConfig.getGroup()); + request.setRemoteId(dLedgerServer.getMemberState().getSelfId()); + request.setBody(encodeResult.data); + dledgerFuture = (AppendFuture) dLedgerServer.handleAppend(request); + if (dledgerFuture.getPos() == -1) { + return new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR)); + } + long wroteOffset = dledgerFuture.getPos() + DLedgerEntry.BODY_OFFSET; + ByteBuffer buffer = ByteBuffer.allocate(MessageDecoder.MSG_ID_LENGTH); + String msgId = MessageDecoder.createMessageId(buffer, msg.getStoreHostBytes(), wroteOffset); + eclipseTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginTimeInDledgerLock; + appendResult = new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, encodeResult.data.length, msgId, System.currentTimeMillis(), queueOffset, eclipseTimeInLock); + switch (tranType) { + case MessageSysFlag.TRANSACTION_PREPARED_TYPE: + case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE: + break; + case MessageSysFlag.TRANSACTION_NOT_TYPE: + case MessageSysFlag.TRANSACTION_COMMIT_TYPE: + // The next update ConsumeQueue information + DLedgerCommitLog.this.topicQueueTable.put(encodeResult.queueOffsetKey, queueOffset + 1); + break; + default: + break; + } + } catch (Exception e) { + log.error("Put message error", e); + return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR)); + } finally { + beginTimeInDledgerLock = 0; + putMessageLock.unlock(); + } + + if (eclipseTimeInLock > 500) { + log.warn("[NOTIFYME]putMessage in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}", eclipseTimeInLock, msg.getBody().length, appendResult); + } + + PutMessageStatus putMessageStatus = PutMessageStatus.UNKNOWN_ERROR; + try { + AppendEntryResponse appendEntryResponse = dledgerFuture.get(3, TimeUnit.SECONDS); + switch (DLedgerResponseCode.valueOf(appendEntryResponse.getCode())) { + case SUCCESS: + putMessageStatus = PutMessageStatus.PUT_OK; + break; + case INCONSISTENT_LEADER: + case NOT_LEADER: + case LEADER_NOT_READY: + case DISK_FULL: + putMessageStatus = PutMessageStatus.SERVICE_NOT_AVAILABLE; + break; + case WAIT_QUORUM_ACK_TIMEOUT: + //Do not return flush_slave_timeout to the client, for the ons client will ignore it. + putMessageStatus = PutMessageStatus.OS_PAGECACHE_BUSY; + break; + case LEADER_PENDING_FULL: + putMessageStatus = PutMessageStatus.OS_PAGECACHE_BUSY; + break; + } + } catch (Throwable t) { + log.error("Failed to get dledger append result", t); + } + + PutMessageResult putMessageResult = new PutMessageResult(putMessageStatus, appendResult); + if (putMessageStatus == PutMessageStatus.PUT_OK) { + // Statistics + storeStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).incrementAndGet(); + storeStatsService.getSinglePutMessageTopicSizeTotal(topic).addAndGet(appendResult.getWroteBytes()); + } + return putMessageResult; + } + + @Override + public PutMessageResult putMessages(final MessageExtBatch messageExtBatch) { + return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null); + } + + + + @Override + public SelectMappedBufferResult getMessage(final long offset, final int size) { + if (offset < dividedCommitlogOffset) { + return super.getMessage(offset, size); + } + int mappedFileSize = this.dLedgerServer.getdLedgerConfig().getMappedFileSizeForEntryData(); + MmapFile mappedFile = this.dLedgerFileList.findMappedFileByOffset(offset, offset == 0); + if (mappedFile != null) { + int pos = (int) (offset % mappedFileSize); + return convertSbr(mappedFile.selectMappedBuffer(pos, size)); + } + return null; + } + + @Override + public long rollNextFile(final long offset) { + int mappedFileSize = this.defaultMessageStore.getMessageStoreConfig().getMapedFileSizeCommitLog(); + return offset + mappedFileSize - offset % mappedFileSize; + } + + @Override + public HashMap getTopicQueueTable() { + return topicQueueTable; + } + + @Override + public void setTopicQueueTable(HashMap topicQueueTable) { + this.topicQueueTable = topicQueueTable; + } + + @Override + public void destroy() { + super.destroy(); + dLedgerFileList.destroy(); + } + + @Override + public boolean appendData(long startOffset, byte[] data) { + //the old ha service will invoke method, here to prevent it + return false; + } + + @Override + public void checkSelf() { + dLedgerFileList.checkSelf(); + } + + @Override + public long lockTimeMills() { + long diff = 0; + long begin = this.beginTimeInDledgerLock; + if (begin > 0) { + diff = this.defaultMessageStore.now() - begin; + } + + if (diff < 0) { + diff = 0; + } + + return diff; + } + + class EncodeResult { + private String queueOffsetKey; + private byte[] data; + private AppendMessageStatus status; + public EncodeResult(AppendMessageStatus status, byte[] data, String queueOffsetKey) { + this.data = data; + this.status = status; + this.queueOffsetKey = queueOffsetKey; + } + } + + class MessageSerializer { + // File at the end of the minimum fixed length empty + private static final int END_FILE_MIN_BLANK_LENGTH = 4 + 4; + private final ByteBuffer msgIdMemory; + // Store the message content + private final ByteBuffer msgStoreItemMemory; + // The maximum length of the message + private final int maxMessageSize; + // Build Message Key + private final StringBuilder keyBuilder = new StringBuilder(); + + private final StringBuilder msgIdBuilder = new StringBuilder(); + + private final ByteBuffer hostHolder = ByteBuffer.allocate(8); + + MessageSerializer(final int size) { + this.msgIdMemory = ByteBuffer.allocate(MessageDecoder.MSG_ID_LENGTH); + this.msgStoreItemMemory = ByteBuffer.allocate(size + END_FILE_MIN_BLANK_LENGTH); + this.maxMessageSize = size; + } + + public ByteBuffer getMsgStoreItemMemory() { + return msgStoreItemMemory; + } + + public EncodeResult serialize(final MessageExtBrokerInner msgInner) { + // STORETIMESTAMP + STOREHOSTADDRESS + OFFSET
+ + // PHY OFFSET + long wroteOffset = 0; + + this.resetByteBuffer(hostHolder, 8); + // Record ConsumeQueue information + keyBuilder.setLength(0); + keyBuilder.append(msgInner.getTopic()); + keyBuilder.append('-'); + keyBuilder.append(msgInner.getQueueId()); + String key = keyBuilder.toString(); + + Long queueOffset = DLedgerCommitLog.this.topicQueueTable.get(key); + if (null == queueOffset) { + queueOffset = 0L; + DLedgerCommitLog.this.topicQueueTable.put(key, queueOffset); + } + + // Transaction messages that require special handling + final int tranType = MessageSysFlag.getTransactionValue(msgInner.getSysFlag()); + switch (tranType) { + // Prepared and Rollback message is not consumed, will not enter the + // consumer queuec + case MessageSysFlag.TRANSACTION_PREPARED_TYPE: + case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE: + queueOffset = 0L; + break; + case MessageSysFlag.TRANSACTION_NOT_TYPE: + case MessageSysFlag.TRANSACTION_COMMIT_TYPE: + default: + break; + } + + /** + * Serialize message + */ + final byte[] propertiesData = + msgInner.getPropertiesString() == null ? null : msgInner.getPropertiesString().getBytes(MessageDecoder.CHARSET_UTF8); + + final int propertiesLength = propertiesData == null ? 0 : propertiesData.length; + + if (propertiesLength > Short.MAX_VALUE) { + log.warn("putMessage message properties length too long. length={}", propertiesData.length); + return new EncodeResult(AppendMessageStatus.PROPERTIES_SIZE_EXCEEDED, null, key); + } + + final byte[] topicData = msgInner.getTopic().getBytes(MessageDecoder.CHARSET_UTF8); + final int topicLength = topicData.length; + + final int bodyLength = msgInner.getBody() == null ? 0 : msgInner.getBody().length; + + final int msgLen = calMsgLength(bodyLength, topicLength, propertiesLength); + + // Exceeds the maximum message + if (msgLen > this.maxMessageSize) { + DLedgerCommitLog.log.warn("message size exceeded, msg total size: " + msgLen + ", msg body size: " + bodyLength + + ", maxMessageSize: " + this.maxMessageSize); + return new EncodeResult(AppendMessageStatus.MESSAGE_SIZE_EXCEEDED, null, key); + } + // Initialization of storage space + this.resetByteBuffer(msgStoreItemMemory, msgLen); + // 1 TOTALSIZE + this.msgStoreItemMemory.putInt(msgLen); + // 2 MAGICCODE + this.msgStoreItemMemory.putInt(DLedgerCommitLog.MESSAGE_MAGIC_CODE); + // 3 BODYCRC + this.msgStoreItemMemory.putInt(msgInner.getBodyCRC()); + // 4 QUEUEID + this.msgStoreItemMemory.putInt(msgInner.getQueueId()); + // 5 FLAG + this.msgStoreItemMemory.putInt(msgInner.getFlag()); + // 6 QUEUEOFFSET + this.msgStoreItemMemory.putLong(queueOffset); + // 7 PHYSICALOFFSET + this.msgStoreItemMemory.putLong(wroteOffset); + // 8 SYSFLAG + this.msgStoreItemMemory.putInt(msgInner.getSysFlag()); + // 9 BORNTIMESTAMP + this.msgStoreItemMemory.putLong(msgInner.getBornTimestamp()); + // 10 BORNHOST + this.resetByteBuffer(hostHolder, 8); + this.msgStoreItemMemory.put(msgInner.getBornHostBytes(hostHolder)); + // 11 STORETIMESTAMP + this.msgStoreItemMemory.putLong(msgInner.getStoreTimestamp()); + // 12 STOREHOSTADDRESS + this.resetByteBuffer(hostHolder, 8); + this.msgStoreItemMemory.put(msgInner.getStoreHostBytes(hostHolder)); + //this.msgBatchMemory.put(msgInner.getStoreHostBytes()); + // 13 RECONSUMETIMES + this.msgStoreItemMemory.putInt(msgInner.getReconsumeTimes()); + // 14 Prepared Transaction Offset + this.msgStoreItemMemory.putLong(msgInner.getPreparedTransactionOffset()); + // 15 BODY + this.msgStoreItemMemory.putInt(bodyLength); + if (bodyLength > 0) { + this.msgStoreItemMemory.put(msgInner.getBody()); + } + // 16 TOPIC + this.msgStoreItemMemory.put((byte) topicLength); + this.msgStoreItemMemory.put(topicData); + // 17 PROPERTIES + this.msgStoreItemMemory.putShort((short) propertiesLength); + if (propertiesLength > 0) { + this.msgStoreItemMemory.put(propertiesData); + } + byte[] data = new byte[msgLen]; + this.msgStoreItemMemory.clear(); + this.msgStoreItemMemory.get(data); + return new EncodeResult(AppendMessageStatus.PUT_OK, data, key); + } + + private void resetByteBuffer(final ByteBuffer byteBuffer, final int limit) { + byteBuffer.flip(); + byteBuffer.limit(limit); + } + + } + + public static class DLedgerSelectMappedBufferResult extends SelectMappedBufferResult { + + private SelectMmapBufferResult sbr; + public DLedgerSelectMappedBufferResult(SelectMmapBufferResult sbr) { + super(sbr.getStartOffset(), sbr.getByteBuffer(), sbr.getSize(), null); + this.sbr = sbr; + } + + public synchronized void release() { + super.release(); + if (sbr != null) { + sbr.release(); + } + } + + } + + public DLedgerServer getdLedgerServer() { + return dLedgerServer; + } + + public int getId() { + return id; + } + + public long getDividedCommitlogOffset() { + return dividedCommitlogOffset; + } +} diff --git a/store/src/main/java/org/apache/rocketmq/store/ha/HAConnection.java b/store/src/main/java/org/apache/rocketmq/store/ha/HAConnection.java index 251dc9971f8613216e071d35f559053b1123f49a..c102881c1ca5e5c2f772b21ddf2175579553da92 100644 --- a/store/src/main/java/org/apache/rocketmq/store/ha/HAConnection.java +++ b/store/src/main/java/org/apache/rocketmq/store/ha/HAConnection.java @@ -90,7 +90,7 @@ public class HAConnection { this.selector = RemotingUtil.openSelector(); this.socketChannel = socketChannel; this.socketChannel.register(this.selector, SelectionKey.OP_READ); - this.thread.setDaemon(true); + this.setDaemon(true); } @Override @@ -205,7 +205,7 @@ public class HAConnection { this.selector = RemotingUtil.openSelector(); this.socketChannel = socketChannel; this.socketChannel.register(this.selector, SelectionKey.OP_WRITE); - this.thread.setDaemon(true); + this.setDaemon(true); } @Override diff --git a/store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java b/store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java index e707463c7959fc91fa7e498d75a9456cca91aae8..50a48d4de9c26be4144a9fc938a76a5e6dcef096 100644 --- a/store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java +++ b/store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java @@ -19,11 +19,11 @@ package org.apache.rocketmq.store.schedule; import java.util.HashMap; import java.util.Iterator; import java.util.Map; -import java.util.Map.Entry; import java.util.Timer; import java.util.TimerTask; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.rocketmq.common.ConfigManager; import org.apache.rocketmq.common.TopicFilterType; import org.apache.rocketmq.common.constant.LoggerName; @@ -38,6 +38,7 @@ import org.apache.rocketmq.store.ConsumeQueue; import org.apache.rocketmq.store.ConsumeQueueExt; import org.apache.rocketmq.store.DefaultMessageStore; import org.apache.rocketmq.store.MessageExtBrokerInner; +import org.apache.rocketmq.store.MessageStore; import org.apache.rocketmq.store.PutMessageResult; import org.apache.rocketmq.store.PutMessageStatus; import org.apache.rocketmq.store.SelectMappedBufferResult; @@ -56,15 +57,15 @@ public class ScheduleMessageService extends ConfigManager { private final ConcurrentMap offsetTable = new ConcurrentHashMap(32); - - private final Timer timer = new Timer("ScheduleMessageTimerThread", true); - private final DefaultMessageStore defaultMessageStore; - + private final AtomicBoolean started = new AtomicBoolean(false); + private Timer timer; + private MessageStore writeMessageStore; private int maxDelayLevel; public ScheduleMessageService(final DefaultMessageStore defaultMessageStore) { this.defaultMessageStore = defaultMessageStore; + this.writeMessageStore = defaultMessageStore; } public static int queueId2DelayLevel(final int queueId) { @@ -75,10 +76,18 @@ public class ScheduleMessageService extends ConfigManager { return delayLevel - 1; } + /** + * @param writeMessageStore + * the writeMessageStore to set + */ + public void setWriteMessageStore(MessageStore writeMessageStore) { + this.writeMessageStore = writeMessageStore; + } + public void buildRunningStats(HashMap stats) { - Iterator> it = this.offsetTable.entrySet().iterator(); + Iterator> it = this.offsetTable.entrySet().iterator(); while (it.hasNext()) { - Entry next = it.next(); + Map.Entry next = it.next(); int queueId = delayLevel2QueueId(next.getKey()); long delayOffset = next.getValue(); long maxOffset = this.defaultMessageStore.getMaxOffsetInQueue(SCHEDULE_TOPIC, queueId); @@ -102,35 +111,45 @@ public class ScheduleMessageService extends ConfigManager { } public void start() { + if (started.compareAndSet(false, true)) { + this.timer = new Timer("ScheduleMessageTimerThread", true); + for (Map.Entry entry : this.delayLevelTable.entrySet()) { + Integer level = entry.getKey(); + Long timeDelay = entry.getValue(); + Long offset = this.offsetTable.get(level); + if (null == offset) { + offset = 0L; + } - for (Map.Entry entry : this.delayLevelTable.entrySet()) { - Integer level = entry.getKey(); - Long timeDelay = entry.getValue(); - Long offset = this.offsetTable.get(level); - if (null == offset) { - offset = 0L; - } - - if (timeDelay != null) { - this.timer.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME); + if (timeDelay != null) { + this.timer.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME); + } } - } - this.timer.scheduleAtFixedRate(new TimerTask() { + this.timer.scheduleAtFixedRate(new TimerTask() { - @Override - public void run() { - try { - ScheduleMessageService.this.persist(); - } catch (Throwable e) { - log.error("scheduleAtFixedRate flush exception", e); + @Override + public void run() { + try { + if (started.get()) ScheduleMessageService.this.persist(); + } catch (Throwable e) { + log.error("scheduleAtFixedRate flush exception", e); + } } - } - }, 10000, this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval()); + }, 10000, this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval()); + } } public void shutdown() { - this.timer.cancel(); + if (this.started.compareAndSet(true, false)) { + if (null != this.timer) + this.timer.cancel(); + } + + } + + public boolean isStarted() { + return started.get(); } public int getMaxDelayLevel() { @@ -214,7 +233,9 @@ public class ScheduleMessageService extends ConfigManager { @Override public void run() { try { - this.executeOnTimeup(); + if (isStarted()) { + this.executeOnTimeup(); + } } catch (Exception e) { // XXX: warn and notify me log.error("ScheduleMessageService, executeOnTimeup exception", e); @@ -285,7 +306,7 @@ public class ScheduleMessageService extends ConfigManager { try { MessageExtBrokerInner msgInner = this.messageTimeup(msgExt); PutMessageResult putMessageResult = - ScheduleMessageService.this.defaultMessageStore + ScheduleMessageService.this.writeMessageStore .putMessage(msgInner); if (putMessageResult != null diff --git a/store/src/test/java/org/apache/rocketmq/store/ConsumeQueueTest.java b/store/src/test/java/org/apache/rocketmq/store/ConsumeQueueTest.java index 59bd90405dcb6d99ce2d8377074970eb2817edc6..470d70b9534d4ee216c5ba0b7ea03844f333b072 100644 --- a/store/src/test/java/org/apache/rocketmq/store/ConsumeQueueTest.java +++ b/store/src/test/java/org/apache/rocketmq/store/ConsumeQueueTest.java @@ -30,9 +30,10 @@ import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.message.MessageDecoder; import org.apache.rocketmq.store.config.MessageStoreConfig; import org.apache.rocketmq.store.stats.BrokerStatsManager; -import static org.assertj.core.api.Assertions.assertThat; import org.junit.Test; +import static org.assertj.core.api.Assertions.assertThat; + public class ConsumeQueueTest { private static final String msg = "Once, there was a chance for me!"; diff --git a/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreShutDownTest.java b/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreShutDownTest.java index 66f7f5dc8f5fc4b2f545081315692951d612d34d..78457dbfe8a7e78052c2c2a65ac719edd9ad56b0 100644 --- a/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreShutDownTest.java +++ b/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreShutDownTest.java @@ -42,9 +42,9 @@ public class DefaultMessageStoreShutDownTest { public void init() throws Exception { messageStore = spy(buildMessageStore()); boolean load = messageStore.load(); - when(messageStore.dispatchBehindBytes()).thenReturn(100L); assertTrue(load); messageStore.start(); + when(messageStore.dispatchBehindBytes()).thenReturn(100L); } @Test diff --git a/store/src/test/java/org/apache/rocketmq/store/StoreTestBase.java b/store/src/test/java/org/apache/rocketmq/store/StoreTestBase.java new file mode 100644 index 0000000000000000000000000000000000000000..0dcb3b254c0c949ad4f4186b8148dccb70332abf --- /dev/null +++ b/store/src/test/java/org/apache/rocketmq/store/StoreTestBase.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.store; + +import java.io.File; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.util.HashSet; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.rocketmq.common.UtilAll; +import org.apache.rocketmq.store.config.StorePathConfigHelper; +import org.junit.After; + +public class StoreTestBase { + + private int QUEUE_TOTAL = 100; + private AtomicInteger QueueId = new AtomicInteger(0); + private SocketAddress BornHost = new InetSocketAddress("127.0.0.1", 8123); + private SocketAddress StoreHost = BornHost; + private byte[] MessageBody = new byte[1024]; + + protected Set baseDirs = new HashSet<>(); + + private static AtomicInteger port = new AtomicInteger(30000); + + public static synchronized int nextPort() { + return port.addAndGet(5); + } + + protected MessageExtBrokerInner buildMessage() { + MessageExtBrokerInner msg = new MessageExtBrokerInner(); + msg.setTopic("StoreTest"); + msg.setTags("TAG1"); + msg.setKeys("Hello"); + msg.setBody(MessageBody); + msg.setKeys(String.valueOf(System.currentTimeMillis())); + msg.setQueueId(Math.abs(QueueId.getAndIncrement()) % QUEUE_TOTAL); + msg.setSysFlag(0); + msg.setBornTimestamp(System.currentTimeMillis()); + msg.setStoreHost(StoreHost); + msg.setBornHost(BornHost); + return msg; + } + + public static String createBaseDir() { + String baseDir = System.getProperty("user.home") + File.separator + "unitteststore" + File.separator + UUID.randomUUID(); + final File file = new File(baseDir); + if (file.exists()) { + System.exit(1); + } + return baseDir; + } + + public static boolean makeSureFileExists(String fileName) throws Exception { + File file = new File(fileName); + MappedFile.ensureDirOK(file.getParent()); + return file.createNewFile(); + } + + + public static void deleteFile(String fileName) { + deleteFile(new File(fileName)); + } + + public static void deleteFile(File file) { + UtilAll.deleteFile(file); + } + + @After + public void clear() { + for (String baseDir : baseDirs) { + deleteFile(baseDir); + } + } + +} diff --git a/store/src/test/java/org/apache/rocketmq/store/dledger/DLedgerCommitlogTest.java b/store/src/test/java/org/apache/rocketmq/store/dledger/DLedgerCommitlogTest.java new file mode 100644 index 0000000000000000000000000000000000000000..d0829d122a1ca1ad250053678f812eb1edac7c57 --- /dev/null +++ b/store/src/test/java/org/apache/rocketmq/store/dledger/DLedgerCommitlogTest.java @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.store.dledger; + +import io.openmessaging.storage.dledger.DLedgerServer; +import io.openmessaging.storage.dledger.store.file.DLedgerMmapFileStore; +import io.openmessaging.storage.dledger.store.file.MmapFileList; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; +import org.apache.rocketmq.common.message.MessageDecoder; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.store.DefaultMessageStore; +import org.apache.rocketmq.store.GetMessageResult; +import org.apache.rocketmq.store.GetMessageStatus; +import org.apache.rocketmq.store.MessageExtBrokerInner; +import org.apache.rocketmq.store.PutMessageResult; +import org.apache.rocketmq.store.PutMessageStatus; +import org.junit.Assert; +import org.junit.Test; + +public class DLedgerCommitlogTest extends MessageStoreTestBase { + + + @Test + public void testTruncateCQ() throws Exception { + String base = createBaseDir(); + String peers = String.format("n0-localhost:%d", nextPort()); + String group = UUID.randomUUID().toString(); + String topic = UUID.randomUUID().toString(); + { + DefaultMessageStore messageStore = createDledgerMessageStore(base, group, "n0", peers, null, false, 0); + DLedgerCommitLog dLedgerCommitLog = (DLedgerCommitLog) messageStore.getCommitLog(); + DLedgerServer dLedgerServer = dLedgerCommitLog.getdLedgerServer(); + DLedgerMmapFileStore dLedgerMmapFileStore = (DLedgerMmapFileStore) dLedgerServer.getdLedgerStore(); + MmapFileList mmapFileList = dLedgerMmapFileStore.getDataFileList(); + Thread.sleep(2000); + doPutMessages(messageStore, topic, 0, 2000, 0); + Thread.sleep(100); + Assert.assertEquals(24, mmapFileList.getMappedFiles().size()); + Assert.assertEquals(0, messageStore.getMinOffsetInQueue(topic, 0)); + Assert.assertEquals(2000, messageStore.getMaxOffsetInQueue(topic, 0)); + Assert.assertEquals(0, messageStore.dispatchBehindBytes()); + doGetMessages(messageStore, topic, 0, 2000, 0); + messageStore.shutdown(); + } + + { + //Abnormal recover, left some commitlogs + DefaultMessageStore messageStore = createDledgerMessageStore(base, group, "n0", peers, null, true, 4); + DLedgerCommitLog dLedgerCommitLog = (DLedgerCommitLog) messageStore.getCommitLog(); + DLedgerServer dLedgerServer = dLedgerCommitLog.getdLedgerServer(); + DLedgerMmapFileStore dLedgerMmapFileStore = (DLedgerMmapFileStore) dLedgerServer.getdLedgerStore(); + MmapFileList mmapFileList = dLedgerMmapFileStore.getDataFileList(); + Thread.sleep(1000); + Assert.assertEquals(20, mmapFileList.getMappedFiles().size()); + Assert.assertEquals(0, messageStore.getMinOffsetInQueue(topic, 0)); + Assert.assertEquals(1700, messageStore.getMaxOffsetInQueue(topic, 0)); + Assert.assertEquals(0, messageStore.dispatchBehindBytes()); + doGetMessages(messageStore, topic, 0, 1700, 0); + messageStore.shutdown(); + } + { + //Abnormal recover, left none commitlogs + DefaultMessageStore messageStore = createDledgerMessageStore(base, group, "n0", peers, null, true, 20); + DLedgerCommitLog dLedgerCommitLog = (DLedgerCommitLog) messageStore.getCommitLog(); + DLedgerServer dLedgerServer = dLedgerCommitLog.getdLedgerServer(); + DLedgerMmapFileStore dLedgerMmapFileStore = (DLedgerMmapFileStore) dLedgerServer.getdLedgerStore(); + MmapFileList mmapFileList = dLedgerMmapFileStore.getDataFileList(); + Thread.sleep(1000); + Assert.assertEquals(0, mmapFileList.getMappedFiles().size()); + Assert.assertEquals(0, messageStore.getMinOffsetInQueue(topic, 0)); + Assert.assertEquals(0, messageStore.getMaxOffsetInQueue(topic, 0)); + Assert.assertEquals(0, messageStore.dispatchBehindBytes()); + messageStore.shutdown(); + } + } + + + + @Test + public void testRecover() throws Exception { + String base = createBaseDir(); + String peers = String.format("n0-localhost:%d", nextPort()); + String group = UUID.randomUUID().toString(); + String topic = UUID.randomUUID().toString(); + { + DefaultMessageStore messageStore = createDledgerMessageStore(base, group, "n0", peers, null, false, 0); + Thread.sleep(1000); + doPutMessages(messageStore, topic, 0, 1000, 0); + Thread.sleep(100); + Assert.assertEquals(0, messageStore.getMinOffsetInQueue(topic, 0)); + Assert.assertEquals(1000, messageStore.getMaxOffsetInQueue(topic, 0)); + Assert.assertEquals(0, messageStore.dispatchBehindBytes()); + doGetMessages(messageStore, topic, 0, 1000, 0); + messageStore.shutdown(); + } + + { + //normal recover + DefaultMessageStore messageStore = createDledgerMessageStore(base, group, "n0", peers, null, false, 0); + Assert.assertEquals(0, messageStore.getMinOffsetInQueue(topic, 0)); + Assert.assertEquals(1000, messageStore.getMaxOffsetInQueue(topic, 0)); + Assert.assertEquals(0, messageStore.dispatchBehindBytes()); + doGetMessages(messageStore, topic, 0, 1000, 0); + messageStore.shutdown(); + } + + { + //abnormal recover + DefaultMessageStore messageStore = createDledgerMessageStore(base, group, "n0", peers, null, true, 0); + Assert.assertEquals(0, messageStore.getMinOffsetInQueue(topic, 0)); + Assert.assertEquals(1000, messageStore.getMaxOffsetInQueue(topic, 0)); + Assert.assertEquals(0, messageStore.dispatchBehindBytes()); + doGetMessages(messageStore, topic, 0, 1000, 0); + messageStore.shutdown(); + } + } + + + + @Test + public void testPutAndGetMessage() throws Exception { + String base = createBaseDir(); + String peers = String.format("n0-localhost:%d", nextPort()); + String group = UUID.randomUUID().toString(); + DefaultMessageStore messageStore = createDledgerMessageStore(base, group, "n0", peers, null, false, 0); + Thread.sleep(1000); + String topic = UUID.randomUUID().toString(); + + List results = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + MessageExtBrokerInner msgInner = buildMessage(); + msgInner.setTopic(topic); + msgInner.setQueueId(0); + PutMessageResult putMessageResult = messageStore.putMessage(msgInner); + results.add(putMessageResult); + Assert.assertEquals(PutMessageStatus.PUT_OK, putMessageResult.getPutMessageStatus()); + Assert.assertEquals(i, putMessageResult.getAppendMessageResult().getLogicsOffset()); + } + Thread.sleep(100); + Assert.assertEquals(0, messageStore.getMinOffsetInQueue(topic, 0)); + Assert.assertEquals(10, messageStore.getMaxOffsetInQueue(topic, 0)); + Assert.assertEquals(0, messageStore.dispatchBehindBytes()); + GetMessageResult getMessageResult = messageStore.getMessage("group", topic, 0, 0, 32, null); + Assert.assertEquals(GetMessageStatus.FOUND, getMessageResult.getStatus()); + + Assert.assertEquals(10, getMessageResult.getMessageBufferList().size()); + Assert.assertEquals(10, getMessageResult.getMessageMapedList().size()); + + for (int i = 0; i < results.size(); i++) { + ByteBuffer buffer = getMessageResult.getMessageBufferList().get(i); + MessageExt messageExt = MessageDecoder.decode(buffer); + Assert.assertEquals(i, messageExt.getQueueOffset()); + Assert.assertEquals(results.get(i).getAppendMessageResult().getMsgId(), messageExt.getMsgId()); + Assert.assertEquals(results.get(i).getAppendMessageResult().getWroteOffset(), messageExt.getCommitLogOffset()); + } + messageStore.destroy(); + messageStore.shutdown(); + } + + + @Test + public void testCommittedPos() throws Exception { + String peers = String.format("n0-localhost:%d;n1-localhost:%d", nextPort(), nextPort()); + String group = UUID.randomUUID().toString(); + DefaultMessageStore leaderStore = createDledgerMessageStore(createBaseDir(), group,"n0", peers, "n0", false, 0); + + String topic = UUID.randomUUID().toString(); + MessageExtBrokerInner msgInner = buildMessage(); + msgInner.setTopic(topic); + msgInner.setQueueId(0); + PutMessageResult putMessageResult = leaderStore.putMessage(msgInner); + Assert.assertEquals(PutMessageStatus.OS_PAGECACHE_BUSY, putMessageResult.getPutMessageStatus()); + + Thread.sleep(1000); + + Assert.assertEquals(0, leaderStore.getCommitLog().getMaxOffset()); + Assert.assertEquals(0, leaderStore.getMaxOffsetInQueue(topic, 0)); + + + DefaultMessageStore followerStore = createDledgerMessageStore(createBaseDir(), group,"n1", peers, "n0", false, 0); + Thread.sleep(2000); + + Assert.assertEquals(1, leaderStore.getMaxOffsetInQueue(topic, 0)); + Assert.assertEquals(1, followerStore.getMaxOffsetInQueue(topic, 0)); + Assert.assertTrue(leaderStore.getCommitLog().getMaxOffset() > 0); + + + leaderStore.destroy(); + followerStore.destroy(); + + leaderStore.shutdown(); + followerStore.shutdown(); + } + + +} diff --git a/store/src/test/java/org/apache/rocketmq/store/dledger/MessageStoreTestBase.java b/store/src/test/java/org/apache/rocketmq/store/dledger/MessageStoreTestBase.java new file mode 100644 index 0000000000000000000000000000000000000000..e45178aaee9c50eba71ec355e48e6e6b589ef903 --- /dev/null +++ b/store/src/test/java/org/apache/rocketmq/store/dledger/MessageStoreTestBase.java @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.store.dledger; + +import io.openmessaging.storage.dledger.DLedgerConfig; +import io.openmessaging.storage.dledger.DLedgerServer; +import java.io.File; +import java.util.Arrays; +import org.apache.rocketmq.common.BrokerConfig; +import org.apache.rocketmq.common.message.MessageDecoder; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.store.DefaultMessageStore; +import org.apache.rocketmq.store.GetMessageResult; +import org.apache.rocketmq.store.MessageExtBrokerInner; +import org.apache.rocketmq.store.MessageStore; +import org.apache.rocketmq.store.PutMessageResult; +import org.apache.rocketmq.store.PutMessageStatus; +import org.apache.rocketmq.store.StoreTestBase; +import org.apache.rocketmq.store.config.FlushDiskType; +import org.apache.rocketmq.store.config.MessageStoreConfig; +import org.apache.rocketmq.store.config.StorePathConfigHelper; +import org.apache.rocketmq.store.stats.BrokerStatsManager; +import org.junit.Assert; + +public class MessageStoreTestBase extends StoreTestBase { + + protected DefaultMessageStore createDledgerMessageStore(String base, String group, String selfId, String peers, String leaderId, boolean createAbort, int deleteFileNum) throws Exception { + System.setProperty("dledger.disk.ratio.check", "0.95"); + System.setProperty("dledger.disk.ratio.clean", "0.95"); + baseDirs.add(base); + MessageStoreConfig storeConfig = new MessageStoreConfig(); + storeConfig.setMapedFileSizeCommitLog(1024 * 100); + storeConfig.setMapedFileSizeConsumeQueue(1024); + storeConfig.setMaxHashSlotNum(100); + storeConfig.setMaxIndexNum(100 * 10); + storeConfig.setStorePathRootDir(base); + storeConfig.setStorePathCommitLog(base + File.separator + "commitlog"); + storeConfig.setFlushDiskType(FlushDiskType.ASYNC_FLUSH); + + storeConfig.setEnableDLegerCommitLog(true); + storeConfig.setdLegerGroup(group); + storeConfig.setdLegerPeers(peers); + storeConfig.setdLegerSelfId(selfId); + DefaultMessageStore defaultMessageStore = new DefaultMessageStore(storeConfig, new BrokerStatsManager("DLedgerCommitlogTest"), (topic, queueId, logicOffset, tagsCode, msgStoreTime, filterBitMap, properties) -> { + + }, new BrokerConfig()); + DLedgerServer dLegerServer = ((DLedgerCommitLog) defaultMessageStore.getCommitLog()).getdLedgerServer(); + if (leaderId != null) { + dLegerServer.getdLedgerConfig().setEnableLeaderElector(false); + if (selfId.equals(leaderId)) { + dLegerServer.getMemberState().changeToLeader(-1); + } else { + dLegerServer.getMemberState().changeToFollower(-1, leaderId); + } + + } + if (createAbort) { + String fileName = StorePathConfigHelper.getAbortFile(storeConfig.getStorePathRootDir()); + makeSureFileExists(fileName); + } + if (deleteFileNum > 0) { + DLedgerConfig config = dLegerServer.getdLedgerConfig(); + if (deleteFileNum > 0) { + File dir = new File(config.getDataStorePath()); + File[] files = dir.listFiles(); + if (files != null) { + Arrays.sort(files); + for (int i = files.length - 1; i >= 0; i--) { + File file = files[i]; + file.delete(); + if (files.length - i >= deleteFileNum) { + break; + } + } + } + } + } + Assert.assertTrue(defaultMessageStore.load()); + defaultMessageStore.start(); + return defaultMessageStore; + } + + + protected DefaultMessageStore createMessageStore(String base, boolean createAbort) throws Exception { + baseDirs.add(base); + MessageStoreConfig storeConfig = new MessageStoreConfig(); + storeConfig.setMapedFileSizeCommitLog(1024 * 100); + storeConfig.setMapedFileSizeConsumeQueue(1024); + storeConfig.setMaxHashSlotNum(100); + storeConfig.setMaxIndexNum(100 * 10); + storeConfig.setStorePathRootDir(base); + storeConfig.setStorePathCommitLog(base + File.separator + "commitlog"); + storeConfig.setFlushDiskType(FlushDiskType.ASYNC_FLUSH); + DefaultMessageStore defaultMessageStore = new DefaultMessageStore(storeConfig, new BrokerStatsManager("CommitlogTest"), (topic, queueId, logicOffset, tagsCode, msgStoreTime, filterBitMap, properties) -> { + + }, new BrokerConfig()); + + if (createAbort) { + String fileName = StorePathConfigHelper.getAbortFile(storeConfig.getStorePathRootDir()); + makeSureFileExists(fileName); + } + Assert.assertTrue(defaultMessageStore.load()); + defaultMessageStore.start(); + return defaultMessageStore; + } + + protected void doPutMessages(MessageStore messageStore, String topic, int queueId, int num, long beginLogicsOffset) { + for (int i = 0; i < num; i++) { + MessageExtBrokerInner msgInner = buildMessage(); + msgInner.setTopic(topic); + msgInner.setQueueId(queueId); + PutMessageResult putMessageResult = messageStore.putMessage(msgInner); + Assert.assertEquals(PutMessageStatus.PUT_OK, putMessageResult.getPutMessageStatus()); + Assert.assertEquals(beginLogicsOffset + i, putMessageResult.getAppendMessageResult().getLogicsOffset()); + } + } + + protected void doGetMessages(MessageStore messageStore, String topic, int queueId, int num, long beginLogicsOffset) { + for (int i = 0; i < num; i++) { + GetMessageResult getMessageResult = messageStore.getMessage("group", topic, queueId, beginLogicsOffset + i, 3, null); + Assert.assertNotNull(getMessageResult); + Assert.assertTrue(!getMessageResult.getMessageBufferList().isEmpty()); + MessageExt messageExt = MessageDecoder.decode(getMessageResult.getMessageBufferList().get(0)); + Assert.assertEquals(beginLogicsOffset + i, messageExt.getQueueOffset()); + getMessageResult.release(); + } + } + +} diff --git a/store/src/test/java/org/apache/rocketmq/store/dledger/MixCommitlogTest.java b/store/src/test/java/org/apache/rocketmq/store/dledger/MixCommitlogTest.java new file mode 100644 index 0000000000000000000000000000000000000000..a5242a35105649954be83ab668880018096b37b7 --- /dev/null +++ b/store/src/test/java/org/apache/rocketmq/store/dledger/MixCommitlogTest.java @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.store.dledger; + +import java.util.UUID; +import org.apache.rocketmq.store.DefaultMessageStore; +import org.apache.rocketmq.store.StoreTestBase; +import org.apache.rocketmq.store.config.StorePathConfigHelper; +import org.junit.Assert; +import org.junit.Test; + +public class MixCommitlogTest extends MessageStoreTestBase { + + + + @Test + public void testFallBehindCQ() throws Exception { + String base = createBaseDir(); + String topic = UUID.randomUUID().toString(); + String peers = String.format("n0-localhost:%d", nextPort()); + String group = UUID.randomUUID().toString(); + { + DefaultMessageStore originalStore = createMessageStore(base, false); + doPutMessages(originalStore, topic, 0, 1000, 0); + Assert.assertEquals(11, originalStore.getMaxPhyOffset()/originalStore.getMessageStoreConfig().getMapedFileSizeCommitLog()); + Thread.sleep(500); + Assert.assertEquals(0, originalStore.getMinOffsetInQueue(topic, 0)); + Assert.assertEquals(1000, originalStore.getMaxOffsetInQueue(topic, 0)); + Assert.assertEquals(0, originalStore.dispatchBehindBytes()); + doGetMessages(originalStore, topic, 0, 1000, 0); + originalStore.shutdown(); + } + //delete the cq files + { + StoreTestBase.deleteFile(StorePathConfigHelper.getStorePathConsumeQueue(base)); + } + { + DefaultMessageStore dledgerStore = createDledgerMessageStore(base, group, "n0", peers, null, true, 0); + Thread.sleep(2000); + Assert.assertEquals(0, dledgerStore.getMinOffsetInQueue(topic, 0)); + Assert.assertEquals(1000, dledgerStore.getMaxOffsetInQueue(topic, 0)); + Assert.assertEquals(0, dledgerStore.dispatchBehindBytes()); + doGetMessages(dledgerStore, topic, 0, 1000, 0); + doPutMessages(dledgerStore, topic, 0, 1000, 1000); + Thread.sleep(500); + Assert.assertEquals(0, dledgerStore.getMinOffsetInQueue(topic, 0)); + Assert.assertEquals(2000, dledgerStore.getMaxOffsetInQueue(topic, 0)); + Assert.assertEquals(0, dledgerStore.dispatchBehindBytes()); + doGetMessages(dledgerStore, topic, 0, 2000, 0); + dledgerStore.shutdown(); + } + } + + + + @Test + public void testPutAndGet() throws Exception { + String base = createBaseDir(); + String topic = UUID.randomUUID().toString(); + String peers = String.format("n0-localhost:%d", nextPort()); + String group = UUID.randomUUID().toString(); + + long dividedOffset; + { + DefaultMessageStore originalStore = createMessageStore(base, false); + doPutMessages(originalStore, topic, 0, 1000, 0); + Thread.sleep(500); + Assert.assertEquals(0, originalStore.getMinOffsetInQueue(topic, 0)); + Assert.assertEquals(1000, originalStore.getMaxOffsetInQueue(topic, 0)); + Assert.assertEquals(0, originalStore.dispatchBehindBytes()); + dividedOffset = originalStore.getCommitLog().getMaxOffset(); + dividedOffset = dividedOffset - dividedOffset % originalStore.getMessageStoreConfig().getMapedFileSizeCommitLog() + originalStore.getMessageStoreConfig().getMapedFileSizeCommitLog(); + doGetMessages(originalStore, topic, 0, 1000, 0); + originalStore.shutdown(); + } + { + DefaultMessageStore recoverOriginalStore = createMessageStore(base, true); + Thread.sleep(500); + Assert.assertEquals(0, recoverOriginalStore.getMinOffsetInQueue(topic, 0)); + Assert.assertEquals(1000, recoverOriginalStore.getMaxOffsetInQueue(topic, 0)); + Assert.assertEquals(0, recoverOriginalStore.dispatchBehindBytes()); + doGetMessages(recoverOriginalStore, topic, 0, 1000, 0); + recoverOriginalStore.shutdown(); + } + { + DefaultMessageStore dledgerStore = createDledgerMessageStore(base, group, "n0", peers, null, true, 0); + DLedgerCommitLog dLedgerCommitLog = (DLedgerCommitLog) dledgerStore.getCommitLog(); + Assert.assertFalse(dLedgerCommitLog.getdLedgerServer().getdLedgerConfig().isEnableDiskForceClean()); + Assert.assertEquals(dividedOffset, dLedgerCommitLog.getDividedCommitlogOffset()); + Assert.assertEquals(0, dledgerStore.dispatchBehindBytes()); + Assert.assertEquals(dividedOffset, dLedgerCommitLog.getMaxOffset()); + Thread.sleep(2000); + doPutMessages(dledgerStore, topic, 0, 1000, 1000); + Thread.sleep(500); + Assert.assertEquals(0, dledgerStore.getMinOffsetInQueue(topic, 0)); + Assert.assertEquals(2000, dledgerStore.getMaxOffsetInQueue(topic, 0)); + Assert.assertEquals(0, dledgerStore.dispatchBehindBytes()); + doGetMessages(dledgerStore, topic, 0, 2000, 0); + dledgerStore.shutdown(); + } + { + DefaultMessageStore recoverDledgerStore = createDledgerMessageStore(base, group, "n0", peers, null, true, 0); + DLedgerCommitLog dLedgerCommitLog = (DLedgerCommitLog) recoverDledgerStore.getCommitLog(); + Assert.assertFalse(dLedgerCommitLog.getdLedgerServer().getdLedgerConfig().isEnableDiskForceClean()); + Assert.assertEquals(dividedOffset, dLedgerCommitLog.getDividedCommitlogOffset()); + Thread.sleep(2000); + doPutMessages(recoverDledgerStore, topic, 0, 1000, 2000); + Thread.sleep(500); + Assert.assertEquals(0, recoverDledgerStore.getMinOffsetInQueue(topic, 0)); + Assert.assertEquals(3000, recoverDledgerStore.getMaxOffsetInQueue(topic, 0)); + Assert.assertEquals(0, recoverDledgerStore.dispatchBehindBytes()); + doGetMessages(recoverDledgerStore, topic, 0, 3000, 0); + recoverDledgerStore.shutdown(); + } + } + + @Test + public void testDeleteExpiredFiles() throws Exception { + String base = createBaseDir(); + String topic = UUID.randomUUID().toString(); + String peers = String.format("n0-localhost:%d", nextPort()); + String group = UUID.randomUUID().toString(); + + long dividedOffset; + { + DefaultMessageStore originalStore = createMessageStore(base, false); + doPutMessages(originalStore, topic, 0, 1000, 0); + Thread.sleep(500); + Assert.assertEquals(0, originalStore.getMinOffsetInQueue(topic, 0)); + Assert.assertEquals(1000, originalStore.getMaxOffsetInQueue(topic, 0)); + Assert.assertEquals(0, originalStore.dispatchBehindBytes()); + dividedOffset = originalStore.getCommitLog().getMaxOffset(); + dividedOffset = dividedOffset - dividedOffset % originalStore.getMessageStoreConfig().getMapedFileSizeCommitLog() + originalStore.getMessageStoreConfig().getMapedFileSizeCommitLog(); + originalStore.shutdown(); + } + long maxPhysicalOffset; + { + DefaultMessageStore dledgerStore = createDledgerMessageStore(base, group, "n0", peers, null, true, 0); + DLedgerCommitLog dLedgerCommitLog = (DLedgerCommitLog) dledgerStore.getCommitLog(); + Assert.assertTrue(dledgerStore.getMessageStoreConfig().isCleanFileForciblyEnable()); + Assert.assertFalse(dLedgerCommitLog.getdLedgerServer().getdLedgerConfig().isEnableDiskForceClean()); + Assert.assertEquals(dividedOffset, dLedgerCommitLog.getDividedCommitlogOffset()); + Thread.sleep(2000); + doPutMessages(dledgerStore, topic, 0, 1000, 1000); + Thread.sleep(500); + Assert.assertEquals(0, dledgerStore.getMinOffsetInQueue(topic, 0)); + Assert.assertEquals(2000, dledgerStore.getMaxOffsetInQueue(topic, 0)); + Assert.assertEquals(0, dledgerStore.dispatchBehindBytes()); + Assert.assertEquals(0, dledgerStore.getMinPhyOffset()); + maxPhysicalOffset = dledgerStore.getMaxPhyOffset(); + Assert.assertTrue(maxPhysicalOffset > 0); + + doGetMessages(dledgerStore, topic, 0, 2000, 0); + + for (int i = 0; i < 100; i++) { + dledgerStore.getCommitLog().deleteExpiredFile(System.currentTimeMillis(), 0, 0, true); + } + Assert.assertEquals(dividedOffset, dledgerStore.getMinPhyOffset()); + Assert.assertEquals(maxPhysicalOffset, dledgerStore.getMaxPhyOffset()); + for (int i = 0; i < 100; i++) { + Assert.assertEquals(Integer.MAX_VALUE, dledgerStore.getCommitLog().deleteExpiredFile(System.currentTimeMillis(), 0, 0, true)); + } + Assert.assertEquals(dividedOffset, dledgerStore.getMinPhyOffset()); + Assert.assertEquals(maxPhysicalOffset, dledgerStore.getMaxPhyOffset()); + + Assert.assertTrue(dledgerStore.getMessageStoreConfig().isCleanFileForciblyEnable()); + Assert.assertTrue(dLedgerCommitLog.getdLedgerServer().getdLedgerConfig().isEnableDiskForceClean()); + + //Test fresh + dledgerStore.getMessageStoreConfig().setCleanFileForciblyEnable(false); + for (int i = 0; i < 100; i++) { + Assert.assertEquals(Integer.MAX_VALUE, dledgerStore.getCommitLog().deleteExpiredFile(System.currentTimeMillis(), 0, 0, true)); + } + Assert.assertFalse(dLedgerCommitLog.getdLedgerServer().getdLedgerConfig().isEnableDiskForceClean()); + doGetMessages(dledgerStore, topic, 0, 1000, 1000); + dledgerStore.shutdown(); + } + } +} diff --git a/store/src/test/resources/logback-test.xml b/store/src/test/resources/logback-test.xml index 875b6715ac993f55f30946d038231aa3a85cd17d..a033816ddad7359e1edcdb5a4ae5a0cd9aa001a1 100644 --- a/store/src/test/resources/logback-test.xml +++ b/store/src/test/resources/logback-test.xml @@ -28,7 +28,7 @@ - + diff --git a/test/src/main/java/org/apache/rocketmq/test/factory/ConsumerFactory.java b/test/src/main/java/org/apache/rocketmq/test/factory/ConsumerFactory.java index 0f94f3606c547c9b5f4c3cf529ffb4e63f208cb0..48508462668e8f523d5e5ab6e389c93d9a3e49ef 100644 --- a/test/src/main/java/org/apache/rocketmq/test/factory/ConsumerFactory.java +++ b/test/src/main/java/org/apache/rocketmq/test/factory/ConsumerFactory.java @@ -17,6 +17,8 @@ package org.apache.rocketmq.test.factory; +import java.util.UUID; +import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer; import org.apache.rocketmq.client.consumer.MessageSelector; import org.apache.rocketmq.test.client.rmq.RMQBroadCastConsumer; import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer; @@ -60,4 +62,12 @@ public class ConsumerFactory { consumer.start(); return consumer; } + + public static DefaultMQPullConsumer getRMQPullConsumer(String nsAddr, String consumerGroup) throws Exception { + DefaultMQPullConsumer defaultMQPullConsumer = new DefaultMQPullConsumer(consumerGroup); + defaultMQPullConsumer.setInstanceName(UUID.randomUUID().toString()); + defaultMQPullConsumer.setNamesrvAddr(nsAddr); + defaultMQPullConsumer.start(); + return defaultMQPullConsumer; + } } diff --git a/test/src/main/java/org/apache/rocketmq/test/factory/ProducerFactory.java b/test/src/main/java/org/apache/rocketmq/test/factory/ProducerFactory.java index 66767cc9f0d682da8112ff2c989186b745aecd3c..76e6e09d0bf7f88899960b4197d24ea1e48cbd0c 100644 --- a/test/src/main/java/org/apache/rocketmq/test/factory/ProducerFactory.java +++ b/test/src/main/java/org/apache/rocketmq/test/factory/ProducerFactory.java @@ -17,6 +17,7 @@ package org.apache.rocketmq.test.factory; +import java.util.UUID; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.test.util.RandomUtil; @@ -25,6 +26,7 @@ public class ProducerFactory { public static DefaultMQProducer getRMQProducer(String ns) { DefaultMQProducer producer = new DefaultMQProducer(RandomUtil.getStringByUUID()); + producer.setInstanceName(UUID.randomUUID().toString()); producer.setNamesrvAddr(ns); try { producer.start(); diff --git a/test/src/main/java/org/apache/rocketmq/test/util/MQAdmin.java b/test/src/main/java/org/apache/rocketmq/test/util/MQAdmin.java index bd151d0561ef6e37732857ce75aca599184a996e..8863ee3e52df1911385031b6645a87ef3918bd04 100644 --- a/test/src/main/java/org/apache/rocketmq/test/util/MQAdmin.java +++ b/test/src/main/java/org/apache/rocketmq/test/util/MQAdmin.java @@ -19,6 +19,7 @@ package org.apache.rocketmq.test.util; import java.util.HashMap; import java.util.Set; +import java.util.UUID; import org.apache.log4j.Logger; import org.apache.rocketmq.common.admin.TopicStatsTable; import org.apache.rocketmq.common.protocol.body.ClusterInfo; @@ -40,6 +41,7 @@ public class MQAdmin { int queueNum, int waitTimeSec) { boolean createResult = false; DefaultMQAdminExt mqAdminExt = new DefaultMQAdminExt(); + mqAdminExt.setInstanceName(UUID.randomUUID().toString()); mqAdminExt.setNamesrvAddr(nameSrvAddr); try { mqAdminExt.start(); diff --git a/test/src/test/java/org/apache/rocketmq/test/base/BaseConf.java b/test/src/test/java/org/apache/rocketmq/test/base/BaseConf.java index 780bd47506d254c0366bcaeb77ef489013a25742..45c6750dfdf98a370913f910ca523ea02122b201 100644 --- a/test/src/test/java/org/apache/rocketmq/test/base/BaseConf.java +++ b/test/src/test/java/org/apache/rocketmq/test/base/BaseConf.java @@ -36,13 +36,13 @@ import org.apache.rocketmq.test.util.MQAdmin; import org.apache.rocketmq.test.util.MQRandomUtils; public class BaseConf { - protected static String nsAddr; + public static String nsAddr; protected static String broker1Name; protected static String broker2Name; protected static String clusterName; protected static int brokerNum; protected static int waitTime = 5; - protected static int consumeTime = 5 * 60 * 1000; + protected static int consumeTime = 2 * 60 * 1000; protected static NamesrvController namesrvController; protected static BrokerController brokerController1; protected static BrokerController brokerController2; diff --git a/test/src/test/java/org/apache/rocketmq/test/base/IntegrationTestBase.java b/test/src/test/java/org/apache/rocketmq/test/base/IntegrationTestBase.java index 9dcb2d8b755cb1e521276ea05b437034a6ebf846..0eacd584fadd0843129911ac33e1891f2b64d1e1 100644 --- a/test/src/test/java/org/apache/rocketmq/test/base/IntegrationTestBase.java +++ b/test/src/test/java/org/apache/rocketmq/test/base/IntegrationTestBase.java @@ -47,9 +47,14 @@ public class IntegrationTestBase { protected static final List BROKER_CONTROLLERS = new ArrayList<>(); protected static final List NAMESRV_CONTROLLERS = new ArrayList<>(); protected static int topicCreateTime = 30 * 1000; - protected static final int COMMIT_LOG_SIZE = 1024 * 1024 * 256; + protected static final int COMMIT_LOG_SIZE = 1024 * 1024 * 100; protected static final int INDEX_NUM = 1000; + private static final AtomicInteger port = new AtomicInteger(40000); + + public static synchronized int nextPort() { + return port.addAndGet(random.nextInt(10) + 10); + } protected static Random random = new Random(); static { @@ -87,7 +92,7 @@ public class IntegrationTestBase { } - private static String createBaseDir() { + public static String createBaseDir() { String baseDir = System.getProperty("user.home") + SEP + "unitteststore-" + UUID.randomUUID(); final File file = new File(baseDir); if (file.exists()) { @@ -105,14 +110,14 @@ public class IntegrationTestBase { namesrvConfig.setKvConfigPath(baseDir + SEP + "namesrv" + SEP + "kvConfig.json"); namesrvConfig.setConfigStorePath(baseDir + SEP + "namesrv" + SEP + "namesrv.properties"); - nameServerNettyServerConfig.setListenPort(9000 + random.nextInt(1000)); + nameServerNettyServerConfig.setListenPort(nextPort()); NamesrvController namesrvController = new NamesrvController(namesrvConfig, nameServerNettyServerConfig); try { Assert.assertTrue(namesrvController.initialize()); logger.info("Name Server Start:{}", nameServerNettyServerConfig.getListenPort()); namesrvController.start(); } catch (Exception e) { - logger.info("Name Server start failed"); + logger.info("Name Server start failed", e); System.exit(1); } NAMESRV_CONTROLLERS.add(namesrvController); @@ -123,8 +128,6 @@ public class IntegrationTestBase { public static BrokerController createAndStartBroker(String nsAddr) { String baseDir = createBaseDir(); BrokerConfig brokerConfig = new BrokerConfig(); - NettyServerConfig nettyServerConfig = new NettyServerConfig(); - NettyClientConfig nettyClientConfig = new NettyClientConfig(); MessageStoreConfig storeConfig = new MessageStoreConfig(); brokerConfig.setBrokerName(BROKER_NAME_PREFIX + BROKER_INDEX.getAndIncrement()); brokerConfig.setBrokerIP1("127.0.0.1"); @@ -132,18 +135,25 @@ public class IntegrationTestBase { brokerConfig.setEnablePropertyFilter(true); storeConfig.setStorePathRootDir(baseDir); storeConfig.setStorePathCommitLog(baseDir + SEP + "commitlog"); - storeConfig.setHaListenPort(8000 + random.nextInt(1000)); storeConfig.setMapedFileSizeCommitLog(COMMIT_LOG_SIZE); storeConfig.setMaxIndexNum(INDEX_NUM); storeConfig.setMaxHashSlotNum(INDEX_NUM * 4); - nettyServerConfig.setListenPort(10000 + random.nextInt(1000)); + return createAndStartBroker(storeConfig, brokerConfig); + + } + + public static BrokerController createAndStartBroker(MessageStoreConfig storeConfig, BrokerConfig brokerConfig) { + NettyServerConfig nettyServerConfig = new NettyServerConfig(); + NettyClientConfig nettyClientConfig = new NettyClientConfig(); + nettyServerConfig.setListenPort(nextPort()); + storeConfig.setHaListenPort(nextPort()); BrokerController brokerController = new BrokerController(brokerConfig, nettyServerConfig, nettyClientConfig, storeConfig); try { Assert.assertTrue(brokerController.initialize()); logger.info("Broker Start name:{} addr:{}", brokerConfig.getBrokerName(), brokerController.getBrokerAddr()); brokerController.start(); - } catch (Exception e) { - logger.info("Broker start failed"); + } catch (Throwable t) { + logger.error("Broker start failed, will exit", t); System.exit(1); } BROKER_CONTROLLERS.add(brokerController); @@ -179,15 +189,7 @@ public class IntegrationTestBase { if (!file.exists()) { return; } - if (file.isFile()) { - file.delete(); - } else if (file.isDirectory()) { - File[] files = file.listFiles(); - for (File file1 : files) { - deleteFile(file1); - } - file.delete(); - } + UtilAll.deleteFile(file); } } diff --git a/test/src/test/java/org/apache/rocketmq/test/base/dledger/DLedgerProduceAndConsumeIT.java b/test/src/test/java/org/apache/rocketmq/test/base/dledger/DLedgerProduceAndConsumeIT.java new file mode 100644 index 0000000000000000000000000000000000000000..e2d9db1983e7a4254ffaa6835e90e66905ac59b8 --- /dev/null +++ b/test/src/test/java/org/apache/rocketmq/test/base/dledger/DLedgerProduceAndConsumeIT.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.test.base.dledger; + +import java.util.UUID; +import org.apache.rocketmq.broker.BrokerController; +import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer; +import org.apache.rocketmq.client.consumer.PullResult; +import org.apache.rocketmq.client.consumer.PullStatus; +import org.apache.rocketmq.client.producer.DefaultMQProducer; +import org.apache.rocketmq.client.producer.SendResult; +import org.apache.rocketmq.client.producer.SendStatus; +import org.apache.rocketmq.common.BrokerConfig; +import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.store.config.BrokerRole; +import org.apache.rocketmq.store.config.MessageStoreConfig; +import org.apache.rocketmq.test.base.BaseConf; +import org.apache.rocketmq.test.base.IntegrationTestBase; +import org.apache.rocketmq.test.factory.ConsumerFactory; +import org.apache.rocketmq.test.factory.ProducerFactory; +import org.junit.Assert; +import org.junit.Ignore; +import org.junit.Test; + +import static org.apache.rocketmq.test.base.IntegrationTestBase.nextPort; +import static sun.util.locale.BaseLocale.SEP; + +public class DLedgerProduceAndConsumeIT { + + public BrokerConfig buildBrokerConfig(String cluster, String brokerName) { + BrokerConfig brokerConfig = new BrokerConfig(); + brokerConfig.setBrokerClusterName(cluster); + brokerConfig.setBrokerName(brokerName); + brokerConfig.setBrokerIP1("127.0.0.1"); + brokerConfig.setNamesrvAddr(BaseConf.nsAddr); + return brokerConfig; + } + + public MessageStoreConfig buildStoreConfig(String brokerName, String peers, String selfId) { + MessageStoreConfig storeConfig = new MessageStoreConfig(); + String baseDir = IntegrationTestBase.createBaseDir(); + storeConfig.setStorePathRootDir(baseDir); + storeConfig.setStorePathCommitLog(baseDir + SEP + "commitlog"); + storeConfig.setHaListenPort(nextPort()); + storeConfig.setMapedFileSizeCommitLog(10 * 1024 * 1024); + storeConfig.setEnableDLegerCommitLog(true); + storeConfig.setdLegerGroup(brokerName); + storeConfig.setdLegerSelfId(selfId); + storeConfig.setdLegerPeers(peers); + return storeConfig; + } + + @Test + public void testProduceAndConsume() throws Exception { + String cluster = UUID.randomUUID().toString(); + String brokerName = UUID.randomUUID().toString(); + String selfId = "n0"; + String peers = String.format("n0-localhost:%d", nextPort()); + BrokerConfig brokerConfig = buildBrokerConfig(cluster, brokerName); + MessageStoreConfig storeConfig = buildStoreConfig(brokerName, peers, selfId); + BrokerController brokerController = IntegrationTestBase.createAndStartBroker(storeConfig, brokerConfig); + Thread.sleep(3000); + + Assert.assertEquals(BrokerRole.SYNC_MASTER, storeConfig.getBrokerRole()); + + + String topic = UUID.randomUUID().toString(); + String consumerGroup = UUID.randomUUID().toString(); + IntegrationTestBase.initTopic(topic, BaseConf.nsAddr, cluster, 1); + DefaultMQProducer producer = ProducerFactory.getRMQProducer(BaseConf.nsAddr); + DefaultMQPullConsumer consumer = ConsumerFactory.getRMQPullConsumer(BaseConf.nsAddr, consumerGroup); + + for (int i = 0; i < 10; i++) { + Message message = new Message(); + message.setTopic(topic); + message.setBody(("Hello" + i).getBytes()); + SendResult sendResult = producer.send(message); + Assert.assertEquals(SendStatus.SEND_OK, sendResult.getSendStatus()); + Assert.assertEquals(0, sendResult.getMessageQueue().getQueueId()); + Assert.assertEquals(brokerName, sendResult.getMessageQueue().getBrokerName()); + Assert.assertEquals(i, sendResult.getQueueOffset()); + Assert.assertNotNull(sendResult.getMsgId()); + Assert.assertNotNull(sendResult.getOffsetMsgId()); + } + + Thread.sleep(500); + Assert.assertEquals(0, brokerController.getMessageStore().getMinOffsetInQueue(topic, 0)); + Assert.assertEquals(10, brokerController.getMessageStore().getMaxOffsetInQueue(topic, 0)); + + MessageQueue messageQueue = new MessageQueue(topic, brokerName, 0); + PullResult pullResult= consumer.pull(messageQueue, "*", 0, 32); + Assert.assertEquals(PullStatus.FOUND, pullResult.getPullStatus()); + Assert.assertEquals(10, pullResult.getMsgFoundList().size()); + + for (int i = 0; i < 10; i++) { + MessageExt messageExt = pullResult.getMsgFoundList().get(i); + Assert.assertEquals(i, messageExt.getQueueOffset()); + Assert.assertArrayEquals(("Hello" + i).getBytes(), messageExt.getBody()); + } + + producer.shutdown(); + consumer.shutdown(); + brokerController.shutdown(); + } +} diff --git a/test/src/test/java/org/apache/rocketmq/test/client/consumer/broadcast/order/OrderMsgBroadCastIT.java b/test/src/test/java/org/apache/rocketmq/test/client/consumer/broadcast/order/OrderMsgBroadCastIT.java index 9294c3fd93411e0226c4e6a09fe2f17a33dabd1c..1d7ea2033acc039a05b028bb5a29f4c04c3550f8 100644 --- a/test/src/test/java/org/apache/rocketmq/test/client/consumer/broadcast/order/OrderMsgBroadCastIT.java +++ b/test/src/test/java/org/apache/rocketmq/test/client/consumer/broadcast/order/OrderMsgBroadCastIT.java @@ -29,10 +29,15 @@ import org.apache.rocketmq.test.util.TestUtils; import org.apache.rocketmq.test.util.VerifyUtils; import org.junit.After; import org.junit.Before; +import org.junit.Ignore; import org.junit.Test; import static com.google.common.truth.Truth.assertThat; +/** + * Currently, dose not support the ordered broadcast message + */ +@Ignore public class OrderMsgBroadCastIT extends BaseBroadCastIT { private static Logger logger = Logger.getLogger(OrderMsgBroadCastIT.class); private RMQNormalProducer producer = null; diff --git a/test/src/test/java/org/apache/rocketmq/test/delay/NormalMsgDelayIT.java b/test/src/test/java/org/apache/rocketmq/test/delay/NormalMsgDelayIT.java index 8cb0f41c1c01fa33f4118147c8fce4d63a027d26..b97b0637b043b243f791188d5a1b10c981fe436c 100644 --- a/test/src/test/java/org/apache/rocketmq/test/delay/NormalMsgDelayIT.java +++ b/test/src/test/java/org/apache/rocketmq/test/delay/NormalMsgDelayIT.java @@ -51,7 +51,8 @@ public class NormalMsgDelayIT extends DelayConf { } @Test - public void testDelayLevell() { + public void testDelayLevel1() throws Exception { + Thread.sleep(3000); int delayLevel = 1; List delayMsgs = MQMessageFactory.getDelayMsg(topic, delayLevel, msgSize); producer.send(delayMsgs);