From 86bea21b4dc701e6cba5977b09e6f4e874494d19 Mon Sep 17 00:00:00 2001 From: ayanamist Date: Tue, 9 Mar 2021 11:04:46 +0800 Subject: [PATCH] [RIP-19] Pop Consuming (common) --- .../apache/rocketmq/common/BrokerConfig.java | 154 +++++++++++ .../apache/rocketmq/common/KeyBuilder.java | 37 +++ .../rocketmq/common/PopAckConstants.java | 35 +++ .../common/constant/ConsumeInitMode.java | 22 ++ .../rocketmq/common/constant/LoggerName.java | 1 + .../rocketmq/common/message/MessageConst.java | 4 + .../common/message/MessageDecoder.java | 2 +- .../message/MessageQueueAssignment.java | 83 ++++++ .../common/message/MessageRequestMode.java | 43 +++ .../rocketmq/common/protocol/RequestCode.java | 7 + .../common/protocol/ResponseCode.java | 3 + .../protocol/body/ConsumerRunningInfo.java | 32 +++ .../protocol/body/PopProcessQueueInfo.java | 59 ++++ .../body/QueryAssignmentRequestBody.java | 74 +++++ .../body/QueryAssignmentResponseBody.java | 36 +++ .../SetMessageRequestModeRequestBody.java | 70 +++++ .../header/AckMessageRequestHeader.java | 85 ++++++ .../ChangeInvisibleTimeRequestHeader.java | 97 +++++++ .../ChangeInvisibleTimeResponseHeader.java | 61 +++++ .../common/protocol/header/ExtraInfoUtil.java | 258 ++++++++++++++++++ .../header/PopMessageRequestHeader.java | 155 +++++++++++ .../header/PopMessageResponseHeader.java | 102 +++++++ .../rocketmq/common/utils/DataConverter.java | 42 +++ 23 files changed, 1461 insertions(+), 1 deletion(-) create mode 100644 common/src/main/java/org/apache/rocketmq/common/KeyBuilder.java create mode 100644 common/src/main/java/org/apache/rocketmq/common/PopAckConstants.java create mode 100644 common/src/main/java/org/apache/rocketmq/common/constant/ConsumeInitMode.java create mode 100644 common/src/main/java/org/apache/rocketmq/common/message/MessageQueueAssignment.java create mode 100644 common/src/main/java/org/apache/rocketmq/common/message/MessageRequestMode.java create mode 100644 common/src/main/java/org/apache/rocketmq/common/protocol/body/PopProcessQueueInfo.java create mode 100644 common/src/main/java/org/apache/rocketmq/common/protocol/body/QueryAssignmentRequestBody.java create mode 100644 common/src/main/java/org/apache/rocketmq/common/protocol/body/QueryAssignmentResponseBody.java create mode 100644 common/src/main/java/org/apache/rocketmq/common/protocol/body/SetMessageRequestModeRequestBody.java create mode 100644 common/src/main/java/org/apache/rocketmq/common/protocol/header/AckMessageRequestHeader.java create mode 100644 common/src/main/java/org/apache/rocketmq/common/protocol/header/ChangeInvisibleTimeRequestHeader.java create mode 100644 common/src/main/java/org/apache/rocketmq/common/protocol/header/ChangeInvisibleTimeResponseHeader.java create mode 100644 common/src/main/java/org/apache/rocketmq/common/protocol/header/ExtraInfoUtil.java create mode 100644 common/src/main/java/org/apache/rocketmq/common/protocol/header/PopMessageRequestHeader.java create mode 100644 common/src/main/java/org/apache/rocketmq/common/protocol/header/PopMessageResponseHeader.java create mode 100644 common/src/main/java/org/apache/rocketmq/common/utils/DataConverter.java diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java index d80b3d21..488f2132 100644 --- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java +++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java @@ -21,6 +21,7 @@ import java.net.UnknownHostException; import org.apache.rocketmq.common.annotation.ImportantField; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.constant.PermName; +import org.apache.rocketmq.common.message.MessageRequestMode; import org.apache.rocketmq.common.topic.TopicValidator; import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLoggerFactory; @@ -62,12 +63,14 @@ public class BrokerConfig { */ private int sendMessageThreadPoolNums = 1; //16 + Runtime.getRuntime().availableProcessors() * 4; private int pullMessageThreadPoolNums = 16 + Runtime.getRuntime().availableProcessors() * 2; + private int ackMessageThreadPoolNums = 3; private int processReplyMessageThreadPoolNums = 16 + Runtime.getRuntime().availableProcessors() * 2; private int queryMessageThreadPoolNums = 8 + Runtime.getRuntime().availableProcessors(); private int adminBrokerThreadPoolNums = 16; private int clientManageThreadPoolNums = 32; private int consumerManageThreadPoolNums = 32; + private int loadBalanceProcessorThreadPoolNums = 32; private int heartbeatThreadPoolNums = Math.min(32, Runtime.getRuntime().availableProcessors()); /** @@ -85,6 +88,7 @@ public class BrokerConfig { private boolean fetchNamesrvAddrByAddressServer = false; private int sendThreadPoolQueueCapacity = 10000; private int pullThreadPoolQueueCapacity = 100000; + private int ackThreadPoolQueueCapacity = 100000; private int replyThreadPoolQueueCapacity = 10000; private int queryThreadPoolQueueCapacity = 20000; private int clientManagerThreadPoolQueueCapacity = 1000000; @@ -158,6 +162,37 @@ public class BrokerConfig { */ private int registerNameServerPeriod = 1000 * 30; + private int popPollingSize = 1024; + private int popPollingMapSize = 100000; + // 20w cost 200M heap memory. + private long maxPopPollingSize = 100000; + private int reviveQueueNum = 8; + private long reviveInterval = 1000; + private long reviveMaxSlow = 3; + private long reviveScanTime = 10000; + private boolean enablePopLog = true; + private boolean enablePopBufferMerge = false; + private int popCkStayBufferTime = 10 * 1000; + private int popCkStayBufferTimeOut = 3 * 1000; + private int popCkMaxBufferSize = 200000; + private int popCkOffsetMaxQueueSize = 20000; + + /** + * the interval of pulling topic information from the named server + */ + private long loadBalancePollNameServerInterval = 1000 * 30; + + /** + * the interval of cleaning + */ + private int cleanOfflineBrokerInterval = 1000 * 30; + + private boolean serverLoadBalancerEnabled = true; + + private MessageRequestMode defaultMessageRequestMode = MessageRequestMode.PULL; + + private int defaultPopShareQueueNum = -1; + /** * The minimum time of the transactional message to be checked firstly, one message only exceed this time interval * that can be checked. @@ -197,6 +232,58 @@ public class BrokerConfig { return "DEFAULT_BROKER"; } + public long getMaxPopPollingSize() { + return maxPopPollingSize; + } + + public int getReviveQueueNum() { + return reviveQueueNum; + } + + public long getReviveInterval() { + return reviveInterval; + } + + public int getPopCkStayBufferTime() { + return popCkStayBufferTime; + } + + public int getPopCkStayBufferTimeOut() { + return popCkStayBufferTimeOut; + } + + public int getPopPollingMapSize() { + return popPollingMapSize; + } + + public long getReviveScanTime() { + return reviveScanTime; + } + + public long getReviveMaxSlow() { + return reviveMaxSlow; + } + + public int getPopPollingSize() { + return popPollingSize; + } + + public boolean isEnablePopBufferMerge() { + return enablePopBufferMerge; + } + + public int getPopCkMaxBufferSize() { + return popCkMaxBufferSize; + } + + public int getPopCkOffsetMaxQueueSize() { + return popCkOffsetMaxQueueSize; + } + + public boolean isEnablePopLog() { + return enablePopLog; + } + public boolean isTraceOn() { return traceOn; } @@ -381,6 +468,14 @@ public class BrokerConfig { this.pullMessageThreadPoolNums = pullMessageThreadPoolNums; } + public int getAckMessageThreadPoolNums() { + return ackMessageThreadPoolNums; + } + + public void setAckMessageThreadPoolNums(int ackMessageThreadPoolNums) { + this.ackMessageThreadPoolNums = ackMessageThreadPoolNums; + } + public int getProcessReplyMessageThreadPoolNums() { return processReplyMessageThreadPoolNums; } @@ -485,6 +580,14 @@ public class BrokerConfig { this.pullThreadPoolQueueCapacity = pullThreadPoolQueueCapacity; } + public int getAckThreadPoolQueueCapacity() { + return ackThreadPoolQueueCapacity; + } + + public void setAckThreadPoolQueueCapacity(int ackThreadPoolQueueCapacity) { + this.ackThreadPoolQueueCapacity = ackThreadPoolQueueCapacity; + } + public int getReplyThreadPoolQueueCapacity() { return replyThreadPoolQueueCapacity; } @@ -804,4 +907,55 @@ public class BrokerConfig { public void setAutoDeleteUnusedStats(boolean autoDeleteUnusedStats) { this.autoDeleteUnusedStats = autoDeleteUnusedStats; } + + + public long getLoadBalancePollNameServerInterval() { + return loadBalancePollNameServerInterval; + } + + public void setLoadBalancePollNameServerInterval(long loadBalancePollNameServerInterval) { + this.loadBalancePollNameServerInterval = loadBalancePollNameServerInterval; + } + + public int getCleanOfflineBrokerInterval() { + return cleanOfflineBrokerInterval; + } + + public void setCleanOfflineBrokerInterval(int cleanOfflineBrokerInterval) { + this.cleanOfflineBrokerInterval = cleanOfflineBrokerInterval; + } + + public int getLoadBalanceProcessorThreadPoolNums() { + return loadBalanceProcessorThreadPoolNums; + } + + public void setLoadBalanceProcessorThreadPoolNums(int loadBalanceProcessorThreadPoolNums) { + this.loadBalanceProcessorThreadPoolNums = loadBalanceProcessorThreadPoolNums; + } + + public boolean isServerLoadBalancerEnabled() { + return serverLoadBalancerEnabled; + } + + public void setServerLoadBalancerEnabled(boolean serverLoadBalancerEnabled) { + this.serverLoadBalancerEnabled = serverLoadBalancerEnabled; + } + + public MessageRequestMode getDefaultMessageRequestMode() { + return defaultMessageRequestMode; + } + + public void setDefaultMessageRequestMode(String defaultMessageRequestMode) { + this.defaultMessageRequestMode = MessageRequestMode.valueOf(defaultMessageRequestMode); + } + + + public int getDefaultPopShareQueueNum() { + return defaultPopShareQueueNum; + } + + + public void setDefaultPopShareQueueNum(int defaultPopShareQueueNum) { + this.defaultPopShareQueueNum = defaultPopShareQueueNum; + } } diff --git a/common/src/main/java/org/apache/rocketmq/common/KeyBuilder.java b/common/src/main/java/org/apache/rocketmq/common/KeyBuilder.java new file mode 100644 index 00000000..d30789f3 --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/KeyBuilder.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.common; + +public class KeyBuilder { + public static final int POP_ORDER_REVIVE_QUEUE = 999; + + public static String buildPopRetryTopic(String topic, String cid) { + return MixAll.RETRY_GROUP_TOPIC_PREFIX + cid + "_" + topic; + } + + public static String parseNormalTopic(String topic, String cid) { + if (topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { + return topic.substring((MixAll.RETRY_GROUP_TOPIC_PREFIX + cid + "_").length()); + } else { + return topic; + } + } + + public static String buildPollingKey(String topic, String cid, int queueId) { + return topic + PopAckConstants.SPLIT + cid + PopAckConstants.SPLIT + queueId; + } +} diff --git a/common/src/main/java/org/apache/rocketmq/common/PopAckConstants.java b/common/src/main/java/org/apache/rocketmq/common/PopAckConstants.java new file mode 100644 index 00000000..839f9475 --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/PopAckConstants.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.common; + +import org.apache.rocketmq.common.topic.TopicValidator; + +public class PopAckConstants { + public static long ackTimeInterval = 1000; + public static final long SECOND = 1000; + + public static long lockTime = 5000; + public static int retryQueueNum = 1; + + public static final String REVIVE_GROUP = MixAll.CID_RMQ_SYS_PREFIX + "REVIVE_GROUP"; + public static final String LOCAL_HOST = "127.0.0.1"; + public static final String REVIVE_TOPIC = TopicValidator.SYSTEM_TOPIC_PREFIX + "REVIVE_LOG_"; + public static final String CK_TAG = "ck"; + public static final String ACK_TAG = "ack"; + public static final String SPLIT = "@"; + +} diff --git a/common/src/main/java/org/apache/rocketmq/common/constant/ConsumeInitMode.java b/common/src/main/java/org/apache/rocketmq/common/constant/ConsumeInitMode.java new file mode 100644 index 00000000..b7091fa2 --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/constant/ConsumeInitMode.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.common.constant; + +public class ConsumeInitMode { + public static final int MIN = 0; + public static final int MAX = 1; +} diff --git a/common/src/main/java/org/apache/rocketmq/common/constant/LoggerName.java b/common/src/main/java/org/apache/rocketmq/common/constant/LoggerName.java index fe0ae9f1..589200b2 100644 --- a/common/src/main/java/org/apache/rocketmq/common/constant/LoggerName.java +++ b/common/src/main/java/org/apache/rocketmq/common/constant/LoggerName.java @@ -37,4 +37,5 @@ public class LoggerName { public static final String PROTECTION_LOGGER_NAME = "RocketmqProtection"; public static final String WATER_MARK_LOGGER_NAME = "RocketmqWaterMark"; public static final String FILTER_LOGGER_NAME = "RocketmqFilter"; + public static final String ROCKETMQ_POP_LOGGER_NAME = "RocketmqPop"; } diff --git a/common/src/main/java/org/apache/rocketmq/common/message/MessageConst.java b/common/src/main/java/org/apache/rocketmq/common/message/MessageConst.java index 5bdc8465..0922c5f6 100644 --- a/common/src/main/java/org/apache/rocketmq/common/message/MessageConst.java +++ b/common/src/main/java/org/apache/rocketmq/common/message/MessageConst.java @@ -52,6 +52,8 @@ public class MessageConst { public static final String PROPERTY_PUSH_REPLY_TIME = "PUSH_REPLY_TIME"; public static final String PROPERTY_CLUSTER = "CLUSTER"; public static final String PROPERTY_MESSAGE_TYPE = "MSG_TYPE"; + public static final String PROPERTY_POP_CK = "POP_CK"; + public static final String PROPERTY_FIRST_POP_TIME = "1ST_POP_TIME"; public static final String KEY_SEPARATOR = " "; @@ -80,6 +82,8 @@ public class MessageConst { STRING_HASH_SET.add(PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX); STRING_HASH_SET.add(PROPERTY_MAX_RECONSUME_TIMES); STRING_HASH_SET.add(PROPERTY_CONSUME_START_TIMESTAMP); + STRING_HASH_SET.add(PROPERTY_POP_CK); + STRING_HASH_SET.add(PROPERTY_FIRST_POP_TIME); STRING_HASH_SET.add(PROPERTY_INSTANCE_ID); STRING_HASH_SET.add(PROPERTY_CORRELATION_ID); STRING_HASH_SET.add(PROPERTY_MESSAGE_REPLY_TO_CLIENT); diff --git a/common/src/main/java/org/apache/rocketmq/common/message/MessageDecoder.java b/common/src/main/java/org/apache/rocketmq/common/message/MessageDecoder.java index 7e86d841..77d3034b 100644 --- a/common/src/main/java/org/apache/rocketmq/common/message/MessageDecoder.java +++ b/common/src/main/java/org/apache/rocketmq/common/message/MessageDecoder.java @@ -37,7 +37,7 @@ public class MessageDecoder { public final static int MESSAGE_MAGIC_CODE_POSTION = 4; public final static int MESSAGE_FLAG_POSTION = 16; public final static int MESSAGE_PHYSIC_OFFSET_POSTION = 28; - // public final static int MESSAGE_STORE_TIMESTAMP_POSTION = 56; + public final static int MESSAGE_STORE_TIMESTAMP_POSITION = 56; public final static int MESSAGE_MAGIC_CODE = -626843481; public static final char NAME_VALUE_SEPARATOR = 1; public static final char PROPERTY_SEPARATOR = 2; diff --git a/common/src/main/java/org/apache/rocketmq/common/message/MessageQueueAssignment.java b/common/src/main/java/org/apache/rocketmq/common/message/MessageQueueAssignment.java new file mode 100644 index 00000000..fcd9f580 --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/message/MessageQueueAssignment.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.common.message; + +import java.io.Serializable; +import java.util.Map; + +public class MessageQueueAssignment implements Serializable { + + private static final long serialVersionUID = 8092600270527861645L; + + private MessageQueue messageQueue; + + private MessageRequestMode mode = MessageRequestMode.PULL; + + private Map attachments; + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + ((messageQueue == null) ? 0 : messageQueue.hashCode()); + result = prime * result + ((mode == null) ? 0 : mode.hashCode()); + result = prime * result + ((attachments == null) ? 0 : attachments.hashCode()); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + MessageQueueAssignment other = (MessageQueueAssignment) obj; + return messageQueue.equals(other.messageQueue); + } + + @Override + public String toString() { + return "MessageQueueAssignment [MessageQueue=" + messageQueue + ", Mode=" + mode + "]"; + } + + public MessageQueue getMessageQueue() { + return messageQueue; + } + + public void setMessageQueue(MessageQueue messageQueue) { + this.messageQueue = messageQueue; + } + + public MessageRequestMode getMode() { + return mode; + } + + public void setMode(MessageRequestMode mode) { + this.mode = mode; + } + + public Map getAttachments() { + return attachments; + } + + public void setAttachments(Map attachments) { + this.attachments = attachments; + } + +} diff --git a/common/src/main/java/org/apache/rocketmq/common/message/MessageRequestMode.java b/common/src/main/java/org/apache/rocketmq/common/message/MessageRequestMode.java new file mode 100644 index 00000000..35a166a6 --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/message/MessageRequestMode.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.common.message; + +/** + * Message Request Mode + */ +public enum MessageRequestMode { + + /** + * pull + */ + PULL("PULL"), + + /** + * pop, consumer working in pop mode could share MessageQueue + */ + POP("POP"); + + private String name; + + MessageRequestMode(String name) { + this.name = name; + } + + public String getName() { + return name; + } +} diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java b/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java index 75ceff38..9446caa0 100644 --- a/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java @@ -80,6 +80,10 @@ public class RequestCode { public static final int GET_BROKER_CLUSTER_ACL_CONFIG = 54; + public static final int POP_MESSAGE = 200050; + public static final int ACK_MESSAGE = 200051; + public static final int CHANGE_MESSAGE_INVISIBLETIME = 200053; + public static final int PUT_KV_CONFIG = 100; public static final int GET_KV_CONFIG = 101; @@ -188,4 +192,7 @@ public class RequestCode { public static final int SEND_REPLY_MESSAGE_V2 = 325; public static final int PUSH_REPLY_MESSAGE_TO_CLIENT = 326; + + public static final int QUERY_ASSIGNMENT = 400; + public static final int SET_MESSAGE_REQUEST_MODE = 401; } diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/ResponseCode.java b/common/src/main/java/org/apache/rocketmq/common/protocol/ResponseCode.java index dc744448..df0ccbe9 100644 --- a/common/src/main/java/org/apache/rocketmq/common/protocol/ResponseCode.java +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/ResponseCode.java @@ -80,4 +80,7 @@ public class ResponseCode extends RemotingSysResponseCode { public static final int UPDATE_GLOBAL_WHITE_ADDRS_CONFIG_FAILED = 211; + public static final int POLLING_FULL = 209; + + public static final int POLLING_TIMEOUT = 210; } diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/body/ConsumerRunningInfo.java b/common/src/main/java/org/apache/rocketmq/common/protocol/body/ConsumerRunningInfo.java index d7942eb4..10d6f4d1 100644 --- a/common/src/main/java/org/apache/rocketmq/common/protocol/body/ConsumerRunningInfo.java +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/body/ConsumerRunningInfo.java @@ -41,6 +41,8 @@ public class ConsumerRunningInfo extends RemotingSerializable { private TreeMap mqTable = new TreeMap(); + private TreeMap mqPopTable = new TreeMap(); + private TreeMap statusTable = new TreeMap(); private String jstack; @@ -265,6 +267,28 @@ public class ConsumerRunningInfo extends RemotingSerializable { } } + { + sb.append("\n\n#Consumer Pop Detail#\n"); + sb.append(String.format("%-32s %-32s %-4s %-20s%n", + "#Topic", + "#Broker Name", + "#QID", + "#ProcessQueueInfo" + )); + + Iterator> it = this.mqPopTable.entrySet().iterator(); + while (it.hasNext()) { + Entry next = it.next(); + String item = String.format("%-32s %-32s %-4d %s%n", + next.getKey().getTopic(), + next.getKey().getBrokerName(), + next.getKey().getQueueId(), + next.getValue().toString()); + + sb.append(item); + } + } + { sb.append("\n\n#Consumer RT&TPS#\n"); sb.append(String.format("%-32s %14s %14s %14s %14s %18s %25s%n", @@ -310,4 +334,12 @@ public class ConsumerRunningInfo extends RemotingSerializable { this.jstack = jstack; } + public TreeMap getMqPopTable() { + return mqPopTable; + } + + public void setMqPopTable( + TreeMap mqPopTable) { + this.mqPopTable = mqPopTable; + } } diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/body/PopProcessQueueInfo.java b/common/src/main/java/org/apache/rocketmq/common/protocol/body/PopProcessQueueInfo.java new file mode 100644 index 00000000..b8811bb4 --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/body/PopProcessQueueInfo.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.common.protocol.body; + +public class PopProcessQueueInfo { + private int waitAckCount; + private boolean droped; + private long lastPopTimestamp; + + + public int getWaitAckCount() { + return waitAckCount; + } + + + public void setWaitAckCount(int waitAckCount) { + this.waitAckCount = waitAckCount; + } + + + public boolean isDroped() { + return droped; + } + + + public void setDroped(boolean droped) { + this.droped = droped; + } + + + public long getLastPopTimestamp() { + return lastPopTimestamp; + } + + + public void setLastPopTimestamp(long lastPopTimestamp) { + this.lastPopTimestamp = lastPopTimestamp; + } + + @Override + public String toString() { + return "PopProcessQueueInfo [waitAckCount:" + waitAckCount + + ", droped:" + droped + ", lastPopTimestamp:" + lastPopTimestamp + "]"; + } +} diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/body/QueryAssignmentRequestBody.java b/common/src/main/java/org/apache/rocketmq/common/protocol/body/QueryAssignmentRequestBody.java new file mode 100644 index 00000000..6d0285b1 --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/body/QueryAssignmentRequestBody.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.common.protocol.body; + +import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; +import org.apache.rocketmq.remoting.protocol.RemotingSerializable; + +public class QueryAssignmentRequestBody extends RemotingSerializable { + + private String topic; + + private String consumerGroup; + + private String clientId; + + private String strategyName; + + private MessageModel messageModel; + + public String getTopic() { + return topic; + } + + public void setTopic(String topic) { + this.topic = topic; + } + + public String getConsumerGroup() { + return consumerGroup; + } + + public void setConsumerGroup(String consumerGroup) { + this.consumerGroup = consumerGroup; + } + + public String getClientId() { + return clientId; + } + + public void setClientId(String clientId) { + this.clientId = clientId; + } + + public String getStrategyName() { + return strategyName; + } + + public void setStrategyName(String strategyName) { + this.strategyName = strategyName; + } + + public MessageModel getMessageModel() { + return messageModel; + } + + public void setMessageModel(MessageModel messageModel) { + this.messageModel = messageModel; + } +} diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/body/QueryAssignmentResponseBody.java b/common/src/main/java/org/apache/rocketmq/common/protocol/body/QueryAssignmentResponseBody.java new file mode 100644 index 00000000..688737d1 --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/body/QueryAssignmentResponseBody.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.common.protocol.body; + +import java.util.Set; +import org.apache.rocketmq.common.message.MessageQueueAssignment; +import org.apache.rocketmq.remoting.protocol.RemotingSerializable; + +public class QueryAssignmentResponseBody extends RemotingSerializable { + + private Set messageQueueAssignments; + + public Set getMessageQueueAssignments() { + return messageQueueAssignments; + } + + public void setMessageQueueAssignments( + Set messageQueueAssignments) { + this.messageQueueAssignments = messageQueueAssignments; + } +} diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/body/SetMessageRequestModeRequestBody.java b/common/src/main/java/org/apache/rocketmq/common/protocol/body/SetMessageRequestModeRequestBody.java new file mode 100644 index 00000000..309f7ae3 --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/body/SetMessageRequestModeRequestBody.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.common.protocol.body; + +import org.apache.rocketmq.common.message.MessageRequestMode; +import org.apache.rocketmq.remoting.protocol.RemotingSerializable; + +public class SetMessageRequestModeRequestBody extends RemotingSerializable { + + private String topic; + + private String consumerGroup; + + private MessageRequestMode mode = MessageRequestMode.PULL; + + /* + consumer working in pop mode could share the MessageQueues assigned to the N (N = popShareQueueNum) consumers following it in the cid list + */ + private int popShareQueueNum = 0; + + public SetMessageRequestModeRequestBody() { + } + + public String getTopic() { + return topic; + } + + public void setTopic(String topic) { + this.topic = topic; + } + + public String getConsumerGroup() { + return consumerGroup; + } + + public void setConsumerGroup(String consumerGroup) { + this.consumerGroup = consumerGroup; + } + + public MessageRequestMode getMode() { + return mode; + } + + public void setMode(MessageRequestMode mode) { + this.mode = mode; + } + + public int getPopShareQueueNum() { + return popShareQueueNum; + } + + public void setPopShareQueueNum(int popShareQueueNum) { + this.popShareQueueNum = popShareQueueNum; + } +} diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/AckMessageRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/AckMessageRequestHeader.java new file mode 100644 index 00000000..02e388ba --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/AckMessageRequestHeader.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.common.protocol.header; + +import org.apache.rocketmq.remoting.CommandCustomHeader; +import org.apache.rocketmq.remoting.annotation.CFNotNull; +import org.apache.rocketmq.remoting.exception.RemotingCommandException; + +public class AckMessageRequestHeader implements CommandCustomHeader { + @CFNotNull + private String consumerGroup; + @CFNotNull + private String topic; + @CFNotNull + private Integer queueId; + @CFNotNull + private String extraInfo; + + @CFNotNull + private Long offset; + + + @Override + public void checkFields() throws RemotingCommandException { + } + + public void setOffset(Long offset) { + this.offset = offset; + } + + public Long getOffset() { + return offset; + } + + public String getConsumerGroup() { + return consumerGroup; + } + + public void setExtraInfo(String extraInfo) { + this.extraInfo = extraInfo; + } + + public String getExtraInfo() { + return extraInfo; + } + + public void setConsumerGroup(String consumerGroup) { + this.consumerGroup = consumerGroup; + } + + public String getTopic() { + return topic; + } + + public void setTopic(String topic) { + this.topic = topic; + } + + public Integer getQueueId() { + return queueId; + } + + public void setQueueId(Integer queueId) { + this.queueId = queueId; + } + + @Override + public String toString() { + return topic + "," + this.consumerGroup + "," + this.queueId + "," + this.offset + "," + this.extraInfo; + } +} diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/ChangeInvisibleTimeRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/ChangeInvisibleTimeRequestHeader.java new file mode 100644 index 00000000..a586e490 --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/ChangeInvisibleTimeRequestHeader.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.common.protocol.header; + +import org.apache.rocketmq.remoting.CommandCustomHeader; +import org.apache.rocketmq.remoting.annotation.CFNotNull; +import org.apache.rocketmq.remoting.exception.RemotingCommandException; + +public class ChangeInvisibleTimeRequestHeader implements CommandCustomHeader { + @CFNotNull + private String consumerGroup; + @CFNotNull + private String topic; + @CFNotNull + private Integer queueId; + /** + * startOffset popTime invisibleTime queueId + */ + @CFNotNull + private String extraInfo; + + @CFNotNull + private Long offset; + + @CFNotNull + private Long invisibleTime; + + @Override + public void checkFields() throws RemotingCommandException { + } + + public void setOffset(Long offset) { + this.offset = offset; + } + + public Long getOffset() { + return offset; + } + + public Long getInvisibleTime() { + return invisibleTime; + } + + public void setInvisibleTime(Long invisibleTime) { + this.invisibleTime = invisibleTime; + } + + public String getConsumerGroup() { + return consumerGroup; + } + + public void setExtraInfo(String extraInfo) { + this.extraInfo = extraInfo; + } + + /** + * startOffset popTime invisibleTime queueId + */ + public String getExtraInfo() { + return extraInfo; + } + + public void setConsumerGroup(String consumerGroup) { + this.consumerGroup = consumerGroup; + } + + public String getTopic() { + return topic; + } + + public void setTopic(String topic) { + this.topic = topic; + } + + public Integer getQueueId() { + return queueId; + } + + public void setQueueId(Integer queueId) { + this.queueId = queueId; + } + +} diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/ChangeInvisibleTimeResponseHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/ChangeInvisibleTimeResponseHeader.java new file mode 100644 index 00000000..2ebabb76 --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/ChangeInvisibleTimeResponseHeader.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.common.protocol.header; + +import org.apache.rocketmq.remoting.CommandCustomHeader; +import org.apache.rocketmq.remoting.annotation.CFNotNull; +import org.apache.rocketmq.remoting.exception.RemotingCommandException; + +public class ChangeInvisibleTimeResponseHeader implements CommandCustomHeader { + + + @CFNotNull + private long popTime; + @CFNotNull + private long invisibleTime; + + @CFNotNull + private int reviveQid; + + @Override + public void checkFields() throws RemotingCommandException { + } + + public long getPopTime() { + return popTime; + } + + public void setPopTime(long popTime) { + this.popTime = popTime; + } + + public long getInvisibleTime() { + return invisibleTime; + } + + public void setInvisibleTime(long invisibleTime) { + this.invisibleTime = invisibleTime; + } + + public int getReviveQid() { + return reviveQid; + } + + public void setReviveQid(int reviveQid) { + this.reviveQid = reviveQid; + } +} diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/ExtraInfoUtil.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/ExtraInfoUtil.java new file mode 100644 index 00000000..19f37f6c --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/ExtraInfoUtil.java @@ -0,0 +1,258 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.common.protocol.header; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.rocketmq.common.KeyBuilder; +import org.apache.rocketmq.common.MixAll; +import org.apache.rocketmq.common.message.MessageConst; + +public class ExtraInfoUtil { + private static final String NORMAL_TOPIC = "0"; + private static final String RETRY_TOPIC = "1"; + + public static String[] split(String extraInfo) { + if (extraInfo == null) { + throw new IllegalArgumentException("split extraInfo is null"); + } + return extraInfo.split(MessageConst.KEY_SEPARATOR); + } + + public static Long getCkQueueOffset(String[] extraInfoStrs) { + if (extraInfoStrs == null || extraInfoStrs.length < 1) { + throw new IllegalArgumentException("getCkQueueOffset fail, extraInfoStrs length " + (extraInfoStrs == null ? 0 : extraInfoStrs.length)); + } + return Long.valueOf(extraInfoStrs[0]); + } + + public static Long getPopTime(String[] extraInfoStrs) { + if (extraInfoStrs == null || extraInfoStrs.length < 2) { + throw new IllegalArgumentException("getPopTime fail, extraInfoStrs length " + (extraInfoStrs == null ? 0 : extraInfoStrs.length)); + } + return Long.valueOf(extraInfoStrs[1]); + } + + public static Long getInvisibleTime(String[] extraInfoStrs) { + if (extraInfoStrs == null || extraInfoStrs.length < 3) { + throw new IllegalArgumentException("getInvisibleTime fail, extraInfoStrs length " + (extraInfoStrs == null ? 0 : extraInfoStrs.length)); + } + return Long.valueOf(extraInfoStrs[2]); + } + + public static int getReviveQid(String[] extraInfoStrs) { + if (extraInfoStrs == null || extraInfoStrs.length < 4) { + throw new IllegalArgumentException("getReviveQid fail, extraInfoStrs length " + (extraInfoStrs == null ? 0 : extraInfoStrs.length)); + } + return Integer.valueOf(extraInfoStrs[3]); + } + + public static String getRealTopic(String[] extraInfoStrs, String topic, String cid) { + if (extraInfoStrs == null || extraInfoStrs.length < 5) { + throw new IllegalArgumentException("getRealTopic fail, extraInfoStrs length " + (extraInfoStrs == null ? 0 : extraInfoStrs.length)); + } + if (RETRY_TOPIC.equals(extraInfoStrs[4])) { + return KeyBuilder.buildPopRetryTopic(topic, cid); + } else { + return topic; + } + } + + public static String getBrokerName(String[] extraInfoStrs) { + if (extraInfoStrs == null || extraInfoStrs.length < 6) { + throw new IllegalArgumentException("getBrokerName fail, extraInfoStrs length " + (extraInfoStrs == null ? 0 : extraInfoStrs.length)); + } + return extraInfoStrs[5]; + } + + public static int getQueueId(String[] extraInfoStrs) { + if (extraInfoStrs == null || extraInfoStrs.length < 7) { + throw new IllegalArgumentException("getQueueId fail, extraInfoStrs length " + (extraInfoStrs == null ? 0 : extraInfoStrs.length)); + } + return Integer.valueOf(extraInfoStrs[6]); + } + + public static long getQueueOffset(String[] extraInfoStrs) { + if (extraInfoStrs == null || extraInfoStrs.length < 8) { + throw new IllegalArgumentException("getQueueOffset fail, extraInfoStrs length " + (extraInfoStrs == null ? 0 : extraInfoStrs.length)); + } + return Long.valueOf(extraInfoStrs[7]); + } + + public static String buildExtraInfo(long ckQueueOffset, long popTime, long invisibleTime, int reviveQid, String topic, String brokerName, int queueId) { + String t = NORMAL_TOPIC; + if (topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { + t = RETRY_TOPIC; + } + return ckQueueOffset + MessageConst.KEY_SEPARATOR + popTime + MessageConst.KEY_SEPARATOR + invisibleTime + MessageConst.KEY_SEPARATOR + reviveQid + MessageConst.KEY_SEPARATOR + t + + MessageConst.KEY_SEPARATOR + brokerName + MessageConst.KEY_SEPARATOR + queueId; + } + + public static String buildExtraInfo(long ckQueueOffset, long popTime, long invisibleTime, int reviveQid, String topic, String brokerName, int queueId, + long msgQueueOffset) { + String t = NORMAL_TOPIC; + if (topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { + t = RETRY_TOPIC; + } + return ckQueueOffset + + MessageConst.KEY_SEPARATOR + popTime + MessageConst.KEY_SEPARATOR + invisibleTime + + MessageConst.KEY_SEPARATOR + reviveQid + MessageConst.KEY_SEPARATOR + t + + MessageConst.KEY_SEPARATOR + brokerName + MessageConst.KEY_SEPARATOR + queueId + + MessageConst.KEY_SEPARATOR + msgQueueOffset; + } + + public static void buildStartOffsetInfo(StringBuilder stringBuilder, boolean retry, int queueId, long startOffset) { + if (stringBuilder == null) { + stringBuilder = new StringBuilder(64); + } + + if (stringBuilder.length() > 0) { + stringBuilder.append(";"); + } + + stringBuilder.append(retry ? RETRY_TOPIC : NORMAL_TOPIC) + .append(MessageConst.KEY_SEPARATOR).append(queueId) + .append(MessageConst.KEY_SEPARATOR).append(startOffset); + } + + public static void buildOrderCountInfo(StringBuilder stringBuilder, boolean retry, int queueId, int orderCount) { + if (stringBuilder == null) { + stringBuilder = new StringBuilder(64); + } + + if (stringBuilder.length() > 0) { + stringBuilder.append(";"); + } + + stringBuilder.append(retry ? RETRY_TOPIC : NORMAL_TOPIC) + .append(MessageConst.KEY_SEPARATOR).append(queueId) + .append(MessageConst.KEY_SEPARATOR).append(orderCount); + } + + public static void buildMsgOffsetInfo(StringBuilder stringBuilder, boolean retry, int queueId, List msgOffsets) { + if (stringBuilder == null) { + stringBuilder = new StringBuilder(64); + } + + if (stringBuilder.length() > 0) { + stringBuilder.append(";"); + } + + stringBuilder.append(retry ? RETRY_TOPIC : NORMAL_TOPIC) + .append(MessageConst.KEY_SEPARATOR).append(queueId) + .append(MessageConst.KEY_SEPARATOR); + + for (int i = 0; i < msgOffsets.size(); i++) { + stringBuilder.append(msgOffsets.get(i)); + if (i < msgOffsets.size() - 1) { + stringBuilder.append(","); + } + } + } + + public static Map> parseMsgOffsetInfo(String msgOffsetInfo) { + if (msgOffsetInfo == null || msgOffsetInfo.length() == 0) { + return null; + } + + Map> msgOffsetMap = new HashMap>(4); + String[] array; + if (msgOffsetInfo.indexOf(";") < 0) { + array = new String[]{msgOffsetInfo}; + } else { + array = msgOffsetInfo.split(";"); + } + + for (String one : array) { + String[] split = one.split(MessageConst.KEY_SEPARATOR); + if (split.length != 3) { + throw new IllegalArgumentException("parse msgOffsetMap error, " + msgOffsetMap); + } + String key = split[0] + "@" + split[1]; + if (msgOffsetMap.containsKey(key)) { + throw new IllegalArgumentException("parse msgOffsetMap error, duplicate, " + msgOffsetMap); + } + msgOffsetMap.put(key, new ArrayList(8)); + String[] msgOffsets = split[2].split(","); + for (String msgOffset : msgOffsets) { + msgOffsetMap.get(key).add(Long.valueOf(msgOffset)); + } + } + + return msgOffsetMap; + } + + public static Map parseStartOffsetInfo(String startOffsetInfo) { + if (startOffsetInfo == null || startOffsetInfo.length() == 0) { + return null; + } + Map startOffsetMap = new HashMap(4); + String[] array; + if (startOffsetInfo.indexOf(";") < 0) { + array = new String[]{startOffsetInfo}; + } else { + array = startOffsetInfo.split(";"); + } + + for (String one : array) { + String[] split = one.split(MessageConst.KEY_SEPARATOR); + if (split.length != 3) { + throw new IllegalArgumentException("parse startOffsetInfo error, " + startOffsetInfo); + } + String key = split[0] + "@" + split[1]; + if (startOffsetMap.containsKey(key)) { + throw new IllegalArgumentException("parse startOffsetInfo error, duplicate, " + startOffsetInfo); + } + startOffsetMap.put(key, Long.valueOf(split[2])); + } + + return startOffsetMap; + } + + public static Map parseOrderCountInfo(String orderCountInfo) { + if (orderCountInfo == null || orderCountInfo.length() == 0) { + return null; + } + Map startOffsetMap = new HashMap(4); + String[] array; + if (orderCountInfo.indexOf(";") < 0) { + array = new String[]{orderCountInfo}; + } else { + array = orderCountInfo.split(";"); + } + + for (String one : array) { + String[] split = one.split(MessageConst.KEY_SEPARATOR); + if (split.length != 3) { + throw new IllegalArgumentException("parse orderCountInfo error, " + orderCountInfo); + } + String key = split[0] + "@" + split[1]; + if (startOffsetMap.containsKey(key)) { + throw new IllegalArgumentException("parse orderCountInfo error, duplicate, " + orderCountInfo); + } + startOffsetMap.put(key, Integer.valueOf(split[2])); + } + + return startOffsetMap; + } + + public static String getStartOffsetInfoMapKey(String topic, int queueId) { + return (topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX) ? RETRY_TOPIC : NORMAL_TOPIC) + "@" + queueId; + } +} diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/PopMessageRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/PopMessageRequestHeader.java new file mode 100644 index 00000000..4d151a23 --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/PopMessageRequestHeader.java @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.common.protocol.header; + +import org.apache.rocketmq.remoting.CommandCustomHeader; +import org.apache.rocketmq.remoting.annotation.CFNotNull; +import org.apache.rocketmq.remoting.exception.RemotingCommandException; + +public class PopMessageRequestHeader implements CommandCustomHeader { + @CFNotNull + private String consumerGroup; + @CFNotNull + private String topic; + @CFNotNull + private int queueId; + @CFNotNull + private int maxMsgNums; + @CFNotNull + private long invisibleTime; + @CFNotNull + private long pollTime; + @CFNotNull + private long bornTime; + @CFNotNull + private int initMode; + + private String expType; + private String exp; + + /** + * marked as order consume, if true + * 1. not commit offset + * 2. not pop retry, because no retry + * 3. not append check point, because no retry + */ + private Boolean order = Boolean.FALSE; + + @Override + public void checkFields() throws RemotingCommandException { + } + + public void setInitMode(int initMode) { + this.initMode = initMode; + } + + public int getInitMode() { + return initMode; + } + + public long getInvisibleTime() { + return invisibleTime; + } + + public void setInvisibleTime(long invisibleTime) { + this.invisibleTime = invisibleTime; + } + + public long getPollTime() { + return pollTime; + } + + public void setPollTime(long pollTime) { + this.pollTime = pollTime; + } + + public String getConsumerGroup() { + return consumerGroup; + } + + public void setConsumerGroup(String consumerGroup) { + this.consumerGroup = consumerGroup; + } + + public long getBornTime() { + return bornTime; + } + + public void setBornTime(long bornTime) { + this.bornTime = bornTime; + } + + public String getTopic() { + return topic; + } + + public void setTopic(String topic) { + this.topic = topic; + } + + public int getQueueId() { + if (queueId < 0) { + return -1; + } + return queueId; + } + + public void setQueueId(int queueId) { + this.queueId = queueId; + } + + + public int getMaxMsgNums() { + return maxMsgNums; + } + + public void setMaxMsgNums(int maxMsgNums) { + this.maxMsgNums = maxMsgNums; + } + + public boolean isTimeoutTooMuch() { + return System.currentTimeMillis() - bornTime - pollTime > 500; + } + + public String getExpType() { + return expType; + } + + public void setExpType(String expType) { + this.expType = expType; + } + + public String getExp() { + return exp; + } + + public void setExp(String exp) { + this.exp = exp; + } + + public Boolean getOrder() { + return order; + } + + public void setOrder(Boolean order) { + this.order = order; + } + + public boolean isOrder() { + return this.order != null && this.order.booleanValue(); + } +} diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/PopMessageResponseHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/PopMessageResponseHeader.java new file mode 100644 index 00000000..09867f3e --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/PopMessageResponseHeader.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.common.protocol.header; + +import org.apache.rocketmq.remoting.CommandCustomHeader; +import org.apache.rocketmq.remoting.annotation.CFNotNull; +import org.apache.rocketmq.remoting.exception.RemotingCommandException; + +public class PopMessageResponseHeader implements CommandCustomHeader { + + + @CFNotNull + private long popTime; + @CFNotNull + private long invisibleTime; + + @CFNotNull + private int reviveQid; + /** + * the rest num in queue + */ + @CFNotNull + private long restNum; + + private String startOffsetInfo; + private String msgOffsetInfo; + private String orderCountInfo; + + @Override + public void checkFields() throws RemotingCommandException { + } + + public long getPopTime() { + return popTime; + } + + public void setPopTime(long popTime) { + this.popTime = popTime; + } + + public long getInvisibleTime() { + return invisibleTime; + } + + public long getRestNum() { + return restNum; + } + + public void setRestNum(long restNum) { + this.restNum = restNum; + } + + public void setInvisibleTime(long invisibleTime) { + this.invisibleTime = invisibleTime; + } + + public int getReviveQid() { + return reviveQid; + } + + public void setReviveQid(int reviveQid) { + this.reviveQid = reviveQid; + } + + public String getStartOffsetInfo() { + return startOffsetInfo; + } + + public void setStartOffsetInfo(String startOffsetInfo) { + this.startOffsetInfo = startOffsetInfo; + } + + public String getMsgOffsetInfo() { + return msgOffsetInfo; + } + + public void setMsgOffsetInfo(String msgOffsetInfo) { + this.msgOffsetInfo = msgOffsetInfo; + } + + public String getOrderCountInfo() { + return orderCountInfo; + } + + public void setOrderCountInfo(String orderCountInfo) { + this.orderCountInfo = orderCountInfo; + } +} diff --git a/common/src/main/java/org/apache/rocketmq/common/utils/DataConverter.java b/common/src/main/java/org/apache/rocketmq/common/utils/DataConverter.java new file mode 100644 index 00000000..8b50de12 --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/utils/DataConverter.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.common.utils; + +import java.nio.ByteBuffer; +import java.nio.charset.Charset; + +public class DataConverter { + public static Charset charset = Charset.forName("UTF-8"); + + public static byte[] Long2Byte(Long v) { + ByteBuffer tmp = ByteBuffer.allocate(8); + tmp.putLong(v); + return tmp.array(); + } + + public static int setBit(int value, int index, boolean flag) { + if (flag) { + return (int) (value | (1L << index)); + } else { + return (int) (value & ~(1L << index)); + } + } + + public static boolean getBit(int value, int index) { + return (value & (1L << index)) != 0; + } +} -- GitLab