提交 f7f32e70 编写于 作者: D dongeforever

Add the test for remapping static topic

上级 c2c56eab
...@@ -492,11 +492,11 @@ public class BrokerController { ...@@ -492,11 +492,11 @@ public class BrokerController {
this.scheduledExecutorService.scheduleAtFixedRate(() -> { this.scheduledExecutorService.scheduleAtFixedRate(() -> {
try { try {
BrokerController.this.refreshBrokerNameMapping(); BrokerController.this.brokerOuterAPI.refreshMetadata();
} catch (Exception e) { } catch (Exception e) {
log.error("ScheduledTask examineBrokerClusterInfo exception", e); log.error("ScheduledTask refresh metadata exception", e);
} }
}, 10, 10, TimeUnit.SECONDS); }, 1, 5, TimeUnit.SECONDS);
if (!messageStoreConfig.isEnableDLegerCommitLog()) { if (!messageStoreConfig.isEnableDLegerCommitLog()) {
if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) { if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {
...@@ -624,13 +624,6 @@ public class BrokerController { ...@@ -624,13 +624,6 @@ public class BrokerController {
} }
} }
private void refreshBrokerNameMapping() throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
ClusterInfo brokerClusterInfo = this.brokerOuterAPI.getBrokerClusterInfo();
brokerClusterInfo.getBrokerAddrTable().forEach((brokerName, data) -> {
String masterBrokerAddr = data.getBrokerAddrs().get(MixAll.MASTER_ID);
this.brokerName2AddrMap.put(brokerName, masterBrokerAddr);
});
}
public String getBrokerAddrByName(String brokerName) { public String getBrokerAddrByName(String brokerName) {
return this.brokerName2AddrMap.get(brokerName); return this.brokerName2AddrMap.get(brokerName);
......
...@@ -24,6 +24,7 @@ import java.util.concurrent.CopyOnWriteArrayList; ...@@ -24,6 +24,7 @@ import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import com.alibaba.fastjson.JSON;
import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.latency.BrokerFixedThreadPoolExecutor; import org.apache.rocketmq.broker.latency.BrokerFixedThreadPoolExecutor;
import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQBrokerException;
...@@ -477,6 +478,12 @@ public class BrokerOuterAPI { ...@@ -477,6 +478,12 @@ public class BrokerOuterAPI {
} }
public void refreshMetadata() throws Exception {
ClusterInfo brokerClusterInfo = getBrokerClusterInfo();
clientMetadata.refreshClusterInfo(brokerClusterInfo);
}
public ClientMetadata getClientMetadata() { public ClientMetadata getClientMetadata() {
return clientMetadata; return clientMetadata;
} }
......
...@@ -621,13 +621,16 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements ...@@ -621,13 +621,16 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
if (mappingContext.getMappingDetail() == null) { if (mappingContext.getMappingDetail() == null) {
return null; return null;
} }
if (requestHeader.getPhysical() != null && requestHeader.getPhysical()) {
return null;
}
TopicQueueMappingDetail mappingDetail = mappingContext.getMappingDetail(); TopicQueueMappingDetail mappingDetail = mappingContext.getMappingDetail();
LogicQueueMappingItem mappingItem = mappingContext.getMappingItem(); List<LogicQueueMappingItem> mappingItems = mappingContext.getMappingItemList();
if (mappingItem == null if (mappingItems == null
|| !mappingDetail.getBname().equals(mappingItem.getBname())) { || mappingItems.isEmpty()) {
return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE, String.format("%s-%d does not exit in request process of current broker %s", mappingContext.getTopic(), mappingContext.getGlobalId(), mappingDetail.getBname())); return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE, String.format("%s-%d does not exit in request process of current broker %s", mappingContext.getTopic(), mappingContext.getGlobalId(), mappingDetail.getBname()));
} }
List<LogicQueueMappingItem> mappingItems = mappingContext.getMappingItemList();
//TODO should make sure the timestampOfOffset is equal or bigger than the searched timestamp //TODO should make sure the timestampOfOffset is equal or bigger than the searched timestamp
Long timestamp = requestHeader.getTimestamp(); Long timestamp = requestHeader.getTimestamp();
long offset = -1; long offset = -1;
...@@ -699,6 +702,10 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements ...@@ -699,6 +702,10 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
if (mappingContext.getMappingDetail() == null) { if (mappingContext.getMappingDetail() == null) {
return null; return null;
} }
if (requestHeader.getPhysical() != null && requestHeader.getPhysical()) {
return null;
}
TopicQueueMappingDetail mappingDetail = mappingContext.getMappingDetail(); TopicQueueMappingDetail mappingDetail = mappingContext.getMappingDetail();
LogicQueueMappingItem mappingItem = mappingContext.getMappingItem(); LogicQueueMappingItem mappingItem = mappingContext.getMappingItem();
if (mappingItem == null if (mappingItem == null
...@@ -743,10 +750,14 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements ...@@ -743,10 +750,14 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
if (mappingContext.getMappingDetail() == null) { if (mappingContext.getMappingDetail() == null) {
return null; return null;
} }
if (requestHeader.getPhysical() != null && requestHeader.getPhysical()) {
return null;
}
TopicQueueMappingDetail mappingDetail = mappingContext.getMappingDetail(); TopicQueueMappingDetail mappingDetail = mappingContext.getMappingDetail();
LogicQueueMappingItem mappingItem = mappingContext.getMappingItem(); LogicQueueMappingItem mappingItem = mappingContext.getMappingItem();
if (mappingItem == null if (mappingItem == null) {
|| !mappingDetail.getBname().equals(mappingItem.getBname())) { //this may not
return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE, String.format("%s-%d does not exit in request process of current broker %s", mappingContext.getTopic(), mappingContext.getGlobalId(), mappingDetail.getBname())); return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE, String.format("%s-%d does not exit in request process of current broker %s", mappingContext.getTopic(), mappingContext.getGlobalId(), mappingDetail.getBname()));
}; };
try { try {
...@@ -774,7 +785,9 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements ...@@ -774,7 +785,9 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
response.setCode(ResponseCode.SUCCESS); response.setCode(ResponseCode.SUCCESS);
response.setRemark(null); response.setRemark(null);
return response; return response;
}catch (Throwable t) { } catch (Throwable t) {
t.printStackTrace();
log.error("rewriteRequestForStaticTopic failed", t);
return buildErrorResponse(ResponseCode.SYSTEM_ERROR, t.getMessage()); return buildErrorResponse(ResponseCode.SYSTEM_ERROR, t.getMessage());
} }
} }
...@@ -786,6 +799,7 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements ...@@ -786,6 +799,7 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
final GetMinOffsetRequestHeader requestHeader = final GetMinOffsetRequestHeader requestHeader =
(GetMinOffsetRequestHeader) request.decodeCommandCustomHeader(GetMinOffsetRequestHeader.class); (GetMinOffsetRequestHeader) request.decodeCommandCustomHeader(GetMinOffsetRequestHeader.class);
TopicQueueMappingContext mappingContext = this.brokerController.getTopicQueueMappingManager().buildTopicQueueMappingContext(requestHeader, false, 0L); TopicQueueMappingContext mappingContext = this.brokerController.getTopicQueueMappingManager().buildTopicQueueMappingContext(requestHeader, false, 0L);
RemotingCommand rewriteResult = rewriteRequestForStaticTopic(requestHeader, mappingContext); RemotingCommand rewriteResult = rewriteRequestForStaticTopic(requestHeader, mappingContext);
if (rewriteResult != null) { if (rewriteResult != null) {
...@@ -804,10 +818,12 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements ...@@ -804,10 +818,12 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
if (mappingContext.getMappingDetail() == null) { if (mappingContext.getMappingDetail() == null) {
return null; return null;
} }
if (requestHeader.getPhysical() != null && requestHeader.getPhysical()) {
return null;
}
TopicQueueMappingDetail mappingDetail = mappingContext.getMappingDetail(); TopicQueueMappingDetail mappingDetail = mappingContext.getMappingDetail();
LogicQueueMappingItem mappingItem = mappingContext.getMappingItem(); LogicQueueMappingItem mappingItem = mappingContext.getMappingItem();
if (mappingItem == null if (mappingItem == null) {
|| !mappingDetail.getBname().equals(mappingItem.getBname())) {
return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE, String.format("%s-%d does not exit in request process of current broker %s", mappingContext.getTopic(), mappingContext.getGlobalId(), mappingDetail.getBname())); return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE, String.format("%s-%d does not exit in request process of current broker %s", mappingContext.getTopic(), mappingContext.getGlobalId(), mappingDetail.getBname()));
}; };
try { try {
......
...@@ -68,6 +68,7 @@ import org.apache.rocketmq.remoting.netty.NettyRequestProcessor; ...@@ -68,6 +68,7 @@ import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
import org.apache.rocketmq.remoting.netty.RequestTask; import org.apache.rocketmq.remoting.netty.RequestTask;
import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.store.GetMessageResult; import org.apache.rocketmq.store.GetMessageResult;
import org.apache.rocketmq.store.GetMessageStatus;
import org.apache.rocketmq.store.MessageExtBrokerInner; import org.apache.rocketmq.store.MessageExtBrokerInner;
import org.apache.rocketmq.store.MessageFilter; import org.apache.rocketmq.store.MessageFilter;
import org.apache.rocketmq.store.PutMessageResult; import org.apache.rocketmq.store.PutMessageResult;
...@@ -78,6 +79,7 @@ import java.nio.ByteBuffer; ...@@ -78,6 +79,7 @@ import java.nio.ByteBuffer;
import java.util.List; import java.util.List;
import static org.apache.rocketmq.remoting.protocol.RemotingCommand.buildErrorResponse; import static org.apache.rocketmq.remoting.protocol.RemotingCommand.buildErrorResponse;
import static org.apache.rocketmq.remoting.protocol.RemotingCommand.decode;
public class PullMessageProcessor extends AsyncNettyRequestProcessor implements NettyRequestProcessor { public class PullMessageProcessor extends AsyncNettyRequestProcessor implements NettyRequestProcessor {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
...@@ -134,6 +136,11 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements ...@@ -134,6 +136,11 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements
&& requestHeader.getMaxMsgNums() != null) { && requestHeader.getMaxMsgNums() != null) {
requestHeader.setMaxMsgNums((int) Math.min(mappingItem.getEndOffset() - mappingItem.getStartOffset(), requestHeader.getMaxMsgNums())); requestHeader.setMaxMsgNums((int) Math.min(mappingItem.getEndOffset() - mappingItem.getStartOffset(), requestHeader.getMaxMsgNums()));
} }
int sysFlag = requestHeader.getSysFlag();
if (!mappingContext.isLeader()) {
sysFlag = PullSysFlag.clearCommitOffsetFlag(sysFlag);
requestHeader.setSysFlag(sysFlag);
}
if (mappingDetail.getBname().equals(bname)) { if (mappingDetail.getBname().equals(bname)) {
//just let it go, do the local pull process //just let it go, do the local pull process
...@@ -142,6 +149,9 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements ...@@ -142,6 +149,9 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements
requestHeader.setPhysical(true); requestHeader.setPhysical(true);
requestHeader.setBname(bname); requestHeader.setBname(bname);
sysFlag = PullSysFlag.clearSuspendFlag(sysFlag);
sysFlag = PullSysFlag.clearCommitOffsetFlag(sysFlag);
requestHeader.setSysFlag(sysFlag);
RpcRequest rpcRequest = new RpcRequest(RequestCode.PULL_MESSAGE, requestHeader, null); RpcRequest rpcRequest = new RpcRequest(RequestCode.PULL_MESSAGE, requestHeader, null);
RpcResponse rpcResponse = this.brokerController.getBrokerOuterAPI().getRpcClient().invoke(rpcRequest, this.brokerController.getBrokerConfig().getForwardTimeout()).get(); RpcResponse rpcResponse = this.brokerController.getBrokerOuterAPI().getRpcClient().invoke(rpcRequest, this.brokerController.getBrokerConfig().getForwardTimeout()).get();
if (rpcResponse.getException() != null) { if (rpcResponse.getException() != null) {
...@@ -150,7 +160,7 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements ...@@ -150,7 +160,7 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements
PullMessageResponseHeader responseHeader = (PullMessageResponseHeader) rpcResponse.getHeader(); PullMessageResponseHeader responseHeader = (PullMessageResponseHeader) rpcResponse.getHeader();
{ {
RemotingCommand rewriteResult = rewriteResponseForStaticTopic(requestHeader, responseHeader, mappingContext); RemotingCommand rewriteResult = rewriteResponseForStaticTopic(requestHeader, responseHeader, mappingContext, rpcResponse.getCode());
if (rewriteResult != null) { if (rewriteResult != null) {
return rewriteResult; return rewriteResult;
} }
...@@ -161,35 +171,73 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements ...@@ -161,35 +171,73 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements
} }
} }
private RemotingCommand rewriteResponseForStaticTopic(PullMessageRequestHeader requestHeader, PullMessageResponseHeader responseHeader, TopicQueueMappingContext mappingContext) { private RemotingCommand rewriteResponseForStaticTopic(PullMessageRequestHeader requestHeader, PullMessageResponseHeader responseHeader,
TopicQueueMappingContext mappingContext, final int code) {
try { try {
if (mappingContext == null) { if (mappingContext == null) {
return null; return null;
} }
TopicQueueMappingDetail mappingDetail = mappingContext.getMappingDetail(); TopicQueueMappingDetail mappingDetail = mappingContext.getMappingDetail();
LogicQueueMappingItem mappingItem = mappingContext.getMappingItem(); LogicQueueMappingItem mappingItem = mappingContext.getMappingItem();
//handle nextBeginOffset
{ long requestOffset = requestHeader.getQueueOffset();
long nextBeginOffset = responseHeader.getNextBeginOffset(); long nextBeginOffset = responseHeader.getNextBeginOffset();
assert nextBeginOffset >= requestHeader.getQueueOffset(); long minOffset = responseHeader.getMinOffset();
//the next begin offset should no more than the end offset long maxOffset = responseHeader.getMaxOffset();
if (mappingItem.checkIfEndOffsetDecided() int responseCode = code;
&& nextBeginOffset >= mappingItem.getEndOffset()) { if (responseCode != ResponseCode.SUCCESS
nextBeginOffset = mappingItem.getEndOffset(); && responseCode != ResponseCode.PULL_RETRY_IMMEDIATELY) {
if (mappingContext.isLeader()) {
if (requestOffset < minOffset) {
nextBeginOffset = minOffset;
responseCode = ResponseCode.PULL_NOT_FOUND;
} else if (requestOffset > maxOffset) {
responseCode = ResponseCode.PULL_OFFSET_MOVED;
} else if (requestOffset == maxOffset) {
responseCode = ResponseCode.PULL_NOT_FOUND;
} else {
//let it go
}
} else {
if (requestOffset < minOffset) {
nextBeginOffset = minOffset;
responseCode = ResponseCode.PULL_NOT_FOUND;
} else if (requestOffset >= maxOffset) {
responseCode = ResponseCode.PULL_NOT_FOUND;
//just move to another item
mappingItem = mappingContext.findNext();
assert mappingItem != null;
nextBeginOffset = mappingItem.getStartOffset();
minOffset = mappingItem.getStartOffset();
maxOffset = minOffset;
}
} }
responseHeader.setNextBeginOffset(mappingItem.computeStaticQueueOffsetUpToEnd(nextBeginOffset));
} }
//handle nextBeginOffset
//the next begin offset should no more than the end offset
if (mappingItem.checkIfEndOffsetDecided()
&& nextBeginOffset >= mappingItem.getEndOffset()) {
nextBeginOffset = mappingItem.getEndOffset();
}
responseHeader.setNextBeginOffset(mappingItem.computeStaticQueueOffsetUpToEnd(nextBeginOffset));
//handle min offset //handle min offset
responseHeader.setMinOffset(mappingItem.computeStaticQueueOffsetUpToEnd(Math.max(mappingItem.getStartOffset(), responseHeader.getMinOffset()))); responseHeader.setMinOffset(mappingItem.computeStaticQueueOffsetUpToEnd(Math.max(mappingItem.getStartOffset(), minOffset)));
//handle max offset //handle max offset
responseHeader.setMaxOffset(Math.max(mappingItem.computeStaticQueueOffsetUpToEnd(responseHeader.getMaxOffset()), responseHeader.setMaxOffset(Math.max(mappingItem.computeStaticQueueOffsetUpToEnd(maxOffset),
TopicQueueMappingDetail.computeMaxOffsetFromMapping(mappingDetail, mappingContext.getGlobalId()))); TopicQueueMappingDetail.computeMaxOffsetFromMapping(mappingDetail, mappingContext.getGlobalId())));
//set the offsetDelta //set the offsetDelta
responseHeader.setOffsetDelta(mappingItem.computeOffsetDelta()); responseHeader.setOffsetDelta(mappingItem.computeOffsetDelta());
if (code != ResponseCode.SUCCESS
&& code != ResponseCode.PULL_RETRY_IMMEDIATELY) {
return RemotingCommand.createResponseCommandWithHeader(responseCode, responseHeader);
} else {
return null;
}
} catch (Throwable t) { } catch (Throwable t) {
return buildErrorResponse(ResponseCode.SYSTEM_ERROR, t.getMessage()); return buildErrorResponse(ResponseCode.SYSTEM_ERROR, t.getMessage());
} }
return null;
} }
...@@ -440,13 +488,7 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements ...@@ -440,13 +488,7 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements
break; break;
} }
//rewrite the response for the
{
RemotingCommand rewriteResult = rewriteResponseForStaticTopic(requestHeader, responseHeader, mappingContext);
if (rewriteResult != null) {
return rewriteResult;
}
}
if (this.hasConsumeMessageHook()) { if (this.hasConsumeMessageHook()) {
...@@ -491,6 +533,12 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements ...@@ -491,6 +533,12 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements
this.executeConsumeMessageHookBefore(context); this.executeConsumeMessageHookBefore(context);
} }
//rewrite the response for the
RemotingCommand rewriteResult = rewriteResponseForStaticTopic(requestHeader, responseHeader, mappingContext, response.getCode());
if (rewriteResult != null) {
response = rewriteResult;
}
switch (response.getCode()) { switch (response.getCode()) {
case ResponseCode.SUCCESS: case ResponseCode.SUCCESS:
......
...@@ -82,6 +82,7 @@ public class TopicQueueMappingManager extends ConfigManager { ...@@ -82,6 +82,7 @@ public class TopicQueueMappingManager extends ConfigManager {
return; return;
} }
if (force) { if (force) {
//bakeup the old items
oldDetail.getHostedQueues().forEach( (queueId, items) -> { oldDetail.getHostedQueues().forEach( (queueId, items) -> {
newDetail.getHostedQueues().putIfAbsent(queueId, items); newDetail.getHostedQueues().putIfAbsent(queueId, items);
}); });
...@@ -90,17 +91,21 @@ public class TopicQueueMappingManager extends ConfigManager { ...@@ -90,17 +91,21 @@ public class TopicQueueMappingManager extends ConfigManager {
return; return;
} }
//do more check //do more check
if (newDetail.getEpoch() <= oldDetail.getEpoch()) { if (newDetail.getEpoch() < oldDetail.getEpoch()) {
throw new RuntimeException(String.format("Can't accept data with small epoch %d < %d", newDetail.getEpoch(), oldDetail.getEpoch())); throw new RuntimeException(String.format("Can't accept data with small epoch %d < %d", newDetail.getEpoch(), oldDetail.getEpoch()));
} }
boolean epochEqual = newDetail.getEpoch() == oldDetail.getEpoch();
for (Integer globalId : oldDetail.getHostedQueues().keySet()) { for (Integer globalId : oldDetail.getHostedQueues().keySet()) {
List<LogicQueueMappingItem> oldItems = oldDetail.getHostedQueues().get(globalId); List<LogicQueueMappingItem> oldItems = oldDetail.getHostedQueues().get(globalId);
List<LogicQueueMappingItem> newItems = newDetail.getHostedQueues().get(globalId); List<LogicQueueMappingItem> newItems = newDetail.getHostedQueues().get(globalId);
if (newItems == null) { if (newItems == null) {
//keep the old if (epochEqual) {
newDetail.getHostedQueues().put(globalId, oldItems); throw new RuntimeException("Cannot accept equal epoch with null data");
} else {
newDetail.getHostedQueues().put(globalId, oldItems);
}
} else { } else {
TopicQueueMappingUtils.makeSureLogicQueueMappingItemImmutable(oldItems, newItems); TopicQueueMappingUtils.makeSureLogicQueueMappingItemImmutable(oldItems, newItems, epochEqual);
} }
} }
topicQueueMappingTable.put(newDetail.getTopic(), newDetail); topicQueueMappingTable.put(newDetail.getTopic(), newDetail);
......
...@@ -36,6 +36,8 @@ import java.util.concurrent.TimeUnit; ...@@ -36,6 +36,8 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantLock;
import com.alibaba.fastjson.JSON;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.ClientConfig; import org.apache.rocketmq.client.ClientConfig;
import org.apache.rocketmq.client.admin.MQAdminExtInner; import org.apache.rocketmq.client.admin.MQAdminExtInner;
......
package org.apache.rocketmq.common.rpc; package org.apache.rocketmq.common.rpc;
import com.alibaba.fastjson.JSON;
import io.netty.util.concurrent.ImmediateEventExecutor; import io.netty.util.concurrent.ImmediateEventExecutor;
import io.netty.util.concurrent.Promise; import io.netty.util.concurrent.Promise;
import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.message.MessageQueue;
...@@ -65,6 +66,15 @@ public class RpcClientImpl implements RpcClient { ...@@ -65,6 +66,15 @@ public class RpcClientImpl implements RpcClient {
case RequestCode.PULL_MESSAGE: case RequestCode.PULL_MESSAGE:
rpcResponsePromise = handlePullMessage(addr, request, timeoutMs); rpcResponsePromise = handlePullMessage(addr, request, timeoutMs);
break; break;
case RequestCode.GET_MIN_OFFSET:
rpcResponsePromise = handleGetMinOffset(addr, request, timeoutMs);
break;
case RequestCode.SEARCH_OFFSET_BY_TIMESTAMP:
rpcResponsePromise = handleSearchOffset(addr, request, timeoutMs);
break;
case RequestCode.GET_EARLIEST_MSG_STORETIME:
rpcResponsePromise = handleGetEarliestMsgStoretime(addr, request, timeoutMs);
break;
default: default:
throw new RpcException(ResponseCode.REQUEST_CODE_NOT_SUPPORTED, "Unknown request code " + request.getCode()); throw new RpcException(ResponseCode.REQUEST_CODE_NOT_SUPPORTED, "Unknown request code " + request.getCode());
} }
...@@ -146,8 +156,9 @@ public class RpcClientImpl implements RpcClient { ...@@ -146,8 +156,9 @@ public class RpcClientImpl implements RpcClient {
return rpcResponsePromise; return rpcResponsePromise;
} }
public RpcResponse handleSearchOffset(String bname, RpcRequest rpcRequest, long timeoutMillis) throws Exception { public Promise<RpcResponse> handleSearchOffset(String addr, RpcRequest rpcRequest, long timeoutMillis) throws Exception {
String addr = getBrokerAddrByNameOrException(bname); final Promise<RpcResponse> rpcResponsePromise = createResponseFuture();
RemotingCommand requestCommand = RpcClientUtils.createCommandForRpcRequest(rpcRequest); RemotingCommand requestCommand = RpcClientUtils.createCommandForRpcRequest(rpcRequest);
RemotingCommand responseCommand = this.remotingClient.invokeSync(addr, requestCommand, timeoutMillis); RemotingCommand responseCommand = this.remotingClient.invokeSync(addr, requestCommand, timeoutMillis);
assert responseCommand != null; assert responseCommand != null;
...@@ -155,17 +166,18 @@ public class RpcClientImpl implements RpcClient { ...@@ -155,17 +166,18 @@ public class RpcClientImpl implements RpcClient {
case ResponseCode.SUCCESS: { case ResponseCode.SUCCESS: {
SearchOffsetResponseHeader responseHeader = SearchOffsetResponseHeader responseHeader =
(SearchOffsetResponseHeader) responseCommand.decodeCommandCustomHeader(SearchOffsetResponseHeader.class); (SearchOffsetResponseHeader) responseCommand.decodeCommandCustomHeader(SearchOffsetResponseHeader.class);
return new RpcResponse(responseCommand.getCode(), responseHeader, responseCommand.getBody()); rpcResponsePromise.setSuccess(new RpcResponse(responseCommand.getCode(), responseHeader, responseCommand.getBody()));
break;
} }
default:{ default:{
RpcResponse rpcResponse = new RpcResponse(new RpcException(responseCommand.getCode(), "unknown remote error")); rpcResponsePromise.setSuccess(new RpcResponse(new RpcException(responseCommand.getCode(), "unknown remote error")));
return rpcResponse;
} }
} }
return rpcResponsePromise;
} }
public RpcResponse handleGetMinOffset(String bname, RpcRequest rpcRequest, long timeoutMillis) throws Exception { public Promise<RpcResponse> handleGetMinOffset(String addr, RpcRequest rpcRequest, long timeoutMillis) throws Exception {
String addr = getBrokerAddrByNameOrException(bname); final Promise<RpcResponse> rpcResponsePromise = createResponseFuture();
RemotingCommand requestCommand = RpcClientUtils.createCommandForRpcRequest(rpcRequest); RemotingCommand requestCommand = RpcClientUtils.createCommandForRpcRequest(rpcRequest);
...@@ -175,17 +187,18 @@ public class RpcClientImpl implements RpcClient { ...@@ -175,17 +187,18 @@ public class RpcClientImpl implements RpcClient {
case ResponseCode.SUCCESS: { case ResponseCode.SUCCESS: {
GetMinOffsetResponseHeader responseHeader = GetMinOffsetResponseHeader responseHeader =
(GetMinOffsetResponseHeader) responseCommand.decodeCommandCustomHeader(GetMinOffsetResponseHeader.class); (GetMinOffsetResponseHeader) responseCommand.decodeCommandCustomHeader(GetMinOffsetResponseHeader.class);
return new RpcResponse(responseCommand.getCode(), responseHeader, responseCommand.getBody()); rpcResponsePromise.setSuccess(new RpcResponse(responseCommand.getCode(), responseHeader, responseCommand.getBody()));
break;
} }
default:{ default:{
RpcResponse rpcResponse = new RpcResponse(new RpcException(responseCommand.getCode(), "unknown remote error")); rpcResponsePromise.setSuccess(new RpcResponse(new RpcException(responseCommand.getCode(), "unknown remote error")));
return rpcResponse;
} }
} }
return rpcResponsePromise;
} }
public RpcResponse handleGetEarliestMsgStoretime(String bname, RpcRequest rpcRequest, long timeoutMillis) throws Exception { public Promise<RpcResponse> handleGetEarliestMsgStoretime(String addr, RpcRequest rpcRequest, long timeoutMillis) throws Exception {
String addr = getBrokerAddrByNameOrException(bname); final Promise<RpcResponse> rpcResponsePromise = createResponseFuture();
RemotingCommand requestCommand = RpcClientUtils.createCommandForRpcRequest(rpcRequest); RemotingCommand requestCommand = RpcClientUtils.createCommandForRpcRequest(rpcRequest);
...@@ -195,14 +208,14 @@ public class RpcClientImpl implements RpcClient { ...@@ -195,14 +208,14 @@ public class RpcClientImpl implements RpcClient {
case ResponseCode.SUCCESS: { case ResponseCode.SUCCESS: {
GetEarliestMsgStoretimeResponseHeader responseHeader = GetEarliestMsgStoretimeResponseHeader responseHeader =
(GetEarliestMsgStoretimeResponseHeader) responseCommand.decodeCommandCustomHeader(GetEarliestMsgStoretimeResponseHeader.class); (GetEarliestMsgStoretimeResponseHeader) responseCommand.decodeCommandCustomHeader(GetEarliestMsgStoretimeResponseHeader.class);
return new RpcResponse(responseCommand.getCode(), responseHeader, responseCommand.getBody()); rpcResponsePromise.setSuccess(new RpcResponse(responseCommand.getCode(), responseHeader, responseCommand.getBody()));
break;
} }
default:{ default:{
RpcResponse rpcResponse = new RpcResponse(new RpcException(responseCommand.getCode(), "unknown remote error")); rpcResponsePromise.setSuccess(new RpcResponse(new RpcException(responseCommand.getCode(), "unknown remote error")));
return rpcResponse;
} }
} }
return rpcResponsePromise;
} }
} }
...@@ -21,6 +21,9 @@ public class RpcClientUtils { ...@@ -21,6 +21,9 @@ public class RpcClientUtils {
} }
public static byte[] encodeBody(Object body) { public static byte[] encodeBody(Object body) {
if (body == null) {
return null;
}
if (body instanceof byte[]) { if (body instanceof byte[]) {
return (byte[])body; return (byte[])body;
} else if (body instanceof RemotingSerializable) { } else if (body instanceof RemotingSerializable) {
......
...@@ -35,6 +35,7 @@ public class TopicQueueMappingContext { ...@@ -35,6 +35,7 @@ public class TopicQueueMappingContext {
this.mappingDetail = mappingDetail; this.mappingDetail = mappingDetail;
this.mappingItemList = mappingItemList; this.mappingItemList = mappingItemList;
this.mappingItem = mappingItem; this.mappingItem = mappingItem;
} }
public boolean checkIfAsPhysical() { public boolean checkIfAsPhysical() {
...@@ -43,6 +44,37 @@ public class TopicQueueMappingContext { ...@@ -43,6 +44,37 @@ public class TopicQueueMappingContext {
|| (mappingItemList.size() == 1 && mappingItemList.get(0).getLogicOffset() == 0); || (mappingItemList.size() == 1 && mappingItemList.get(0).getLogicOffset() == 0);
} }
public boolean isLeader() {
if (mappingDetail == null
|| mappingItemList == null
|| mappingItemList.isEmpty()) {
return false;
}
LogicQueueMappingItem mappingItem = mappingItemList.get(mappingItemList.size() - 1);
return mappingItem.getBname().equals(mappingDetail.getBname());
}
public LogicQueueMappingItem findNext() {
if (mappingDetail == null
|| mappingItem == null
|| mappingItemList == null
|| mappingItemList.isEmpty()) {
return null;
}
for (int i = 0; i < mappingItemList.size(); i++) {
LogicQueueMappingItem item = mappingItemList.get(i);
if (item.getGen() == mappingItem.getGen()) {
if (i < mappingItemList.size() - 1) {
return mappingItemList.get(i + 1);
} else {
return null;
}
}
}
return null;
}
public String getTopic() { public String getTopic() {
return topic; return topic;
} }
...@@ -90,4 +122,6 @@ public class TopicQueueMappingContext { ...@@ -90,4 +122,6 @@ public class TopicQueueMappingContext {
public void setMappingItem(LogicQueueMappingItem mappingItem) { public void setMappingItem(LogicQueueMappingItem mappingItem) {
this.mappingItem = mappingItem; this.mappingItem = mappingItem;
} }
} }
...@@ -85,7 +85,8 @@ public class TopicQueueMappingDetail extends TopicQueueMappingInfo { ...@@ -85,7 +85,8 @@ public class TopicQueueMappingDetail extends TopicQueueMappingInfo {
//Could use bi-search to polish performance //Could use bi-search to polish performance
for (int i = mappingItems.size() - 1; i >= 0; i--) { for (int i = mappingItems.size() - 1; i >= 0; i--) {
LogicQueueMappingItem item = mappingItems.get(i); LogicQueueMappingItem item = mappingItems.get(i);
if (logicOffset >= item.getLogicOffset()) { if (item.getLogicOffset() >= 0
&& logicOffset >= item.getLogicOffset()) {
return item; return item;
} }
} }
......
...@@ -36,6 +36,8 @@ import java.util.Set; ...@@ -36,6 +36,8 @@ import java.util.Set;
public class TopicQueueMappingUtils { public class TopicQueueMappingUtils {
public static final int DEFAULT_BLOCK_SEQ_SIZE = 10000;
public static class MappingAllocator { public static class MappingAllocator {
Map<String, Integer> brokerNumMap = new HashMap<String, Integer>(); Map<String, Integer> brokerNumMap = new HashMap<String, Integer>();
Map<Integer, String> idToBroker = new HashMap<Integer, String>(); Map<Integer, String> idToBroker = new HashMap<Integer, String>();
...@@ -191,7 +193,7 @@ public class TopicQueueMappingUtils { ...@@ -191,7 +193,7 @@ public class TopicQueueMappingUtils {
return new AbstractMap.SimpleEntry<Long, Integer>(maxEpoch, maxNum); return new AbstractMap.SimpleEntry<Long, Integer>(maxEpoch, maxNum);
} }
public static void makeSureLogicQueueMappingItemImmutable(List<LogicQueueMappingItem> oldItems, List<LogicQueueMappingItem> newItems) { public static void makeSureLogicQueueMappingItemImmutable(List<LogicQueueMappingItem> oldItems, List<LogicQueueMappingItem> newItems, boolean epochEqual) {
if (oldItems == null || oldItems.isEmpty()) { if (oldItems == null || oldItems.isEmpty()) {
return; return;
} }
...@@ -218,6 +220,16 @@ public class TopicQueueMappingUtils { ...@@ -218,6 +220,16 @@ public class TopicQueueMappingUtils {
inew++; inew++;
} }
} }
if (epochEqual) {
LogicQueueMappingItem oldLeader = oldItems.get(oldItems.size() - 1);
LogicQueueMappingItem newLeader = newItems.get(newItems.size() - 1);
if (newLeader.getGen() != oldLeader.getGen()
|| !newLeader.getBname().equals(oldLeader.getBname())
|| newLeader.getQueueId() != oldLeader.getQueueId()
|| newLeader.getStartOffset() != oldLeader.getStartOffset()) {
throw new RuntimeException("The new leader is different but epoch equal");
}
}
} }
......
...@@ -69,6 +69,10 @@ public class PullSysFlag { ...@@ -69,6 +69,10 @@ public class PullSysFlag {
return (sysFlag & FLAG_SUSPEND) == FLAG_SUSPEND; return (sysFlag & FLAG_SUSPEND) == FLAG_SUSPEND;
} }
public static int clearSuspendFlag(final int sysFlag) {
return sysFlag & (~FLAG_SUSPEND);
}
public static boolean hasSubscriptionFlag(final int sysFlag) { public static boolean hasSubscriptionFlag(final int sysFlag) {
return (sysFlag & FLAG_SUBSCRIPTION) == FLAG_SUBSCRIPTION; return (sysFlag & FLAG_SUBSCRIPTION) == FLAG_SUBSCRIPTION;
} }
......
...@@ -4,9 +4,11 @@ import com.alibaba.fastjson.JSON; ...@@ -4,9 +4,11 @@ import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.JSONObject;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import org.apache.rocketmq.common.protocol.header.GetMinOffsetRequestHeader;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable; import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
import org.junit.experimental.theories.suppliers.TestedOn;
import java.util.Map; import java.util.Map;
...@@ -53,4 +55,9 @@ public class TopicQueueMappingTest { ...@@ -53,4 +55,9 @@ public class TopicQueueMappingTest {
Assert.assertEquals(mappingDetailJson, RemotingSerializable.toJson(mappingDetailFromJson, false)); Assert.assertEquals(mappingDetailJson, RemotingSerializable.toJson(mappingDetailFromJson, false));
} }
} }
@Test
public void test() {
}
} }
...@@ -13,7 +13,7 @@ import java.util.Map; ...@@ -13,7 +13,7 @@ import java.util.Map;
import java.util.Random; import java.util.Random;
import java.util.Set; import java.util.Set;
public class TopicMappingUtilsTest { public class TopicQueueMappingUtilsTest {
private Set<String> buildTargetBrokers(int num) { private Set<String> buildTargetBrokers(int num) {
......
...@@ -28,8 +28,13 @@ import java.lang.annotation.Annotation; ...@@ -28,8 +28,13 @@ import java.lang.annotation.Annotation;
import java.lang.reflect.Field; import java.lang.reflect.Field;
import java.lang.reflect.Modifier; import java.lang.reflect.Modifier;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
public class RemotingCommand { public class RemotingCommand {
...@@ -313,11 +318,17 @@ public class RemotingCommand { ...@@ -313,11 +318,17 @@ public class RemotingCommand {
return objectHeader; return objectHeader;
} }
private Field[] getClazzFields(Class<? extends CommandCustomHeader> classHeader) { //make it able to test
Field[] getClazzFields(Class<? extends CommandCustomHeader> classHeader) {
Field[] field = CLASS_HASH_MAP.get(classHeader); Field[] field = CLASS_HASH_MAP.get(classHeader);
if (field == null) { if (field == null) {
field = classHeader.getDeclaredFields(); Set<Field> fieldList = new HashSet<Field>();
for (Class className = classHeader; className != Object.class; className = className.getSuperclass()) {
Field[] fields = className.getDeclaredFields();
fieldList.addAll(Arrays.asList(fields));
}
field = fieldList.toArray(new Field[0]);
synchronized (CLASS_HASH_MAP) { synchronized (CLASS_HASH_MAP) {
CLASS_HASH_MAP.put(classHeader, field); CLASS_HASH_MAP.put(classHeader, field);
} }
......
...@@ -19,9 +19,14 @@ package org.apache.rocketmq.remoting.protocol; ...@@ -19,9 +19,14 @@ package org.apache.rocketmq.remoting.protocol;
import java.lang.reflect.Field; import java.lang.reflect.Field;
import java.lang.reflect.Method; import java.lang.reflect.Method;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.HashSet;
import java.util.Set;
import com.alibaba.fastjson.JSON;
import org.apache.rocketmq.remoting.CommandCustomHeader; import org.apache.rocketmq.remoting.CommandCustomHeader;
import org.apache.rocketmq.remoting.annotation.CFNotNull; import org.apache.rocketmq.remoting.annotation.CFNotNull;
import org.apache.rocketmq.remoting.exception.RemotingCommandException; import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThat;
...@@ -198,6 +203,28 @@ public class RemotingCommandTest { ...@@ -198,6 +203,28 @@ public class RemotingCommandTest {
Field value = FieldTestClass.class.getDeclaredField("value"); Field value = FieldTestClass.class.getDeclaredField("value");
assertThat(method.invoke(remotingCommand, value)).isEqualTo(false); assertThat(method.invoke(remotingCommand, value)).isEqualTo(false);
} }
@Test
public void testParentField() throws Exception {
SubExtFieldsHeader subExtFieldsHeader = new SubExtFieldsHeader();
RemotingCommand remotingCommand = RemotingCommand.createRequestCommand(1, subExtFieldsHeader);
Field[] fields = remotingCommand.getClazzFields(subExtFieldsHeader.getClass());
Assert.assertEquals(7, fields.length);
Set<String> names = new HashSet<>();
names.add("stringValue");
names.add("intValue");
names.add("longValue");
names.add("booleanValue");
names.add("doubleValue");
names.add("name");
names.add("value");
for (Field field : fields) {
Assert.assertTrue(names.contains(field.getName()));
}
remotingCommand.makeCustomHeaderToNet();
SubExtFieldsHeader other = (SubExtFieldsHeader) remotingCommand.decodeCommandCustomHeader(subExtFieldsHeader.getClass());
Assert.assertEquals(other, subExtFieldsHeader);
}
} }
class FieldTestClass { class FieldTestClass {
...@@ -246,4 +273,72 @@ class ExtFieldsHeader implements CommandCustomHeader { ...@@ -246,4 +273,72 @@ class ExtFieldsHeader implements CommandCustomHeader {
public double getDoubleValue() { public double getDoubleValue() {
return doubleValue; return doubleValue;
} }
}
\ No newline at end of file @Override
public boolean equals(Object o) {
if (this == o) return true;
if (!(o instanceof ExtFieldsHeader)) return false;
ExtFieldsHeader that = (ExtFieldsHeader) o;
if (intValue != that.intValue) return false;
if (longValue != that.longValue) return false;
if (booleanValue != that.booleanValue) return false;
if (Double.compare(that.doubleValue, doubleValue) != 0) return false;
return stringValue != null ? stringValue.equals(that.stringValue) : that.stringValue == null;
}
@Override
public int hashCode() {
int result;
long temp;
result = stringValue != null ? stringValue.hashCode() : 0;
result = 31 * result + intValue;
result = 31 * result + (int) (longValue ^ (longValue >>> 32));
result = 31 * result + (booleanValue ? 1 : 0);
temp = Double.doubleToLongBits(doubleValue);
result = 31 * result + (int) (temp ^ (temp >>> 32));
return result;
}
}
class SubExtFieldsHeader extends ExtFieldsHeader {
private String name = "12321";
private int value = 111;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public int getValue() {
return value;
}
public void setValue(int value) {
this.value = value;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (!(o instanceof SubExtFieldsHeader)) return false;
if (!super.equals(o)) return false;
SubExtFieldsHeader that = (SubExtFieldsHeader) o;
if (value != that.value) return false;
return name != null ? name.equals(that.name) : that.name == null;
}
@Override
public int hashCode() {
int result = super.hashCode();
result = 31 * result + (name != null ? name.hashCode() : 0);
result = 31 * result + value;
return result;
}
}
...@@ -49,6 +49,7 @@ public class RMQNormalConsumer extends AbstractMQConsumer { ...@@ -49,6 +49,7 @@ public class RMQNormalConsumer extends AbstractMQConsumer {
consumer = new DefaultMQPushConsumer(consumerGroup); consumer = new DefaultMQPushConsumer(consumerGroup);
consumer.setInstanceName(RandomUtil.getStringByUUID()); consumer.setInstanceName(RandomUtil.getStringByUUID());
consumer.setNamesrvAddr(nsAddr); consumer.setNamesrvAddr(nsAddr);
consumer.setPollNameServerInterval(100);
try { try {
consumer.subscribe(topic, subExpression); consumer.subscribe(topic, subExpression);
} catch (MQClientException e) { } catch (MQClientException e) {
...@@ -92,4 +93,8 @@ public class RMQNormalConsumer extends AbstractMQConsumer { ...@@ -92,4 +93,8 @@ public class RMQNormalConsumer extends AbstractMQConsumer {
create(); create();
start(); start();
} }
public DefaultMQPushConsumer getConsumer() {
return consumer;
}
} }
...@@ -73,6 +73,7 @@ public class RMQNormalProducer extends AbstractMQProducer { ...@@ -73,6 +73,7 @@ public class RMQNormalProducer extends AbstractMQProducer {
producer.setProducerGroup(getProducerGroupName()); producer.setProducerGroup(getProducerGroupName());
producer.setInstanceName(getProducerInstanceName()); producer.setInstanceName(getProducerInstanceName());
producer.setUseTLS(useTLS); producer.setUseTLS(useTLS);
producer.setPollNameServerInterval(100);
if (nsAddr != null) { if (nsAddr != null) {
producer.setNamesrvAddr(nsAddr); producer.setNamesrvAddr(nsAddr);
......
...@@ -100,7 +100,10 @@ public class BaseConf { ...@@ -100,7 +100,10 @@ public class BaseConf {
List<BrokerData> brokerDatas = mqAdminExt.examineTopicRouteInfo(clusterName).getBrokerDatas(); List<BrokerData> brokerDatas = mqAdminExt.examineTopicRouteInfo(clusterName).getBrokerDatas();
return brokerDatas.size() == brokerNum; return brokerDatas.size() == brokerNum;
}); });
} catch (MQClientException e) { for (BrokerController brokerController: brokerControllerList) {
brokerController.getBrokerOuterAPI().refreshMetadata();
}
} catch (Exception e) {
log.error("init failed, please check BaseConf"); log.error("init failed, please check BaseConf");
} }
ForkJoinPool.commonPool().execute(mqAdminExt::shutdown); ForkJoinPool.commonPool().execute(mqAdminExt::shutdown);
...@@ -126,6 +129,7 @@ public class BaseConf { ...@@ -126,6 +129,7 @@ public class BaseConf {
public static DefaultMQAdminExt getAdmin(String nsAddr) { public static DefaultMQAdminExt getAdmin(String nsAddr) {
final DefaultMQAdminExt mqAdminExt = new DefaultMQAdminExt(500); final DefaultMQAdminExt mqAdminExt = new DefaultMQAdminExt(500);
mqAdminExt.setNamesrvAddr(nsAddr); mqAdminExt.setNamesrvAddr(nsAddr);
mqAdminExt.setPollNameServerInterval(100);
mqClients.add(mqAdminExt); mqClients.add(mqAdminExt);
return mqAdminExt; return mqAdminExt;
} }
......
...@@ -2,6 +2,7 @@ package org.apache.rocketmq.test.smoke; ...@@ -2,6 +2,7 @@ package org.apache.rocketmq.test.smoke;
import org.apache.log4j.Logger; import org.apache.log4j.Logger;
import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageClientExt; import org.apache.rocketmq.common.message.MessageClientExt;
import org.apache.rocketmq.common.message.MessageConst; import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageExt;
...@@ -11,6 +12,7 @@ import org.apache.rocketmq.common.rpc.ClientMetadata; ...@@ -11,6 +12,7 @@ import org.apache.rocketmq.common.rpc.ClientMetadata;
import org.apache.rocketmq.common.statictopic.TopicConfigAndQueueMapping; import org.apache.rocketmq.common.statictopic.TopicConfigAndQueueMapping;
import org.apache.rocketmq.common.statictopic.TopicQueueMappingOne; import org.apache.rocketmq.common.statictopic.TopicQueueMappingOne;
import org.apache.rocketmq.common.statictopic.TopicQueueMappingUtils; import org.apache.rocketmq.common.statictopic.TopicQueueMappingUtils;
import org.apache.rocketmq.common.statictopic.TopicRemappingDetailWrapper;
import org.apache.rocketmq.test.base.BaseConf; import org.apache.rocketmq.test.base.BaseConf;
import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer; import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer;
import org.apache.rocketmq.test.client.rmq.RMQNormalProducer; import org.apache.rocketmq.test.client.rmq.RMQNormalProducer;
...@@ -23,6 +25,7 @@ import org.junit.Assert; ...@@ -23,6 +25,7 @@ import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.FixMethodOrder; import org.junit.FixMethodOrder;
import org.junit.Test; import org.junit.Test;
import sun.jvm.hotspot.runtime.aarch64.AARCH64CurrentFrameGuess;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
...@@ -61,7 +64,7 @@ public class StaticTopicIT extends BaseConf { ...@@ -61,7 +64,7 @@ public class StaticTopicIT extends BaseConf {
Map<String, TopicConfigAndQueueMapping> brokerConfigMap = defaultMQAdminExt.examineTopicConfigAll(clientMetadata, topic); Map<String, TopicConfigAndQueueMapping> brokerConfigMap = defaultMQAdminExt.examineTopicConfigAll(clientMetadata, topic);
Assert.assertTrue(brokerConfigMap.isEmpty()); Assert.assertTrue(brokerConfigMap.isEmpty());
TopicQueueMappingUtils.createTopicConfigMapping(topic, queueNum, targetBrokers, new HashSet<>(), brokerConfigMap); TopicQueueMappingUtils.createTopicConfigMapping(topic, queueNum, targetBrokers, new HashSet<>(), brokerConfigMap);
Assert.assertEquals(2, brokerConfigMap.size()); Assert.assertEquals(targetBrokers.size(), brokerConfigMap.size());
//If some succeed, and others fail, it will cause inconsistent data //If some succeed, and others fail, it will cause inconsistent data
for (Map.Entry<String, TopicConfigAndQueueMapping> entry : brokerConfigMap.entrySet()) { for (Map.Entry<String, TopicConfigAndQueueMapping> entry : brokerConfigMap.entrySet()) {
String broker = entry.getKey(); String broker = entry.getKey();
...@@ -72,6 +75,22 @@ public class StaticTopicIT extends BaseConf { ...@@ -72,6 +75,22 @@ public class StaticTopicIT extends BaseConf {
return brokerConfigMap; return brokerConfigMap;
} }
public void remappingStaticTopic(String topic, Set<String> targetBrokers) throws Exception {
Map<String, TopicConfigAndQueueMapping> brokerConfigMap = defaultMQAdminExt.examineTopicConfigAll(clientMetadata, topic);
Assert.assertFalse(brokerConfigMap.isEmpty());
TopicRemappingDetailWrapper wrapper = TopicQueueMappingUtils.remappingStaticTopic(topic, brokerConfigMap, targetBrokers);
defaultMQAdminExt.remappingStaticTopic(clientMetadata, topic, wrapper.getBrokerToMapIn(), wrapper.getBrokerToMapOut(), brokerConfigMap, TopicQueueMappingUtils.DEFAULT_BLOCK_SEQ_SIZE, false);
}
@Test
public void testNonTargetBrokers() {
}
@Test @Test
public void testCreateProduceConsumeStaticTopic() throws Exception { public void testCreateProduceConsumeStaticTopic() throws Exception {
String topic = "static" + MQRandomUtils.getRandomTopic(); String topic = "static" + MQRandomUtils.getRandomTopic();
...@@ -120,7 +139,7 @@ public class StaticTopicIT extends BaseConf { ...@@ -120,7 +139,7 @@ public class StaticTopicIT extends BaseConf {
Assert.assertEquals(msgEachQueue * queueNum, producer.getAllOriginMsg().size()); Assert.assertEquals(msgEachQueue * queueNum, producer.getAllOriginMsg().size());
Assert.assertEquals(0, producer.getSendErrorMsg().size()); Assert.assertEquals(0, producer.getSendErrorMsg().size());
consumer.getListener().waitForMessageConsume(producer.getAllMsgBody(), consumeTime); consumer.getListener().waitForMessageConsume(producer.getAllMsgBody(), 3000);
assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(), assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
consumer.getListener().getAllMsgBody())) consumer.getListener().getAllMsgBody()))
.containsExactlyElementsIn(producer.getAllMsgBody()); .containsExactlyElementsIn(producer.getAllMsgBody());
...@@ -149,6 +168,110 @@ public class StaticTopicIT extends BaseConf { ...@@ -149,6 +168,110 @@ public class StaticTopicIT extends BaseConf {
} }
@Test
public void testRemappingProduceConsumeStaticTopic() throws Exception {
String topic = "static" + MQRandomUtils.getRandomTopic();
RMQNormalProducer producer = getProducer(nsAddr, topic);
RMQNormalConsumer consumer = getConsumer(nsAddr, topic, "*", new RMQNormalListener());
producer.getProducer().setPollNameServerInterval(100);
int queueNum = 10;
int msgEachQueue = 100;
//create static topic
{
Set<String> targetBrokers = new HashSet<>();
targetBrokers.add(broker1Name);
createStaticTopic(topic, queueNum, targetBrokers);
}
//produce the messages
{
List<MessageQueue> messageQueueList = producer.getMessageQueue();
for (int i = 0; i < queueNum; i++) {
MessageQueue messageQueue = messageQueueList.get(i);
Assert.assertEquals(i, messageQueue.getQueueId());
Assert.assertEquals(MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME, messageQueue.getBrokerName());
}
for(MessageQueue messageQueue: messageQueueList) {
producer.send(msgEachQueue, messageQueue);
}
Assert.assertEquals(0, producer.getSendErrorMsg().size());
//leave the time to build the cq
Thread.sleep(100);
for(MessageQueue messageQueue: messageQueueList) {
//Assert.assertEquals(0, defaultMQAdminExt.minOffset(messageQueue));
Assert.assertEquals(msgEachQueue, defaultMQAdminExt.maxOffset(messageQueue));
}
}
//remapping the static topic
{
Set<String> targetBrokers = new HashSet<>();
targetBrokers.add(broker2Name);
remappingStaticTopic(topic, targetBrokers);
Map<String, TopicConfigAndQueueMapping> remoteBrokerConfigMap = defaultMQAdminExt.examineTopicConfigAll(clientMetadata, topic);
TopicQueueMappingUtils.checkNameEpochNumConsistence(topic, remoteBrokerConfigMap);
Map<Integer, TopicQueueMappingOne> globalIdMap = TopicQueueMappingUtils.checkAndBuildMappingItems(new ArrayList<>(getMappingDetailFromConfig(remoteBrokerConfigMap.values())), false, true);
Assert.assertEquals(queueNum, globalIdMap.size());
for (TopicQueueMappingOne mappingOne: globalIdMap.values()) {
Assert.assertEquals(broker2Name, mappingOne.getBname());
}
}
//leave the time to refresh the metadata
Thread.sleep(500);
producer.setDebug(true);
{
List<MessageQueue> messageQueueList = producer.getMessageQueue();
for (int i = 0; i < queueNum; i++) {
MessageQueue messageQueue = messageQueueList.get(i);
Assert.assertEquals(i, messageQueue.getQueueId());
Assert.assertEquals(MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME, messageQueue.getBrokerName());
String destBrokerName = clientMetadata.getBrokerNameFromMessageQueue(messageQueue);
Assert.assertEquals(destBrokerName, broker2Name);
}
for(MessageQueue messageQueue: messageQueueList) {
producer.send(msgEachQueue, messageQueue);
}
Assert.assertEquals(0, producer.getSendErrorMsg().size());
//leave the time to build the cq
Thread.sleep(100);
for(MessageQueue messageQueue: messageQueueList) {
Assert.assertEquals(0, defaultMQAdminExt.minOffset(messageQueue));
Assert.assertEquals(msgEachQueue + TopicQueueMappingUtils.DEFAULT_BLOCK_SEQ_SIZE, defaultMQAdminExt.maxOffset(messageQueue));
}
}
{
consumer.getListener().waitForMessageConsume(producer.getAllMsgBody(), 3000);
System.out.println("Consume: " + consumer.getListener().getAllMsgBody().size());
assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
consumer.getListener().getAllMsgBody()))
.containsExactlyElementsIn(producer.getAllMsgBody());
Map<Integer, List<MessageExt>> messagesByQueue = new HashMap<>();
for (Object object : consumer.getListener().getAllOriginMsg()) {
MessageExt messageExt = (MessageExt) object;
if (!messagesByQueue.containsKey(messageExt.getQueueId())) {
messagesByQueue.put(messageExt.getQueueId(), new ArrayList<>());
}
messagesByQueue.get(messageExt.getQueueId()).add(messageExt);
}
Assert.assertEquals(queueNum, messagesByQueue.size());
for (int i = 0; i < queueNum; i++) {
List<MessageExt> messageExts = messagesByQueue.get(i);
Assert.assertEquals(msgEachQueue, messageExts.size());
Collections.sort(messageExts, new Comparator<MessageExt>() {
@Override
public int compare(MessageExt o1, MessageExt o2) {
return (int) (o1.getQueueOffset() - o2.getQueueOffset());
}
});
for (int j = 0; j < msgEachQueue; j++) {
Assert.assertEquals(j, messageExts.get(j).getQueueOffset());
}
}
}
}
@After @After
public void tearDown() { public void tearDown() {
super.shutdown(); super.shutdown();
......
...@@ -1112,6 +1112,15 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner { ...@@ -1112,6 +1112,15 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
TopicConfigAndQueueMapping configMapping = brokerConfigMap.get(broker); TopicConfigAndQueueMapping configMapping = brokerConfigMap.get(broker);
createStaticTopic(addr, defaultMQAdminExt.getCreateTopicKey(), configMapping, configMapping.getMappingDetail(), force); createStaticTopic(addr, defaultMQAdminExt.getCreateTopicKey(), configMapping, configMapping.getMappingDetail(), force);
} }
//Step5: write the non-target brokers
for (String broker: brokerConfigMap.keySet()) {
if (brokersToMapIn.contains(broker) || brokersToMapOut.contains(broker)) {
continue;
}
String addr = clientMetadata.findMasterBrokerAddr(broker);
TopicConfigAndQueueMapping configMapping = brokerConfigMap.get(broker);
createStaticTopic(addr, defaultMQAdminExt.getCreateTopicKey(), configMapping, configMapping.getMappingDetail(), force);
}
} }
@Override @Override
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册