提交 0e4aa3ab 编写于 作者: D duhenglucky

Polish consumer offset management process, prevent queryed old offset in an random snode

上级 21d18b35
......@@ -82,13 +82,13 @@ import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper;
import org.apache.rocketmq.common.stats.MomentStatsItem;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.ClientConfig;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.RemotingServer;
import org.apache.rocketmq.remoting.RemotingServerFactory;
import org.apache.rocketmq.remoting.common.TlsMode;
import org.apache.rocketmq.remoting.ClientConfig;
import org.apache.rocketmq.remoting.RequestProcessor;
import org.apache.rocketmq.remoting.ServerConfig;
import org.apache.rocketmq.remoting.common.TlsMode;
import org.apache.rocketmq.remoting.netty.RequestTask;
import org.apache.rocketmq.remoting.netty.TlsSystemConfig;
import org.apache.rocketmq.remoting.transport.rocketmq.NettyRemotingServer;
......@@ -532,10 +532,12 @@ public class BrokerController {
this.remotingServer.registerProcessor(RequestCode.HEART_BEAT, clientProcessor, this.heartbeatExecutor);
this.remotingServer.registerProcessor(RequestCode.UNREGISTER_CLIENT, clientProcessor, this.clientManageExecutor);
this.remotingServer.registerProcessor(RequestCode.CHECK_CLIENT_CONFIG, clientProcessor, this.clientManageExecutor);
this.remotingServer.registerProcessor(RequestCode.CREATE_RETRY_TOPIC, clientProcessor, this.clientManageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.HEART_BEAT, clientProcessor, this.heartbeatExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.UNREGISTER_CLIENT, clientProcessor, this.clientManageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.CHECK_CLIENT_CONFIG, clientProcessor, this.clientManageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.CREATE_RETRY_TOPIC, clientProcessor, this.clientManageExecutor);
/**
* ConsumerManageProcessor
......
......@@ -23,10 +23,10 @@ import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.constant.PermName;
import org.apache.rocketmq.common.filter.ExpressionType;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.body.CheckClientRequestBody;
import org.apache.rocketmq.common.protocol.header.CreateRetryTopicRequestHeader;
import org.apache.rocketmq.common.protocol.header.UnregisterClientRequestHeader;
import org.apache.rocketmq.common.protocol.header.UnregisterClientResponseHeader;
import org.apache.rocketmq.common.protocol.heartbeat.ConsumerData;
......@@ -36,13 +36,14 @@ import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
import org.apache.rocketmq.common.sysflag.TopicSysFlag;
import org.apache.rocketmq.filter.FilterFactory;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.RemotingChannel;
import org.apache.rocketmq.remoting.RequestProcessor;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.netty.NettyChannelHandlerContextImpl;
import org.apache.rocketmq.remoting.netty.NettyChannelImpl;
import org.apache.rocketmq.remoting.RequestProcessor;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
public class ClientManageProcessor implements RequestProcessor {
......@@ -55,8 +56,8 @@ public class ClientManageProcessor implements RequestProcessor {
@Override
public RemotingCommand processRequest(RemotingChannel remotingChannel,
RemotingCommand request) throws RemotingCommandException {
NettyChannelHandlerContextImpl nettyChannelHandlerContext = (NettyChannelHandlerContextImpl)remotingChannel;
RemotingCommand request) throws RemotingCommandException {
NettyChannelHandlerContextImpl nettyChannelHandlerContext = (NettyChannelHandlerContextImpl) remotingChannel;
ChannelHandlerContext ctx = nettyChannelHandlerContext.getChannelHandlerContext();
switch (request.getCode()) {
......@@ -66,6 +67,8 @@ public class ClientManageProcessor implements RequestProcessor {
return this.unregisterClient(ctx, request);
case RequestCode.CHECK_CLIENT_CONFIG:
return this.checkClientConfig(ctx, request);
case RequestCode.CREATE_RETRY_TOPIC:
return createRetryTopic(ctx, request);
default:
break;
}
......@@ -80,7 +83,6 @@ public class ClientManageProcessor implements RequestProcessor {
public RemotingCommand heartBeat(ChannelHandlerContext ctx, RemotingCommand request) {
RemotingCommand response = RemotingCommand.createResponseCommand(null);
HeartbeatData heartbeatData = HeartbeatData.decode(request.getBody(), HeartbeatData.class);
log.info("heart beat request:{}", heartbeatData);
ClientChannelInfo clientChannelInfo = new ClientChannelInfo(
new NettyChannelImpl(ctx.channel()),
heartbeatData.getClientID(),
......@@ -95,15 +97,7 @@ public class ClientManageProcessor implements RequestProcessor {
boolean isNotifyConsumerIdsChangedEnable = true;
if (null != subscriptionGroupConfig) {
isNotifyConsumerIdsChangedEnable = subscriptionGroupConfig.isNotifyConsumerIdsChangedEnable();
int topicSysFlag = 0;
if (data.isUnitMode()) {
topicSysFlag = TopicSysFlag.buildSysFlag(false, true);
}
String newTopic = MixAll.getRetryTopic(data.getGroupName());
this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(
newTopic,
subscriptionGroupConfig.getRetryQueueNums(),
PermName.PERM_WRITE | PermName.PERM_READ, topicSysFlag);
createRetryTopic(data.isUnitMode(), data.getGroupName(), subscriptionGroupConfig.getRetryQueueNums());
}
boolean changed = this.brokerController.getConsumerManager().registerConsumer(
......@@ -209,4 +203,33 @@ public class ClientManageProcessor implements RequestProcessor {
response.setRemark(null);
return response;
}
private RemotingCommand createRetryTopic(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
RemotingCommand response = RemotingCommand.createResponseCommand(null);
final CreateRetryTopicRequestHeader requestHeader =
(CreateRetryTopicRequestHeader) request
.decodeCommandCustomHeader(CreateRetryTopicRequestHeader.class);
if (requestHeader.getGroupName() != null) {
SubscriptionGroupConfig subscriptionGroupConfig =
this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(requestHeader.getGroupName());
if (subscriptionGroupConfig != null) {
createRetryTopic(false, requestHeader.getGroupName(), subscriptionGroupConfig.getRetryQueueNums());
}
}
return response;
}
private void createRetryTopic(boolean unitMode, String groupName, int retryQueueNumbers) {
int topicSysFlag = 0;
if (unitMode) {
topicSysFlag = TopicSysFlag.buildSysFlag(false, true);
}
String newTopic = MixAll.getRetryTopic(groupName);
this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(
newTopic,
retryQueueNumbers,
PermName.PERM_WRITE | PermName.PERM_READ, topicSysFlag);
}
}
......@@ -25,6 +25,7 @@ import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.impl.FindBrokerResult;
import org.apache.rocketmq.client.impl.factory.MQClientInstance;
import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.common.MixAll;
......
......@@ -84,6 +84,7 @@ import org.apache.rocketmq.common.protocol.body.UnlockBatchRequestBody;
import org.apache.rocketmq.common.protocol.header.CloneGroupOffsetRequestHeader;
import org.apache.rocketmq.common.protocol.header.ConsumeMessageDirectlyResultRequestHeader;
import org.apache.rocketmq.common.protocol.header.ConsumerSendMsgBackRequestHeader;
import org.apache.rocketmq.common.protocol.header.CreateRetryTopicRequestHeader;
import org.apache.rocketmq.common.protocol.header.CreateTopicRequestHeader;
import org.apache.rocketmq.common.protocol.header.DeleteSubscriptionGroupRequestHeader;
import org.apache.rocketmq.common.protocol.header.DeleteTopicRequestHeader;
......@@ -1194,7 +1195,7 @@ public class MQClientAPIImpl {
public SnodeClusterInfo getSnodeClusterInfo(
//Todo Redifine snode exception
final long timeoutMillis) throws InterruptedException, RemotingTimeoutException,
RemotingSendRequestException, RemotingConnectException , MQBrokerException {
RemotingSendRequestException, RemotingConnectException, MQBrokerException {
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_SNODE_CLUSTER_INFO, null);
RemotingCommand response = this.remotingClient.invokeSync(null, request, timeoutMillis);
......@@ -1230,14 +1231,12 @@ public class MQClientAPIImpl {
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ROUTEINTO_BY_TOPIC, requestHeader);
RemotingCommand response = this.remotingClient.invokeSync(null, request, timeoutMillis);
log.info("getTopicRouteInfoFromNameServer response: " + response);
assert response != null;
switch (response.getCode()) {
case ResponseCode.TOPIC_NOT_EXIST: {
if (allowTopicNotExist && !topic.equals(MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC)) {
log.warn("get Topic [{}] RouteInfoFromNameServer is not exist value", topic);
}
break;
}
case ResponseCode.SUCCESS: {
......@@ -2110,4 +2109,21 @@ public class MQClientAPIImpl {
throw new MQClientException(response.getCode(), response.getRemark());
}
}
public void createRetryTopic(final String address, final String enodeName,
final String consumerGroup,
final long timeoutMillis) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException,
RemotingConnectException, MQClientException {
CreateRetryTopicRequestHeader requestHeader = new CreateRetryTopicRequestHeader();
requestHeader.setEnodeName(enodeName);
requestHeader.setGroupName(consumerGroup);
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CREATE_RETRY_TOPIC, requestHeader);
RemotingCommand response = this.remotingClient.invokeSync(address, request, timeoutMillis);
assert response != null;
System.out.println("create retry topic for consumerGrouop: " + consumerGroup);
if (ResponseCode.SUCCESS != response.getCode()) {
throw new MQClientException(response.getCode(), response.getRemark());
}
}
}
......@@ -847,6 +847,8 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {
final String topic = entry.getKey();
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
this.mQClientFactory.createRetryTopic(topic, this.defaultMQPushConsumer.getConsumerGroup());
}
}
}
......
......@@ -1282,6 +1282,24 @@ public class MQClientInstance {
}
}
public void createRetryTopic(String topic,
String consumerGroup) {
System.out.println("****create retry topic for consumerGrouop: " + consumerGroup);
TopicRouteData topicRouteData = this.topicRouteTable.get(topic);
if (topicRouteData != null) {
if (topicRouteData.getBrokerDatas() != null) {
for (BrokerData brokerData : topicRouteData.getBrokerDatas()) {
try {
String address = findSnodeAddressInPublish();
this.mQClientAPIImpl.createRetryTopic(address, brokerData.getBrokerName(), consumerGroup, 1000 * 3);
} catch (Exception ex) {
log.warn("CreateRetryTopic error", ex);
}
}
}
}
}
public TopicRouteData getAnExistTopicRouteData(final String topic) {
return this.topicRouteTable.get(topic);
}
......
......@@ -178,8 +178,7 @@ public class RequestCode {
public static final int GET_SNODE_INFO = 354;
public static final int CREATE_RETRY_TOPIC = 355;
public static final int MQTT_MESSAGE = 1000;
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.common.protocol.header;
import org.apache.rocketmq.remoting.CommandCustomHeader;
import org.apache.rocketmq.remoting.annotation.CFNotNull;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
public class CreateRetryTopicRequestHeader implements CommandCustomHeader {
@CFNotNull
private String groupName;
@CFNotNull
private String enodeName;
public String getEnodeName() {
return enodeName;
}
public void setEnodeName(String enodeName) {
this.enodeName = enodeName;
}
public String getGroupName() {
return groupName;
}
public void setGroupName(String groupName) {
this.groupName = groupName;
}
@Override
public void checkFields() throws RemotingCommandException {
}
}
......@@ -35,7 +35,7 @@ public class Consumer {
/*
* Instantiate with specified consumer group name.
*/
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("RocketMQ5");
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("hello");
/*
* Specify name server addresses.
......
......@@ -31,7 +31,7 @@ public class Producer {
/*
* Instantiate with a producer group name.
*/
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
DefaultMQProducer producer = new DefaultMQProducer("nihao");
/*
* Specify name server addresses.
......@@ -75,7 +75,7 @@ public class Producer {
/*
* Shut down once the producer instance is not longer in use.
*/
Thread.sleep(3000L);
Thread.sleep(30000L);
producer.shutdown();
}
}
......@@ -102,7 +102,6 @@ public class SnodeController {
private ExecutorService consumerManageExecutor;
private EnodeService enodeService;
private NnodeService nnodeService;
private ExecutorService consumerManagerExecutor;
private ScheduledService scheduledService;
private ClientManager producerManager;
private ClientManager consumerManager;
......@@ -169,14 +168,14 @@ public class SnodeController {
"SnodeHeartbeatThread",
true);
this.consumerManagerExecutor = ThreadUtils.newThreadPoolExecutor(
snodeConfig.getSnodeSendMessageMinPoolSize(),
snodeConfig.getSnodeSendMessageMaxPoolSize(),
3000,
TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(snodeConfig.getSnodeSendThreadPoolQueueCapacity()),
"SnodePullMessageThread",
false);
// this.consumerManagerExecutor = ThreadUtils.newThreadPoolExecutor(
// snodeConfig.getSnodeSendMessageMinPoolSize(),
// snodeConfig.getSnodeSendMessageMaxPoolSize(),
// 3000,
// TimeUnit.MILLISECONDS,
// new ArrayBlockingQueue<>(snodeConfig.getSnodeSendThreadPoolQueueCapacity()),
// "SnodePullMessageThread",
// false);
this.consumerManageExecutor = ThreadUtils.newThreadPoolExecutor(
snodeConfig.getSnodeSendMessageMinPoolSize(),
......@@ -350,10 +349,10 @@ public class SnodeController {
this.snodeServer
.registerProcessor(RequestCode.SEARCH_OFFSET_BY_TIMESTAMP, consumerManageProcessor,
this.consumerManageExecutor);
this.snodeServer.registerProcessor(RequestCode.CREATE_RETRY_TOPIC, consumerManageProcessor, this.consumerManageExecutor);
this.mqttRemotingServer.registerProcessor(RequestCode.MQTT_MESSAGE,
defaultMqttMessageProcessor, handleMqttMessageExecutor);
defaultMqttMessageProcessor.registerMessageHanlder(MqttMessageType.CONNECT,
new MqttConnectMessageHandler(this));
defaultMqttMessageProcessor.registerMessageHanlder(MqttMessageType.DISCONNECT,
......@@ -375,8 +374,6 @@ public class SnodeController {
defaultMqttMessageProcessor.registerMessageHanlder(MqttMessageType.UNSUBSCRIBE,
new MqttUnsubscribeMessagHandler(this));
}
public void start() {
......@@ -402,9 +399,9 @@ public class SnodeController {
if (this.heartbeatExecutor != null) {
this.heartbeatExecutor.shutdown();
}
if (this.consumerManagerExecutor != null) {
this.consumerManagerExecutor.shutdown();
}
// if (this.consumerManagerExecutor != null) {
// this.consumerManagerExecutor.shutdown();
// }
if (this.scheduledExecutorService != null) {
this.scheduledExecutorService.shutdown();
}
......
......@@ -113,7 +113,6 @@ public abstract class ClientManagerImpl implements ClientManager {
ConcurrentHashMap prev = groupClientTable.putIfAbsent(groupId, channelTable);
channelTable = prev != null ? prev : channelTable;
}
log.info("*********");
RemotingChannel remotingChannel = client.getRemotingChannel();
if (remotingChannel instanceof NettyChannelHandlerContextImpl) {
remotingChannel = new NettyChannelImpl(((NettyChannelHandlerContextImpl) remotingChannel).getChannelHandlerContext().channel());
......@@ -140,7 +139,7 @@ public abstract class ClientManagerImpl implements ClientManager {
oldClient.setLastUpdateTimestamp(System.currentTimeMillis());
onRegister(groupId, remotingChannel);
}
log.info("Register client role: {}, group: {}, last: {}", client.getClientRole(), groupId,
log.debug("Register client role: {}, group: {}, last: {}", client.getClientRole(), groupId,
client.getLastUpdateTimestamp());
return updated;
}
......
......@@ -36,7 +36,7 @@ public class SlowConsumerServiceImpl implements SlowConsumerService {
@Override
public boolean isSlowConsumer(long currentOffset, String topic, int queueId,
String consumerGroup, String enodeName) {
long ackedOffset = this.snodeController.getConsumerOffsetManager().queryOffset(enodeName, consumerGroup, topic, queueId);
long ackedOffset = this.snodeController.getConsumerOffsetManager().queryCacheOffset(enodeName, consumerGroup, topic, queueId);
if (currentOffset - ackedOffset > snodeController.getSnodeConfig().getSlowConsumerThreshold()) {
log.warn("[SlowConsumer] group: {}, lastAckedOffset:{} nowOffset:{} ", consumerGroup, ackedOffset, currentOffset);
return true;
......
......@@ -17,7 +17,6 @@
package org.apache.rocketmq.snode.offset;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
......@@ -31,7 +30,6 @@ import org.apache.rocketmq.remoting.exception.RemotingConnectException;
import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.serialize.RemotingSerializable;
import org.apache.rocketmq.snode.SnodeController;
import org.apache.rocketmq.snode.exception.SnodeException;
......@@ -88,7 +86,7 @@ public class ConsumerOffsetManager {
return result;
}
public void commitOffset(final String enodeName, final String clientHost, final String group, final String topic,
public void cacheOffset(final String enodeName, final String clientHost, final String group, final String topic,
final int queueId,
final long offset) {
// Topic@group
......@@ -111,7 +109,7 @@ public class ConsumerOffsetManager {
}
}
public long queryOffset(final String enodeName, final String group, final String topic, final int queueId) {
public long queryCacheOffset(final String enodeName, final String group, final String topic, final int queueId) {
String key = buildKey(enodeName, topic, group);
ConcurrentMap<Integer, Long> map = this.offsetTable.get(key);
if (null != map) {
......@@ -123,49 +121,35 @@ public class ConsumerOffsetManager {
return -1;
}
public String encode(final boolean prettyFormat) {
return RemotingSerializable.toJson(this, prettyFormat);
}
// public String encode(final boolean prettyFormat) {
// return RemotingSerializable.toJson(this, prettyFormat);
// }
public ConcurrentMap<String, ConcurrentMap<Integer, Long>> getOffsetTable() {
return offsetTable;
}
public void setOffsetTable(ConcurrentHashMap<String, ConcurrentMap<Integer, Long>> offsetTable) {
this.offsetTable = offsetTable;
}
// public void setOffsetTable(ConcurrentHashMap<String, ConcurrentMap<Integer, Long>> offsetTable) {
// this.offsetTable = offsetTable;
// }
public Map<Integer, Long> queryOffset(final String enodeName, final String group, final String topic) {
// topic@group
String key = buildKey(enodeName, topic, group);
return this.offsetTable.get(key);
}
// public Map<Integer, Long> queryOffset(final String enodeName, final String group, final String topic) {
// // topic@group
// String key = buildKey(enodeName, topic, group);
// return this.offsetTable.get(key);
// }
public void cloneOffset(final String srcGroup, final String destGroup, final String topic) {
ConcurrentMap<Integer, Long> offsets = this.offsetTable.get(topic + TOPIC_GROUP_SEPARATOR + srcGroup);
if (offsets != null) {
this.offsetTable.put(topic + TOPIC_GROUP_SEPARATOR + destGroup, new ConcurrentHashMap<Integer, Long>(offsets));
}
public void commitOffset(final String enodeName, final String clientHost, final String group, final String topic,
final int queueId,
final long offset) {
cacheOffset(enodeName, clientHost, group, topic, queueId, offset);
this.snodeController.getEnodeService().persistOffset(enodeName, group, topic, queueId, offset);
}
public void persist() {
for (Entry<String, ConcurrentMap<Integer, Long>> offSetEntry : this.offsetTable.entrySet()) {
ConcurrentHashMap<Integer, Long> map = (ConcurrentHashMap<Integer, Long>) offSetEntry.getValue();
String key = offSetEntry.getKey();
String[] keys = key.split(TOPIC_GROUP_SEPARATOR);
if (keys.length == 3) {
String enodeName = keys[0];
String topic = keys[1];
String consumerGroup = keys[2];
for (Entry<Integer, Long> queueEntry : map.entrySet()) {
Integer queueId = queueEntry.getKey();
Long offset = queueEntry.getValue();
this.snodeController.getEnodeService().persistOffset(enodeName, consumerGroup, topic, queueId, offset);
}
} else {
log.error("Persist offset split keys error:{}", key);
}
}
public RemotingCommand queryOffset(final String enodeName, final String group, final String topic,
final int queueId) throws InterruptedException, RemotingTimeoutException,
RemotingSendRequestException, RemotingConnectException {
return this.snodeController.getEnodeService().loadOffset(enodeName, group, topic, queueId);
}
}
......@@ -20,13 +20,13 @@ import java.util.List;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.header.CreateRetryTopicRequestHeader;
import org.apache.rocketmq.common.protocol.header.GetConsumerListByGroupRequestHeader;
import org.apache.rocketmq.common.protocol.header.GetConsumerListByGroupResponseBody;
import org.apache.rocketmq.common.protocol.header.GetConsumerListByGroupResponseHeader;
import org.apache.rocketmq.common.protocol.header.GetMaxOffsetRequestHeader;
import org.apache.rocketmq.common.protocol.header.GetMinOffsetRequestHeader;
import org.apache.rocketmq.common.protocol.header.QueryConsumerOffsetRequestHeader;
import org.apache.rocketmq.common.protocol.header.QueryConsumerOffsetResponseHeader;
import org.apache.rocketmq.common.protocol.header.SearchOffsetRequestHeader;
import org.apache.rocketmq.common.protocol.header.UpdateConsumerOffsetRequestHeader;
import org.apache.rocketmq.common.protocol.header.UpdateConsumerOffsetResponseHeader;
......@@ -68,6 +68,8 @@ public class ConsumerManageProcessor implements RequestProcessor {
return getMaxOffset(remotingChannel, request);
case RequestCode.GET_MIN_OFFSET:
return getMinOffset(remotingChannel, request);
case RequestCode.CREATE_RETRY_TOPIC:
return createRetryTopic(remotingChannel, request);
default:
break;
}
......@@ -161,32 +163,25 @@ public class ConsumerManageProcessor implements RequestProcessor {
private RemotingCommand queryConsumerOffset(RemotingChannel remotingChannel, RemotingCommand request)
throws InterruptedException, RemotingTimeoutException,
RemotingSendRequestException, RemotingConnectException, RemotingCommandException {
final RemotingCommand response =
RemotingCommand.createResponseCommand(QueryConsumerOffsetResponseHeader.class);
final QueryConsumerOffsetRequestHeader requestHeader =
(QueryConsumerOffsetRequestHeader) request
.decodeCommandCustomHeader(QueryConsumerOffsetRequestHeader.class);
final QueryConsumerOffsetResponseHeader responseHeader =
(QueryConsumerOffsetResponseHeader) response.readCustomHeader();
long offset =
this.snodeController.getConsumerOffsetManager().queryOffset(requestHeader.getEnodeName(),
requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId());
if (offset < 0) {
log.info("Load offset from enode server, enodeName: {}, consumer group: {}, topic: {}, queueId: {}",
requestHeader.getEnodeName(),
requestHeader.getConsumerGroup(),
requestHeader.getTopic(),
requestHeader.getQueueId());
return this.snodeController.getEnodeService().loadOffset(requestHeader.getEnodeName(), requestHeader.getConsumerGroup(), requestHeader.getTopic(),
requestHeader.getQueueId());
} else {
responseHeader.setOffset(offset);
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
return response;
}
log.info("Load offset from enode server, enodeName: {}, consumer group: {}, topic: {}, queueId: {}",
requestHeader.getEnodeName(),
requestHeader.getConsumerGroup(),
requestHeader.getTopic(),
requestHeader.getQueueId());
return this.snodeController.getConsumerOffsetManager().queryOffset(requestHeader.getEnodeName(), requestHeader.getConsumerGroup(), requestHeader.getTopic(),
requestHeader.getQueueId());
}
public RemotingCommand createRetryTopic(RemotingChannel remotingChannel,
RemotingCommand request) throws InterruptedException, RemotingTimeoutException,
RemotingSendRequestException, RemotingConnectException, RemotingCommandException {
final CreateRetryTopicRequestHeader requestHeader = (CreateRetryTopicRequestHeader) request.decodeCommandCustomHeader(CreateRetryTopicRequestHeader.class);
requestHeader.getEnodeName();
return this.snodeController.getEnodeService().creatRetryTopic(requestHeader.getEnodeName(), request);
}
}
......@@ -18,7 +18,6 @@ package org.apache.rocketmq.snode.service;
import java.util.concurrent.CompletableFuture;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader;
import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeaderV2;
import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
......@@ -55,18 +54,18 @@ public interface EnodeService {
CompletableFuture<RemotingCommand> pullMessage(final String enodeName, final RemotingCommand request);
/**
* Create topic to enode server.
* Create retry topic in enode server.
*
* @param enodeName Enode server name
* @param topicConfig {@link TopicConfig} Topic config information
* @param request {@link RemotingCommand } with @see Cra Create
* @return
* @throws InterruptedException
* @throws RemotingTimeoutException
* @throws RemotingSendRequestException
* @throws RemotingConnectException
*/
RemotingCommand creatTopic(String enodeName,
TopicConfig topicConfig) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException;
RemotingCommand creatRetryTopic(String enodeName,
RemotingCommand request) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException;
/**
* Update Enode address from name server
......
......@@ -24,12 +24,10 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.body.ClusterInfo;
import org.apache.rocketmq.common.protocol.header.CreateTopicRequestHeader;
import org.apache.rocketmq.common.protocol.header.GetMinOffsetRequestHeader;
import org.apache.rocketmq.common.protocol.header.QueryConsumerOffsetRequestHeader;
import org.apache.rocketmq.common.protocol.header.UpdateConsumerOffsetRequestHeader;
......@@ -226,31 +224,22 @@ public class EnodeServiceImpl implements EnodeService {
@Override
public RemotingCommand getMaxOffsetInQueue(String enodeName,
RemotingCommand request) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, RemotingCommandException {
String addr = this.snodeController.getNnodeService().getAddressByEnodeName(enodeName, false);
return this.snodeController.getRemotingClient().invokeSync(MixAll.brokerVIPChannel(snodeController.getSnodeConfig().isVipChannelEnabled(), addr),
String address = this.snodeController.getNnodeService().getAddressByEnodeName(enodeName, false);
return this.snodeController.getRemotingClient().invokeSync(address,
request, SnodeConstant.DEFAULT_TIMEOUT_MILLS);
}
@Override
public RemotingCommand getOffsetByTimestamp(String enodeName,
RemotingCommand request) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
String addr = this.snodeController.getNnodeService().getAddressByEnodeName(enodeName, false);
return this.snodeController.getRemotingClient().invokeSync(MixAll.brokerVIPChannel(snodeController.getSnodeConfig().isVipChannelEnabled(), addr),
String address = this.snodeController.getNnodeService().getAddressByEnodeName(enodeName, false);
return this.snodeController.getRemotingClient().invokeSync(address,
request, SnodeConstant.DEFAULT_TIMEOUT_MILLS);
}
@Override
public RemotingCommand creatTopic(String enodeName,
TopicConfig topicConfig) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
CreateTopicRequestHeader requestHeader = new CreateTopicRequestHeader();
requestHeader.setTopic(topicConfig.getTopicName());
requestHeader.setPerm(topicConfig.getPerm());
requestHeader.setReadQueueNums(topicConfig.getReadQueueNums());
requestHeader.setWriteQueueNums(topicConfig.getWriteQueueNums());
requestHeader.setOrder(topicConfig.isOrder());
requestHeader.setTopicFilterType(topicConfig.getTopicFilterType().name());
requestHeader.setTopicSysFlag(topicConfig.getTopicSysFlag());
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UPDATE_AND_CREATE_TOPIC, requestHeader);
public RemotingCommand creatRetryTopic(String enodeName,
RemotingCommand request) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
String address = this.snodeController.getNnodeService().getAddressByEnodeName(enodeName, false);
return this.snodeController.getRemotingClient().invokeSync(address,
request, SnodeConstant.DEFAULT_TIMEOUT_MILLS);
......
......@@ -130,16 +130,6 @@ public class ScheduledServiceImpl implements ScheduledService {
}
}, 0, Math.max(10000, Math.min(snodeConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
snodeController.getConsumerOffsetManager().persist();
} catch (Throwable e) {
log.error("ScheduledTask fetchNameServerAddr exception", e);
}
}
}, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS);
}
@Override
......
......@@ -19,7 +19,6 @@ package org.apache.rocketmq.snode.service;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.common.SnodeConfig;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader;
......@@ -124,15 +123,6 @@ public class EnodeServiceImplTest extends SnodeTestBase {
assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
}
@Test
public void creatTopicTest() throws Exception {
when(snodeController.getNnodeService().getAddressByEnodeName(anyString(), anyBoolean())).thenReturn("127.0.0.1:10911");
when(snodeController.getRemotingClient().invokeSync(anyString(), any(RemotingCommand.class), anyLong())).thenReturn(createSuccessResponse());
TopicConfig topicConfig = new TopicConfig(topic, 1, 1, 2);
RemotingCommand response = enodeService.creatTopic(enodeName, topicConfig);
assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
}
private GetMessageResult createGetMessageResult() {
GetMessageResult getMessageResult = new GetMessageResult();
getMessageResult.setStatus(GetMessageStatus.FOUND);
......
......@@ -61,7 +61,7 @@ public class SlowConsumerServiceImplTest {
@Test
public void isSlowConsumerTest() {
snodeController.setConsumerOffsetManager(consumerOffsetManager);
when(snodeController.getConsumerOffsetManager().queryOffset(anyString(), anyString(), anyString(), anyInt())).thenReturn(1024L);
when(snodeController.getConsumerOffsetManager().queryCacheOffset(anyString(), anyString(), anyString(), anyInt())).thenReturn(1024L);
this.snodeController.getSnodeConfig().setSlowConsumerThreshold(100);
boolean slowConsumer = slowConsumerService.isSlowConsumer(2000, topic, queue, group, enodeName);
assertThat(slowConsumer).isTrue();
......@@ -70,7 +70,7 @@ public class SlowConsumerServiceImplTest {
@Test
public void isSlowConsumerTestFalse() {
snodeController.setConsumerOffsetManager(consumerOffsetManager);
when(snodeController.getConsumerOffsetManager().queryOffset(anyString(), anyString(), anyString(), anyInt())).thenReturn(1024L);
when(snodeController.getConsumerOffsetManager().queryCacheOffset(anyString(), anyString(), anyString(), anyInt())).thenReturn(1024L);
this.snodeController.getSnodeConfig().setSlowConsumerThreshold(100);
boolean slowConsumer = slowConsumerService.isSlowConsumer(1025, topic, queue, group, enodeName);
assertThat(slowConsumer).isFalse();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册