diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java index e0a3b69923053cb2ef651fccf3ad92b73cc890f0..94ebe4f12684543b13fa91c9640294d7cf3454fa 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java @@ -54,7 +54,9 @@ public class BrokerStartup { public static BrokerController start(BrokerController controller) { try { + controller.start(); + String tip = "The broker[" + controller.getBrokerConfig().getBrokerName() + ", " + controller.getBrokerAddr() + "] boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer(); @@ -242,7 +244,7 @@ public class BrokerStartup { System.setProperty("rocketmq.namesrv.domain.subgroup", rmqAddressServerSubGroup); } - public static Options buildCommandlineOptions(final Options options) { + private static Options buildCommandlineOptions(final Options options) { Option opt = new Option("c", "configFile", true, "Broker config properties file"); opt.setRequired(false); options.addOption(opt); diff --git a/broker/src/test/java/org/apache/rocketmq/broker/BrokerStartupTest.java b/broker/src/test/java/org/apache/rocketmq/broker/BrokerStartupTest.java index a5ad3ac39e2f3cb68ce835086f110bf80c2237fa..c8da08d28dfb041a145b56a0854d9497eef2b107 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/BrokerStartupTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/BrokerStartupTest.java @@ -25,6 +25,8 @@ import org.junit.Test; public class BrokerStartupTest { + private String storePathRootDir = "."; + @Test public void testProperties2SystemEnv() throws NoSuchMethodException, InvocationTargetException, IllegalAccessException { @@ -36,5 +38,4 @@ public class BrokerStartupTest { method.invoke(null, properties); Assert.assertEquals("value", System.getProperty("rocketmq.namesrv.domain")); } - } \ No newline at end of file diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java index ffa8dbc2a133325835d7b0e879d2c1a164f8483a..59ef49045a20f5f032aefdbed0a436574abb1620 100644 --- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java @@ -16,6 +16,25 @@ */ package org.apache.rocketmq.store; +import java.io.File; +import java.io.IOException; +import java.io.RandomAccessFile; +import java.net.SocketAddress; +import java.nio.ByteBuffer; +import java.nio.channels.FileLock; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; import org.apache.rocketmq.common.BrokerConfig; import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.ServiceThread; @@ -39,24 +58,6 @@ import org.apache.rocketmq.store.stats.BrokerStatsManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.File; -import java.io.IOException; -import java.net.SocketAddress; -import java.nio.ByteBuffer; -import java.util.Collections; -import java.util.HashMap; -import java.util.Iterator; -import java.util.LinkedList; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; - import static org.apache.rocketmq.store.config.BrokerRole.SLAVE; public class DefaultMessageStore implements MessageStore { @@ -105,6 +106,10 @@ public class DefaultMessageStore implements MessageStore { private final LinkedList dispatcherList; + private RandomAccessFile lockFile; + + private FileLock lock; + public DefaultMessageStore(final MessageStoreConfig messageStoreConfig, final BrokerStatsManager brokerStatsManager, final MessageArrivingListener messageArrivingListener, final BrokerConfig brokerConfig) throws IOException { this.messageArrivingListener = messageArrivingListener; @@ -139,6 +144,10 @@ public class DefaultMessageStore implements MessageStore { this.dispatcherList = new LinkedList<>(); this.dispatcherList.addLast(new CommitLogDispatcherBuildConsumeQueue()); this.dispatcherList.addLast(new CommitLogDispatcherBuildIndex()); + + File file = new File(StorePathConfigHelper.getLockFile(messageStoreConfig.getStorePathRootDir())); + MappedFile.ensureDirOK(file.getParent()); + lockFile = new RandomAccessFile(file, "rw"); } public void truncateDirtyLogicFiles(long phyOffset) { @@ -197,6 +206,15 @@ public class DefaultMessageStore implements MessageStore { * @throws Exception */ public void start() throws Exception { + + lock = lockFile.getChannel().tryLock(0, 1, false); + if (lock == null || lock.isShared() || !lock.isValid()) { + throw new RuntimeException("Lock failed,MQ already started"); + } + + lockFile.getChannel().write(ByteBuffer.wrap("lock".getBytes())); + lockFile.getChannel().force(true); + this.flushConsumeQueueService.start(); this.commitLog.start(); this.storeStatsService.start(); @@ -255,6 +273,14 @@ public class DefaultMessageStore implements MessageStore { } this.transientStorePool.destroy(); + + if (lockFile != null && lock != null) { + try { + lock.release(); + lockFile.close(); + } catch (IOException e) { + } + } } public void destroy() { diff --git a/store/src/main/java/org/apache/rocketmq/store/config/StorePathConfigHelper.java b/store/src/main/java/org/apache/rocketmq/store/config/StorePathConfigHelper.java index ef1d670a2b8d868bcd7eb02e1c0a0a5fcc0f12d2..ccd76c4f0925fc7ed173e989e53aa0ce26fbea52 100644 --- a/store/src/main/java/org/apache/rocketmq/store/config/StorePathConfigHelper.java +++ b/store/src/main/java/org/apache/rocketmq/store/config/StorePathConfigHelper.java @@ -40,6 +40,10 @@ public class StorePathConfigHelper { return rootDir + File.separator + "abort"; } + public static String getLockFile(final String rootDir) { + return rootDir + File.separator + "lock"; + } + public static String getDelayOffsetStorePath(final String rootDir) { return rootDir + File.separator + "config" + File.separator + "delayOffset.json"; } diff --git a/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java b/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java index 6e37b705f7435003edae4eeac3654566f7a6cacc..9269cdfa77268b90574270e99f34d440191d89b8 100644 --- a/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java +++ b/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java @@ -21,6 +21,7 @@ import java.io.File; import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.SocketAddress; +import java.nio.channels.OverlappingFileLockException; import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; @@ -56,6 +57,31 @@ public class DefaultMessageStoreTest { messageStore.start(); } + @Test(expected = OverlappingFileLockException.class) + public void test_repate_restart() throws Exception { + long totalMsgs = 100; + QUEUE_TOTAL = 1; + MessageBody = StoreMessage.getBytes(); + + MessageStoreConfig messageStoreConfig = new MessageStoreConfig(); + messageStoreConfig.setMapedFileSizeCommitLog(1024 * 8); + messageStoreConfig.setMapedFileSizeConsumeQueue(1024 * 4); + messageStoreConfig.setMaxHashSlotNum(100); + messageStoreConfig.setMaxIndexNum(100 * 10); + MessageStore master = new DefaultMessageStore(messageStoreConfig, null, new MyMessageArrivingListener(), new BrokerConfig()); + + boolean load = master.load(); + assertTrue(load); + + try { + master.start(); + master.start(); + } finally { + master.shutdown(); + master.destroy(); + } + } + @After public void destory() { messageStore.shutdown(); @@ -164,7 +190,7 @@ public class DefaultMessageStoreTest { private class MyMessageArrivingListener implements MessageArrivingListener { @Override public void arriving(String topic, int queueId, long logicOffset, long tagsCode, long msgStoreTime, - byte[] filterBitMap, Map properties) { + byte[] filterBitMap, Map properties) { } } }