提交 6a97d288 编写于 作者: 傅冲 提交者: yukon

[ROCKETMQ-281] Add check policy for preventing repeat start mq

Author: 傅冲 <yubao.fyb@alibaba-inc.com>
Author: fuyou001 <fuyou001@gmail.com>

Closes #158 from fuyou001/ROCKETMQ-281.
上级 254d4324
......@@ -54,7 +54,9 @@ public class BrokerStartup {
public static BrokerController start(BrokerController controller) {
try {
controller.start();
String tip = "The broker[" + controller.getBrokerConfig().getBrokerName() + ", "
+ controller.getBrokerAddr() + "] boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();
......@@ -242,7 +244,7 @@ public class BrokerStartup {
System.setProperty("rocketmq.namesrv.domain.subgroup", rmqAddressServerSubGroup);
}
public static Options buildCommandlineOptions(final Options options) {
private static Options buildCommandlineOptions(final Options options) {
Option opt = new Option("c", "configFile", true, "Broker config properties file");
opt.setRequired(false);
options.addOption(opt);
......
......@@ -25,6 +25,8 @@ import org.junit.Test;
public class BrokerStartupTest {
private String storePathRootDir = ".";
@Test
public void testProperties2SystemEnv() throws NoSuchMethodException, InvocationTargetException,
IllegalAccessException {
......@@ -36,5 +38,4 @@ public class BrokerStartupTest {
method.invoke(null, properties);
Assert.assertEquals("value", System.getProperty("rocketmq.namesrv.domain"));
}
}
\ No newline at end of file
......@@ -16,6 +16,25 @@
*/
package org.apache.rocketmq.store;
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.FileLock;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.ServiceThread;
......@@ -39,24 +58,6 @@ import org.apache.rocketmq.store.stats.BrokerStatsManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import static org.apache.rocketmq.store.config.BrokerRole.SLAVE;
public class DefaultMessageStore implements MessageStore {
......@@ -105,6 +106,10 @@ public class DefaultMessageStore implements MessageStore {
private final LinkedList<CommitLogDispatcher> dispatcherList;
private RandomAccessFile lockFile;
private FileLock lock;
public DefaultMessageStore(final MessageStoreConfig messageStoreConfig, final BrokerStatsManager brokerStatsManager,
final MessageArrivingListener messageArrivingListener, final BrokerConfig brokerConfig) throws IOException {
this.messageArrivingListener = messageArrivingListener;
......@@ -139,6 +144,10 @@ public class DefaultMessageStore implements MessageStore {
this.dispatcherList = new LinkedList<>();
this.dispatcherList.addLast(new CommitLogDispatcherBuildConsumeQueue());
this.dispatcherList.addLast(new CommitLogDispatcherBuildIndex());
File file = new File(StorePathConfigHelper.getLockFile(messageStoreConfig.getStorePathRootDir()));
MappedFile.ensureDirOK(file.getParent());
lockFile = new RandomAccessFile(file, "rw");
}
public void truncateDirtyLogicFiles(long phyOffset) {
......@@ -197,6 +206,15 @@ public class DefaultMessageStore implements MessageStore {
* @throws Exception
*/
public void start() throws Exception {
lock = lockFile.getChannel().tryLock(0, 1, false);
if (lock == null || lock.isShared() || !lock.isValid()) {
throw new RuntimeException("Lock failed,MQ already started");
}
lockFile.getChannel().write(ByteBuffer.wrap("lock".getBytes()));
lockFile.getChannel().force(true);
this.flushConsumeQueueService.start();
this.commitLog.start();
this.storeStatsService.start();
......@@ -255,6 +273,14 @@ public class DefaultMessageStore implements MessageStore {
}
this.transientStorePool.destroy();
if (lockFile != null && lock != null) {
try {
lock.release();
lockFile.close();
} catch (IOException e) {
}
}
}
public void destroy() {
......
......@@ -40,6 +40,10 @@ public class StorePathConfigHelper {
return rootDir + File.separator + "abort";
}
public static String getLockFile(final String rootDir) {
return rootDir + File.separator + "lock";
}
public static String getDelayOffsetStorePath(final String rootDir) {
return rootDir + File.separator + "config" + File.separator + "delayOffset.json";
}
......
......@@ -21,6 +21,7 @@ import java.io.File;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.channels.OverlappingFileLockException;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
......@@ -56,6 +57,31 @@ public class DefaultMessageStoreTest {
messageStore.start();
}
@Test(expected = OverlappingFileLockException.class)
public void test_repate_restart() throws Exception {
long totalMsgs = 100;
QUEUE_TOTAL = 1;
MessageBody = StoreMessage.getBytes();
MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
messageStoreConfig.setMapedFileSizeCommitLog(1024 * 8);
messageStoreConfig.setMapedFileSizeConsumeQueue(1024 * 4);
messageStoreConfig.setMaxHashSlotNum(100);
messageStoreConfig.setMaxIndexNum(100 * 10);
MessageStore master = new DefaultMessageStore(messageStoreConfig, null, new MyMessageArrivingListener(), new BrokerConfig());
boolean load = master.load();
assertTrue(load);
try {
master.start();
master.start();
} finally {
master.shutdown();
master.destroy();
}
}
@After
public void destory() {
messageStore.shutdown();
......@@ -164,7 +190,7 @@ public class DefaultMessageStoreTest {
private class MyMessageArrivingListener implements MessageArrivingListener {
@Override
public void arriving(String topic, int queueId, long logicOffset, long tagsCode, long msgStoreTime,
byte[] filterBitMap, Map<String, String> properties) {
byte[] filterBitMap, Map<String, String> properties) {
}
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册