提交 cbbd6ff0 编写于 作者: D dongeforever

Polish role change logic

上级 5020f325
......@@ -1060,6 +1060,8 @@ public class BrokerController {
}
public void changeToSlave(int brokerId) {
log.info("Begin to change to slave brokerName={} brokerId={}", brokerConfig.getBrokerName(), brokerId);
//change the role
brokerConfig.setBrokerId(brokerId == 0 ? 1 : brokerId); //TO DO check
messageStoreConfig.setBrokerRole(BrokerRole.SLAVE);
......@@ -1078,6 +1080,7 @@ public class BrokerController {
} catch (Throwable ignored) {
}
log.info("Finish to change to slave brokerName={} brokerId={}", brokerConfig.getBrokerName(), brokerId);
}
......@@ -1086,11 +1089,10 @@ public class BrokerController {
if (role == BrokerRole.SLAVE) {
return;
}
log.info("Begin to change to master brokerName={}", brokerConfig.getBrokerName());
//handle the slave synchronise
handleSlaveSynchronize(role);
//change the role
brokerConfig.setBrokerId(0); //TO DO check
messageStoreConfig.setBrokerRole(role);
//handle the scheduled service
this.messageStore.handleScheduleMessageService(role);
......@@ -1098,11 +1100,16 @@ public class BrokerController {
//handle the transactional service
this.startProcessorByHa();
//if the operations above are totally successful, we change to master
brokerConfig.setBrokerId(0); //TO DO check
messageStoreConfig.setBrokerRole(role);
try {
this.registerBrokerAll(true, true, brokerConfig.isForceRegister());
} catch (Throwable ignored) {
}
log.info("Finish to change to master brokerName={}", brokerConfig.getBrokerName());
}
private void startProcessorByHa() {
......
......@@ -39,7 +39,8 @@ public class TransactionalMessageCheckService extends ServiceThread {
public void start() {
if (started.compareAndSet(false, true)) {
super.start();
this.brokerController.getTransactionalMessageService().open();
//no need to do this
//this.brokerController.getTransactionalMessageService().open();
}
}
......@@ -47,8 +48,9 @@ public class TransactionalMessageCheckService extends ServiceThread {
public void shutdown(boolean interrupt) {
if (started.compareAndSet(true, false)) {
super.shutdown(interrupt);
this.brokerController.getTransactionalMessageService().close();
this.brokerController.getTransactionalMessageCheckListener().shutDown();
//no need to do this
//this.brokerController.getTransactionalMessageService().close();
//this.brokerController.getTransactionalMessageCheckListener().shutDown();
}
}
......
......@@ -19,11 +19,11 @@ package org.apache.rocketmq.store.schedule;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.rocketmq.common.ConfigManager;
import org.apache.rocketmq.common.TopicFilterType;
import org.apache.rocketmq.common.constant.LoggerName;
......@@ -38,6 +38,7 @@ import org.apache.rocketmq.store.ConsumeQueue;
import org.apache.rocketmq.store.ConsumeQueueExt;
import org.apache.rocketmq.store.DefaultMessageStore;
import org.apache.rocketmq.store.MessageExtBrokerInner;
import org.apache.rocketmq.store.MessageStore;
import org.apache.rocketmq.store.PutMessageResult;
import org.apache.rocketmq.store.PutMessageStatus;
import org.apache.rocketmq.store.SelectMappedBufferResult;
......@@ -56,15 +57,15 @@ public class ScheduleMessageService extends ConfigManager {
private final ConcurrentMap<Integer /* level */, Long/* offset */> offsetTable =
new ConcurrentHashMap<Integer, Long>(32);
private final Timer timer = new Timer("ScheduleMessageTimerThread", true);
private final DefaultMessageStore defaultMessageStore;
private final AtomicBoolean started = new AtomicBoolean(false);
private Timer timer;
private MessageStore writeMessageStore;
private int maxDelayLevel;
public ScheduleMessageService(final DefaultMessageStore defaultMessageStore) {
this.defaultMessageStore = defaultMessageStore;
this.writeMessageStore = defaultMessageStore;
}
public static int queueId2DelayLevel(final int queueId) {
......@@ -75,10 +76,18 @@ public class ScheduleMessageService extends ConfigManager {
return delayLevel - 1;
}
/**
* @param writeMessageStore
* the writeMessageStore to set
*/
public void setWriteMessageStore(MessageStore writeMessageStore) {
this.writeMessageStore = writeMessageStore;
}
public void buildRunningStats(HashMap<String, String> stats) {
Iterator<Entry<Integer, Long>> it = this.offsetTable.entrySet().iterator();
Iterator<Map.Entry<Integer, Long>> it = this.offsetTable.entrySet().iterator();
while (it.hasNext()) {
Entry<Integer, Long> next = it.next();
Map.Entry<Integer, Long> next = it.next();
int queueId = delayLevel2QueueId(next.getKey());
long delayOffset = next.getValue();
long maxOffset = this.defaultMessageStore.getMaxOffsetInQueue(SCHEDULE_TOPIC, queueId);
......@@ -102,35 +111,45 @@ public class ScheduleMessageService extends ConfigManager {
}
public void start() {
if (started.compareAndSet(false, true)) {
this.timer = new Timer("ScheduleMessageTimerThread", true);
for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) {
Integer level = entry.getKey();
Long timeDelay = entry.getValue();
Long offset = this.offsetTable.get(level);
if (null == offset) {
offset = 0L;
}
for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) {
Integer level = entry.getKey();
Long timeDelay = entry.getValue();
Long offset = this.offsetTable.get(level);
if (null == offset) {
offset = 0L;
}
if (timeDelay != null) {
this.timer.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME);
if (timeDelay != null) {
this.timer.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME);
}
}
}
this.timer.scheduleAtFixedRate(new TimerTask() {
this.timer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
try {
ScheduleMessageService.this.persist();
} catch (Throwable e) {
log.error("scheduleAtFixedRate flush exception", e);
@Override
public void run() {
try {
if (started.get()) ScheduleMessageService.this.persist();
} catch (Throwable e) {
log.error("scheduleAtFixedRate flush exception", e);
}
}
}
}, 10000, this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval());
}, 10000, this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval());
}
}
public void shutdown() {
this.timer.cancel();
if (this.started.compareAndSet(true, false)) {
if (null != this.timer)
this.timer.cancel();
}
}
public boolean isStarted() {
return started.get();
}
public int getMaxDelayLevel() {
......@@ -214,7 +233,9 @@ public class ScheduleMessageService extends ConfigManager {
@Override
public void run() {
try {
this.executeOnTimeup();
if (isStarted()) {
this.executeOnTimeup();
}
} catch (Exception e) {
// XXX: warn and notify me
log.error("ScheduleMessageService, executeOnTimeup exception", e);
......@@ -285,7 +306,7 @@ public class ScheduleMessageService extends ConfigManager {
try {
MessageExtBrokerInner msgInner = this.messageTimeup(msgExt);
PutMessageResult putMessageResult =
ScheduleMessageService.this.defaultMessageStore
ScheduleMessageService.this.writeMessageStore
.putMessage(msgInner);
if (putMessageResult != null
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册