提交 12915b80 编写于 作者: D dongeforever

Add clean item logic for topic queue mapping

上级 527382e4
......@@ -163,6 +163,9 @@ public class BrokerController {
private final BrokerOuterAPI brokerOuterAPI;
private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl(
"BrokerControllerScheduledThread"));
//the topic queue mapping is costly, so use an independent executor
private final ScheduledExecutorService scheduledForTopicQueueMapping = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl(
"BrokerControllerScheduledThread-TopicQueueMapping"));
private final SlaveSynchronize slaveSynchronize;
private final BlockingQueue<Runnable> sendThreadPoolQueue;
private final BlockingQueue<Runnable> ackThreadPoolQueue;
......@@ -498,6 +501,22 @@ public class BrokerController {
}
}, 1, 5, TimeUnit.SECONDS);
this.scheduledForTopicQueueMapping.scheduleAtFixedRate( () -> {
try {
this.topicQueueMappingManager.cleanItemListMoreThanSecondGen();
} catch (Throwable t) {
log.error("ScheduledTask cleanItemListMoreThanSecondGen failed", t);
}
}, 1, 5, TimeUnit.MINUTES);
this.scheduledForTopicQueueMapping.scheduleAtFixedRate( () -> {
try {
this.topicQueueMappingManager.cleanItemExpired();
} catch (Throwable t) {
log.error("ScheduledTask cleanItemExpired failed", t);
}
}, 1, 5, TimeUnit.MINUTES);
if (!messageStoreConfig.isEnableDLegerCommitLog()) {
if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {
if (this.messageStoreConfig.getHaMasterAddress() != null && this.messageStoreConfig.getHaMasterAddress().length() >= 6) {
......@@ -892,6 +911,12 @@ public class BrokerController {
} catch (InterruptedException e) {
}
this.scheduledForTopicQueueMapping.shutdown();
try {
this.scheduledForTopicQueueMapping.awaitTermination(5000, TimeUnit.MILLISECONDS);
} catch (Throwable ignored) {
}
this.unregisterBrokerAll();
if (this.sendMessageExecutor != null) {
......
......@@ -21,6 +21,14 @@ import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.BrokerPathConfigHelper;
import org.apache.rocketmq.common.ConfigManager;
import org.apache.rocketmq.common.DataVersion;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.admin.TopicOffset;
import org.apache.rocketmq.common.admin.TopicStatsTable;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.common.protocol.header.GetTopicStatsInfoRequestHeader;
import org.apache.rocketmq.common.rpc.RpcRequest;
import org.apache.rocketmq.common.rpc.RpcResponse;
import org.apache.rocketmq.common.statictopic.LogicQueueMappingItem;
import org.apache.rocketmq.common.statictopic.TopicQueueMappingContext;
import org.apache.rocketmq.common.constant.LoggerName;
......@@ -32,10 +40,14 @@ import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.common.rpc.TopicQueueRequestHeader;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.store.DefaultMessageStore;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
......@@ -272,4 +284,82 @@ public class TopicQueueMappingManager extends ConfigManager {
}
}
public void cleanItemExpired() {
String when = this.brokerController.getMessageStoreConfig().getDeleteWhen();
if (!UtilAll.isItTimeToDo(when)) {
return;
}
boolean changed = false;
long start = System.currentTimeMillis();
try {
for(String topic : topicQueueMappingTable.keySet()) {
TopicQueueMappingDetail mappingDetail = topicQueueMappingTable.get(topic);
if (mappingDetail == null
|| mappingDetail.getHostedQueues().isEmpty()) {
continue;
}
if (!mappingDetail.getBname().equals(this.brokerController.getBrokerConfig().getBrokerName())) {
log.warn("The TopicQueueMappingDetail [{}] should not exist in this broker", mappingDetail);
continue;
}
Set<String> brokers = new HashSet<>();
for (List<LogicQueueMappingItem> items: mappingDetail.getHostedQueues().values()) {
if (items.size() < 2) {
continue;
}
LogicQueueMappingItem earlistItem = items.get(0);
brokers.add(earlistItem.getBname());
}
Map<String, TopicStatsTable> statsTable = new HashMap<>();
for (String broker: brokers) {
GetTopicStatsInfoRequestHeader header = new GetTopicStatsInfoRequestHeader();
header.setTopic(topic);
header.setBname(broker);
try {
RpcRequest rpcRequest = new RpcRequest(RequestCode.GET_TOPIC_STATS_INFO, header, null);
RpcResponse rpcResponse = this.brokerController.getBrokerOuterAPI().getRpcClient().invoke(rpcRequest, this.brokerController.getBrokerConfig().getForwardTimeout()).get();
if (rpcResponse.getException() != null) {
throw rpcResponse.getException();
}
statsTable.put(broker, (TopicStatsTable) rpcResponse.getBody());
} catch (Throwable rt) {
log.warn("Get remote topic {} state info failed from broker {}", topic, broker, rt);
}
}
for (List<LogicQueueMappingItem> items: mappingDetail.getHostedQueues().values()) {
if (items.size() < 2) {
continue;
}
LogicQueueMappingItem earlistItem = items.get(0);
TopicStatsTable topicStats = statsTable.get(earlistItem.getBname());
if (topicStats == null) {
continue;
}
TopicOffset topicOffset = topicStats.getOffsetTable().get(new MessageQueue(topic, earlistItem.getBname(), earlistItem.getQueueId()));
if (topicOffset == null) {
//this may should not happen
log.warn("Get null topicOffset for {}", earlistItem);
continue;
}
if (topicOffset.getMaxOffset() == topicOffset.getMinOffset()) {
boolean result = items.remove(earlistItem);
changed = changed || result;
log.info("The logic queue item {} is removed {} because of {}", result, earlistItem, topicOffset);
}
}
UtilAll.sleep(10);
}
} catch (Throwable t) {
log.error("Try cleanItemExpired failed", t);
} finally {
if (changed) {
this.dataVersion.nextVersion();
this.persist();
log.info("CleanItemExpired changed");
}
log.info("cleanItemExpired cost {} ms", System.currentTimeMillis() - start);
}
}
}
......@@ -44,4 +44,13 @@ public class TopicOffset {
public void setLastUpdateTimestamp(long lastUpdateTimestamp) {
this.lastUpdateTimestamp = lastUpdateTimestamp;
}
@Override
public String toString() {
return "TopicOffset{" +
"minOffset=" + minOffset +
", maxOffset=" + maxOffset +
", lastUpdateTimestamp=" + lastUpdateTimestamp +
'}';
}
}
......@@ -17,11 +17,12 @@
package org.apache.rocketmq.common.protocol.header;
import org.apache.rocketmq.common.rpc.RpcRequestHeader;
import org.apache.rocketmq.remoting.CommandCustomHeader;
import org.apache.rocketmq.remoting.annotation.CFNotNull;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
public class GetTopicStatsInfoRequestHeader implements CommandCustomHeader {
public class GetTopicStatsInfoRequestHeader extends RpcRequestHeader {
@CFNotNull
private String topic;
......
package org.apache.rocketmq.common.rpc;
import com.alibaba.fastjson.JSON;
import io.netty.util.concurrent.ImmediateEventExecutor;
import io.netty.util.concurrent.Promise;
import org.apache.rocketmq.common.admin.TopicStatsTable;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.common.protocol.ResponseCode;
......@@ -83,6 +83,9 @@ public class RpcClientImpl implements RpcClient {
case RequestCode.QUERY_CONSUMER_OFFSET:
rpcResponsePromise = handleQueryConsumerOffset(addr, request, timeoutMs);
break;
case RequestCode.GET_TOPIC_STATS_INFO:
rpcResponsePromise = handleGetTopicStats(addr, request, timeoutMs);
break;
default:
throw new RpcException(ResponseCode.REQUEST_CODE_NOT_SUPPORTED, "Unknown request code " + request.getCode());
}
......@@ -209,6 +212,25 @@ public class RpcClientImpl implements RpcClient {
return rpcResponsePromise;
}
public Promise<RpcResponse> handleGetTopicStats(String addr, RpcRequest rpcRequest, long timeoutMillis) throws Exception {
final Promise<RpcResponse> rpcResponsePromise = createResponseFuture();
RemotingCommand requestCommand = RpcClientUtils.createCommandForRpcRequest(rpcRequest);
RemotingCommand responseCommand = this.remotingClient.invokeSync(addr, requestCommand, timeoutMillis);
assert responseCommand != null;
switch (responseCommand.getCode()) {
case ResponseCode.SUCCESS: {
TopicStatsTable topicStatsTable = TopicStatsTable.decode(responseCommand.getBody(), TopicStatsTable.class);
rpcResponsePromise.setSuccess(new RpcResponse(ResponseCode.SUCCESS, null, topicStatsTable));
break;
}
default:{
rpcResponsePromise.setSuccess(new RpcResponse(new RpcException(responseCommand.getCode(), "unknown remote error")));
}
}
return rpcResponsePromise;
}
public Promise<RpcResponse> handleGetMinOffset(String addr, RpcRequest rpcRequest, long timeoutMillis) throws Exception {
final Promise<RpcResponse> rpcResponsePromise = createResponseFuture();
......
......@@ -28,7 +28,7 @@ public class RpcResponse {
}
public RpcResponse(int code, CommandCustomHeader header, byte[] body) {
public RpcResponse(int code, CommandCustomHeader header, Object body) {
this.code = code;
this.header = header;
this.body = body;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册