...
 
Commits (4)
    https://gitcode.net/apacherocketmq/rocketmq/-/commit/8b747f97f99bf40a6b0d6b813fb91b5f1f673c67 Try using the new style to handble get min offset 2021-12-08T10:58:47+08:00 dongeforever dongeforever@apache.org https://gitcode.net/apacherocketmq/rocketmq/-/commit/48db31b4815af2d2a3c07c1944a69f572e4425d7 Finish the test for topicStats 2021-12-08T17:27:55+08:00 dongeforever dongeforever@apache.org https://gitcode.net/apacherocketmq/rocketmq/-/commit/a9addc3c293d878112ae3eb45fa00f75adf1f6c3 Add tests for command 2021-12-08T20:12:04+08:00 dongeforever dongeforever@apache.org https://gitcode.net/apacherocketmq/rocketmq/-/commit/ad90cc163c48e590a1aab16911f84fc704b72c97 Finish the slave sync logic for topic queue mapping 2021-12-08T20:38:31+08:00 dongeforever dongeforever@apache.org
...@@ -350,7 +350,7 @@ public class BrokerOuterAPI { ...@@ -350,7 +350,7 @@ public class BrokerOuterAPI {
return changedList; return changedList;
} }
public TopicConfigSerializeWrapper getAllTopicConfig( public TopicConfigAndMappingSerializeWrapper getAllTopicConfig(
final String addr) throws RemotingConnectException, RemotingSendRequestException, final String addr) throws RemotingConnectException, RemotingSendRequestException,
RemotingTimeoutException, InterruptedException, MQBrokerException { RemotingTimeoutException, InterruptedException, MQBrokerException {
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_TOPIC_CONFIG, null); RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_TOPIC_CONFIG, null);
...@@ -359,7 +359,7 @@ public class BrokerOuterAPI { ...@@ -359,7 +359,7 @@ public class BrokerOuterAPI {
assert response != null; assert response != null;
switch (response.getCode()) { switch (response.getCode()) {
case ResponseCode.SUCCESS: { case ResponseCode.SUCCESS: {
return TopicConfigSerializeWrapper.decode(response.getBody(), TopicConfigSerializeWrapper.class); return TopicConfigSerializeWrapper.decode(response.getBody(), TopicConfigAndMappingSerializeWrapper.class);
} }
default: default:
break; break;
......
...@@ -20,6 +20,7 @@ import com.alibaba.fastjson.JSON; ...@@ -20,6 +20,7 @@ import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.JSONObject;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.util.concurrent.CompleteFuture;
import org.apache.rocketmq.acl.AccessValidator; import org.apache.rocketmq.acl.AccessValidator;
import org.apache.rocketmq.acl.plain.PlainAccessValidator; import org.apache.rocketmq.acl.plain.PlainAccessValidator;
import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.broker.BrokerController;
...@@ -29,6 +30,7 @@ import org.apache.rocketmq.broker.filter.ConsumerFilterData; ...@@ -29,6 +30,7 @@ import org.apache.rocketmq.broker.filter.ConsumerFilterData;
import org.apache.rocketmq.broker.filter.ExpressionMessageFilter; import org.apache.rocketmq.broker.filter.ExpressionMessageFilter;
import org.apache.rocketmq.broker.transaction.queue.TransactionalMessageUtil; import org.apache.rocketmq.broker.transaction.queue.TransactionalMessageUtil;
import org.apache.rocketmq.common.AclConfig; import org.apache.rocketmq.common.AclConfig;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.MQVersion; import org.apache.rocketmq.common.MQVersion;
import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.PlainAccessConfig; import org.apache.rocketmq.common.PlainAccessConfig;
...@@ -62,6 +64,7 @@ import org.apache.rocketmq.common.protocol.body.QueryConsumeQueueResponseBody; ...@@ -62,6 +64,7 @@ import org.apache.rocketmq.common.protocol.body.QueryConsumeQueueResponseBody;
import org.apache.rocketmq.common.protocol.body.QueryConsumeTimeSpanBody; import org.apache.rocketmq.common.protocol.body.QueryConsumeTimeSpanBody;
import org.apache.rocketmq.common.protocol.body.QueryCorrectionOffsetBody; import org.apache.rocketmq.common.protocol.body.QueryCorrectionOffsetBody;
import org.apache.rocketmq.common.protocol.body.QueueTimeSpan; import org.apache.rocketmq.common.protocol.body.QueueTimeSpan;
import org.apache.rocketmq.common.protocol.body.TopicConfigAndMappingSerializeWrapper;
import org.apache.rocketmq.common.protocol.body.TopicList; import org.apache.rocketmq.common.protocol.body.TopicList;
import org.apache.rocketmq.common.protocol.body.UnlockBatchRequestBody; import org.apache.rocketmq.common.protocol.body.UnlockBatchRequestBody;
import org.apache.rocketmq.common.protocol.header.CloneGroupOffsetRequestHeader; import org.apache.rocketmq.common.protocol.header.CloneGroupOffsetRequestHeader;
...@@ -103,6 +106,9 @@ import org.apache.rocketmq.common.protocol.header.ViewBrokerStatsDataRequestHead ...@@ -103,6 +106,9 @@ import org.apache.rocketmq.common.protocol.header.ViewBrokerStatsDataRequestHead
import org.apache.rocketmq.common.protocol.header.filtersrv.RegisterFilterServerRequestHeader; import org.apache.rocketmq.common.protocol.header.filtersrv.RegisterFilterServerRequestHeader;
import org.apache.rocketmq.common.protocol.header.filtersrv.RegisterFilterServerResponseHeader; import org.apache.rocketmq.common.protocol.header.filtersrv.RegisterFilterServerResponseHeader;
import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData; import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
import org.apache.rocketmq.common.rpc.RpcClient;
import org.apache.rocketmq.common.rpc.RpcClientUtils;
import org.apache.rocketmq.common.rpc.RpcException;
import org.apache.rocketmq.common.rpc.RpcRequest; import org.apache.rocketmq.common.rpc.RpcRequest;
import org.apache.rocketmq.common.rpc.RpcResponse; import org.apache.rocketmq.common.rpc.RpcResponse;
import org.apache.rocketmq.common.statictopic.LogicQueueMappingItem; import org.apache.rocketmq.common.statictopic.LogicQueueMappingItem;
...@@ -146,16 +152,23 @@ import java.util.List; ...@@ -146,16 +152,23 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Properties; import java.util.Properties;
import java.util.Set; import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Future;
import java.util.function.BiConsumer;
import static org.apache.rocketmq.remoting.protocol.RemotingCommand.buildErrorResponse; import static org.apache.rocketmq.remoting.protocol.RemotingCommand.buildErrorResponse;
public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements NettyRequestProcessor { public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements NettyRequestProcessor {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
private final BrokerController brokerController; private final BrokerController brokerController;
private final RpcClient rpcClient;
private final BrokerConfig brokerConfig;
public AdminBrokerProcessor(final BrokerController brokerController) { public AdminBrokerProcessor(final BrokerController brokerController) {
this.brokerController = brokerController; this.brokerController = brokerController;
this.brokerConfig = brokerController.getBrokerConfig();
this.rpcClient = brokerController.getBrokerOuterAPI().getRpcClient();
} }
@Override @Override
...@@ -533,7 +546,16 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements ...@@ -533,7 +546,16 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
// final GetAllTopicConfigResponseHeader responseHeader = // final GetAllTopicConfigResponseHeader responseHeader =
// (GetAllTopicConfigResponseHeader) response.readCustomHeader(); // (GetAllTopicConfigResponseHeader) response.readCustomHeader();
String content = this.brokerController.getTopicConfigManager().encode(); TopicConfigAndMappingSerializeWrapper topicConfigAndMappingSerializeWrapper = new TopicConfigAndMappingSerializeWrapper();
topicConfigAndMappingSerializeWrapper.setDataVersion(this.brokerController.getTopicConfigManager().getDataVersion());
topicConfigAndMappingSerializeWrapper.setTopicConfigTable(this.brokerController.getTopicConfigManager().getTopicConfigTable());
topicConfigAndMappingSerializeWrapper.setMappingDataVersion(this.brokerController.getTopicQueueMappingManager().getDataVersion());
topicConfigAndMappingSerializeWrapper.setTopicQueueMappingDetailMap(this.brokerController.getTopicQueueMappingManager().getTopicQueueMappingTable());
String content = topicConfigAndMappingSerializeWrapper.toJson();
if (content != null && content.length() > 0) { if (content != null && content.length() > 0) {
try { try {
response.setBody(content.getBytes(MixAll.DEFAULT_CHARSET)); response.setBody(content.getBytes(MixAll.DEFAULT_CHARSET));
...@@ -644,7 +666,7 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements ...@@ -644,7 +666,7 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
break; break;
} }
} else { } else {
requestHeader.setPhysical(true); requestHeader.setLo(false);
requestHeader.setTimestamp(timestamp); requestHeader.setTimestamp(timestamp);
requestHeader.setQueueId(item.getQueueId()); requestHeader.setQueueId(item.getQueueId());
requestHeader.setBname(item.getBname()); requestHeader.setBname(item.getBname());
...@@ -714,7 +736,7 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements ...@@ -714,7 +736,7 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
assert maxItem != null; assert maxItem != null;
assert maxItem.getLogicOffset() >= 0; assert maxItem.getLogicOffset() >= 0;
requestHeader.setBname(maxItem.getBname()); requestHeader.setBname(maxItem.getBname());
requestHeader.setPhysical(true); requestHeader.setLo(false);
requestHeader.setQueueId(mappingItem.getQueueId()); requestHeader.setQueueId(mappingItem.getQueueId());
long maxPhysicalOffset = Long.MAX_VALUE; long maxPhysicalOffset = Long.MAX_VALUE;
...@@ -764,22 +786,22 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements ...@@ -764,22 +786,22 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
return response; return response;
} }
private RemotingCommand rewriteRequestForStaticTopic(GetMinOffsetRequestHeader requestHeader, TopicQueueMappingContext mappingContext) { private CompletableFuture<RpcResponse> handleGetMinOffsetForStaticTopic(RpcRequest request, TopicQueueMappingContext mappingContext) {
if (mappingContext.getMappingDetail() == null) { if (mappingContext.getMappingDetail() == null) {
return null; return null;
} }
TopicQueueMappingDetail mappingDetail = mappingContext.getMappingDetail(); TopicQueueMappingDetail mappingDetail = mappingContext.getMappingDetail();
if (!mappingContext.isLeader()) { if (!mappingContext.isLeader()) {
//this may not //this may not
return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE, String.format("%s-%d does not exit in request process of current broker %s", mappingContext.getTopic(), mappingContext.getGlobalId(), mappingDetail.getBname())); return CompletableFuture.completedFuture(new RpcResponse(new RpcException(ResponseCode.NOT_LEADER_FOR_QUEUE,
String.format("%s-%d is not leader in broker %s, request code %d", mappingContext.getTopic(), mappingContext.getGlobalId(), mappingDetail.getBname(), request.getCode()))));
}; };
GetMinOffsetRequestHeader requestHeader = (GetMinOffsetRequestHeader) request.getHeader();
LogicQueueMappingItem mappingItem = TopicQueueMappingUtils.findLogicQueueMappingItem(mappingContext.getMappingItemList(), 0L, true); LogicQueueMappingItem mappingItem = TopicQueueMappingUtils.findLogicQueueMappingItem(mappingContext.getMappingItemList(), 0L, true);
assert mappingItem != null; assert mappingItem != null;
try { try {
requestHeader.setBname(mappingItem.getBname()); requestHeader.setBname(mappingItem.getBname());
requestHeader.setPhysical(true); requestHeader.setLo(false);
requestHeader.setQueueId(mappingItem.getQueueId()); requestHeader.setQueueId(mappingItem.getQueueId());
long physicalOffset; long physicalOffset;
//run in local //run in local
...@@ -796,38 +818,40 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements ...@@ -796,38 +818,40 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
} }
long offset = mappingItem.computeStaticQueueOffsetLoosely(physicalOffset); long offset = mappingItem.computeStaticQueueOffsetLoosely(physicalOffset);
final RemotingCommand response = RemotingCommand.createResponseCommand(GetMinOffsetResponseHeader.class); final GetMinOffsetResponseHeader responseHeader = new GetMinOffsetResponseHeader();
final GetMinOffsetResponseHeader responseHeader = (GetMinOffsetResponseHeader) response.readCustomHeader();
responseHeader.setOffset(offset); responseHeader.setOffset(offset);
response.setCode(ResponseCode.SUCCESS); return CompletableFuture.completedFuture(new RpcResponse(ResponseCode.SUCCESS, responseHeader, null));
response.setRemark(null);
return response;
} catch (Throwable t) { } catch (Throwable t) {
log.error("rewriteRequestForStaticTopic failed", t); log.error("rewriteRequestForStaticTopic failed", t);
return buildErrorResponse(ResponseCode.SYSTEM_ERROR, t.getMessage()); return CompletableFuture.completedFuture(new RpcResponse(new RpcException(ResponseCode.SYSTEM_ERROR, t.getMessage(), t)));
} }
} }
private RemotingCommand getMinOffset(ChannelHandlerContext ctx, private CompletableFuture<RpcResponse> handleGetMinOffset(RpcRequest request) {
RemotingCommand request) throws RemotingCommandException { assert request.getCode() == RequestCode.GET_MIN_OFFSET;
final RemotingCommand response = RemotingCommand.createResponseCommand(GetMinOffsetResponseHeader.class); GetMinOffsetRequestHeader requestHeader = (GetMinOffsetRequestHeader) request.getHeader();
final GetMinOffsetResponseHeader responseHeader = (GetMinOffsetResponseHeader) response.readCustomHeader();
final GetMinOffsetRequestHeader requestHeader =
(GetMinOffsetRequestHeader) request.decodeCommandCustomHeader(GetMinOffsetRequestHeader.class);
TopicQueueMappingContext mappingContext = this.brokerController.getTopicQueueMappingManager().buildTopicQueueMappingContext(requestHeader, false); TopicQueueMappingContext mappingContext = this.brokerController.getTopicQueueMappingManager().buildTopicQueueMappingContext(requestHeader, false);
RemotingCommand rewriteResult = rewriteRequestForStaticTopic(requestHeader, mappingContext); CompletableFuture<RpcResponse> rewriteResult = handleGetMinOffsetForStaticTopic(request, mappingContext);
if (rewriteResult != null) { if (rewriteResult != null) {
return rewriteResult; return rewriteResult;
} }
final GetMinOffsetResponseHeader responseHeader = new GetMinOffsetResponseHeader();
long offset = this.brokerController.getMessageStore().getMinOffsetInQueue(requestHeader.getTopic(), requestHeader.getQueueId()); long offset = this.brokerController.getMessageStore().getMinOffsetInQueue(requestHeader.getTopic(), requestHeader.getQueueId());
responseHeader.setOffset(offset); responseHeader.setOffset(offset);
response.setCode(ResponseCode.SUCCESS); return CompletableFuture.completedFuture(new RpcResponse(ResponseCode.SUCCESS, responseHeader, null));
response.setRemark(null); }
return response;
private RemotingCommand getMinOffset(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
final GetMinOffsetRequestHeader requestHeader =
(GetMinOffsetRequestHeader) request.decodeCommandCustomHeader(GetMinOffsetRequestHeader.class);
try {
CompletableFuture<RpcResponse> responseFuture = handleGetMinOffset(new RpcRequest(RequestCode.GET_MIN_OFFSET, requestHeader, null));
RpcResponse rpcResponse = responseFuture.get();
return RpcClientUtils.createCommandForRpcResponse(rpcResponse);
} catch (Throwable t) {
return buildErrorResponse(ResponseCode.SYSTEM_ERROR, t.getMessage());
}
} }
private RemotingCommand rewriteRequestForStaticTopic(GetEarliestMsgStoretimeRequestHeader requestHeader, TopicQueueMappingContext mappingContext) { private RemotingCommand rewriteRequestForStaticTopic(GetEarliestMsgStoretimeRequestHeader requestHeader, TopicQueueMappingContext mappingContext) {
...@@ -843,7 +867,7 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements ...@@ -843,7 +867,7 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
assert mappingItem != null; assert mappingItem != null;
try { try {
requestHeader.setBname(mappingItem.getBname()); requestHeader.setBname(mappingItem.getBname());
requestHeader.setPhysical(true); requestHeader.setLo(false);
RpcRequest rpcRequest = new RpcRequest(RequestCode.GET_EARLIEST_MSG_STORETIME, requestHeader, null); RpcRequest rpcRequest = new RpcRequest(RequestCode.GET_EARLIEST_MSG_STORETIME, requestHeader, null);
//TODO check if it is in current broker //TODO check if it is in current broker
RpcResponse rpcResponse = this.brokerController.getBrokerOuterAPI().getRpcClient().invoke(rpcRequest, this.brokerController.getBrokerConfig().getForwardTimeout()).get(); RpcResponse rpcResponse = this.brokerController.getBrokerOuterAPI().getRpcClient().invoke(rpcRequest, this.brokerController.getBrokerConfig().getForwardTimeout()).get();
...@@ -998,6 +1022,70 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements ...@@ -998,6 +1022,70 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
return response; return response;
} }
private RpcResponse handleGetTopicStatsInfoForStaticTopic(RpcRequest request, TopicQueueMappingContext mappingContext) {
try {
assert request.getCode() == RequestCode.GET_TOPIC_STATS_INFO;
if (mappingContext.getMappingDetail() == null) {
return null;
}
final GetTopicStatsInfoRequestHeader requestHeader = (GetTopicStatsInfoRequestHeader) request.getHeader();
String topic = requestHeader.getTopic();
TopicQueueMappingDetail mappingDetail = mappingContext.getMappingDetail();
Map<Integer, LogicQueueMappingItem[]> qidItemMap = new HashMap<>();
Set<String> brokers = new HashSet<>();
mappingDetail.getHostedQueues().forEach((qid, items) -> {
if (TopicQueueMappingUtils.checkIfLeader(items, mappingDetail)) {
LogicQueueMappingItem[] itemPair = new LogicQueueMappingItem[2];
itemPair[0] = TopicQueueMappingUtils.findLogicQueueMappingItem(items, 0, true);
itemPair[1] = TopicQueueMappingUtils.findLogicQueueMappingItem(items, Long.MAX_VALUE, true);
assert itemPair[0] != null && itemPair[1] != null;
qidItemMap.put(qid, itemPair);
brokers.add(itemPair[0].getBname());
brokers.add(itemPair[1].getBname());
}
});
Map<String, TopicStatsTable> statsTable = new HashMap<>();
for (String broker: brokers) {
GetTopicStatsInfoRequestHeader header = new GetTopicStatsInfoRequestHeader();
header.setTopic(topic);
header.setBname(broker);
header.setLo(false);
RpcRequest rpcRequest = new RpcRequest(RequestCode.GET_TOPIC_STATS_INFO, header, null);
RpcResponse rpcResponse = rpcClient.invoke(rpcRequest, brokerConfig.getForwardTimeout()).get();
if (rpcResponse.getException() != null) {
throw rpcResponse.getException();
}
statsTable.put(broker, (TopicStatsTable) rpcResponse.getBody());
}
TopicStatsTable topicStatsTable = new TopicStatsTable();
qidItemMap.forEach( (qid, itemPair) -> {
LogicQueueMappingItem minItem = itemPair[0];
LogicQueueMappingItem maxItem = itemPair[1];
TopicOffset minTopicOffset = statsTable.get(minItem.getBname()).getOffsetTable().get(new MessageQueue(topic, minItem.getBname(), minItem.getQueueId()));
TopicOffset maxTopicOffset = statsTable.get(maxItem.getBname()).getOffsetTable().get(new MessageQueue(topic, maxItem.getBname(), maxItem.getQueueId()));
assert minTopicOffset != null && maxTopicOffset != null;
long min = minItem.computeStaticQueueOffsetLoosely(minTopicOffset.getMinOffset());
if (min < 0)
min = 0;
long max = maxItem.computeStaticQueueOffsetStrictly(maxTopicOffset.getMaxOffset());
if (max < 0)
max = 0;
long timestamp = maxTopicOffset.getLastUpdateTimestamp();
TopicOffset topicOffset = new TopicOffset();
topicOffset.setMinOffset(min);
topicOffset.setMaxOffset(max);
topicOffset.setLastUpdateTimestamp(timestamp);
topicStatsTable.getOffsetTable().put(new MessageQueue(topic, MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME, qid), topicOffset);
});
return new RpcResponse(ResponseCode.SUCCESS, null, topicStatsTable);
} catch (Throwable t) {
return new RpcResponse(new RpcException(ResponseCode.SYSTEM_ERROR, t.getMessage(), t));
}
}
private RemotingCommand getTopicStatsInfo(ChannelHandlerContext ctx, private RemotingCommand getTopicStatsInfo(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException { RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null); final RemotingCommand response = RemotingCommand.createResponseCommand(null);
...@@ -1011,8 +1099,17 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements ...@@ -1011,8 +1099,17 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
response.setRemark("topic[" + topic + "] not exist"); response.setRemark("topic[" + topic + "] not exist");
return response; return response;
} }
TopicStatsTable topicStatsTable = new TopicStatsTable(); TopicStatsTable topicStatsTable = new TopicStatsTable();
TopicQueueMappingContext mappingContext = this.brokerController.getTopicQueueMappingManager().buildTopicQueueMappingContext(requestHeader, false);
RpcResponse rpcResponse = handleGetTopicStatsInfoForStaticTopic(new RpcRequest(RequestCode.GET_TOPIC_STATS_INFO, requestHeader, null), mappingContext);
if (rpcResponse != null) {
if (rpcResponse.getException() != null) {
return RpcClientUtils.createCommandForRpcResponse(rpcResponse);
} else {
topicStatsTable.getOffsetTable().putAll(((TopicStatsTable)rpcResponse.getBody()).getOffsetTable());
}
}
for (int i = 0; i < topicConfig.getWriteQueueNums(); i++) { for (int i = 0; i < topicConfig.getWriteQueueNums(); i++) {
MessageQueue mq = new MessageQueue(); MessageQueue mq = new MessageQueue();
mq.setTopic(topic); mq.setTopic(topic);
...@@ -1909,7 +2006,7 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements ...@@ -1909,7 +2006,7 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
return response; return response;
} }
TopicQueueMappingDetail topicQueueMappingDetail = null; TopicQueueMappingDetail topicQueueMappingDetail = null;
if (Boolean.TRUE.equals(requestHeader.getWithMapping())) { if (Boolean.TRUE.equals(requestHeader.getLo())) {
topicQueueMappingDetail = this.brokerController.getTopicQueueMappingManager().getTopicQueueMapping(requestHeader.getTopic()); topicQueueMappingDetail = this.brokerController.getTopicQueueMappingManager().getTopicQueueMapping(requestHeader.getTopic());
} }
String content = JSONObject.toJSONString(new TopicConfigAndQueueMapping(topicConfig, topicQueueMappingDetail)); String content = JSONObject.toJSONString(new TopicConfigAndQueueMapping(topicConfig, topicQueueMappingDetail));
......
...@@ -153,11 +153,8 @@ public class ConsumerManageProcessor extends AsyncNettyRequestProcessor implemen ...@@ -153,11 +153,8 @@ public class ConsumerManageProcessor extends AsyncNettyRequestProcessor implemen
//by default, it is -1 //by default, it is -1
long offset = -1; long offset = -1;
//double read, first from leader, then from second leader //double read, first from leader, then from second leader
for (int i = 1; i <= 2; i++) { for (int i = itemList.size() - 1; i >= 0; i--) {
if (itemList.size() - i < 0) { LogicQueueMappingItem mappingItem = itemList.get(i);
break;
}
LogicQueueMappingItem mappingItem = itemList.get(itemList.size() - i);
if (mappingItem.getBname().equals(mappingDetail.getBname())) { if (mappingItem.getBname().equals(mappingDetail.getBname())) {
offset = this.brokerController.getConsumerOffsetManager().queryOffset(requestHeader.getConsumerGroup(), requestHeader.getTopic(), mappingItem.getQueueId()); offset = this.brokerController.getConsumerOffsetManager().queryOffset(requestHeader.getConsumerGroup(), requestHeader.getTopic(), mappingItem.getQueueId());
if (offset >= 0) { if (offset >= 0) {
...@@ -170,7 +167,7 @@ public class ConsumerManageProcessor extends AsyncNettyRequestProcessor implemen ...@@ -170,7 +167,7 @@ public class ConsumerManageProcessor extends AsyncNettyRequestProcessor implemen
//maybe we need to reconstruct an object //maybe we need to reconstruct an object
requestHeader.setBname(mappingItem.getBname()); requestHeader.setBname(mappingItem.getBname());
requestHeader.setQueueId(mappingItem.getQueueId()); requestHeader.setQueueId(mappingItem.getQueueId());
requestHeader.setPhysical(true); requestHeader.setLo(false);
requestHeader.setSetZeroIfNotFound(false); requestHeader.setSetZeroIfNotFound(false);
RpcRequest rpcRequest = new RpcRequest(RequestCode.QUERY_CONSUMER_OFFSET, requestHeader, null); RpcRequest rpcRequest = new RpcRequest(RequestCode.QUERY_CONSUMER_OFFSET, requestHeader, null);
RpcResponse rpcResponse = this.brokerController.getBrokerOuterAPI().getRpcClient().invoke(rpcRequest, this.brokerController.getBrokerConfig().getForwardTimeout()).get(); RpcResponse rpcResponse = this.brokerController.getBrokerOuterAPI().getRpcClient().invoke(rpcRequest, this.brokerController.getBrokerConfig().getForwardTimeout()).get();
...@@ -179,7 +176,8 @@ public class ConsumerManageProcessor extends AsyncNettyRequestProcessor implemen ...@@ -179,7 +176,8 @@ public class ConsumerManageProcessor extends AsyncNettyRequestProcessor implemen
} }
if (rpcResponse.getCode() == ResponseCode.SUCCESS) { if (rpcResponse.getCode() == ResponseCode.SUCCESS) {
offset = ((QueryConsumerOffsetResponseHeader) rpcResponse.getHeader()).getOffset(); offset = ((QueryConsumerOffsetResponseHeader) rpcResponse.getHeader()).getOffset();
} else if (rpcResponse.getCode() == ResponseCode.PULL_NOT_FOUND){ break;
} else if (rpcResponse.getCode() == ResponseCode.QUERY_NOT_FOUND){
continue; continue;
} else { } else {
//this should not happen //this should not happen
......
...@@ -134,18 +134,14 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements ...@@ -134,18 +134,14 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements
&& requestHeader.getMaxMsgNums() != null) { && requestHeader.getMaxMsgNums() != null) {
requestHeader.setMaxMsgNums((int) Math.min(mappingItem.getEndOffset() - mappingItem.getStartOffset(), requestHeader.getMaxMsgNums())); requestHeader.setMaxMsgNums((int) Math.min(mappingItem.getEndOffset() - mappingItem.getStartOffset(), requestHeader.getMaxMsgNums()));
} }
int sysFlag = requestHeader.getSysFlag();
if (!mappingContext.isLeader()) {
sysFlag = PullSysFlag.clearCommitOffsetFlag(sysFlag);
requestHeader.setSysFlag(sysFlag);
}
if (mappingDetail.getBname().equals(bname)) { if (mappingDetail.getBname().equals(bname)) {
//just let it go, do the local pull process //just let it go, do the local pull process
return null; return null;
} }
requestHeader.setPhysical(true); int sysFlag = requestHeader.getSysFlag();
requestHeader.setLo(false);
requestHeader.setBname(bname); requestHeader.setBname(bname);
sysFlag = PullSysFlag.clearSuspendFlag(sysFlag); sysFlag = PullSysFlag.clearSuspendFlag(sysFlag);
sysFlag = PullSysFlag.clearCommitOffsetFlag(sysFlag); sysFlag = PullSysFlag.clearCommitOffsetFlag(sysFlag);
...@@ -189,11 +185,13 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements ...@@ -189,11 +185,13 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements
long minOffset = responseHeader.getMinOffset(); long minOffset = responseHeader.getMinOffset();
long maxOffset = responseHeader.getMaxOffset(); long maxOffset = responseHeader.getMaxOffset();
int responseCode = code; int responseCode = code;
//consider the following situations //consider the following situations
// 1. read from slave, currently not supported // 1. read from slave, currently not supported
// 2. the middle queue is truncated because of deleting commitlog // 2. the middle queue is truncated because of deleting commitlog
if (code != ResponseCode.SUCCESS) { if (code != ResponseCode.SUCCESS) {
//note the currentItem maybe both the leader and the earliest //note the currentItem maybe both the leader and the earliest
boolean isRevised = false;
if (leaderItem.getGen() == currentItem.getGen()) { if (leaderItem.getGen() == currentItem.getGen()) {
//read the leader //read the leader
if (requestOffset > maxOffset) { if (requestOffset > maxOffset) {
...@@ -228,6 +226,7 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements ...@@ -228,6 +226,7 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements
//just move to another item //just move to another item
LogicQueueMappingItem nextItem = TopicQueueMappingUtils.findNext(mappingContext.getMappingItemList(), currentItem, true); LogicQueueMappingItem nextItem = TopicQueueMappingUtils.findNext(mappingContext.getMappingItemList(), currentItem, true);
if (nextItem != null) { if (nextItem != null) {
isRevised = true;
currentItem = nextItem; currentItem = nextItem;
nextBeginOffset = currentItem.getStartOffset(); nextBeginOffset = currentItem.getStartOffset();
minOffset = currentItem.getStartOffset(); minOffset = currentItem.getStartOffset();
...@@ -244,7 +243,8 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements ...@@ -244,7 +243,8 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements
} }
//read from the middle item, ignore the PULL_OFFSET_MOVED //read from the middle item, ignore the PULL_OFFSET_MOVED
if (leaderItem.getGen() != currentItem.getGen() if (!isRevised
&& leaderItem.getGen() != currentItem.getGen()
&& earlistItem.getGen() != currentItem.getGen()) { && earlistItem.getGen() != currentItem.getGen()) {
if (requestOffset < minOffset) { if (requestOffset < minOffset) {
nextBeginOffset = minOffset; nextBeginOffset = minOffset;
...@@ -289,7 +289,6 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements ...@@ -289,7 +289,6 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements
return null; return null;
} }
} catch (Throwable t) { } catch (Throwable t) {
t.printStackTrace();
return buildErrorResponse(ResponseCode.SYSTEM_ERROR, t.getMessage()); return buildErrorResponse(ResponseCode.SYSTEM_ERROR, t.getMessage());
} }
} }
...@@ -443,6 +442,7 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements ...@@ -443,6 +442,7 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements
return response; return response;
} }
MessageFilter messageFilter; MessageFilter messageFilter;
if (this.brokerController.getBrokerConfig().isFilterSupportRetry()) { if (this.brokerController.getBrokerConfig().isFilterSupportRetry()) {
messageFilter = new ExpressionForRetryMessageFilter(subscriptionData, consumerFilterData, messageFilter = new ExpressionForRetryMessageFilter(subscriptionData, consumerFilterData,
......
...@@ -21,6 +21,7 @@ import org.apache.rocketmq.broker.BrokerController; ...@@ -21,6 +21,7 @@ import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.subscription.SubscriptionGroupManager; import org.apache.rocketmq.broker.subscription.SubscriptionGroupManager;
import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.protocol.body.TopicConfigAndMappingSerializeWrapper;
import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.common.protocol.body.ConsumerOffsetSerializeWrapper; import org.apache.rocketmq.common.protocol.body.ConsumerOffsetSerializeWrapper;
...@@ -56,7 +57,7 @@ public class SlaveSynchronize { ...@@ -56,7 +57,7 @@ public class SlaveSynchronize {
String masterAddrBak = this.masterAddr; String masterAddrBak = this.masterAddr;
if (masterAddrBak != null && !masterAddrBak.equals(brokerController.getBrokerAddr())) { if (masterAddrBak != null && !masterAddrBak.equals(brokerController.getBrokerAddr())) {
try { try {
TopicConfigSerializeWrapper topicWrapper = TopicConfigAndMappingSerializeWrapper topicWrapper =
this.brokerController.getBrokerOuterAPI().getAllTopicConfig(masterAddrBak); this.brokerController.getBrokerOuterAPI().getAllTopicConfig(masterAddrBak);
if (!this.brokerController.getTopicConfigManager().getDataVersion() if (!this.brokerController.getTopicConfigManager().getDataVersion()
.equals(topicWrapper.getDataVersion())) { .equals(topicWrapper.getDataVersion())) {
...@@ -67,9 +68,17 @@ public class SlaveSynchronize { ...@@ -67,9 +68,17 @@ public class SlaveSynchronize {
this.brokerController.getTopicConfigManager().getTopicConfigTable() this.brokerController.getTopicConfigManager().getTopicConfigTable()
.putAll(topicWrapper.getTopicConfigTable()); .putAll(topicWrapper.getTopicConfigTable());
this.brokerController.getTopicConfigManager().persist(); this.brokerController.getTopicConfigManager().persist();
log.info("Update slave topic config from master, {}", masterAddrBak);
} }
if (topicWrapper.getTopicQueueMappingDetailMap() != null
&& !topicWrapper.getMappingDataVersion().equals(this.brokerController.getTopicQueueMappingManager().getDataVersion())) {
this.brokerController.getTopicQueueMappingManager().getDataVersion()
.assignNewOne(topicWrapper.getMappingDataVersion());
this.brokerController.getTopicQueueMappingManager().getTopicQueueMappingTable().clear();
this.brokerController.getTopicQueueMappingManager().getTopicQueueMappingTable()
.putAll(topicWrapper.getTopicQueueMappingDetailMap());
this.brokerController.getTopicQueueMappingManager().persist();
}
log.info("Update slave topic config from master, {}", masterAddrBak);
} catch (Exception e) { } catch (Exception e) {
log.error("SyncTopicConfig Exception, {}", masterAddrBak, e); log.error("SyncTopicConfig Exception, {}", masterAddrBak, e);
} }
......
...@@ -134,6 +134,7 @@ public class TopicQueueMappingCleanService extends ServiceThread { ...@@ -134,6 +134,7 @@ public class TopicQueueMappingCleanService extends ServiceThread {
GetTopicStatsInfoRequestHeader header = new GetTopicStatsInfoRequestHeader(); GetTopicStatsInfoRequestHeader header = new GetTopicStatsInfoRequestHeader();
header.setTopic(topic); header.setTopic(topic);
header.setBname(broker); header.setBname(broker);
header.setLo(false);
try { try {
RpcRequest rpcRequest = new RpcRequest(RequestCode.GET_TOPIC_STATS_INFO, header, null); RpcRequest rpcRequest = new RpcRequest(RequestCode.GET_TOPIC_STATS_INFO, header, null);
RpcResponse rpcResponse = rpcClient.invoke(rpcRequest, brokerConfig.getForwardTimeout()).get(); RpcResponse rpcResponse = rpcClient.invoke(rpcRequest, brokerConfig.getForwardTimeout()).get();
...@@ -261,7 +262,7 @@ public class TopicQueueMappingCleanService extends ServiceThread { ...@@ -261,7 +262,7 @@ public class TopicQueueMappingCleanService extends ServiceThread {
GetTopicConfigRequestHeader header = new GetTopicConfigRequestHeader(); GetTopicConfigRequestHeader header = new GetTopicConfigRequestHeader();
header.setTopic(topic); header.setTopic(topic);
header.setBname(broker); header.setBname(broker);
header.setWithMapping(true); header.setLo(true);
try { try {
RpcRequest rpcRequest = new RpcRequest(RequestCode.GET_TOPIC_CONFIG, header, null); RpcRequest rpcRequest = new RpcRequest(RequestCode.GET_TOPIC_CONFIG, header, null);
RpcResponse rpcResponse = rpcClient.invoke(rpcRequest, brokerConfig.getForwardTimeout()).get(); RpcResponse rpcResponse = rpcClient.invoke(rpcRequest, brokerConfig.getForwardTimeout()).get();
......
...@@ -30,6 +30,7 @@ import org.apache.rocketmq.common.protocol.header.GetTopicConfigRequestHeader; ...@@ -30,6 +30,7 @@ import org.apache.rocketmq.common.protocol.header.GetTopicConfigRequestHeader;
import org.apache.rocketmq.common.rpc.RpcRequest; import org.apache.rocketmq.common.rpc.RpcRequest;
import org.apache.rocketmq.common.rpc.RpcResponse; import org.apache.rocketmq.common.rpc.RpcResponse;
import org.apache.rocketmq.common.rpc.TopicQueueRequestHeader; import org.apache.rocketmq.common.rpc.TopicQueueRequestHeader;
import org.apache.rocketmq.common.rpc.TopicRequestHeader;
import org.apache.rocketmq.common.statictopic.LogicQueueMappingItem; import org.apache.rocketmq.common.statictopic.LogicQueueMappingItem;
import org.apache.rocketmq.common.statictopic.TopicConfigAndQueueMapping; import org.apache.rocketmq.common.statictopic.TopicConfigAndQueueMapping;
import org.apache.rocketmq.common.statictopic.TopicQueueMappingContext; import org.apache.rocketmq.common.statictopic.TopicQueueMappingContext;
...@@ -190,28 +191,37 @@ public class TopicQueueMappingManager extends ConfigManager { ...@@ -190,28 +191,37 @@ public class TopicQueueMappingManager extends ConfigManager {
return dataVersion; return dataVersion;
} }
public TopicQueueMappingContext buildTopicQueueMappingContext(TopicQueueRequestHeader requestHeader) { public TopicQueueMappingContext buildTopicQueueMappingContext(TopicRequestHeader requestHeader) {
return buildTopicQueueMappingContext(requestHeader, false); return buildTopicQueueMappingContext(requestHeader, false);
} }
//Do not return a null context //Do not return a null context
public TopicQueueMappingContext buildTopicQueueMappingContext(TopicQueueRequestHeader requestHeader, boolean selectOneWhenMiss) { public TopicQueueMappingContext buildTopicQueueMappingContext(TopicRequestHeader requestHeader, boolean selectOneWhenMiss) {
if (requestHeader.getPhysical() != null //should disable logic queue explicitly, otherwise the old client may cause dirty data to newly created static topic
&& Boolean.TRUE.equals(requestHeader.getPhysical())) { if (requestHeader.getLo() != null
return new TopicQueueMappingContext(requestHeader.getTopic(), requestHeader.getQueueId(), null, null, null); && Boolean.FALSE.equals(requestHeader.getLo())) {
return new TopicQueueMappingContext(requestHeader.getTopic(), null, null, null, null);
} }
TopicQueueMappingDetail mappingDetail = getTopicQueueMapping(requestHeader.getTopic()); String topic = requestHeader.getTopic();
Integer globalId = null;
if (requestHeader instanceof TopicQueueRequestHeader) {
globalId = ((TopicQueueRequestHeader) requestHeader).getQueueId();
}
TopicQueueMappingDetail mappingDetail = getTopicQueueMapping(topic);
if (mappingDetail == null) { if (mappingDetail == null) {
//it is not static topic //it is not static topic
return new TopicQueueMappingContext(requestHeader.getTopic(), requestHeader.getQueueId(), null, null, null); return new TopicQueueMappingContext(topic, null, null, null, null);
} }
assert mappingDetail.getBname().equals(this.brokerController.getBrokerConfig().getBrokerName()); assert mappingDetail.getBname().equals(this.brokerController.getBrokerConfig().getBrokerName());
if (globalId == null) {
return new TopicQueueMappingContext(topic, null, mappingDetail, null, null);
}
//If not find mappingItem, it encounters some errors //If not find mappingItem, it encounters some errors
Integer globalId = requestHeader.getQueueId();
if (globalId < 0 && !selectOneWhenMiss) { if (globalId < 0 && !selectOneWhenMiss) {
return new TopicQueueMappingContext(requestHeader.getTopic(), globalId, mappingDetail, null, null); return new TopicQueueMappingContext(topic, globalId, mappingDetail, null, null);
} }
if (globalId < 0) { if (globalId < 0) {
...@@ -224,7 +234,7 @@ public class TopicQueueMappingManager extends ConfigManager { ...@@ -224,7 +234,7 @@ public class TopicQueueMappingManager extends ConfigManager {
} }
} }
if (globalId < 0) { if (globalId < 0) {
return new TopicQueueMappingContext(requestHeader.getTopic(), globalId, mappingDetail, null, null); return new TopicQueueMappingContext(topic, globalId, mappingDetail, null, null);
} }
List<LogicQueueMappingItem> mappingItemList = TopicQueueMappingDetail.getMappingInfo(mappingDetail, globalId); List<LogicQueueMappingItem> mappingItemList = TopicQueueMappingDetail.getMappingInfo(mappingDetail, globalId);
...@@ -233,7 +243,7 @@ public class TopicQueueMappingManager extends ConfigManager { ...@@ -233,7 +243,7 @@ public class TopicQueueMappingManager extends ConfigManager {
&& mappingItemList.size() > 0) { && mappingItemList.size() > 0) {
leaderItem = mappingItemList.get(mappingItemList.size() - 1); leaderItem = mappingItemList.get(mappingItemList.size() - 1);
} }
return new TopicQueueMappingContext(requestHeader.getTopic(), globalId, mappingDetail, mappingItemList, leaderItem); return new TopicQueueMappingContext(topic, globalId, mappingDetail, mappingItemList, leaderItem);
} }
......
...@@ -2581,7 +2581,7 @@ public class MQClientAPIImpl { ...@@ -2581,7 +2581,7 @@ public class MQClientAPIImpl {
RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQBrokerException { RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQBrokerException {
GetTopicConfigRequestHeader header = new GetTopicConfigRequestHeader(); GetTopicConfigRequestHeader header = new GetTopicConfigRequestHeader();
header.setTopic(topic); header.setTopic(topic);
header.setWithMapping(true); header.setLo(true);
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_TOPIC_CONFIG, header); RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_TOPIC_CONFIG, header);
RemotingCommand response = this.remotingClient RemotingCommand response = this.remotingClient
.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), brokerAddr), request, timeoutMillis); .invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), brokerAddr), request, timeoutMillis);
......
...@@ -17,6 +17,8 @@ ...@@ -17,6 +17,8 @@
package org.apache.rocketmq.common.protocol.body; package org.apache.rocketmq.common.protocol.body;
import org.apache.rocketmq.common.DataVersion;
import org.apache.rocketmq.common.statictopic.TopicQueueMappingDetail;
import org.apache.rocketmq.common.statictopic.TopicQueueMappingInfo; import org.apache.rocketmq.common.statictopic.TopicQueueMappingInfo;
import java.util.Map; import java.util.Map;
...@@ -25,6 +27,11 @@ import java.util.concurrent.ConcurrentHashMap; ...@@ -25,6 +27,11 @@ import java.util.concurrent.ConcurrentHashMap;
public class TopicConfigAndMappingSerializeWrapper extends TopicConfigSerializeWrapper { public class TopicConfigAndMappingSerializeWrapper extends TopicConfigSerializeWrapper {
private Map<String/* topic */, TopicQueueMappingInfo> topicQueueMappingInfoMap = new ConcurrentHashMap<String, TopicQueueMappingInfo>(); private Map<String/* topic */, TopicQueueMappingInfo> topicQueueMappingInfoMap = new ConcurrentHashMap<String, TopicQueueMappingInfo>();
private Map<String/* topic */, TopicQueueMappingDetail> topicQueueMappingDetailMap = new ConcurrentHashMap<String, TopicQueueMappingDetail>();
private DataVersion mappingDataVersion = new DataVersion();
public Map<String, TopicQueueMappingInfo> getTopicQueueMappingInfoMap() { public Map<String, TopicQueueMappingInfo> getTopicQueueMappingInfoMap() {
return topicQueueMappingInfoMap; return topicQueueMappingInfoMap;
} }
...@@ -33,6 +40,22 @@ public class TopicConfigAndMappingSerializeWrapper extends TopicConfigSerializeW ...@@ -33,6 +40,22 @@ public class TopicConfigAndMappingSerializeWrapper extends TopicConfigSerializeW
this.topicQueueMappingInfoMap = topicQueueMappingInfoMap; this.topicQueueMappingInfoMap = topicQueueMappingInfoMap;
} }
public Map<String, TopicQueueMappingDetail> getTopicQueueMappingDetailMap() {
return topicQueueMappingDetailMap;
}
public void setTopicQueueMappingDetailMap(Map<String, TopicQueueMappingDetail> topicQueueMappingDetailMap) {
this.topicQueueMappingDetailMap = topicQueueMappingDetailMap;
}
public DataVersion getMappingDataVersion() {
return mappingDataVersion;
}
public void setMappingDataVersion(DataVersion mappingDataVersion) {
this.mappingDataVersion = mappingDataVersion;
}
public static TopicConfigAndMappingSerializeWrapper from(TopicConfigSerializeWrapper wrapper) { public static TopicConfigAndMappingSerializeWrapper from(TopicConfigSerializeWrapper wrapper) {
if (wrapper instanceof TopicConfigAndMappingSerializeWrapper) { if (wrapper instanceof TopicConfigAndMappingSerializeWrapper) {
return (TopicConfigAndMappingSerializeWrapper) wrapper; return (TopicConfigAndMappingSerializeWrapper) wrapper;
......
...@@ -18,11 +18,12 @@ ...@@ -18,11 +18,12 @@
package org.apache.rocketmq.common.protocol.header; package org.apache.rocketmq.common.protocol.header;
import org.apache.rocketmq.common.rpc.RpcRequestHeader; import org.apache.rocketmq.common.rpc.RpcRequestHeader;
import org.apache.rocketmq.common.rpc.TopicRequestHeader;
import org.apache.rocketmq.remoting.CommandCustomHeader; import org.apache.rocketmq.remoting.CommandCustomHeader;
import org.apache.rocketmq.remoting.annotation.CFNotNull; import org.apache.rocketmq.remoting.annotation.CFNotNull;
import org.apache.rocketmq.remoting.exception.RemotingCommandException; import org.apache.rocketmq.remoting.exception.RemotingCommandException;
public class GetTopicConfigRequestHeader extends RpcRequestHeader { public class GetTopicConfigRequestHeader extends TopicRequestHeader {
@Override @Override
public void checkFields() throws RemotingCommandException { public void checkFields() throws RemotingCommandException {
} }
...@@ -30,7 +31,6 @@ public class GetTopicConfigRequestHeader extends RpcRequestHeader { ...@@ -30,7 +31,6 @@ public class GetTopicConfigRequestHeader extends RpcRequestHeader {
@CFNotNull @CFNotNull
private String topic; private String topic;
private Boolean withMapping;
/** /**
* @return the topic * @return the topic
...@@ -45,12 +45,4 @@ public class GetTopicConfigRequestHeader extends RpcRequestHeader { ...@@ -45,12 +45,4 @@ public class GetTopicConfigRequestHeader extends RpcRequestHeader {
public void setTopic(String topic) { public void setTopic(String topic) {
this.topic = topic; this.topic = topic;
} }
public Boolean getWithMapping() {
return withMapping;
}
public void setWithMapping(Boolean withMapping) {
this.withMapping = withMapping;
}
} }
\ No newline at end of file
...@@ -17,12 +17,11 @@ ...@@ -17,12 +17,11 @@
package org.apache.rocketmq.common.protocol.header; package org.apache.rocketmq.common.protocol.header;
import org.apache.rocketmq.common.rpc.RpcRequestHeader; import org.apache.rocketmq.common.rpc.TopicRequestHeader;
import org.apache.rocketmq.remoting.CommandCustomHeader;
import org.apache.rocketmq.remoting.annotation.CFNotNull; import org.apache.rocketmq.remoting.annotation.CFNotNull;
import org.apache.rocketmq.remoting.exception.RemotingCommandException; import org.apache.rocketmq.remoting.exception.RemotingCommandException;
public class GetTopicStatsInfoRequestHeader extends RpcRequestHeader { public class GetTopicStatsInfoRequestHeader extends TopicRequestHeader {
@CFNotNull @CFNotNull
private String topic; private String topic;
......
...@@ -25,7 +25,7 @@ public class RequestBuilder { ...@@ -25,7 +25,7 @@ public class RequestBuilder {
} }
try { try {
RpcRequestHeader requestHeader = (RpcRequestHeader) requestHeaderClass.newInstance(); RpcRequestHeader requestHeader = (RpcRequestHeader) requestHeaderClass.newInstance();
requestHeader.setOneway(oneway); requestHeader.setOway(oneway);
requestHeader.setBname(destBrokerName); requestHeader.setBname(destBrokerName);
return requestHeader; return requestHeader;
} catch (Throwable t) { } catch (Throwable t) {
...@@ -37,26 +37,26 @@ public class RequestBuilder { ...@@ -37,26 +37,26 @@ public class RequestBuilder {
return buildTopicQueueRequestHeader(requestCode, null, mq.getBrokerName(), mq.getTopic(), mq.getQueueId(), null); return buildTopicQueueRequestHeader(requestCode, null, mq.getBrokerName(), mq.getTopic(), mq.getQueueId(), null);
} }
public static TopicQueueRequestHeader buildTopicQueueRequestHeader(int requestCode, MessageQueue mq, Boolean physical) { public static TopicQueueRequestHeader buildTopicQueueRequestHeader(int requestCode, MessageQueue mq, Boolean logic) {
return buildTopicQueueRequestHeader(requestCode, null, mq.getBrokerName(), mq.getTopic(), mq.getQueueId(), physical); return buildTopicQueueRequestHeader(requestCode, null, mq.getBrokerName(), mq.getTopic(), mq.getQueueId(), logic);
} }
public static TopicQueueRequestHeader buildTopicQueueRequestHeader(int requestCode, Boolean oneway, MessageQueue mq, Boolean physical) { public static TopicQueueRequestHeader buildTopicQueueRequestHeader(int requestCode, Boolean oneway, MessageQueue mq, Boolean logic) {
return buildTopicQueueRequestHeader(requestCode, oneway, mq.getBrokerName(), mq.getTopic(), mq.getQueueId(), physical); return buildTopicQueueRequestHeader(requestCode, oneway, mq.getBrokerName(), mq.getTopic(), mq.getQueueId(), logic);
} }
public static TopicQueueRequestHeader buildTopicQueueRequestHeader(int requestCode, Boolean oneway, String destBrokerName, String topic, int queueId, Boolean physical) { public static TopicQueueRequestHeader buildTopicQueueRequestHeader(int requestCode, Boolean oneway, String destBrokerName, String topic, int queueId, Boolean logic) {
Class requestHeaderClass = requestCodeMap.get(requestCode); Class requestHeaderClass = requestCodeMap.get(requestCode);
if (requestHeaderClass == null) { if (requestHeaderClass == null) {
throw new UnsupportedOperationException("unknown " + requestCode); throw new UnsupportedOperationException("unknown " + requestCode);
} }
try { try {
TopicQueueRequestHeader requestHeader = (TopicQueueRequestHeader) requestHeaderClass.newInstance(); TopicQueueRequestHeader requestHeader = (TopicQueueRequestHeader) requestHeaderClass.newInstance();
requestHeader.setOneway(oneway); requestHeader.setOway(oneway);
requestHeader.setBname(destBrokerName); requestHeader.setBname(destBrokerName);
requestHeader.setTopic(topic); requestHeader.setTopic(topic);
requestHeader.setQueueId(queueId); requestHeader.setQueueId(queueId);
requestHeader.setPhysical(physical); requestHeader.setLo(logic);
return requestHeader; return requestHeader;
} catch (Throwable t) { } catch (Throwable t) {
throw new RuntimeException(t); throw new RuntimeException(t);
......
...@@ -214,6 +214,7 @@ public class RpcClientImpl implements RpcClient { ...@@ -214,6 +214,7 @@ public class RpcClientImpl implements RpcClient {
} }
case ResponseCode.QUERY_NOT_FOUND: { case ResponseCode.QUERY_NOT_FOUND: {
rpcResponsePromise.setSuccess(new RpcResponse(responseCommand.getCode(), null, null)); rpcResponsePromise.setSuccess(new RpcResponse(responseCommand.getCode(), null, null));
break;
} }
default:{ default:{
rpcResponsePromise.setSuccess(new RpcResponse(new RpcException(responseCommand.getCode(), "unknown remote error"))); rpcResponsePromise.setSuccess(new RpcResponse(new RpcException(responseCommand.getCode(), "unknown remote error")));
......
...@@ -20,13 +20,13 @@ import org.apache.rocketmq.remoting.CommandCustomHeader; ...@@ -20,13 +20,13 @@ import org.apache.rocketmq.remoting.CommandCustomHeader;
public abstract class RpcRequestHeader implements CommandCustomHeader { public abstract class RpcRequestHeader implements CommandCustomHeader {
//the namespace name //the namespace name
protected String namespace; protected String ns;
//if the data has been namespaced //if the data has been namespaced
protected Boolean namespaced; protected Boolean nsd;
//the abstract remote addr name, usually the physical broker name //the abstract remote addr name, usually the physical broker name
protected String bname; protected String bname;
//oneway
protected Boolean oneway; protected Boolean oway;
public String getBname() { public String getBname() {
return bname; return bname;
...@@ -36,27 +36,27 @@ public abstract class RpcRequestHeader implements CommandCustomHeader { ...@@ -36,27 +36,27 @@ public abstract class RpcRequestHeader implements CommandCustomHeader {
this.bname = bname; this.bname = bname;
} }
public Boolean getOneway() { public Boolean getOway() {
return oneway; return oway;
} }
public void setOneway(Boolean oneway) { public void setOway(Boolean oway) {
this.oneway = oneway; this.oway = oway;
} }
public String getNamespace() { public String getNs() {
return namespace; return ns;
} }
public void setNamespace(String namespace) { public void setNs(String ns) {
this.namespace = namespace; this.ns = ns;
} }
public Boolean getNamespaced() { public Boolean getNsd() {
return namespaced; return nsd;
} }
public void setNamespaced(Boolean namespaced) { public void setNsd(Boolean nsd) {
this.namespaced = namespaced; this.nsd = nsd;
} }
} }
...@@ -34,7 +34,7 @@ public class RpcResponse { ...@@ -34,7 +34,7 @@ public class RpcResponse {
this.body = body; this.body = body;
} }
RpcResponse(RpcException rpcException) { public RpcResponse(RpcException rpcException) {
this.code = rpcException.getErrorCode(); this.code = rpcException.getErrorCode();
this.exception = rpcException; this.exception = rpcException;
} }
......
...@@ -16,20 +16,8 @@ ...@@ -16,20 +16,8 @@
*/ */
package org.apache.rocketmq.common.rpc; package org.apache.rocketmq.common.rpc;
public abstract class TopicQueueRequestHeader extends RpcRequestHeader { public abstract class TopicQueueRequestHeader extends TopicRequestHeader {
//Physical or Logical
protected Boolean physical;
public Boolean getPhysical() {
return physical;
}
public void setPhysical(Boolean physical) {
this.physical = physical;
}
public abstract String getTopic();
public abstract void setTopic(String topic);
public abstract Integer getQueueId(); public abstract Integer getQueueId();
public abstract void setQueueId(Integer queueId); public abstract void setQueueId(Integer queueId);
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.common.rpc;
public abstract class TopicRequestHeader extends RpcRequestHeader {
//logical
protected Boolean lo;
public abstract String getTopic();
public abstract void setTopic(String topic);
public Boolean getLo() {
return lo;
}
public void setLo(Boolean lo) {
this.lo = lo;
}
}
...@@ -22,9 +22,16 @@ import java.util.Map; ...@@ -22,9 +22,16 @@ import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.ForkJoinPool; import java.util.concurrent.ForkJoinPool;
import java.util.stream.Collectors;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.PosixParser;
import org.apache.log4j.Logger; import org.apache.log4j.Logger;
import org.apache.rocketmq.acl.common.AclUtils;
import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.admin.TopicStatsTable; import org.apache.rocketmq.common.admin.TopicStatsTable;
import org.apache.rocketmq.common.protocol.body.ClusterInfo; import org.apache.rocketmq.common.protocol.body.ClusterInfo;
import org.apache.rocketmq.common.protocol.route.BrokerData; import org.apache.rocketmq.common.protocol.route.BrokerData;
...@@ -34,9 +41,16 @@ import org.apache.rocketmq.common.statictopic.TopicQueueMappingUtils; ...@@ -34,9 +41,16 @@ import org.apache.rocketmq.common.statictopic.TopicQueueMappingUtils;
import org.apache.rocketmq.common.statictopic.TopicRemappingDetailWrapper; import org.apache.rocketmq.common.statictopic.TopicRemappingDetailWrapper;
import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig; import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
import org.apache.rocketmq.remoting.exception.RemotingException; import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.srvutil.ServerUtil;
import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer;
import org.apache.rocketmq.test.client.rmq.RMQNormalProducer;
import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListener;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.admin.MQAdminUtils; import org.apache.rocketmq.tools.admin.MQAdminUtils;
import org.apache.rocketmq.tools.command.CommandUtil; import org.apache.rocketmq.tools.command.CommandUtil;
import org.apache.rocketmq.tools.command.MQAdminStartup;
import org.apache.rocketmq.tools.command.topic.RemappingStaticTopicSubCommand;
import org.apache.rocketmq.tools.command.topic.UpdateStaticTopicSubCommand;
public class MQAdminTestUtils { public class MQAdminTestUtils {
private static Logger log = Logger.getLogger(MQAdminTestUtils.class); private static Logger log = Logger.getLogger(MQAdminTestUtils.class);
...@@ -231,5 +245,65 @@ public class MQAdminTestUtils { ...@@ -231,5 +245,65 @@ public class MQAdminTestUtils {
} }
public static void createStaticTopicWithCommand(String topic, int queueNum, Set<String> brokers, String cluster, String nameservers) throws Exception {
UpdateStaticTopicSubCommand cmd = new UpdateStaticTopicSubCommand();
Options options = ServerUtil.buildCommandlineOptions(new Options());
String[] args;
if (cluster != null) {
args = new String[] {
"-c", cluster,
"-t", topic,
"-qn", String.valueOf(queueNum),
"-n", nameservers
};
} else {
String brokerStr = String.join(",", brokers);
args = new String[] {
"-b", brokerStr,
"-t", topic,
"-qn", String.valueOf(queueNum),
"-n", nameservers
};
}
final CommandLine commandLine = ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), args, cmd.buildCommandlineOptions(options), new PosixParser());
if (null == commandLine) {
return;
}
if (commandLine.hasOption('n')) {
String namesrvAddr = commandLine.getOptionValue('n');
System.setProperty(MixAll.NAMESRV_ADDR_PROPERTY, namesrvAddr);
}
cmd.execute(commandLine, options, null);
}
public static void remappingStaticTopicWithCommand(String topic, Set<String> brokers, String cluster, String nameservers) throws Exception {
RemappingStaticTopicSubCommand cmd = new RemappingStaticTopicSubCommand();
Options options = ServerUtil.buildCommandlineOptions(new Options());
String[] args;
if (cluster != null) {
args = new String[] {
"-c", cluster,
"-t", topic,
"-n", nameservers
};
} else {
String brokerStr = String.join(",", brokers);
args = new String[] {
"-b", brokerStr,
"-t", topic,
"-n", nameservers
};
}
final CommandLine commandLine = ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), args, cmd.buildCommandlineOptions(options), new PosixParser());
if (null == commandLine) {
return;
}
if (commandLine.hasOption('n')) {
String namesrvAddr = commandLine.getOptionValue('n');
System.setProperty(MixAll.NAMESRV_ADDR_PROPERTY, namesrvAddr);
}
cmd.execute(commandLine, options, null);
}
} }
...@@ -268,6 +268,7 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner { ...@@ -268,6 +268,7 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
} }
} }
if (topicStatsTable.getOffsetTable().isEmpty()) { if (topicStatsTable.getOffsetTable().isEmpty()) {
throw new MQClientException("Not found the topic stats info", null); throw new MQClientException("Not found the topic stats info", null);
} }
......
...@@ -20,13 +20,12 @@ import org.apache.commons.cli.CommandLine; ...@@ -20,13 +20,12 @@ import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option; import org.apache.commons.cli.Option;
import org.apache.commons.cli.OptionGroup; import org.apache.commons.cli.OptionGroup;
import org.apache.commons.cli.Options; import org.apache.commons.cli.Options;
import org.apache.rocketmq.client.MQAdmin;
import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.protocol.body.ClusterInfo;
import org.apache.rocketmq.common.rpc.ClientMetadata;
import org.apache.rocketmq.common.statictopic.TopicConfigAndQueueMapping; import org.apache.rocketmq.common.statictopic.TopicConfigAndQueueMapping;
import org.apache.rocketmq.common.statictopic.TopicQueueMappingOne; import org.apache.rocketmq.common.statictopic.TopicQueueMappingOne;
import org.apache.rocketmq.common.statictopic.TopicQueueMappingUtils; import org.apache.rocketmq.common.statictopic.TopicQueueMappingUtils;
import org.apache.rocketmq.common.protocol.body.ClusterInfo;
import org.apache.rocketmq.common.rpc.ClientMetadata;
import org.apache.rocketmq.common.statictopic.TopicRemappingDetailWrapper; import org.apache.rocketmq.common.statictopic.TopicRemappingDetailWrapper;
import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.srvutil.ServerUtil; import org.apache.rocketmq.srvutil.ServerUtil;
...@@ -91,6 +90,7 @@ public class RemappingStaticTopicSubCommand implements SubCommand { ...@@ -91,6 +90,7 @@ public class RemappingStaticTopicSubCommand implements SubCommand {
defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis())); defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
try { try {
defaultMQAdminExt.start();
String topic = commandLine.getOptionValue('t').trim(); String topic = commandLine.getOptionValue('t').trim();
String mapFileName = commandLine.getOptionValue('f').trim(); String mapFileName = commandLine.getOptionValue('f').trim();
...@@ -137,6 +137,7 @@ public class RemappingStaticTopicSubCommand implements SubCommand { ...@@ -137,6 +137,7 @@ public class RemappingStaticTopicSubCommand implements SubCommand {
Set<String> targetBrokers = new HashSet<>(); Set<String> targetBrokers = new HashSet<>();
try { try {
defaultMQAdminExt.start();
if ((!commandLine.hasOption("b") && !commandLine.hasOption('c'))) { if ((!commandLine.hasOption("b") && !commandLine.hasOption('c'))) {
ServerUtil.printCommandLineHelp("mqadmin " + this.commandName(), options); ServerUtil.printCommandLineHelp("mqadmin " + this.commandName(), options);
return; return;
...@@ -148,6 +149,7 @@ public class RemappingStaticTopicSubCommand implements SubCommand { ...@@ -148,6 +149,7 @@ public class RemappingStaticTopicSubCommand implements SubCommand {
|| clusterInfo.getClusterAddrTable().isEmpty()) { || clusterInfo.getClusterAddrTable().isEmpty()) {
throw new RuntimeException("The Cluster info is empty"); throw new RuntimeException("The Cluster info is empty");
} }
clientMetadata.refreshClusterInfo(clusterInfo);
{ {
if (commandLine.hasOption("b")) { if (commandLine.hasOption("b")) {
String brokerStrs = commandLine.getOptionValue("b").trim(); String brokerStrs = commandLine.getOptionValue("b").trim();
......
...@@ -92,9 +92,9 @@ public class UpdateStaticTopicSubCommand implements SubCommand { ...@@ -92,9 +92,9 @@ public class UpdateStaticTopicSubCommand implements SubCommand {
RPCHook rpcHook) throws SubCommandException { RPCHook rpcHook) throws SubCommandException {
DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook); DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis())); defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
ClientMetadata clientMetadata = new ClientMetadata();
try { try {
defaultMQAdminExt.start();
String topic = commandLine.getOptionValue('t').trim(); String topic = commandLine.getOptionValue('t').trim();
String mapFileName = commandLine.getOptionValue('f').trim(); String mapFileName = commandLine.getOptionValue('f').trim();
String mapData = MixAll.file2String(mapFileName); String mapData = MixAll.file2String(mapFileName);
...@@ -139,6 +139,7 @@ public class UpdateStaticTopicSubCommand implements SubCommand { ...@@ -139,6 +139,7 @@ public class UpdateStaticTopicSubCommand implements SubCommand {
Set<String> targetBrokers = new HashSet<>(); Set<String> targetBrokers = new HashSet<>();
try { try {
defaultMQAdminExt.start();
if ((!commandLine.hasOption("b") && !commandLine.hasOption('c')) if ((!commandLine.hasOption("b") && !commandLine.hasOption('c'))
|| !commandLine.hasOption("qn")) { || !commandLine.hasOption("qn")) {
ServerUtil.printCommandLineHelp("mqadmin " + this.commandName(), options); ServerUtil.printCommandLineHelp("mqadmin " + this.commandName(), options);
......