提交 86bea21b 编写于 作者: A ayanamist

[RIP-19] Pop Consuming (common)

上级 e277216b
......@@ -21,6 +21,7 @@ import java.net.UnknownHostException;
import org.apache.rocketmq.common.annotation.ImportantField;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.constant.PermName;
import org.apache.rocketmq.common.message.MessageRequestMode;
import org.apache.rocketmq.common.topic.TopicValidator;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
......@@ -62,12 +63,14 @@ public class BrokerConfig {
*/
private int sendMessageThreadPoolNums = 1; //16 + Runtime.getRuntime().availableProcessors() * 4;
private int pullMessageThreadPoolNums = 16 + Runtime.getRuntime().availableProcessors() * 2;
private int ackMessageThreadPoolNums = 3;
private int processReplyMessageThreadPoolNums = 16 + Runtime.getRuntime().availableProcessors() * 2;
private int queryMessageThreadPoolNums = 8 + Runtime.getRuntime().availableProcessors();
private int adminBrokerThreadPoolNums = 16;
private int clientManageThreadPoolNums = 32;
private int consumerManageThreadPoolNums = 32;
private int loadBalanceProcessorThreadPoolNums = 32;
private int heartbeatThreadPoolNums = Math.min(32, Runtime.getRuntime().availableProcessors());
/**
......@@ -85,6 +88,7 @@ public class BrokerConfig {
private boolean fetchNamesrvAddrByAddressServer = false;
private int sendThreadPoolQueueCapacity = 10000;
private int pullThreadPoolQueueCapacity = 100000;
private int ackThreadPoolQueueCapacity = 100000;
private int replyThreadPoolQueueCapacity = 10000;
private int queryThreadPoolQueueCapacity = 20000;
private int clientManagerThreadPoolQueueCapacity = 1000000;
......@@ -158,6 +162,37 @@ public class BrokerConfig {
*/
private int registerNameServerPeriod = 1000 * 30;
private int popPollingSize = 1024;
private int popPollingMapSize = 100000;
// 20w cost 200M heap memory.
private long maxPopPollingSize = 100000;
private int reviveQueueNum = 8;
private long reviveInterval = 1000;
private long reviveMaxSlow = 3;
private long reviveScanTime = 10000;
private boolean enablePopLog = true;
private boolean enablePopBufferMerge = false;
private int popCkStayBufferTime = 10 * 1000;
private int popCkStayBufferTimeOut = 3 * 1000;
private int popCkMaxBufferSize = 200000;
private int popCkOffsetMaxQueueSize = 20000;
/**
* the interval of pulling topic information from the named server
*/
private long loadBalancePollNameServerInterval = 1000 * 30;
/**
* the interval of cleaning
*/
private int cleanOfflineBrokerInterval = 1000 * 30;
private boolean serverLoadBalancerEnabled = true;
private MessageRequestMode defaultMessageRequestMode = MessageRequestMode.PULL;
private int defaultPopShareQueueNum = -1;
/**
* The minimum time of the transactional message to be checked firstly, one message only exceed this time interval
* that can be checked.
......@@ -197,6 +232,58 @@ public class BrokerConfig {
return "DEFAULT_BROKER";
}
public long getMaxPopPollingSize() {
return maxPopPollingSize;
}
public int getReviveQueueNum() {
return reviveQueueNum;
}
public long getReviveInterval() {
return reviveInterval;
}
public int getPopCkStayBufferTime() {
return popCkStayBufferTime;
}
public int getPopCkStayBufferTimeOut() {
return popCkStayBufferTimeOut;
}
public int getPopPollingMapSize() {
return popPollingMapSize;
}
public long getReviveScanTime() {
return reviveScanTime;
}
public long getReviveMaxSlow() {
return reviveMaxSlow;
}
public int getPopPollingSize() {
return popPollingSize;
}
public boolean isEnablePopBufferMerge() {
return enablePopBufferMerge;
}
public int getPopCkMaxBufferSize() {
return popCkMaxBufferSize;
}
public int getPopCkOffsetMaxQueueSize() {
return popCkOffsetMaxQueueSize;
}
public boolean isEnablePopLog() {
return enablePopLog;
}
public boolean isTraceOn() {
return traceOn;
}
......@@ -381,6 +468,14 @@ public class BrokerConfig {
this.pullMessageThreadPoolNums = pullMessageThreadPoolNums;
}
public int getAckMessageThreadPoolNums() {
return ackMessageThreadPoolNums;
}
public void setAckMessageThreadPoolNums(int ackMessageThreadPoolNums) {
this.ackMessageThreadPoolNums = ackMessageThreadPoolNums;
}
public int getProcessReplyMessageThreadPoolNums() {
return processReplyMessageThreadPoolNums;
}
......@@ -485,6 +580,14 @@ public class BrokerConfig {
this.pullThreadPoolQueueCapacity = pullThreadPoolQueueCapacity;
}
public int getAckThreadPoolQueueCapacity() {
return ackThreadPoolQueueCapacity;
}
public void setAckThreadPoolQueueCapacity(int ackThreadPoolQueueCapacity) {
this.ackThreadPoolQueueCapacity = ackThreadPoolQueueCapacity;
}
public int getReplyThreadPoolQueueCapacity() {
return replyThreadPoolQueueCapacity;
}
......@@ -804,4 +907,55 @@ public class BrokerConfig {
public void setAutoDeleteUnusedStats(boolean autoDeleteUnusedStats) {
this.autoDeleteUnusedStats = autoDeleteUnusedStats;
}
public long getLoadBalancePollNameServerInterval() {
return loadBalancePollNameServerInterval;
}
public void setLoadBalancePollNameServerInterval(long loadBalancePollNameServerInterval) {
this.loadBalancePollNameServerInterval = loadBalancePollNameServerInterval;
}
public int getCleanOfflineBrokerInterval() {
return cleanOfflineBrokerInterval;
}
public void setCleanOfflineBrokerInterval(int cleanOfflineBrokerInterval) {
this.cleanOfflineBrokerInterval = cleanOfflineBrokerInterval;
}
public int getLoadBalanceProcessorThreadPoolNums() {
return loadBalanceProcessorThreadPoolNums;
}
public void setLoadBalanceProcessorThreadPoolNums(int loadBalanceProcessorThreadPoolNums) {
this.loadBalanceProcessorThreadPoolNums = loadBalanceProcessorThreadPoolNums;
}
public boolean isServerLoadBalancerEnabled() {
return serverLoadBalancerEnabled;
}
public void setServerLoadBalancerEnabled(boolean serverLoadBalancerEnabled) {
this.serverLoadBalancerEnabled = serverLoadBalancerEnabled;
}
public MessageRequestMode getDefaultMessageRequestMode() {
return defaultMessageRequestMode;
}
public void setDefaultMessageRequestMode(String defaultMessageRequestMode) {
this.defaultMessageRequestMode = MessageRequestMode.valueOf(defaultMessageRequestMode);
}
public int getDefaultPopShareQueueNum() {
return defaultPopShareQueueNum;
}
public void setDefaultPopShareQueueNum(int defaultPopShareQueueNum) {
this.defaultPopShareQueueNum = defaultPopShareQueueNum;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.common;
public class KeyBuilder {
public static final int POP_ORDER_REVIVE_QUEUE = 999;
public static String buildPopRetryTopic(String topic, String cid) {
return MixAll.RETRY_GROUP_TOPIC_PREFIX + cid + "_" + topic;
}
public static String parseNormalTopic(String topic, String cid) {
if (topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
return topic.substring((MixAll.RETRY_GROUP_TOPIC_PREFIX + cid + "_").length());
} else {
return topic;
}
}
public static String buildPollingKey(String topic, String cid, int queueId) {
return topic + PopAckConstants.SPLIT + cid + PopAckConstants.SPLIT + queueId;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.common;
import org.apache.rocketmq.common.topic.TopicValidator;
public class PopAckConstants {
public static long ackTimeInterval = 1000;
public static final long SECOND = 1000;
public static long lockTime = 5000;
public static int retryQueueNum = 1;
public static final String REVIVE_GROUP = MixAll.CID_RMQ_SYS_PREFIX + "REVIVE_GROUP";
public static final String LOCAL_HOST = "127.0.0.1";
public static final String REVIVE_TOPIC = TopicValidator.SYSTEM_TOPIC_PREFIX + "REVIVE_LOG_";
public static final String CK_TAG = "ck";
public static final String ACK_TAG = "ack";
public static final String SPLIT = "@";
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.common.constant;
public class ConsumeInitMode {
public static final int MIN = 0;
public static final int MAX = 1;
}
......@@ -37,4 +37,5 @@ public class LoggerName {
public static final String PROTECTION_LOGGER_NAME = "RocketmqProtection";
public static final String WATER_MARK_LOGGER_NAME = "RocketmqWaterMark";
public static final String FILTER_LOGGER_NAME = "RocketmqFilter";
public static final String ROCKETMQ_POP_LOGGER_NAME = "RocketmqPop";
}
......@@ -52,6 +52,8 @@ public class MessageConst {
public static final String PROPERTY_PUSH_REPLY_TIME = "PUSH_REPLY_TIME";
public static final String PROPERTY_CLUSTER = "CLUSTER";
public static final String PROPERTY_MESSAGE_TYPE = "MSG_TYPE";
public static final String PROPERTY_POP_CK = "POP_CK";
public static final String PROPERTY_FIRST_POP_TIME = "1ST_POP_TIME";
public static final String KEY_SEPARATOR = " ";
......@@ -80,6 +82,8 @@ public class MessageConst {
STRING_HASH_SET.add(PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
STRING_HASH_SET.add(PROPERTY_MAX_RECONSUME_TIMES);
STRING_HASH_SET.add(PROPERTY_CONSUME_START_TIMESTAMP);
STRING_HASH_SET.add(PROPERTY_POP_CK);
STRING_HASH_SET.add(PROPERTY_FIRST_POP_TIME);
STRING_HASH_SET.add(PROPERTY_INSTANCE_ID);
STRING_HASH_SET.add(PROPERTY_CORRELATION_ID);
STRING_HASH_SET.add(PROPERTY_MESSAGE_REPLY_TO_CLIENT);
......
......@@ -37,7 +37,7 @@ public class MessageDecoder {
public final static int MESSAGE_MAGIC_CODE_POSTION = 4;
public final static int MESSAGE_FLAG_POSTION = 16;
public final static int MESSAGE_PHYSIC_OFFSET_POSTION = 28;
// public final static int MESSAGE_STORE_TIMESTAMP_POSTION = 56;
public final static int MESSAGE_STORE_TIMESTAMP_POSITION = 56;
public final static int MESSAGE_MAGIC_CODE = -626843481;
public static final char NAME_VALUE_SEPARATOR = 1;
public static final char PROPERTY_SEPARATOR = 2;
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.common.message;
import java.io.Serializable;
import java.util.Map;
public class MessageQueueAssignment implements Serializable {
private static final long serialVersionUID = 8092600270527861645L;
private MessageQueue messageQueue;
private MessageRequestMode mode = MessageRequestMode.PULL;
private Map<String, String> attachments;
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + ((messageQueue == null) ? 0 : messageQueue.hashCode());
result = prime * result + ((mode == null) ? 0 : mode.hashCode());
result = prime * result + ((attachments == null) ? 0 : attachments.hashCode());
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
MessageQueueAssignment other = (MessageQueueAssignment) obj;
return messageQueue.equals(other.messageQueue);
}
@Override
public String toString() {
return "MessageQueueAssignment [MessageQueue=" + messageQueue + ", Mode=" + mode + "]";
}
public MessageQueue getMessageQueue() {
return messageQueue;
}
public void setMessageQueue(MessageQueue messageQueue) {
this.messageQueue = messageQueue;
}
public MessageRequestMode getMode() {
return mode;
}
public void setMode(MessageRequestMode mode) {
this.mode = mode;
}
public Map<String, String> getAttachments() {
return attachments;
}
public void setAttachments(Map<String, String> attachments) {
this.attachments = attachments;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.common.message;
/**
* Message Request Mode
*/
public enum MessageRequestMode {
/**
* pull
*/
PULL("PULL"),
/**
* pop, consumer working in pop mode could share MessageQueue
*/
POP("POP");
private String name;
MessageRequestMode(String name) {
this.name = name;
}
public String getName() {
return name;
}
}
......@@ -80,6 +80,10 @@ public class RequestCode {
public static final int GET_BROKER_CLUSTER_ACL_CONFIG = 54;
public static final int POP_MESSAGE = 200050;
public static final int ACK_MESSAGE = 200051;
public static final int CHANGE_MESSAGE_INVISIBLETIME = 200053;
public static final int PUT_KV_CONFIG = 100;
public static final int GET_KV_CONFIG = 101;
......@@ -188,4 +192,7 @@ public class RequestCode {
public static final int SEND_REPLY_MESSAGE_V2 = 325;
public static final int PUSH_REPLY_MESSAGE_TO_CLIENT = 326;
public static final int QUERY_ASSIGNMENT = 400;
public static final int SET_MESSAGE_REQUEST_MODE = 401;
}
......@@ -80,4 +80,7 @@ public class ResponseCode extends RemotingSysResponseCode {
public static final int UPDATE_GLOBAL_WHITE_ADDRS_CONFIG_FAILED = 211;
public static final int POLLING_FULL = 209;
public static final int POLLING_TIMEOUT = 210;
}
......@@ -41,6 +41,8 @@ public class ConsumerRunningInfo extends RemotingSerializable {
private TreeMap<MessageQueue, ProcessQueueInfo> mqTable = new TreeMap<MessageQueue, ProcessQueueInfo>();
private TreeMap<MessageQueue, PopProcessQueueInfo> mqPopTable = new TreeMap<MessageQueue, PopProcessQueueInfo>();
private TreeMap<String/* Topic */, ConsumeStatus> statusTable = new TreeMap<String, ConsumeStatus>();
private String jstack;
......@@ -265,6 +267,28 @@ public class ConsumerRunningInfo extends RemotingSerializable {
}
}
{
sb.append("\n\n#Consumer Pop Detail#\n");
sb.append(String.format("%-32s %-32s %-4s %-20s%n",
"#Topic",
"#Broker Name",
"#QID",
"#ProcessQueueInfo"
));
Iterator<Entry<MessageQueue, PopProcessQueueInfo>> it = this.mqPopTable.entrySet().iterator();
while (it.hasNext()) {
Entry<MessageQueue, PopProcessQueueInfo> next = it.next();
String item = String.format("%-32s %-32s %-4d %s%n",
next.getKey().getTopic(),
next.getKey().getBrokerName(),
next.getKey().getQueueId(),
next.getValue().toString());
sb.append(item);
}
}
{
sb.append("\n\n#Consumer RT&TPS#\n");
sb.append(String.format("%-32s %14s %14s %14s %14s %18s %25s%n",
......@@ -310,4 +334,12 @@ public class ConsumerRunningInfo extends RemotingSerializable {
this.jstack = jstack;
}
public TreeMap<MessageQueue, PopProcessQueueInfo> getMqPopTable() {
return mqPopTable;
}
public void setMqPopTable(
TreeMap<MessageQueue, PopProcessQueueInfo> mqPopTable) {
this.mqPopTable = mqPopTable;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.common.protocol.body;
public class PopProcessQueueInfo {
private int waitAckCount;
private boolean droped;
private long lastPopTimestamp;
public int getWaitAckCount() {
return waitAckCount;
}
public void setWaitAckCount(int waitAckCount) {
this.waitAckCount = waitAckCount;
}
public boolean isDroped() {
return droped;
}
public void setDroped(boolean droped) {
this.droped = droped;
}
public long getLastPopTimestamp() {
return lastPopTimestamp;
}
public void setLastPopTimestamp(long lastPopTimestamp) {
this.lastPopTimestamp = lastPopTimestamp;
}
@Override
public String toString() {
return "PopProcessQueueInfo [waitAckCount:" + waitAckCount +
", droped:" + droped + ", lastPopTimestamp:" + lastPopTimestamp + "]";
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.common.protocol.body;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
public class QueryAssignmentRequestBody extends RemotingSerializable {
private String topic;
private String consumerGroup;
private String clientId;
private String strategyName;
private MessageModel messageModel;
public String getTopic() {
return topic;
}
public void setTopic(String topic) {
this.topic = topic;
}
public String getConsumerGroup() {
return consumerGroup;
}
public void setConsumerGroup(String consumerGroup) {
this.consumerGroup = consumerGroup;
}
public String getClientId() {
return clientId;
}
public void setClientId(String clientId) {
this.clientId = clientId;
}
public String getStrategyName() {
return strategyName;
}
public void setStrategyName(String strategyName) {
this.strategyName = strategyName;
}
public MessageModel getMessageModel() {
return messageModel;
}
public void setMessageModel(MessageModel messageModel) {
this.messageModel = messageModel;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.common.protocol.body;
import java.util.Set;
import org.apache.rocketmq.common.message.MessageQueueAssignment;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
public class QueryAssignmentResponseBody extends RemotingSerializable {
private Set<MessageQueueAssignment> messageQueueAssignments;
public Set<MessageQueueAssignment> getMessageQueueAssignments() {
return messageQueueAssignments;
}
public void setMessageQueueAssignments(
Set<MessageQueueAssignment> messageQueueAssignments) {
this.messageQueueAssignments = messageQueueAssignments;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.common.protocol.body;
import org.apache.rocketmq.common.message.MessageRequestMode;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
public class SetMessageRequestModeRequestBody extends RemotingSerializable {
private String topic;
private String consumerGroup;
private MessageRequestMode mode = MessageRequestMode.PULL;
/*
consumer working in pop mode could share the MessageQueues assigned to the N (N = popShareQueueNum) consumers following it in the cid list
*/
private int popShareQueueNum = 0;
public SetMessageRequestModeRequestBody() {
}
public String getTopic() {
return topic;
}
public void setTopic(String topic) {
this.topic = topic;
}
public String getConsumerGroup() {
return consumerGroup;
}
public void setConsumerGroup(String consumerGroup) {
this.consumerGroup = consumerGroup;
}
public MessageRequestMode getMode() {
return mode;
}
public void setMode(MessageRequestMode mode) {
this.mode = mode;
}
public int getPopShareQueueNum() {
return popShareQueueNum;
}
public void setPopShareQueueNum(int popShareQueueNum) {
this.popShareQueueNum = popShareQueueNum;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.common.protocol.header;
import org.apache.rocketmq.remoting.CommandCustomHeader;
import org.apache.rocketmq.remoting.annotation.CFNotNull;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
public class AckMessageRequestHeader implements CommandCustomHeader {
@CFNotNull
private String consumerGroup;
@CFNotNull
private String topic;
@CFNotNull
private Integer queueId;
@CFNotNull
private String extraInfo;
@CFNotNull
private Long offset;
@Override
public void checkFields() throws RemotingCommandException {
}
public void setOffset(Long offset) {
this.offset = offset;
}
public Long getOffset() {
return offset;
}
public String getConsumerGroup() {
return consumerGroup;
}
public void setExtraInfo(String extraInfo) {
this.extraInfo = extraInfo;
}
public String getExtraInfo() {
return extraInfo;
}
public void setConsumerGroup(String consumerGroup) {
this.consumerGroup = consumerGroup;
}
public String getTopic() {
return topic;
}
public void setTopic(String topic) {
this.topic = topic;
}
public Integer getQueueId() {
return queueId;
}
public void setQueueId(Integer queueId) {
this.queueId = queueId;
}
@Override
public String toString() {
return topic + "," + this.consumerGroup + "," + this.queueId + "," + this.offset + "," + this.extraInfo;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.common.protocol.header;
import org.apache.rocketmq.remoting.CommandCustomHeader;
import org.apache.rocketmq.remoting.annotation.CFNotNull;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
public class ChangeInvisibleTimeRequestHeader implements CommandCustomHeader {
@CFNotNull
private String consumerGroup;
@CFNotNull
private String topic;
@CFNotNull
private Integer queueId;
/**
* startOffset popTime invisibleTime queueId
*/
@CFNotNull
private String extraInfo;
@CFNotNull
private Long offset;
@CFNotNull
private Long invisibleTime;
@Override
public void checkFields() throws RemotingCommandException {
}
public void setOffset(Long offset) {
this.offset = offset;
}
public Long getOffset() {
return offset;
}
public Long getInvisibleTime() {
return invisibleTime;
}
public void setInvisibleTime(Long invisibleTime) {
this.invisibleTime = invisibleTime;
}
public String getConsumerGroup() {
return consumerGroup;
}
public void setExtraInfo(String extraInfo) {
this.extraInfo = extraInfo;
}
/**
* startOffset popTime invisibleTime queueId
*/
public String getExtraInfo() {
return extraInfo;
}
public void setConsumerGroup(String consumerGroup) {
this.consumerGroup = consumerGroup;
}
public String getTopic() {
return topic;
}
public void setTopic(String topic) {
this.topic = topic;
}
public Integer getQueueId() {
return queueId;
}
public void setQueueId(Integer queueId) {
this.queueId = queueId;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.common.protocol.header;
import org.apache.rocketmq.remoting.CommandCustomHeader;
import org.apache.rocketmq.remoting.annotation.CFNotNull;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
public class ChangeInvisibleTimeResponseHeader implements CommandCustomHeader {
@CFNotNull
private long popTime;
@CFNotNull
private long invisibleTime;
@CFNotNull
private int reviveQid;
@Override
public void checkFields() throws RemotingCommandException {
}
public long getPopTime() {
return popTime;
}
public void setPopTime(long popTime) {
this.popTime = popTime;
}
public long getInvisibleTime() {
return invisibleTime;
}
public void setInvisibleTime(long invisibleTime) {
this.invisibleTime = invisibleTime;
}
public int getReviveQid() {
return reviveQid;
}
public void setReviveQid(int reviveQid) {
this.reviveQid = reviveQid;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.common.protocol.header;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.rocketmq.common.KeyBuilder;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.message.MessageConst;
public class ExtraInfoUtil {
private static final String NORMAL_TOPIC = "0";
private static final String RETRY_TOPIC = "1";
public static String[] split(String extraInfo) {
if (extraInfo == null) {
throw new IllegalArgumentException("split extraInfo is null");
}
return extraInfo.split(MessageConst.KEY_SEPARATOR);
}
public static Long getCkQueueOffset(String[] extraInfoStrs) {
if (extraInfoStrs == null || extraInfoStrs.length < 1) {
throw new IllegalArgumentException("getCkQueueOffset fail, extraInfoStrs length " + (extraInfoStrs == null ? 0 : extraInfoStrs.length));
}
return Long.valueOf(extraInfoStrs[0]);
}
public static Long getPopTime(String[] extraInfoStrs) {
if (extraInfoStrs == null || extraInfoStrs.length < 2) {
throw new IllegalArgumentException("getPopTime fail, extraInfoStrs length " + (extraInfoStrs == null ? 0 : extraInfoStrs.length));
}
return Long.valueOf(extraInfoStrs[1]);
}
public static Long getInvisibleTime(String[] extraInfoStrs) {
if (extraInfoStrs == null || extraInfoStrs.length < 3) {
throw new IllegalArgumentException("getInvisibleTime fail, extraInfoStrs length " + (extraInfoStrs == null ? 0 : extraInfoStrs.length));
}
return Long.valueOf(extraInfoStrs[2]);
}
public static int getReviveQid(String[] extraInfoStrs) {
if (extraInfoStrs == null || extraInfoStrs.length < 4) {
throw new IllegalArgumentException("getReviveQid fail, extraInfoStrs length " + (extraInfoStrs == null ? 0 : extraInfoStrs.length));
}
return Integer.valueOf(extraInfoStrs[3]);
}
public static String getRealTopic(String[] extraInfoStrs, String topic, String cid) {
if (extraInfoStrs == null || extraInfoStrs.length < 5) {
throw new IllegalArgumentException("getRealTopic fail, extraInfoStrs length " + (extraInfoStrs == null ? 0 : extraInfoStrs.length));
}
if (RETRY_TOPIC.equals(extraInfoStrs[4])) {
return KeyBuilder.buildPopRetryTopic(topic, cid);
} else {
return topic;
}
}
public static String getBrokerName(String[] extraInfoStrs) {
if (extraInfoStrs == null || extraInfoStrs.length < 6) {
throw new IllegalArgumentException("getBrokerName fail, extraInfoStrs length " + (extraInfoStrs == null ? 0 : extraInfoStrs.length));
}
return extraInfoStrs[5];
}
public static int getQueueId(String[] extraInfoStrs) {
if (extraInfoStrs == null || extraInfoStrs.length < 7) {
throw new IllegalArgumentException("getQueueId fail, extraInfoStrs length " + (extraInfoStrs == null ? 0 : extraInfoStrs.length));
}
return Integer.valueOf(extraInfoStrs[6]);
}
public static long getQueueOffset(String[] extraInfoStrs) {
if (extraInfoStrs == null || extraInfoStrs.length < 8) {
throw new IllegalArgumentException("getQueueOffset fail, extraInfoStrs length " + (extraInfoStrs == null ? 0 : extraInfoStrs.length));
}
return Long.valueOf(extraInfoStrs[7]);
}
public static String buildExtraInfo(long ckQueueOffset, long popTime, long invisibleTime, int reviveQid, String topic, String brokerName, int queueId) {
String t = NORMAL_TOPIC;
if (topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
t = RETRY_TOPIC;
}
return ckQueueOffset + MessageConst.KEY_SEPARATOR + popTime + MessageConst.KEY_SEPARATOR + invisibleTime + MessageConst.KEY_SEPARATOR + reviveQid + MessageConst.KEY_SEPARATOR + t
+ MessageConst.KEY_SEPARATOR + brokerName + MessageConst.KEY_SEPARATOR + queueId;
}
public static String buildExtraInfo(long ckQueueOffset, long popTime, long invisibleTime, int reviveQid, String topic, String brokerName, int queueId,
long msgQueueOffset) {
String t = NORMAL_TOPIC;
if (topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
t = RETRY_TOPIC;
}
return ckQueueOffset
+ MessageConst.KEY_SEPARATOR + popTime + MessageConst.KEY_SEPARATOR + invisibleTime
+ MessageConst.KEY_SEPARATOR + reviveQid + MessageConst.KEY_SEPARATOR + t
+ MessageConst.KEY_SEPARATOR + brokerName + MessageConst.KEY_SEPARATOR + queueId
+ MessageConst.KEY_SEPARATOR + msgQueueOffset;
}
public static void buildStartOffsetInfo(StringBuilder stringBuilder, boolean retry, int queueId, long startOffset) {
if (stringBuilder == null) {
stringBuilder = new StringBuilder(64);
}
if (stringBuilder.length() > 0) {
stringBuilder.append(";");
}
stringBuilder.append(retry ? RETRY_TOPIC : NORMAL_TOPIC)
.append(MessageConst.KEY_SEPARATOR).append(queueId)
.append(MessageConst.KEY_SEPARATOR).append(startOffset);
}
public static void buildOrderCountInfo(StringBuilder stringBuilder, boolean retry, int queueId, int orderCount) {
if (stringBuilder == null) {
stringBuilder = new StringBuilder(64);
}
if (stringBuilder.length() > 0) {
stringBuilder.append(";");
}
stringBuilder.append(retry ? RETRY_TOPIC : NORMAL_TOPIC)
.append(MessageConst.KEY_SEPARATOR).append(queueId)
.append(MessageConst.KEY_SEPARATOR).append(orderCount);
}
public static void buildMsgOffsetInfo(StringBuilder stringBuilder, boolean retry, int queueId, List<Long> msgOffsets) {
if (stringBuilder == null) {
stringBuilder = new StringBuilder(64);
}
if (stringBuilder.length() > 0) {
stringBuilder.append(";");
}
stringBuilder.append(retry ? RETRY_TOPIC : NORMAL_TOPIC)
.append(MessageConst.KEY_SEPARATOR).append(queueId)
.append(MessageConst.KEY_SEPARATOR);
for (int i = 0; i < msgOffsets.size(); i++) {
stringBuilder.append(msgOffsets.get(i));
if (i < msgOffsets.size() - 1) {
stringBuilder.append(",");
}
}
}
public static Map<String, List<Long>> parseMsgOffsetInfo(String msgOffsetInfo) {
if (msgOffsetInfo == null || msgOffsetInfo.length() == 0) {
return null;
}
Map<String, List<Long>> msgOffsetMap = new HashMap<String, List<Long>>(4);
String[] array;
if (msgOffsetInfo.indexOf(";") < 0) {
array = new String[]{msgOffsetInfo};
} else {
array = msgOffsetInfo.split(";");
}
for (String one : array) {
String[] split = one.split(MessageConst.KEY_SEPARATOR);
if (split.length != 3) {
throw new IllegalArgumentException("parse msgOffsetMap error, " + msgOffsetMap);
}
String key = split[0] + "@" + split[1];
if (msgOffsetMap.containsKey(key)) {
throw new IllegalArgumentException("parse msgOffsetMap error, duplicate, " + msgOffsetMap);
}
msgOffsetMap.put(key, new ArrayList<Long>(8));
String[] msgOffsets = split[2].split(",");
for (String msgOffset : msgOffsets) {
msgOffsetMap.get(key).add(Long.valueOf(msgOffset));
}
}
return msgOffsetMap;
}
public static Map<String, Long> parseStartOffsetInfo(String startOffsetInfo) {
if (startOffsetInfo == null || startOffsetInfo.length() == 0) {
return null;
}
Map<String, Long> startOffsetMap = new HashMap<String, Long>(4);
String[] array;
if (startOffsetInfo.indexOf(";") < 0) {
array = new String[]{startOffsetInfo};
} else {
array = startOffsetInfo.split(";");
}
for (String one : array) {
String[] split = one.split(MessageConst.KEY_SEPARATOR);
if (split.length != 3) {
throw new IllegalArgumentException("parse startOffsetInfo error, " + startOffsetInfo);
}
String key = split[0] + "@" + split[1];
if (startOffsetMap.containsKey(key)) {
throw new IllegalArgumentException("parse startOffsetInfo error, duplicate, " + startOffsetInfo);
}
startOffsetMap.put(key, Long.valueOf(split[2]));
}
return startOffsetMap;
}
public static Map<String, Integer> parseOrderCountInfo(String orderCountInfo) {
if (orderCountInfo == null || orderCountInfo.length() == 0) {
return null;
}
Map<String, Integer> startOffsetMap = new HashMap<String, Integer>(4);
String[] array;
if (orderCountInfo.indexOf(";") < 0) {
array = new String[]{orderCountInfo};
} else {
array = orderCountInfo.split(";");
}
for (String one : array) {
String[] split = one.split(MessageConst.KEY_SEPARATOR);
if (split.length != 3) {
throw new IllegalArgumentException("parse orderCountInfo error, " + orderCountInfo);
}
String key = split[0] + "@" + split[1];
if (startOffsetMap.containsKey(key)) {
throw new IllegalArgumentException("parse orderCountInfo error, duplicate, " + orderCountInfo);
}
startOffsetMap.put(key, Integer.valueOf(split[2]));
}
return startOffsetMap;
}
public static String getStartOffsetInfoMapKey(String topic, int queueId) {
return (topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX) ? RETRY_TOPIC : NORMAL_TOPIC) + "@" + queueId;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.common.protocol.header;
import org.apache.rocketmq.remoting.CommandCustomHeader;
import org.apache.rocketmq.remoting.annotation.CFNotNull;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
public class PopMessageRequestHeader implements CommandCustomHeader {
@CFNotNull
private String consumerGroup;
@CFNotNull
private String topic;
@CFNotNull
private int queueId;
@CFNotNull
private int maxMsgNums;
@CFNotNull
private long invisibleTime;
@CFNotNull
private long pollTime;
@CFNotNull
private long bornTime;
@CFNotNull
private int initMode;
private String expType;
private String exp;
/**
* marked as order consume, if true
* 1. not commit offset
* 2. not pop retry, because no retry
* 3. not append check point, because no retry
*/
private Boolean order = Boolean.FALSE;
@Override
public void checkFields() throws RemotingCommandException {
}
public void setInitMode(int initMode) {
this.initMode = initMode;
}
public int getInitMode() {
return initMode;
}
public long getInvisibleTime() {
return invisibleTime;
}
public void setInvisibleTime(long invisibleTime) {
this.invisibleTime = invisibleTime;
}
public long getPollTime() {
return pollTime;
}
public void setPollTime(long pollTime) {
this.pollTime = pollTime;
}
public String getConsumerGroup() {
return consumerGroup;
}
public void setConsumerGroup(String consumerGroup) {
this.consumerGroup = consumerGroup;
}
public long getBornTime() {
return bornTime;
}
public void setBornTime(long bornTime) {
this.bornTime = bornTime;
}
public String getTopic() {
return topic;
}
public void setTopic(String topic) {
this.topic = topic;
}
public int getQueueId() {
if (queueId < 0) {
return -1;
}
return queueId;
}
public void setQueueId(int queueId) {
this.queueId = queueId;
}
public int getMaxMsgNums() {
return maxMsgNums;
}
public void setMaxMsgNums(int maxMsgNums) {
this.maxMsgNums = maxMsgNums;
}
public boolean isTimeoutTooMuch() {
return System.currentTimeMillis() - bornTime - pollTime > 500;
}
public String getExpType() {
return expType;
}
public void setExpType(String expType) {
this.expType = expType;
}
public String getExp() {
return exp;
}
public void setExp(String exp) {
this.exp = exp;
}
public Boolean getOrder() {
return order;
}
public void setOrder(Boolean order) {
this.order = order;
}
public boolean isOrder() {
return this.order != null && this.order.booleanValue();
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.common.protocol.header;
import org.apache.rocketmq.remoting.CommandCustomHeader;
import org.apache.rocketmq.remoting.annotation.CFNotNull;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
public class PopMessageResponseHeader implements CommandCustomHeader {
@CFNotNull
private long popTime;
@CFNotNull
private long invisibleTime;
@CFNotNull
private int reviveQid;
/**
* the rest num in queue
*/
@CFNotNull
private long restNum;
private String startOffsetInfo;
private String msgOffsetInfo;
private String orderCountInfo;
@Override
public void checkFields() throws RemotingCommandException {
}
public long getPopTime() {
return popTime;
}
public void setPopTime(long popTime) {
this.popTime = popTime;
}
public long getInvisibleTime() {
return invisibleTime;
}
public long getRestNum() {
return restNum;
}
public void setRestNum(long restNum) {
this.restNum = restNum;
}
public void setInvisibleTime(long invisibleTime) {
this.invisibleTime = invisibleTime;
}
public int getReviveQid() {
return reviveQid;
}
public void setReviveQid(int reviveQid) {
this.reviveQid = reviveQid;
}
public String getStartOffsetInfo() {
return startOffsetInfo;
}
public void setStartOffsetInfo(String startOffsetInfo) {
this.startOffsetInfo = startOffsetInfo;
}
public String getMsgOffsetInfo() {
return msgOffsetInfo;
}
public void setMsgOffsetInfo(String msgOffsetInfo) {
this.msgOffsetInfo = msgOffsetInfo;
}
public String getOrderCountInfo() {
return orderCountInfo;
}
public void setOrderCountInfo(String orderCountInfo) {
this.orderCountInfo = orderCountInfo;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.common.utils;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
public class DataConverter {
public static Charset charset = Charset.forName("UTF-8");
public static byte[] Long2Byte(Long v) {
ByteBuffer tmp = ByteBuffer.allocate(8);
tmp.putLong(v);
return tmp.array();
}
public static int setBit(int value, int index, boolean flag) {
if (flag) {
return (int) (value | (1L << index));
} else {
return (int) (value & ~(1L << index));
}
}
public static boolean getBit(int value, int index) {
return (value & (1L << index)) != 0;
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册