diff --git a/common/src/main/java/org/apache/rocketmq/common/rpc/RequestBuilder.java b/common/src/main/java/org/apache/rocketmq/common/rpc/RequestBuilder.java new file mode 100644 index 0000000000000000000000000000000000000000..aa69b17a0b2ff63b7d796053c387637ff7d1fc3f --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/rpc/RequestBuilder.java @@ -0,0 +1,65 @@ +package org.apache.rocketmq.common.rpc; + +import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.common.protocol.RequestCode; +import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader; + +import java.util.HashMap; +import java.util.Map; + +public class RequestBuilder { + + private static Map requestCodeMap = new HashMap(); + static { + requestCodeMap.put(RequestCode.PULL_MESSAGE, PullMessageRequestHeader.class); + } + + public static CommonRpcHeader buildCommonRpcHeader(int requestCode, String destBrokerName) { + return buildCommonRpcHeader(requestCode, null, destBrokerName); + } + + public static CommonRpcHeader buildCommonRpcHeader(int requestCode, Boolean oneway, String destBrokerName) { + Class requestHeaderClass = requestCodeMap.get(requestCode); + if (requestHeaderClass == null) { + throw new UnsupportedOperationException("unknown " + requestCode); + } + try { + CommonRpcHeader requestHeader = (CommonRpcHeader) requestHeaderClass.newInstance(); + requestHeader.setOneway(oneway); + requestHeader.setBname(destBrokerName); + return requestHeader; + } catch (Throwable t) { + throw new RuntimeException(t); + } + } + + public static TopicQueueRequestHeader buildTopicQueueRequestHeader(int requestCode, MessageQueue mq) { + return buildTopicQueueRequestHeader(requestCode, null, mq.getBrokerName(), mq.getTopic(), mq.getQueueId(), null); + } + + public static TopicQueueRequestHeader buildTopicQueueRequestHeader(int requestCode, MessageQueue mq, Boolean physical) { + return buildTopicQueueRequestHeader(requestCode, null, mq.getBrokerName(), mq.getTopic(), mq.getQueueId(), physical); + } + + public static TopicQueueRequestHeader buildTopicQueueRequestHeader(int requestCode, Boolean oneway, MessageQueue mq, Boolean physical) { + return buildTopicQueueRequestHeader(requestCode, oneway, mq.getBrokerName(), mq.getTopic(), mq.getQueueId(), physical); + } + + public static TopicQueueRequestHeader buildTopicQueueRequestHeader(int requestCode, Boolean oneway, String destBrokerName, String topic, int queueId, Boolean physical) { + Class requestHeaderClass = requestCodeMap.get(requestCode); + if (requestHeaderClass == null) { + throw new UnsupportedOperationException("unknown " + requestCode); + } + try { + TopicQueueRequestHeader requestHeader = (TopicQueueRequestHeader) requestHeaderClass.newInstance(); + requestHeader.setOneway(oneway); + requestHeader.setBname(destBrokerName); + requestHeader.setTopic(topic); + requestHeader.setQueueId(queueId); + requestHeader.setPhysical(physical); + return requestHeader; + } catch (Throwable t) { + throw new RuntimeException(t); + } + } +} diff --git a/common/src/main/java/org/apache/rocketmq/common/rpc/TopicQueueRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/rpc/TopicQueueRequestHeader.java index 897dfcb13e358720df6485b5318d771ccd344758..4b0a394360dc728f10b3016b9d7f8f2d46794095 100644 --- a/common/src/main/java/org/apache/rocketmq/common/rpc/TopicQueueRequestHeader.java +++ b/common/src/main/java/org/apache/rocketmq/common/rpc/TopicQueueRequestHeader.java @@ -20,12 +20,10 @@ public abstract class TopicQueueRequestHeader extends CommonRpcHeader { //Physical or Logical protected Boolean physical; - @Override public Boolean getPhysical() { return physical; } - @Override public void setPhysical(Boolean physical) { this.physical = physical; }