diff --git a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java index 7f3ce81d2c37c173404f21b0f22c53a8b6aa67d7..c5f8b90a13cd0fe01e184651ac612a133ea43b89 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java @@ -45,8 +45,15 @@ import org.apache.rocketmq.common.protocol.body.RegisterBrokerBody; import org.apache.rocketmq.common.protocol.body.SubscriptionGroupWrapper; import org.apache.rocketmq.common.protocol.body.TopicConfigAndMappingSerializeWrapper; import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper; +import org.apache.rocketmq.common.protocol.header.GetEarliestMsgStoretimeRequestHeader; +import org.apache.rocketmq.common.protocol.header.GetEarliestMsgStoretimeResponseHeader; +import org.apache.rocketmq.common.protocol.header.GetMinOffsetRequestHeader; +import org.apache.rocketmq.common.protocol.header.GetMinOffsetResponseHeader; import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader; import org.apache.rocketmq.common.protocol.header.PullMessageResponseHeader; +import org.apache.rocketmq.common.protocol.header.SearchOffsetRequestHeader; +import org.apache.rocketmq.common.protocol.header.SearchOffsetResponseHeader; +import org.apache.rocketmq.common.protocol.header.SendMessageResponseHeader; import org.apache.rocketmq.common.protocol.header.namesrv.GetRouteInfoRequestHeader; import org.apache.rocketmq.common.protocol.header.namesrv.QueryDataVersionRequestHeader; import org.apache.rocketmq.common.protocol.header.namesrv.QueryDataVersionResponseHeader; @@ -59,6 +66,8 @@ import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.remoting.InvokeCallback; import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.remoting.RemotingClient; +import org.apache.rocketmq.remoting.RpcRequest; +import org.apache.rocketmq.remoting.RpcResponse; import org.apache.rocketmq.remoting.exception.RemotingCommandException; import org.apache.rocketmq.remoting.exception.RemotingConnectException; import org.apache.rocketmq.remoting.exception.RemotingException; @@ -75,6 +84,7 @@ public class BrokerOuterAPI { private final BrokerController brokerController; private final TopAddressing topAddressing = new TopAddressing(MixAll.getWSAddr()); private String nameSrvAddr = null; + private final String currBrokerName; private BrokerFixedThreadPoolExecutor brokerOuterExecutor = new BrokerFixedThreadPoolExecutor(4, 10, 1, TimeUnit.MINUTES, new ArrayBlockingQueue(32), new ThreadFactoryImpl("brokerOutApi_thread_", true)); @@ -86,6 +96,7 @@ public class BrokerOuterAPI { this.remotingClient = new NettyRemotingClient(nettyClientConfig); this.remotingClient.registerRPCHook(rpcHook); this.brokerController = brokerController; + this.currBrokerName = brokerController.getBrokerConfig().getBrokerName(); } public void start() { @@ -463,14 +474,13 @@ public class BrokerOuterAPI { this.remotingClient.invokeAsync(brokerAddr, request, timeoutMillis, invokeCallback); } - public RemotingCommand pullMessage(String brokerName, PullMessageRequestHeader requestHeader, long timeoutMillis) throws Exception { + public RemotingCommand pullMessage(String bname, PullMessageRequestHeader requestHeader, long timeoutMillis) throws Exception { - String addr = this.brokerController.getBrokerAddrByName(brokerName); + String addr = this.brokerController.getBrokerAddrByName(bname); if (addr == null) { - final RemotingCommand response = RemotingCommand.createResponseCommand(PullMessageResponseHeader.class); - response.setCode(ResponseCode.SYSTEM_ERROR); - response.setRemark(String.format("%s-%d cannot find addr when forward to broker %s in broker %s", requestHeader.getTopic(), requestHeader.getQueueId(), brokerName, this.brokerController.getBrokerConfig().getBrokerName())); - return response; + return RemotingCommand.buildErrorResponse(ResponseCode.SYSTEM_ERROR, + String.format("%s-%d cannot find addr when forward to broker %s in broker %s", requestHeader.getTopic(), requestHeader.getQueueId(), bname, currBrokerName), + PullMessageResponseHeader.class); } RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, requestHeader); RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis); @@ -478,4 +488,108 @@ public class BrokerOuterAPI { return response; } + + public RpcResponse pullMessage(String bname, RpcRequest rpcRequest, long timeoutMillis) throws Exception { + String addr = this.brokerController.getBrokerAddrByName(bname); + if (addr == null) { + RpcResponse rpcResponse = new RpcResponse(ResponseCode.SYSTEM_ERROR, null, null); + rpcResponse.setException(new MQBrokerException(ResponseCode.SYSTEM_ERROR, "cannot find addr for broker " + bname, addr)); + return rpcResponse; + } + RemotingCommand requestCommand = RemotingCommand.createRequestCommand(rpcRequest ); + RemotingCommand responseCommand = this.remotingClient.invokeSync(addr, requestCommand, timeoutMillis); + assert responseCommand != null; + + switch (responseCommand.getCode()) { + case ResponseCode.SUCCESS: + case ResponseCode.PULL_NOT_FOUND: + case ResponseCode.PULL_RETRY_IMMEDIATELY: + case ResponseCode.PULL_OFFSET_MOVED: + PullMessageResponseHeader responseHeader = + (PullMessageResponseHeader) responseCommand.decodeCommandCustomHeader(PullMessageResponseHeader.class); + return new RpcResponse(responseCommand.getCode(), responseHeader, responseCommand.getBody()); + default: + RpcResponse rpcResponse = new RpcResponse(responseCommand.getCode(), null, null); + rpcResponse.setException(new MQBrokerException(responseCommand.getCode(), "unknown remote error", addr)); + return rpcResponse; + } + } + + public RpcResponse searchOffset(String bname, RpcRequest rpcRequest, long timeoutMillis) throws Exception { + String addr = this.brokerController.getBrokerAddrByName(bname); + if (addr == null) { + RpcResponse rpcResponse = new RpcResponse(ResponseCode.SYSTEM_ERROR, null, null); + rpcResponse.setException(new MQBrokerException(ResponseCode.SYSTEM_ERROR, "cannot find addr for broker " + bname, addr)); + return rpcResponse; + } + RemotingCommand requestCommand = RemotingCommand.createRequestCommand(rpcRequest); + RemotingCommand responseCommand = this.remotingClient.invokeSync(addr, requestCommand, timeoutMillis); + assert responseCommand != null; + switch (responseCommand.getCode()) { + case ResponseCode.SUCCESS: { + SearchOffsetResponseHeader responseHeader = + (SearchOffsetResponseHeader) responseCommand.decodeCommandCustomHeader(SearchOffsetResponseHeader.class); + return new RpcResponse(responseCommand.getCode(), responseHeader, responseCommand.getBody()); + } + default:{ + RpcResponse rpcResponse = new RpcResponse(responseCommand.getCode(), null, null); + rpcResponse.setException(new MQBrokerException(responseCommand.getCode(), "unknown remote error", addr)); + return rpcResponse; + } + } + } + + public RpcResponse getMinOffset(String bname, RpcRequest rpcRequest, long timeoutMillis) throws Exception { + String addr = this.brokerController.getBrokerAddrByName(bname); + if (addr == null) { + RpcResponse rpcResponse = new RpcResponse(ResponseCode.SYSTEM_ERROR, null, null); + rpcResponse.setException(new MQBrokerException(ResponseCode.SYSTEM_ERROR, "cannot find addr for broker " + bname, addr)); + return rpcResponse; + } + + RemotingCommand requestCommand = RemotingCommand.createRequestCommand(rpcRequest); + + RemotingCommand responseCommand = this.remotingClient.invokeSync(addr, requestCommand, timeoutMillis); + assert responseCommand != null; + switch (responseCommand.getCode()) { + case ResponseCode.SUCCESS: { + GetMinOffsetResponseHeader responseHeader = + (GetMinOffsetResponseHeader) responseCommand.decodeCommandCustomHeader(GetMinOffsetResponseHeader.class); + return new RpcResponse(responseCommand.getCode(), responseHeader, responseCommand.getBody()); + } + default:{ + RpcResponse rpcResponse = new RpcResponse(responseCommand.getCode(), null, null); + rpcResponse.setException(new MQBrokerException(responseCommand.getCode(), "unknown remote error", addr)); + return rpcResponse; + } + } + } + + public RpcResponse getEarliestMsgStoretime(String bname, RpcRequest rpcRequest, long timeoutMillis) throws Exception { + String addr = this.brokerController.getBrokerAddrByName(bname); + if (addr == null) { + RpcResponse rpcResponse = new RpcResponse(ResponseCode.SYSTEM_ERROR, null, null); + rpcResponse.setException(new MQBrokerException(ResponseCode.SYSTEM_ERROR, "cannot find addr for broker " + bname, addr)); + return rpcResponse; + } + + RemotingCommand requestCommand = RemotingCommand.createRequestCommand(rpcRequest); + + RemotingCommand responseCommand = this.remotingClient.invokeSync(addr, requestCommand, timeoutMillis); + assert responseCommand != null; + switch (responseCommand.getCode()) { + case ResponseCode.SUCCESS: { + GetEarliestMsgStoretimeResponseHeader responseHeader = + (GetEarliestMsgStoretimeResponseHeader) responseCommand.decodeCommandCustomHeader(GetEarliestMsgStoretimeResponseHeader.class); + return new RpcResponse(responseCommand.getCode(), responseHeader, responseCommand.getBody()); + + } + default:{ + RpcResponse rpcResponse = new RpcResponse(responseCommand.getCode(), null, null); + rpcResponse.setException(new MQBrokerException(responseCommand.getCode(), "unknown remote error", addr)); + return rpcResponse; + } + } + } + } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java index 6bd742c32538fa344f29280ce45298812da8fa87..0b6fa482b359a9e580248548d40a4f9288c2da64 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java @@ -139,6 +139,8 @@ import org.apache.rocketmq.store.PutMessageResult; import org.apache.rocketmq.store.PutMessageStatus; import org.apache.rocketmq.store.SelectMappedBufferResult; +import static org.apache.rocketmq.remoting.protocol.RemotingCommand.buildErrorResponse; + public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements NettyRequestProcessor { private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); private final BrokerController brokerController; @@ -614,6 +616,29 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements return response; } + private RemotingCommand rewriteRequestForStaticTopic(SearchOffsetRequestHeader requestHeader, TopicQueueMappingContext mappingContext) { + try { + if (mappingContext.getMappingDetail() == null) { + return null; + } + TopicQueueMappingDetail mappingDetail = mappingContext.getMappingDetail(); + LogicQueueMappingItem mappingItem = mappingContext.getMappingItem(); + if (mappingItem == null + || !mappingDetail.getBname().equals(mappingItem.getBname())) { + return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE, String.format("%s-%d does not exit in request process of current broker %s", requestHeader.getTopic(), requestHeader.getQueueId(), mappingDetail.getBname())); + } + ImmutableList mappingItems = mappingContext.getMappingItemList(); + //TODO should make sure the offset timestamp is equal or bigger than the searched timestamp + for (int i = mappingItems.size() - 1; i >=0; i--) { + + } + requestHeader.setQueueId(mappingItem.getQueueId()); + return null; + } catch (Throwable t) { + return buildErrorResponse(ResponseCode.SYSTEM_ERROR, t.getMessage()); + } + } + private RemotingCommand searchOffsetByTimestamp(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(SearchOffsetResponseHeader.class); @@ -621,6 +646,14 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements final SearchOffsetRequestHeader requestHeader = (SearchOffsetRequestHeader) request.decodeCommandCustomHeader(SearchOffsetRequestHeader.class); + { + TopicQueueMappingContext mappingContext = this.brokerController.getTopicQueueMappingManager().buildTopicQueueMappingContext(requestHeader); + TopicQueueMappingDetail mappingDetail = mappingContext.getMappingDetail(); + if (mappingDetail != null) { + + } + } + long offset = this.brokerController.getMessageStore().getOffsetInQueueByTime(requestHeader.getTopic(), requestHeader.getQueueId(), requestHeader.getTimestamp()); diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/SearchOffsetRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/SearchOffsetRequestHeader.java index 5ea2e24bfc87a56721950b8b77d485717ec21c1e..e3a4b1dafa6efea8308c146a96fb257b12300bc6 100644 --- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/SearchOffsetRequestHeader.java +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/SearchOffsetRequestHeader.java @@ -20,11 +20,11 @@ */ package org.apache.rocketmq.common.protocol.header; -import org.apache.rocketmq.remoting.CommandCustomHeader; +import org.apache.rocketmq.remoting.TopicQueueRequestHeader; import org.apache.rocketmq.remoting.annotation.CFNotNull; import org.apache.rocketmq.remoting.exception.RemotingCommandException; -public class SearchOffsetRequestHeader implements CommandCustomHeader { +public class SearchOffsetRequestHeader extends TopicQueueRequestHeader { @CFNotNull private String topic; @CFNotNull @@ -37,18 +37,22 @@ public class SearchOffsetRequestHeader implements CommandCustomHeader { } + @Override public String getTopic() { return topic; } + @Override public void setTopic(String topic) { this.topic = topic; } + @Override public Integer getQueueId() { return queueId; } + @Override public void setQueueId(Integer queueId) { this.queueId = queueId; } diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/RpcRequest.java b/remoting/src/main/java/org/apache/rocketmq/remoting/RpcRequest.java new file mode 100644 index 0000000000000000000000000000000000000000..e8e03d5901b574ac0215a25b9c484df9d3a2f65a --- /dev/null +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/RpcRequest.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.remoting; + +public class RpcRequest { + private int code; + private CommandCustomHeader header; + private byte[] body; + + public RpcRequest(int code, CommandCustomHeader header, byte[] body) { + this.code = code; + this.header = header; + this.body = body; + } + + public int getCode() { + return code; + } + + public CommandCustomHeader getHeader() { + return header; + } + + public byte[] getBody() { + return body; + } +} diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/RpcResponse.java b/remoting/src/main/java/org/apache/rocketmq/remoting/RpcResponse.java new file mode 100644 index 0000000000000000000000000000000000000000..9ae9950434fea11ad07f490272c40b39cf35de63 --- /dev/null +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/RpcResponse.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.remoting; + +public class RpcResponse { + private int code; + private CommandCustomHeader header; + private byte[] body; + public Exception exception; + + public RpcResponse(int code, CommandCustomHeader header, byte[] body) { + this.code = code; + this.header = header; + this.body = body; + } + + public int getCode() { + return code; + } + + public CommandCustomHeader getHeader() { + return header; + } + + public byte[] getBody() { + return body; + } + + public Exception getException() { + return exception; + } + + public void setException(Exception exception) { + this.exception = exception; + } +} diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java index 34a1790ffcd72f6107c5cde6da7c762f33427f31..8fc3f9e36611760280c7298f013f022c27c210bf 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java @@ -20,11 +20,13 @@ import com.alibaba.fastjson.annotation.JSONField; import java.lang.annotation.Annotation; import java.lang.reflect.Field; import java.lang.reflect.Modifier; +import java.lang.reflect.Type; import java.nio.ByteBuffer; import java.util.HashMap; import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; import org.apache.rocketmq.remoting.CommandCustomHeader; +import org.apache.rocketmq.remoting.RpcRequest; import org.apache.rocketmq.remoting.annotation.CFNotNull; import org.apache.rocketmq.remoting.common.RemotingHelper; import org.apache.rocketmq.remoting.exception.RemotingCommandException; @@ -85,6 +87,15 @@ public class RemotingCommand { protected RemotingCommand() { } + public static RemotingCommand createRequestCommand(RpcRequest rpcRequest) { + RemotingCommand cmd = new RemotingCommand(); + cmd.setCode(rpcRequest.getCode()); + cmd.customHeader = rpcRequest.getHeader(); + setCmdVersion(cmd); + cmd.setBody(rpcRequest.getBody()); + return cmd; + } + public static RemotingCommand createRequestCommand(int code, CommandCustomHeader customHeader) { RemotingCommand cmd = new RemotingCommand(); cmd.setCode(code); @@ -110,6 +121,13 @@ public class RemotingCommand { return createResponseCommand(RemotingSysResponseCode.SYSTEM_ERROR, "not set any response code", classHeader); } + public static RemotingCommand buildErrorResponse(int code, String remark, Class classHeader) { + final RemotingCommand response = RemotingCommand.createResponseCommand(classHeader); + response.setCode(code); + response.setRemark(remark); + return response; + } + public static RemotingCommand buildErrorResponse(int code, String remark) { final RemotingCommand response = RemotingCommand.createResponseCommand(null); response.setCode(code); @@ -117,6 +135,7 @@ public class RemotingCommand { return response; } + public static RemotingCommand createResponseCommand(int code, String remark, Class classHeader) { RemotingCommand cmd = new RemotingCommand();