提交 c06564f6 编写于 作者: D dongeforever

Try polishing the clear logic, need more polishment

上级 9cae8c1b
...@@ -26,10 +26,12 @@ import org.apache.rocketmq.common.admin.TopicOffset; ...@@ -26,10 +26,12 @@ import org.apache.rocketmq.common.admin.TopicOffset;
import org.apache.rocketmq.common.admin.TopicStatsTable; import org.apache.rocketmq.common.admin.TopicStatsTable;
import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.RequestCode; import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.common.protocol.header.GetTopicConfigRequestHeader;
import org.apache.rocketmq.common.protocol.header.GetTopicStatsInfoRequestHeader; import org.apache.rocketmq.common.protocol.header.GetTopicStatsInfoRequestHeader;
import org.apache.rocketmq.common.rpc.RpcRequest; import org.apache.rocketmq.common.rpc.RpcRequest;
import org.apache.rocketmq.common.rpc.RpcResponse; import org.apache.rocketmq.common.rpc.RpcResponse;
import org.apache.rocketmq.common.statictopic.LogicQueueMappingItem; import org.apache.rocketmq.common.statictopic.LogicQueueMappingItem;
import org.apache.rocketmq.common.statictopic.TopicConfigAndQueueMapping;
import org.apache.rocketmq.common.statictopic.TopicQueueMappingContext; import org.apache.rocketmq.common.statictopic.TopicQueueMappingContext;
import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.protocol.ResponseCode; import org.apache.rocketmq.common.protocol.ResponseCode;
...@@ -258,45 +260,13 @@ public class TopicQueueMappingManager extends ConfigManager { ...@@ -258,45 +260,13 @@ public class TopicQueueMappingManager extends ConfigManager {
public void cleanItemListMoreThanSecondGen() { public void cleanItemListMoreThanSecondGen() {
for(String topic : topicQueueMappingTable.keySet()) {
TopicQueueMappingDetail mappingDetail = topicQueueMappingTable.get(topic);
if (mappingDetail == null
|| mappingDetail.getHostedQueues().isEmpty()) {
continue;
}
if (!mappingDetail.getBname().equals(this.brokerController.getBrokerConfig().getBrokerName())) {
log.warn("The TopicQueueMappingDetail [{}] should not exist in this broker", mappingDetail);
continue;
}
Iterator<Map.Entry<Integer, List<LogicQueueMappingItem>>> it = mappingDetail.getHostedQueues().entrySet().iterator();
while (it.hasNext()) {
Map.Entry<Integer, List<LogicQueueMappingItem>> entry = it.next();
Integer queueId = entry.getKey();
List<LogicQueueMappingItem> items = entry.getValue();
if (items.size() <= 2) {
continue;
}
LogicQueueMappingItem leaderItem = items.get(items.size() - 1);
LogicQueueMappingItem secLeaderItem = items.get(items.size() - 2);
if (!leaderItem.getBname().equals(mappingDetail.getBname())
&& !secLeaderItem.getBname().equals(mappingDetail.getBname())) {
it.remove();
log.info("The topic queue {} {} is expired with items {}", mappingDetail.getTopic(), queueId, items);
}
}
}
}
public void cleanItemExpired() {
String when = this.brokerController.getMessageStoreConfig().getDeleteWhen(); String when = this.brokerController.getMessageStoreConfig().getDeleteWhen();
if (!UtilAll.isItTimeToDo(when)) { if (!UtilAll.isItTimeToDo(when)) {
return; return;
} }
boolean changed = false;
long start = System.currentTimeMillis(); for(String topic : topicQueueMappingTable.keySet()) {
try { try {
for(String topic : topicQueueMappingTable.keySet()) {
TopicQueueMappingDetail mappingDetail = topicQueueMappingTable.get(topic); TopicQueueMappingDetail mappingDetail = topicQueueMappingTable.get(topic);
if (mappingDetail == null if (mappingDetail == null
|| mappingDetail.getHostedQueues().isEmpty()) { || mappingDetail.getHostedQueues().isEmpty()) {
...@@ -307,51 +277,132 @@ public class TopicQueueMappingManager extends ConfigManager { ...@@ -307,51 +277,132 @@ public class TopicQueueMappingManager extends ConfigManager {
continue; continue;
} }
Set<String> brokers = new HashSet<>(); Set<String> brokers = new HashSet<>();
for (List<LogicQueueMappingItem> items: mappingDetail.getHostedQueues().values()) { for (List<LogicQueueMappingItem> items : mappingDetail.getHostedQueues().values()) {
if (items.size() < 2) { if (items.size() < 2) {
continue; continue;
} }
LogicQueueMappingItem earlistItem = items.get(0); LogicQueueMappingItem leaderItem = items.get(items.size() - 1);
brokers.add(earlistItem.getBname()); if (!leaderItem.equals(mappingDetail.getBname())) {
brokers.add(leaderItem.getBname());
}
} }
Map<String, TopicStatsTable> statsTable = new HashMap<>(); if (brokers.isEmpty()) {
continue;
}
Map<String, TopicConfigAndQueueMapping> configAndQueueMappingMap = new HashMap<>();
for (String broker: brokers) { for (String broker: brokers) {
GetTopicStatsInfoRequestHeader header = new GetTopicStatsInfoRequestHeader(); GetTopicConfigRequestHeader header = new GetTopicConfigRequestHeader();
header.setTopic(topic); header.setTopic(topic);
header.setBname(broker); header.setBname(broker);
try { try {
RpcRequest rpcRequest = new RpcRequest(RequestCode.GET_TOPIC_STATS_INFO, header, null); RpcRequest rpcRequest = new RpcRequest(RequestCode.GET_TOPIC_CONFIG, header, null);
RpcResponse rpcResponse = this.brokerController.getBrokerOuterAPI().getRpcClient().invoke(rpcRequest, this.brokerController.getBrokerConfig().getForwardTimeout()).get(); RpcResponse rpcResponse = this.brokerController.getBrokerOuterAPI().getRpcClient().invoke(rpcRequest, this.brokerController.getBrokerConfig().getForwardTimeout()).get();
if (rpcResponse.getException() != null) { if (rpcResponse.getException() != null) {
throw rpcResponse.getException(); throw rpcResponse.getException();
} }
statsTable.put(broker, (TopicStatsTable) rpcResponse.getBody()); configAndQueueMappingMap.put(broker, (TopicConfigAndQueueMapping) rpcResponse.getBody());
} catch (Throwable rt) { } catch (Throwable rt) {
log.warn("Get remote topic {} state info failed from broker {}", topic, broker, rt); log.warn("Get remote topic {} state info failed from broker {}", topic, broker, rt);
} }
} }
for (List<LogicQueueMappingItem> items: mappingDetail.getHostedQueues().values()) {
Iterator<Map.Entry<Integer, List<LogicQueueMappingItem>>> it = mappingDetail.getHostedQueues().entrySet().iterator();
while (it.hasNext()) {
Map.Entry<Integer, List<LogicQueueMappingItem>> entry = it.next();
Integer queueId = entry.getKey();
List<LogicQueueMappingItem> items = entry.getValue();
if (items.size() < 2) { if (items.size() < 2) {
continue; continue;
} }
LogicQueueMappingItem earlistItem = items.get(0); LogicQueueMappingItem leaderItem = items.get(items.size() - 1);
TopicStatsTable topicStats = statsTable.get(earlistItem.getBname());
if (topicStats == null) { TopicConfigAndQueueMapping configAndQueueMapping = configAndQueueMappingMap.get(leaderItem.getBname());
if (configAndQueueMapping == null) {
continue; continue;
} }
TopicOffset topicOffset = topicStats.getOffsetTable().get(new MessageQueue(topic, earlistItem.getBname(), earlistItem.getQueueId())); List<LogicQueueMappingItem> itemsRemote = configAndQueueMapping.getMappingDetail().getHostedQueues().get(queueId);
if (topicOffset == null) { //TODO
//this may should not happen }
log.warn("Get null topicOffset for {}", earlistItem); } catch (Throwable tt) {
log.error("Try cleanItemListMoreThanSecondGen failed for topic {}", topic, tt);
} finally {
UtilAll.sleep(10);
}
}
}
public void cleanItemExpired() {
String when = this.brokerController.getMessageStoreConfig().getDeleteWhen();
if (!UtilAll.isItTimeToDo(when)) {
return;
}
boolean changed = false;
long start = System.currentTimeMillis();
try {
for(String topic : topicQueueMappingTable.keySet()) {
try {
TopicQueueMappingDetail mappingDetail = topicQueueMappingTable.get(topic);
if (mappingDetail == null
|| mappingDetail.getHostedQueues().isEmpty()) {
continue;
}
if (!mappingDetail.getBname().equals(this.brokerController.getBrokerConfig().getBrokerName())) {
log.warn("The TopicQueueMappingDetail [{}] should not exist in this broker", mappingDetail);
continue; continue;
} }
if (topicOffset.getMaxOffset() == topicOffset.getMinOffset()) { Set<String> brokers = new HashSet<>();
boolean result = items.remove(earlistItem); for (List<LogicQueueMappingItem> items: mappingDetail.getHostedQueues().values()) {
changed = changed || result; if (items.size() < 2) {
log.info("The logic queue item {} is removed {} because of {}", result, earlistItem, topicOffset); continue;
}
LogicQueueMappingItem earlistItem = items.get(0);
brokers.add(earlistItem.getBname());
}
Map<String, TopicStatsTable> statsTable = new HashMap<>();
for (String broker: brokers) {
GetTopicStatsInfoRequestHeader header = new GetTopicStatsInfoRequestHeader();
header.setTopic(topic);
header.setBname(broker);
try {
RpcRequest rpcRequest = new RpcRequest(RequestCode.GET_TOPIC_STATS_INFO, header, null);
RpcResponse rpcResponse = this.brokerController.getBrokerOuterAPI().getRpcClient().invoke(rpcRequest, this.brokerController.getBrokerConfig().getForwardTimeout()).get();
if (rpcResponse.getException() != null) {
throw rpcResponse.getException();
}
statsTable.put(broker, (TopicStatsTable) rpcResponse.getBody());
} catch (Throwable rt) {
log.warn("Get remote topic {} state info failed from broker {}", topic, broker, rt);
}
}
for (List<LogicQueueMappingItem> items: mappingDetail.getHostedQueues().values()) {
if (items.size() < 2) {
continue;
}
LogicQueueMappingItem earlistItem = items.get(0);
TopicStatsTable topicStats = statsTable.get(earlistItem.getBname());
if (topicStats == null) {
continue;
}
TopicOffset topicOffset = topicStats.getOffsetTable().get(new MessageQueue(topic, earlistItem.getBname(), earlistItem.getQueueId()));
if (topicOffset == null) {
//this may should not happen
log.warn("Get null topicOffset for {}", earlistItem);
continue;
}
if (topicOffset.getMaxOffset() == topicOffset.getMinOffset()) {
//TODO be careful of the concurrent problem
//Should use the lock
boolean result = items.remove(earlistItem);
changed = changed || result;
log.info("The logic queue item {} is removed {} because of {}", result, earlistItem, topicOffset);
}
} }
} catch (Throwable tt) {
log.error("Try CleanItemExpired failed for {}", topic, tt);
} finally {
UtilAll.sleep(10);
} }
UtilAll.sleep(10);
} }
} catch (Throwable t) { } catch (Throwable t) {
log.error("Try cleanItemExpired failed", t); log.error("Try cleanItemExpired failed", t);
......
...@@ -17,11 +17,12 @@ ...@@ -17,11 +17,12 @@
package org.apache.rocketmq.common.protocol.header; package org.apache.rocketmq.common.protocol.header;
import org.apache.rocketmq.common.rpc.RpcRequestHeader;
import org.apache.rocketmq.remoting.CommandCustomHeader; import org.apache.rocketmq.remoting.CommandCustomHeader;
import org.apache.rocketmq.remoting.annotation.CFNotNull; import org.apache.rocketmq.remoting.annotation.CFNotNull;
import org.apache.rocketmq.remoting.exception.RemotingCommandException; import org.apache.rocketmq.remoting.exception.RemotingCommandException;
public class GetTopicConfigRequestHeader implements CommandCustomHeader { public class GetTopicConfigRequestHeader extends RpcRequestHeader {
@Override @Override
public void checkFields() throws RemotingCommandException { public void checkFields() throws RemotingCommandException {
} }
......
...@@ -2,6 +2,7 @@ package org.apache.rocketmq.common.rpc; ...@@ -2,6 +2,7 @@ package org.apache.rocketmq.common.rpc;
import io.netty.util.concurrent.ImmediateEventExecutor; import io.netty.util.concurrent.ImmediateEventExecutor;
import io.netty.util.concurrent.Promise; import io.netty.util.concurrent.Promise;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.admin.TopicStatsTable; import org.apache.rocketmq.common.admin.TopicStatsTable;
import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.RequestCode; import org.apache.rocketmq.common.protocol.RequestCode;
...@@ -9,13 +10,19 @@ import org.apache.rocketmq.common.protocol.ResponseCode; ...@@ -9,13 +10,19 @@ import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.header.GetEarliestMsgStoretimeResponseHeader; import org.apache.rocketmq.common.protocol.header.GetEarliestMsgStoretimeResponseHeader;
import org.apache.rocketmq.common.protocol.header.GetMaxOffsetResponseHeader; import org.apache.rocketmq.common.protocol.header.GetMaxOffsetResponseHeader;
import org.apache.rocketmq.common.protocol.header.GetMinOffsetResponseHeader; import org.apache.rocketmq.common.protocol.header.GetMinOffsetResponseHeader;
import org.apache.rocketmq.common.protocol.header.GetTopicConfigRequestHeader;
import org.apache.rocketmq.common.protocol.header.PullMessageResponseHeader; import org.apache.rocketmq.common.protocol.header.PullMessageResponseHeader;
import org.apache.rocketmq.common.protocol.header.QueryConsumerOffsetResponseHeader; import org.apache.rocketmq.common.protocol.header.QueryConsumerOffsetResponseHeader;
import org.apache.rocketmq.common.protocol.header.SearchOffsetResponseHeader; import org.apache.rocketmq.common.protocol.header.SearchOffsetResponseHeader;
import org.apache.rocketmq.common.statictopic.TopicConfigAndQueueMapping;
import org.apache.rocketmq.remoting.InvokeCallback; import org.apache.rocketmq.remoting.InvokeCallback;
import org.apache.rocketmq.remoting.RemotingClient; import org.apache.rocketmq.remoting.RemotingClient;
import org.apache.rocketmq.remoting.exception.RemotingConnectException;
import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.remoting.netty.ResponseFuture; import org.apache.rocketmq.remoting.netty.ResponseFuture;
import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
...@@ -84,7 +91,10 @@ public class RpcClientImpl implements RpcClient { ...@@ -84,7 +91,10 @@ public class RpcClientImpl implements RpcClient {
rpcResponsePromise = handleQueryConsumerOffset(addr, request, timeoutMs); rpcResponsePromise = handleQueryConsumerOffset(addr, request, timeoutMs);
break; break;
case RequestCode.GET_TOPIC_STATS_INFO: case RequestCode.GET_TOPIC_STATS_INFO:
rpcResponsePromise = handleGetTopicStats(addr, request, timeoutMs); rpcResponsePromise = handleCommonBodyRequest(addr, request, timeoutMs, TopicStatsTable.class);
break;
case RequestCode.GET_TOPIC_CONFIG:
rpcResponsePromise = handleCommonBodyRequest(addr, request, timeoutMs, TopicConfigAndQueueMapping.class);
break; break;
default: default:
throw new RpcException(ResponseCode.REQUEST_CODE_NOT_SUPPORTED, "Unknown request code " + request.getCode()); throw new RpcException(ResponseCode.REQUEST_CODE_NOT_SUPPORTED, "Unknown request code " + request.getCode());
...@@ -212,16 +222,14 @@ public class RpcClientImpl implements RpcClient { ...@@ -212,16 +222,14 @@ public class RpcClientImpl implements RpcClient {
return rpcResponsePromise; return rpcResponsePromise;
} }
public Promise<RpcResponse> handleGetTopicStats(String addr, RpcRequest rpcRequest, long timeoutMillis) throws Exception { public Promise<RpcResponse> handleCommonBodyRequest(final String addr, RpcRequest rpcRequest, long timeoutMillis, Class bodyClass) throws Exception {
final Promise<RpcResponse> rpcResponsePromise = createResponseFuture(); final Promise<RpcResponse> rpcResponsePromise = createResponseFuture();
RemotingCommand requestCommand = RpcClientUtils.createCommandForRpcRequest(rpcRequest); RemotingCommand requestCommand = RpcClientUtils.createCommandForRpcRequest(rpcRequest);
RemotingCommand responseCommand = this.remotingClient.invokeSync(addr, requestCommand, timeoutMillis); RemotingCommand responseCommand = this.remotingClient.invokeSync(addr, requestCommand, timeoutMillis);
assert responseCommand != null; assert responseCommand != null;
switch (responseCommand.getCode()) { switch (responseCommand.getCode()) {
case ResponseCode.SUCCESS: { case ResponseCode.SUCCESS: {
TopicStatsTable topicStatsTable = TopicStatsTable.decode(responseCommand.getBody(), TopicStatsTable.class); rpcResponsePromise.setSuccess(new RpcResponse(ResponseCode.SUCCESS, null, RemotingSerializable.decode(requestCommand.getBody(), bodyClass)));
rpcResponsePromise.setSuccess(new RpcResponse(ResponseCode.SUCCESS, null, topicStatsTable));
break; break;
} }
default:{ default:{
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册