提交 f7e2b1f8 编写于 作者: S ShannonDing

Merge branch 'snode' of github.com:apache/rocketmq into snode

......@@ -121,6 +121,7 @@ public class BrokerController {
private final ClientHousekeepingService clientHousekeepingService;
private final PullMessageProcessor pullMessageProcessor;
private final SnodePullMessageProcessor snodePullMessageProcessor;
private final SendMessageProcessor sendProcessor;
private final PullRequestHoldService pullRequestHoldService;
private final MessageArrivingListener messageArrivingListener;
private final Broker2Client broker2Client;
......@@ -164,6 +165,9 @@ public class BrokerController {
private TransactionalMessageService transactionalMessageService;
private AbstractTransactionalMessageCheckListener transactionalMessageCheckListener;
private Future<?> slaveSyncFuture;
private ClientManageProcessor clientManageProcessor;
private AdminBrokerProcessor adminProcessor;
private ConsumerManageProcessor consumerManageProcessor;
public BrokerController(
final BrokerConfig brokerConfig,
......@@ -178,6 +182,7 @@ public class BrokerController {
this.consumerOffsetManager = new ConsumerOffsetManager(this);
this.topicConfigManager = new TopicConfigManager(this);
this.pullMessageProcessor = new PullMessageProcessor(this);
this.sendProcessor = new SendMessageProcessor(this);
this.pullRequestHoldService = new PullRequestHoldService(this);
this.messageArrivingListener = new NotifyMessageArrivingListener(this.pullRequestHoldService);
this.consumerIdsChangeListener = new DefaultConsumerIdsChangeListener(this);
......@@ -543,9 +548,8 @@ public class BrokerController {
/**
* SendMessageProcessor
*/
SendMessageProcessor sendProcessor = new SendMessageProcessor(this);
sendProcessor.registerSendMessageHook(sendMessageHookList);
sendProcessor.registerConsumeMessageHook(consumeMessageHookList);
this.sendProcessor.registerSendMessageHook(sendMessageHookList);
this.sendProcessor.registerConsumeMessageHook(consumeMessageHookList);
this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor);
this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor);
......@@ -575,28 +579,28 @@ public class BrokerController {
/**
* ClientManageProcessor
*/
ClientManageProcessor clientProcessor = new ClientManageProcessor(this);
this.remotingServer.registerProcessor(RequestCode.HEART_BEAT, clientProcessor, this.heartbeatExecutor);
this.remotingServer.registerProcessor(RequestCode.UNREGISTER_CLIENT, clientProcessor, this.clientManageExecutor);
this.remotingServer.registerProcessor(RequestCode.CHECK_CLIENT_CONFIG, clientProcessor, this.clientManageExecutor);
this.remotingServer.registerProcessor(RequestCode.CREATE_RETRY_TOPIC, clientProcessor, this.clientManageExecutor);
this.clientManageProcessor = new ClientManageProcessor(this);
this.remotingServer.registerProcessor(RequestCode.HEART_BEAT, clientManageProcessor, this.heartbeatExecutor);
this.remotingServer.registerProcessor(RequestCode.UNREGISTER_CLIENT, clientManageProcessor, this.clientManageExecutor);
this.remotingServer.registerProcessor(RequestCode.CHECK_CLIENT_CONFIG, clientManageProcessor, this.clientManageExecutor);
this.remotingServer.registerProcessor(RequestCode.CREATE_RETRY_TOPIC, clientManageProcessor, this.clientManageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.HEART_BEAT, clientProcessor, this.heartbeatExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.UNREGISTER_CLIENT, clientProcessor, this.clientManageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.CHECK_CLIENT_CONFIG, clientProcessor, this.clientManageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.CREATE_RETRY_TOPIC, clientProcessor, this.clientManageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.HEART_BEAT, clientManageProcessor, this.heartbeatExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.UNREGISTER_CLIENT, clientManageProcessor, this.clientManageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.CHECK_CLIENT_CONFIG, clientManageProcessor, this.clientManageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.CREATE_RETRY_TOPIC, clientManageProcessor, this.clientManageExecutor);
/**
* ConsumerManageProcessor
*/
ConsumerManageProcessor consumerManageProcessor = new ConsumerManageProcessor(this);
this.remotingServer.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP, consumerManageProcessor, this.consumerManageExecutor);
this.remotingServer.registerProcessor(RequestCode.UPDATE_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);
this.remotingServer.registerProcessor(RequestCode.QUERY_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);
this.consumerManageProcessor = new ConsumerManageProcessor(this);
this.remotingServer.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP, this.consumerManageProcessor, this.consumerManageExecutor);
this.remotingServer.registerProcessor(RequestCode.UPDATE_CONSUMER_OFFSET, this.consumerManageProcessor, this.consumerManageExecutor);
this.remotingServer.registerProcessor(RequestCode.QUERY_CONSUMER_OFFSET, this.consumerManageProcessor, this.consumerManageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP, consumerManageProcessor, this.consumerManageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.UPDATE_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.QUERY_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP, this.consumerManageProcessor, this.consumerManageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.UPDATE_CONSUMER_OFFSET, this.consumerManageProcessor, this.consumerManageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.QUERY_CONSUMER_OFFSET, this.consumerManageProcessor, this.consumerManageExecutor);
/**
* EndTransactionProcessor
......@@ -607,7 +611,7 @@ public class BrokerController {
/**
* Default
*/
AdminBrokerProcessor adminProcessor = new AdminBrokerProcessor(this);
this.adminProcessor = new AdminBrokerProcessor(this);
this.remotingServer.registerDefaultProcessor(adminProcessor, this.adminBrokerExecutor);
this.fastRemotingServer.registerDefaultProcessor(adminProcessor, this.adminBrokerExecutor);
}
......@@ -1220,4 +1224,23 @@ public class BrokerController {
}
}
public SendMessageProcessor getSendProcessor() {
return sendProcessor;
}
public ClientManageProcessor getClientManageProcessor() {
return clientManageProcessor;
}
public AdminBrokerProcessor getAdminProcessor() {
return adminProcessor;
}
public void setAdminProcessor(AdminBrokerProcessor adminProcessor) {
this.adminProcessor = adminProcessor;
}
public ConsumerManageProcessor getConsumerManageProcessor() {
return consumerManageProcessor;
}
}
......@@ -18,6 +18,11 @@ package org.apache.rocketmq.broker;
import ch.qos.logback.classic.LoggerContext;
import ch.qos.logback.classic.joran.JoranConfigurator;
import java.io.BufferedInputStream;
import java.io.FileInputStream;
import java.io.InputStream;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
......@@ -28,10 +33,10 @@ import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.common.RemotingUtil;
import org.apache.rocketmq.remoting.common.TlsMode;
import org.apache.rocketmq.remoting.ClientConfig;
import org.apache.rocketmq.remoting.ServerConfig;
import org.apache.rocketmq.remoting.common.RemotingUtil;
import org.apache.rocketmq.remoting.common.TlsMode;
import org.apache.rocketmq.remoting.netty.NettySystemConfig;
import org.apache.rocketmq.remoting.netty.TlsSystemConfig;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
......@@ -40,12 +45,6 @@ import org.apache.rocketmq.store.config.BrokerRole;
import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.slf4j.LoggerFactory;
import java.io.BufferedInputStream;
import java.io.FileInputStream;
import java.io.InputStream;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicInteger;
import static org.apache.rocketmq.remoting.netty.TlsSystemConfig.TLS_ENABLE;
public class BrokerStartup {
......@@ -53,6 +52,7 @@ public class BrokerStartup {
public static CommandLine commandLine = null;
public static String configFile = null;
public static InternalLogger log;
private static BrokerController brokerController = null;
public static void main(String[] args) {
start(createBrokerController(args));
......@@ -238,7 +238,7 @@ public class BrokerStartup {
}
}
}, "ShutdownHook"));
brokerController = controller;
return controller;
} catch (Throwable e) {
e.printStackTrace();
......@@ -260,7 +260,6 @@ public class BrokerStartup {
private static Options buildCommandlineOptions(final Options options) {
Option opt = new Option("c", "configFile", true, "Broker config properties file");
opt.setRequired(false);
options.addOption(opt);
......@@ -275,4 +274,8 @@ public class BrokerStartup {
return options;
}
public static BrokerController getBrokerController() {
return brokerController;
}
}
......@@ -17,6 +17,11 @@
package org.apache.rocketmq.broker.processor;
import io.netty.channel.ChannelHandlerContext;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.List;
import java.util.Map;
import java.util.Random;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.mqtrace.SendMessageContext;
import org.apache.rocketmq.broker.mqtrace.SendMessageHook;
......@@ -27,8 +32,6 @@ import org.apache.rocketmq.common.constant.DBMsgConstants;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.constant.PermName;
import org.apache.rocketmq.common.help.FAQUrl;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.common.message.MessageAccessor;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageDecoder;
......@@ -40,18 +43,14 @@ import org.apache.rocketmq.common.protocol.header.SendMessageResponseHeader;
import org.apache.rocketmq.common.sysflag.MessageSysFlag;
import org.apache.rocketmq.common.sysflag.TopicSysFlag;
import org.apache.rocketmq.common.utils.ChannelUtil;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.RequestProcessor;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.RequestProcessor;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.store.MessageExtBrokerInner;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.List;
import java.util.Map;
import java.util.Random;
public abstract class AbstractSendMessageProcessor implements RequestProcessor {
protected static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
......@@ -159,7 +158,7 @@ public abstract class AbstractSendMessageProcessor implements RequestProcessor {
return response;
}
protected RemotingCommand msgCheck(final ChannelHandlerContext ctx,
protected RemotingCommand msgCheck(final String remoteAddress,
final SendMessageRequestHeader requestHeader, final RemotingCommand response) {
if (!PermName.isWriteable(this.brokerController.getBrokerConfig().getBrokerPermission())
&& this.brokerController.getTopicConfigManager().isOrderTopic(requestHeader.getTopic())) {
......@@ -188,11 +187,11 @@ public abstract class AbstractSendMessageProcessor implements RequestProcessor {
}
}
log.warn("the topic {} not exist, producer: {}", requestHeader.getTopic(), ctx.channel().remoteAddress());
log.warn("the topic {} not exist, producer: {}", requestHeader.getTopic(), remoteAddress);
topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageMethod(
requestHeader.getTopic(),
requestHeader.getDefaultTopic(),
RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
remoteAddress,
requestHeader.getDefaultTopicQueueNums(), topicSysFlag);
if (null == topicConfig) {
......@@ -218,7 +217,7 @@ public abstract class AbstractSendMessageProcessor implements RequestProcessor {
String errorInfo = String.format("request queueId[%d] is illegal, %s Producer: %s",
queueIdInt,
topicConfig.toString(),
RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
remoteAddress);
log.warn(errorInfo);
response.setCode(ResponseCode.SYSTEM_ERROR);
......
......@@ -207,9 +207,8 @@ public class ClientManageProcessor implements RequestProcessor {
private RemotingCommand createRetryTopic(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
RemotingCommand response = RemotingCommand.createResponseCommand(null);
final CreateRetryTopicRequestHeader requestHeader =
(CreateRetryTopicRequestHeader) request
.decodeCommandCustomHeader(CreateRetryTopicRequestHeader.class);
final CreateRetryTopicRequestHeader requestHeader = (CreateRetryTopicRequestHeader) request
.decodeCommandCustomHeader(CreateRetryTopicRequestHeader.class);
if (requestHeader.getGroupName() != null) {
SubscriptionGroupConfig subscriptionGroupConfig =
this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(requestHeader.getGroupName());
......@@ -217,7 +216,8 @@ public class ClientManageProcessor implements RequestProcessor {
createRetryTopic(false, requestHeader.getGroupName(), subscriptionGroupConfig.getRetryQueueNums());
}
}
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
return response;
}
......
......@@ -46,6 +46,7 @@ import org.apache.rocketmq.common.sysflag.MessageSysFlag;
import org.apache.rocketmq.common.sysflag.TopicSysFlag;
import org.apache.rocketmq.remoting.RemotingChannel;
import org.apache.rocketmq.remoting.RequestProcessor;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.netty.NettyChannelHandlerContextImpl;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
......@@ -71,7 +72,7 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
SendMessageContext mqtraceContext;
switch (request.getCode()) {
case RequestCode.CONSUMER_SEND_MSG_BACK:
return this.consumerSendMsgBack(ctx, request);
return this.consumerSendMsgBack(request);
default:
SendMessageRequestHeader requestHeader = parseRequestHeader(request);
if (requestHeader == null) {
......@@ -82,10 +83,13 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
this.executeSendMessageHookBefore(ctx, request, mqtraceContext);
RemotingCommand response;
// String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
SocketAddress bornHost = ctx.channel().remoteAddress();
if (requestHeader.isBatch()) {
response = this.sendBatchMessage(ctx, request, mqtraceContext, requestHeader);
response = this.sendBatchMessage(bornHost, request, mqtraceContext, requestHeader);
} else {
response = this.sendMessage(ctx, request, mqtraceContext, requestHeader);
response = this.sendMessage(bornHost, request, mqtraceContext, requestHeader);
}
this.executeSendMessageHookAfter(response, mqtraceContext);
......@@ -99,7 +103,7 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
this.brokerController.getMessageStore().isTransientStorePoolDeficient();
}
private RemotingCommand consumerSendMsgBack(final ChannelHandlerContext ctx, final RemotingCommand request)
private RemotingCommand consumerSendMsgBack(final RemotingCommand request)
throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
final ConsumerSendMsgBackRequestHeader requestHeader =
......@@ -296,7 +300,7 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
return true;
}
private RemotingCommand sendMessage(final ChannelHandlerContext ctx,
private RemotingCommand sendMessage(final SocketAddress remoteAddress,
final RemotingCommand request,
final SendMessageContext sendMessageContext,
final SendMessageRequestHeader requestHeader) throws RemotingCommandException {
......@@ -319,7 +323,7 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
}
response.setCode(-1);
super.msgCheck(ctx, requestHeader, response);
super.msgCheck(RemotingHelper.parseChannelRemoteAddr(remoteAddress), requestHeader, response);
if (response.getCode() != -1) {
return response;
}
......@@ -346,9 +350,12 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
MessageAccessor.setProperties(msgInner, MessageDecoder.string2messageProperties(requestHeader.getProperties()));
msgInner.setPropertiesString(requestHeader.getProperties());
msgInner.setBornTimestamp(requestHeader.getBornTimestamp());
msgInner.setBornHost(ctx.channel().remoteAddress());
msgInner.setBornHost(requestHeader.getBornHost());
msgInner.setStoreHost(this.getStoreHost());
msgInner.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes());
msgInner.setBornHost(remoteAddress);
// ByteBuffer hostHolder = ByteBuffer.allocate(8);
// String bornHost = msgInner.getStoreHostBytes(hostHolder).toString();
PutMessageResult putMessageResult = null;
Map<String, String> oriProps = MessageDecoder.string2messageProperties(requestHeader.getProperties());
String traFlag = oriProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);
......@@ -365,14 +372,14 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
}
return handlePutMessageResult(putMessageResult, response, request, msgInner, responseHeader, sendMessageContext, ctx, queueIdInt);
return handlePutMessageResult(putMessageResult, response, request, msgInner, responseHeader, sendMessageContext, queueIdInt, remoteAddress);
}
private RemotingCommand handlePutMessageResult(PutMessageResult putMessageResult, RemotingCommand response,
RemotingCommand request, MessageExt msg,
SendMessageResponseHeader responseHeader, SendMessageContext sendMessageContext, ChannelHandlerContext ctx,
int queueIdInt) {
SendMessageResponseHeader responseHeader, SendMessageContext sendMessageContext,
int queueIdInt, SocketAddress bornHost) {
if (putMessageResult == null) {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("store putMessage return null");
......@@ -445,8 +452,8 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
responseHeader.setCommitLogOffset(putMessageResult.getAppendMessageResult().getWroteOffset());
responseHeader.setStoreTimestamp(putMessageResult.getAppendMessageResult().getStoreTimestamp());
responseHeader.setStoreSize(putMessageResult.getAppendMessageResult().getWroteBytes());
responseHeader.setStoreHost(ctx.channel().localAddress().toString());
doResponse(ctx, request, response);
responseHeader.setStoreHost(RemotingHelper.parseChannelRemoteAddr(bornHost));
// doResponse(ctx, request, response);
if (hasSendMessageHook()) {
sendMessageContext.setMsgId(responseHeader.getMsgId());
......@@ -462,6 +469,10 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
sendMessageContext.setCommercialSendSize(wroteSize);
sendMessageContext.setCommercialOwner(owner);
}
if (!request.isOnewayRPC()) {
response.setCustomHeader(responseHeader);
return response;
}
return null;
} else {
if (hasSendMessageHook()) {
......@@ -477,7 +488,7 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
return response;
}
private RemotingCommand sendBatchMessage(final ChannelHandlerContext ctx,
private RemotingCommand sendBatchMessage(final SocketAddress remoteAddress,
final RemotingCommand request,
final SendMessageContext sendMessageContext,
final SendMessageRequestHeader requestHeader) throws RemotingCommandException {
......@@ -500,7 +511,7 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
}
response.setCode(-1);
super.msgCheck(ctx, requestHeader, response);
super.msgCheck(RemotingHelper.parseChannelRemoteAddr(remoteAddress), requestHeader, response);
if (response.getCode() != -1) {
return response;
}
......@@ -537,13 +548,17 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
MessageAccessor.setProperties(messageExtBatch, MessageDecoder.string2messageProperties(requestHeader.getProperties()));
messageExtBatch.setBody(request.getBody());
messageExtBatch.setBornTimestamp(requestHeader.getBornTimestamp());
messageExtBatch.setBornHost(ctx.channel().remoteAddress());
messageExtBatch.setBornHost(requestHeader.getBornHost());
messageExtBatch.setStoreHost(this.getStoreHost());
messageExtBatch.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes());
// ByteBuffer hostHolder = ByteBuffer.allocate(8);
// String storeHost = messageExtBatch.getStoreHostBytes(hostHolder).toString();
PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessages(messageExtBatch);
return handlePutMessageResult(putMessageResult, response, request, messageExtBatch, responseHeader, sendMessageContext, ctx, queueIdInt);
return handlePutMessageResult(putMessageResult, response, request, messageExtBatch, responseHeader, sendMessageContext, queueIdInt, storeHost);
}
public boolean hasConsumeMessageHook() {
......
......@@ -22,6 +22,8 @@ import org.apache.rocketmq.common.annotation.ImportantField;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.ClientConfig;
import org.apache.rocketmq.remoting.ServerConfig;
import org.apache.rocketmq.remoting.common.RemotingUtil;
public class SnodeConfig {
......@@ -32,6 +34,10 @@ public class SnodeConfig {
private String rocketmqHome = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY, System.getenv(MixAll.ROCKETMQ_HOME_ENV));
private ServerConfig nettyServerConfig;
private ClientConfig nettyClientConfig;
@ImportantField
private String namesrvAddr = System.getProperty(MixAll.NAMESRV_ADDR_PROPERTY, System.getenv(MixAll.NAMESRV_ADDR_ENV));
......@@ -106,6 +112,9 @@ public class SnodeConfig {
@ImportantField
private boolean aclEnable = false;
@ImportantField
private boolean embeddedModeEnable = true;
public void setSnodeHeartBeatInterval(long snodeHeartBeatInterval) {
this.snodeHeartBeatInterval = snodeHeartBeatInterval;
}
......@@ -410,4 +419,27 @@ public class SnodeConfig {
this.loadOffsetInterval = loadOffsetInterval;
}
public boolean isEmbeddedModeEnable() {
return embeddedModeEnable;
}
public void setEmbeddedModeEnable(boolean embeddedModeEnable) {
this.embeddedModeEnable = embeddedModeEnable;
}
public ServerConfig getNettyServerConfig() {
return nettyServerConfig;
}
public void setNettyServerConfig(ServerConfig nettyServerConfig) {
this.nettyServerConfig = nettyServerConfig;
}
public ClientConfig getNettyClientConfig() {
return nettyClientConfig;
}
public void setNettyClientConfig(ClientConfig nettyClientConfig) {
this.nettyClientConfig = nettyClientConfig;
}
}
......@@ -15,7 +15,8 @@
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-all</artifactId>
......@@ -92,6 +93,10 @@
<groupId>io.prometheus</groupId>
<artifactId>simpleclient_hotspot</artifactId>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>rocketmq-broker</artifactId>
</dependency>
</dependencies>
<build>
......
......@@ -23,6 +23,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.acl.AccessValidator;
import org.apache.rocketmq.broker.BrokerStartup;
import org.apache.rocketmq.common.SnodeConfig;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.constant.LoggerName;
......@@ -69,10 +70,11 @@ import org.apache.rocketmq.snode.service.PushService;
import org.apache.rocketmq.snode.service.ScheduledService;
import org.apache.rocketmq.snode.service.WillMessageService;
import org.apache.rocketmq.snode.service.impl.ClientServiceImpl;
import org.apache.rocketmq.snode.service.impl.EnodeServiceImpl;
import org.apache.rocketmq.snode.service.impl.LocalEnodeServiceImpl;
import org.apache.rocketmq.snode.service.impl.MetricsServiceImpl;
import org.apache.rocketmq.snode.service.impl.NnodeServiceImpl;
import org.apache.rocketmq.snode.service.impl.PushServiceImpl;
import org.apache.rocketmq.snode.service.impl.RemoteEnodeServiceImpl;
import org.apache.rocketmq.snode.service.impl.ScheduledServiceImpl;
import org.apache.rocketmq.snode.service.impl.WillMessageServiceImpl;
......@@ -127,7 +129,11 @@ public class SnodeController {
this.nettyClientConfig = nettyClientConfig;
this.nettyServerConfig = nettyServerConfig;
this.snodeConfig = snodeConfig;
this.enodeService = new EnodeServiceImpl(this);
if (!this.snodeConfig.isEmbeddedModeEnable()) {
this.enodeService = new RemoteEnodeServiceImpl(this);
} else {
this.enodeService = new LocalEnodeServiceImpl(BrokerStartup.getBrokerController());
}
this.nnodeService = new NnodeServiceImpl(this);
this.scheduledService = new ScheduledServiceImpl(this);
this.remotingClient = RemotingClientFactory.getInstance().createRemotingClient()
......@@ -163,7 +169,6 @@ public class SnodeController {
"SnodeHeartbeatThread",
true);
this.consumerManageExecutor = ThreadUtils.newThreadPoolExecutor(
snodeConfig.getSnodeSendMessageMinPoolSize(),
snodeConfig.getSnodeSendMessageMaxPoolSize(),
......@@ -228,10 +233,8 @@ public class SnodeController {
}
public boolean initialize() {
this.snodeServer = RemotingServerFactory.getInstance().createRemotingServer()
.init(this.nettyServerConfig, this.clientHousekeepingService);
this.mqttRemotingServer = RemotingServerFactory.getInstance().createRemotingServer(
RemotingUtil.MQTT_PROTOCOL)
this.snodeServer = RemotingServerFactory.getInstance().createRemotingServer().init(this.nettyServerConfig, this.clientHousekeepingService);
this.mqttRemotingServer = RemotingServerFactory.getInstance().createRemotingServer(RemotingUtil.MQTT_PROTOCOL)
.init(this.nettyServerConfig, this.clientHousekeepingService);
this.registerProcessor();
initSnodeInterceptorGroup();
......
......@@ -16,8 +16,6 @@
*/
package org.apache.rocketmq.snode;
import static org.apache.rocketmq.remoting.netty.TlsSystemConfig.TLS_ENABLE;
import ch.qos.logback.classic.LoggerContext;
import ch.qos.logback.classic.joran.JoranConfigurator;
import ch.qos.logback.core.joran.spi.JoranException;
......@@ -31,6 +29,7 @@ import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.PosixParser;
import org.apache.rocketmq.broker.BrokerStartup;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.SnodeConfig;
import org.apache.rocketmq.common.constant.LoggerName;
......@@ -44,6 +43,8 @@ import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.srvutil.ServerUtil;
import org.slf4j.LoggerFactory;
import static org.apache.rocketmq.remoting.netty.TlsSystemConfig.TLS_ENABLE;
public class SnodeStartup {
private static InternalLogger log;
public static Properties properties = null;
......@@ -51,13 +52,17 @@ public class SnodeStartup {
public static String configFile = null;
public static void main(String[] args) throws IOException, JoranException {
startup(createSnodeController(args));
SnodeConfig snodeConfig = loadConfig(args);
if (snodeConfig.isEmbeddedModeEnable()) {
BrokerStartup.start(BrokerStartup.createBrokerController(args));
}
SnodeController snodeController = createSnodeController(snodeConfig);
startup(snodeController);
}
public static SnodeController startup(SnodeController controller) {
try {
controller.start();
String tip = "The snode[" + controller.getSnodeConfig().getSnodeName() + ", "
+ controller.getSnodeConfig().getSnodeIP1() + "] boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();
......@@ -74,7 +79,7 @@ public class SnodeStartup {
return null;
}
public static SnodeController createSnodeController(String[] args) throws IOException, JoranException {
public static SnodeConfig loadConfig(String[] args) throws IOException {
Options options = ServerUtil.buildCommandlineOptions(new Options());
commandLine = ServerUtil.parseCmdLine("snode", args, buildCommandlineOptions(options),
new PosixParser());
......@@ -82,12 +87,11 @@ public class SnodeStartup {
System.exit(-1);
}
final SnodeConfig snodeConfig = new SnodeConfig();
SnodeConfig snodeConfig = new SnodeConfig();
final ServerConfig nettyServerConfig = new ServerConfig();
final ClientConfig nettyClientConfig = new ClientConfig();
nettyServerConfig.setListenPort(snodeConfig.getListenPort());
nettyServerConfig.setListenPort(11911);
nettyClientConfig.setUseTLS(Boolean.parseBoolean(System.getProperty(TLS_ENABLE,
String.valueOf(TlsSystemConfig.tlsMode == TlsMode.ENFORCING))));
......@@ -101,27 +105,28 @@ public class SnodeStartup {
MixAll.properties2Object(properties, snodeConfig);
MixAll.properties2Object(properties, nettyServerConfig);
MixAll.properties2Object(properties, nettyClientConfig);
in.close();
}
}
snodeConfig.setNettyServerConfig(nettyServerConfig);
snodeConfig.setNettyClientConfig(nettyClientConfig);
if (null == snodeConfig.getRocketmqHome()) {
System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation", MixAll.ROCKETMQ_HOME_ENV);
System.exit(-2);
}
LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
JoranConfigurator configurator = new JoranConfigurator();
configurator.setContext(lc);
lc.reset();
configurator.doConfigure(snodeConfig.getRocketmqHome() + "/conf/logback_snode.xml");
log = InternalLoggerFactory.getLogger(LoggerName.SNODE_LOGGER_NAME);
MixAll.printObjectProperties(log, snodeConfig);
MixAll.printObjectProperties(log, nettyClientConfig);
MixAll.printObjectProperties(log, nettyServerConfig);
MixAll.printObjectProperties(log, snodeConfig.getNettyServerConfig());
MixAll.printObjectProperties(log, snodeConfig.getNettyClientConfig());
return snodeConfig;
}
public static SnodeController createSnodeController(SnodeConfig snodeConfig) throws JoranException {
final SnodeController snodeController = new SnodeController(
nettyServerConfig,
nettyClientConfig,
snodeConfig.getNettyServerConfig(),
snodeConfig.getNettyClientConfig(),
snodeConfig);
boolean initResult = snodeController.initialize();
......@@ -148,7 +153,14 @@ public class SnodeStartup {
}
}
}
},"ShutdownHook"));
}, "ShutdownHook"));
LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
JoranConfigurator configurator = new JoranConfigurator();
configurator.setContext(lc);
lc.reset();
configurator.doConfigure(snodeConfig.getRocketmqHome() + "/conf/logback_snode.xml");
log = InternalLoggerFactory.getLogger(LoggerName.SNODE_LOGGER_NAME);
return snodeController;
}
......@@ -164,7 +176,7 @@ public class SnodeStartup {
opt = new Option("m", "printImportantConfig", false, "Print important config item");
opt.setRequired(false);
options.addOption(opt);
return options;
}
}
......
......@@ -42,19 +42,6 @@ public class SubscriptionGroupManager {
private void init() {
}
public void updateSubscriptionGroupConfig(final SubscriptionGroupConfig config) {
SubscriptionGroupConfig old = this.subscriptionGroupTable.put(config.getGroupName(), config);
if (old != null) {
log.info("Update subscription group config, old: {} new: {}", old, config);
} else {
log.info("Create new subscription group, {}", config);
}
this.dataVersion.nextVersion();
this.persistSubscription(config);
}
public void disableConsume(final String groupName) {
SubscriptionGroupConfig old = this.subscriptionGroupTable.get(groupName);
if (old != null) {
......@@ -63,7 +50,8 @@ public class SubscriptionGroupManager {
}
}
public SubscriptionGroupConfig findSubscriptionGroupConfig(final String group) {
public SubscriptionGroupConfig findSubscriptionGroupConfig(
final String group) {
SubscriptionGroupConfig subscriptionGroupConfig = this.subscriptionGroupTable.get(group);
if (null == subscriptionGroupConfig) {
if (snodeController.getSnodeConfig().isAutoCreateSubscriptionGroup() || MixAll.isSysConsumerGroup(group)) {
......@@ -74,15 +62,13 @@ public class SubscriptionGroupManager {
log.info("Auto create a subscription group, {}", subscriptionGroupConfig.toString());
}
this.dataVersion.nextVersion();
this.persistSubscription(subscriptionGroupConfig);
this.snodeController.getEnodeService().persistSubscriptionGroupConfig(subscriptionGroupConfig);
}
}
return subscriptionGroupConfig;
}
public ConcurrentMap<String, SubscriptionGroupConfig> getSubscriptionGroupTable() {
return subscriptionGroupTable;
}
......@@ -91,18 +77,4 @@ public class SubscriptionGroupManager {
return dataVersion;
}
public void deleteSubscriptionGroupConfig(final String groupName) {
SubscriptionGroupConfig old = this.subscriptionGroupTable.remove(groupName);
if (old != null) {
log.info("delete subscription group OK, subscription group:{}", old);
this.dataVersion.nextVersion();
this.persistSubscription(old);
} else {
log.warn("delete subscription group failed, subscription groupName: {} not exist", groupName);
}
}
void persistSubscription(SubscriptionGroupConfig config) {
this.snodeController.getEnodeService().persistSubscriptionGroupConfig(config);
}
}
......@@ -29,11 +29,6 @@ public interface SubscriptionManager {
boolean subscribe(String groupId, Set<SubscriptionData> subscriptionDataSet, ConsumeType consumeType,
MessageModel messageModel, ConsumeFromWhere consumeFromWhere);
void unSubscribe(String groupId, RemotingChannel remotingChannel,
Set<SubscriptionData> subscriptionDataSet);
void cleanSubscription(String groupId, String topic);
Subscription getSubscription(String groupId);
void registerPushSession(Set<SubscriptionData> subscriptionDataSet, RemotingChannel remotingChannel,
......
......@@ -44,7 +44,7 @@ public class SubscriptionManagerImpl implements SubscriptionManager {
@Override
public void registerPushSession(Set<SubscriptionData> subscriptionDataSet, RemotingChannel remotingChannel,
String groupId) {
log.debug("Before ConsumerGroup: {} RemotingChannel: {} subscription: {}", groupId, remotingChannel.remoteAddress(), subscriptionDataSet);
log.info("Before ConsumerGroup: {} RemotingChannel: {} subscription: {}", groupId, remotingChannel.remoteAddress(), subscriptionDataSet);
Set<MessageQueue> prevSubSet = this.clientSubscriptionTable.get(remotingChannel);
Set<MessageQueue> keySet = new HashSet<>();
for (SubscriptionData subscriptionData : subscriptionDataSet) {
......@@ -77,7 +77,7 @@ public class SubscriptionManagerImpl implements SubscriptionManager {
}
}
}
log.debug("After ConsumerGroup: {} RemotingChannel: {} subscription: {}", groupId, remotingChannel.remoteAddress(), this.clientSubscriptionTable.get(remotingChannel));
log.info("After ConsumerGroup: {} RemotingChannel: {} subscription: {}", groupId, remotingChannel.remoteAddress(), this.clientSubscriptionTable.get(remotingChannel));
}
@Override
......@@ -191,17 +191,6 @@ public class SubscriptionManagerImpl implements SubscriptionManager {
return updated;
}
@Override
public void unSubscribe(String groupId, RemotingChannel remotingChannel,
Set<SubscriptionData> subscriptionDataSet) {
}
@Override
public void cleanSubscription(String groupId, String topic) {
}
@Override
public Subscription getSubscription(String groupId) {
return groupSubscriptionTable.get(groupId);
......
......@@ -20,13 +20,9 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.header.QueryConsumerOffsetResponseHeader;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.exception.RemotingConnectException;
import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.RemotingChannel;
import org.apache.rocketmq.snode.SnodeController;
import org.apache.rocketmq.snode.exception.SnodeException;
......@@ -79,17 +75,18 @@ public class ConsumerOffsetManager {
}
}
private long parserOffset(final String enodeName, final String group, final String topic, final int queueId) {
try {
RemotingCommand remotingCommand = queryOffset(enodeName, group, topic, queueId);
QueryConsumerOffsetResponseHeader responseHeader =
(QueryConsumerOffsetResponseHeader) remotingCommand.decodeCommandCustomHeader(QueryConsumerOffsetResponseHeader.class);
return responseHeader.getOffset();
} catch (Exception ex) {
log.error("Load offset from broker error", ex);
}
return -1;
}
// private long parserOffset(final RemotingChannel remotingChannel, final String enodeName, final String group,
// final String topic, final int queueId) {
// try {
// RemotingCommand remotingCommand = queryOffset(remotingChannel, enodeName, group, topic, queueId);
// QueryConsumerOffsetResponseHeader responseHeader =
// (QueryConsumerOffsetResponseHeader) remotingCommand.decodeCommandCustomHeader(QueryConsumerOffsetResponseHeader.class);
// return responseHeader.getOffset();
// } catch (Exception ex) {
// log.error("Load offset from broker error", ex);
// }
// return -1;
// }
public long queryCacheOffset(final String enodeName, final String group, final String topic, final int queueId) {
String key = buildKey(enodeName, topic, group);
......@@ -99,30 +96,33 @@ public class ConsumerOffsetManager {
map = this.offsetTable.putIfAbsent(key, map);
}
CacheOffset cacheOffset = map.get(queueId);
if (cacheOffset != null) {
if (System.currentTimeMillis() - cacheOffset.getUpdateTimestamp() > snodeController.getSnodeConfig().getLoadOffsetInterval()) {
cacheOffset.setOffset(parserOffset(enodeName, group, topic, queueId));
cacheOffset.setUpdateTimestamp(System.currentTimeMillis());
try {
if (cacheOffset != null) {
if (!this.snodeController.getSnodeConfig().isEmbeddedModeEnable() && System.currentTimeMillis() - cacheOffset.getUpdateTimestamp() > snodeController.getSnodeConfig().getLoadOffsetInterval()) {
long offset = this.snodeController.getEnodeService().queryOffset(enodeName, group, topic, queueId);
cacheOffset.setOffset(offset);
cacheOffset.setUpdateTimestamp(System.currentTimeMillis());
} else {
long offset = this.snodeController.getEnodeService().queryOffset(enodeName, group, topic, queueId);
cacheOffset.setOffset(offset);
}
} else {
long offset = this.snodeController.getEnodeService().queryOffset(enodeName, group, topic, queueId);
cacheOffset = new CacheOffset(key, offset, System.currentTimeMillis());
map.put(queueId, cacheOffset);
}
} else {
cacheOffset = new CacheOffset(key, parserOffset(enodeName, group, topic, queueId), System.currentTimeMillis());
map.put(queueId, cacheOffset);
} catch (Exception ex) {
log.warn("Load offset error, enodeName: {}, group:{},topic:{} queueId:{}", enodeName, group, topic, queueId);
}
return cacheOffset.getOffset();
}
public void commitOffset(final String enodeName, final String clientHost, final String group, final String topic,
public void commitOffset(final RemotingChannel remotingChannel, final String enodeName, final String clientHost,
final String group, final String topic,
final int queueId,
final long offset) {
cacheOffset(enodeName, clientHost, group, topic, queueId, offset);
this.snodeController.getEnodeService().persistOffset(enodeName, group, topic, queueId, offset);
}
public RemotingCommand queryOffset(final String enodeName, final String group, final String topic,
final int queueId) throws InterruptedException, RemotingTimeoutException,
RemotingSendRequestException, RemotingConnectException {
return this.snodeController.getEnodeService().loadOffset(enodeName, group, topic, queueId);
this.snodeController.getEnodeService().persistOffset(remotingChannel, enodeName, group, topic, queueId, offset);
}
public class CacheOffset {
......
......@@ -25,9 +25,13 @@ import org.apache.rocketmq.common.protocol.header.GetConsumerListByGroupRequestH
import org.apache.rocketmq.common.protocol.header.GetConsumerListByGroupResponseBody;
import org.apache.rocketmq.common.protocol.header.GetConsumerListByGroupResponseHeader;
import org.apache.rocketmq.common.protocol.header.GetMaxOffsetRequestHeader;
import org.apache.rocketmq.common.protocol.header.GetMaxOffsetResponseHeader;
import org.apache.rocketmq.common.protocol.header.GetMinOffsetRequestHeader;
import org.apache.rocketmq.common.protocol.header.GetMinOffsetResponseHeader;
import org.apache.rocketmq.common.protocol.header.QueryConsumerOffsetRequestHeader;
import org.apache.rocketmq.common.protocol.header.QueryConsumerOffsetResponseHeader;
import org.apache.rocketmq.common.protocol.header.SearchOffsetRequestHeader;
import org.apache.rocketmq.common.protocol.header.SearchOffsetResponseHeader;
import org.apache.rocketmq.common.protocol.header.UpdateConsumerOffsetRequestHeader;
import org.apache.rocketmq.common.protocol.header.UpdateConsumerOffsetResponseHeader;
import org.apache.rocketmq.logging.InternalLogger;
......@@ -87,7 +91,14 @@ public class ConsumerManageProcessor implements RequestProcessor {
(SearchOffsetRequestHeader) request
.decodeCommandCustomHeader(SearchOffsetRequestHeader.class);
try {
return this.snodeController.getEnodeService().getOffsetByTimestamp(requestHeader.getEnodeName(), request);
final RemotingCommand response = RemotingCommand.createResponseCommand(SearchOffsetResponseHeader.class);
final SearchOffsetResponseHeader responseHeader = (SearchOffsetResponseHeader) response.readCustomHeader();
long offset = this.snodeController.getEnodeService().getOffsetByTimestamp(requestHeader.getEnodeName(),
requestHeader.getTopic(), requestHeader.getQueueId(), requestHeader.getTimestamp(), request);
responseHeader.setOffset(offset);
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
} catch (Exception ex) {
log.error("Search offset by timestamp error:{}", ex);
}
......@@ -100,7 +111,14 @@ public class ConsumerManageProcessor implements RequestProcessor {
(GetMinOffsetRequestHeader) request
.decodeCommandCustomHeader(GetMinOffsetRequestHeader.class);
try {
return this.snodeController.getEnodeService().getMinOffsetInQueue(requestHeader.getEnodeName(), requestHeader.getTopic(), requestHeader.getQueueId());
final RemotingCommand response = RemotingCommand.createResponseCommand(GetMinOffsetResponseHeader.class);
final GetMinOffsetResponseHeader responseHeader = (GetMinOffsetResponseHeader) response.readCustomHeader();
long offset = this.snodeController.getEnodeService().getMinOffsetInQueue(requestHeader.getEnodeName(),
requestHeader.getTopic(), requestHeader.getQueueId(), request);
responseHeader.setOffset(offset);
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
} catch (Exception ex) {
log.error("Get min offset error:{}", ex);
}
......@@ -113,9 +131,16 @@ public class ConsumerManageProcessor implements RequestProcessor {
(GetMaxOffsetRequestHeader) request
.decodeCommandCustomHeader(GetMaxOffsetRequestHeader.class);
try {
return this.snodeController.getEnodeService().getMaxOffsetInQueue(requestHeader.getEnodeName(), request);
long offset = this.snodeController.getEnodeService().getMaxOffsetInQueue(requestHeader.getEnodeName(),
requestHeader.getTopic(), requestHeader.getQueueId(), request);
final RemotingCommand response = RemotingCommand.createResponseCommand(GetMaxOffsetResponseHeader.class);
final GetMaxOffsetResponseHeader responseHeader = (GetMaxOffsetResponseHeader) response.readCustomHeader();
responseHeader.setOffset(offset);
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
return response;
} catch (Exception ex) {
log.error("Get min offset error:{}", ex);
log.error("Get min offset error, remoting: {} error: {} ", remotingChannel.remoteAddress(), ex);
}
return null;
}
......@@ -153,7 +178,7 @@ public class ConsumerManageProcessor implements RequestProcessor {
final UpdateConsumerOffsetRequestHeader requestHeader =
(UpdateConsumerOffsetRequestHeader) request
.decodeCommandCustomHeader(UpdateConsumerOffsetRequestHeader.class);
this.snodeController.getConsumerOffsetManager().commitOffset(requestHeader.getEnodeName(), RemotingHelper.parseChannelRemoteAddr(remotingChannel.remoteAddress()), requestHeader.getConsumerGroup(),
this.snodeController.getConsumerOffsetManager().commitOffset(remotingChannel, requestHeader.getEnodeName(), RemotingHelper.parseChannelRemoteAddr(remotingChannel.remoteAddress()), requestHeader.getConsumerGroup(),
requestHeader.getTopic(), requestHeader.getQueueId(), requestHeader.getCommitOffset());
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
......@@ -171,9 +196,15 @@ public class ConsumerManageProcessor implements RequestProcessor {
requestHeader.getConsumerGroup(),
requestHeader.getTopic(),
requestHeader.getQueueId());
return this.snodeController.getConsumerOffsetManager().queryOffset(requestHeader.getEnodeName(), requestHeader.getConsumerGroup(), requestHeader.getTopic(),
long offset = this.snodeController.getEnodeService().queryOffset(requestHeader.getEnodeName(), requestHeader.getConsumerGroup(), requestHeader.getTopic(),
requestHeader.getQueueId());
final RemotingCommand response = RemotingCommand.createResponseCommand(QueryConsumerOffsetResponseHeader.class);
final QueryConsumerOffsetResponseHeader responseHeader = (QueryConsumerOffsetResponseHeader) response.readCustomHeader();
responseHeader.setOffset(offset);
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
return response;
}
public RemotingCommand createRetryTopic(RemotingChannel remotingChannel,
......@@ -181,7 +212,7 @@ public class ConsumerManageProcessor implements RequestProcessor {
RemotingSendRequestException, RemotingConnectException, RemotingCommandException {
final CreateRetryTopicRequestHeader requestHeader = (CreateRetryTopicRequestHeader) request.decodeCommandCustomHeader(CreateRetryTopicRequestHeader.class);
requestHeader.getEnodeName();
return this.snodeController.getEnodeService().creatRetryTopic(requestHeader.getEnodeName(), request);
return this.snodeController.getEnodeService().creatRetryTopic(remotingChannel, requestHeader.getEnodeName(), request);
}
}
......@@ -127,14 +127,16 @@ public class PullMessageProcessor implements RequestProcessor {
}
}
CompletableFuture<RemotingCommand> responseFuture = snodeController.getEnodeService().pullMessage(requestHeader.getEnodeName(), request);
CompletableFuture<RemotingCommand> responseFuture = snodeController.getEnodeService().pullMessage(remotingChannel, requestHeader.getEnodeName(), request);
responseFuture.whenComplete((data, ex) -> {
if (ex == null) {
if (this.snodeController.getConsumeMessageInterceptorGroup() != null) {
ResponseContext responseContext = new ResponseContext(request, remotingChannel, data);
this.snodeController.getSendMessageInterceptorGroup().afterRequest(responseContext);
}
remotingChannel.reply(data);
if (data != null) {
remotingChannel.reply(data);
}
} else {
if (this.snodeController.getConsumeMessageInterceptorGroup() != null) {
ExceptionContext exceptionContext = new ExceptionContext(request, remotingChannel, ex, null);
......
......@@ -83,11 +83,11 @@ public class SendMessageProcessor implements RequestProcessor {
stringBuffer.append(MixAll.getRetryTopic(consumerSendMsgBackRequestHeader.getGroup()));
}
CompletableFuture<RemotingCommand> responseFuture = snodeController.getEnodeService().sendMessage(enodeName, request);
CompletableFuture<RemotingCommand> responseFuture = snodeController.getEnodeService().sendMessage(remotingChannel, enodeName, request);
sendMessageRequestHeaderV2.setO(remotingChannel.remoteAddress());
final byte[] message = request.getBody();
final boolean isNeedPush = !isSendBack;
final boolean needPush = !isSendBack;
final SendMessageRequestHeader sendMessageRequestHeader =
SendMessageRequestHeaderV2.createSendMessageRequestHeaderV1(sendMessageRequestHeaderV2);
responseFuture.whenComplete((data, ex) -> {
......@@ -98,7 +98,7 @@ public class SendMessageProcessor implements RequestProcessor {
}
remotingChannel.reply(data);
this.snodeController.getMetricsService().recordRequestSize(stringBuffer.toString(), request.getBody().length);
if (data.getCode() == ResponseCode.SUCCESS && isNeedPush) {
if (data.getCode() == ResponseCode.SUCCESS && needPush) {
this.snodeController.getPushService().pushMessage(sendMessageRequestHeader, message, data);
}
} else {
......
......@@ -29,6 +29,7 @@ public class MqttPingreqMessageHandler implements MessageHandler {
public MqttPingreqMessageHandler(SnodeController snodeController) {
this.snodeController = snodeController;
}
/**
* handle the PINGREQ message from client
* <ol>
......@@ -41,7 +42,8 @@ public class MqttPingreqMessageHandler implements MessageHandler {
* @param message
* @return
*/
@Override public RemotingCommand handleMessage(MqttMessage message, RemotingChannel remotingChannel) {
@Override
public RemotingCommand handleMessage(MqttMessage message, RemotingChannel remotingChannel) {
return null;
}
}
package org.apache.rocketmq.snode.service;/*
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
......@@ -14,7 +14,7 @@ package org.apache.rocketmq.snode.service;/*
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.snode.service;
public interface AdminService {
}
......@@ -21,6 +21,7 @@ import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader;
import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeaderV2;
import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
import org.apache.rocketmq.remoting.RemotingChannel;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.exception.RemotingConnectException;
import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
......@@ -42,7 +43,8 @@ public interface EnodeService {
* @param request {@link SendMessageRequestHeaderV2} Send message request header
* @return Send message response future
*/
CompletableFuture<RemotingCommand> sendMessage(final String enodeName, final RemotingCommand request);
CompletableFuture<RemotingCommand> sendMessage(final RemotingChannel remotingChannel, final String enodeName,
final RemotingCommand request);
/**
* Pull message from enode server.
......@@ -51,7 +53,8 @@ public interface EnodeService {
* @param request {@link PullMessageRequestHeader} Pull message request header
* @return Pull message Response future
*/
CompletableFuture<RemotingCommand> pullMessage(final String enodeName, final RemotingCommand request);
CompletableFuture<RemotingCommand> pullMessage(final RemotingChannel remotingChannel, final String enodeName,
final RemotingCommand request);
/**
* Create retry topic in enode server.
......@@ -64,7 +67,7 @@ public interface EnodeService {
* @throws RemotingSendRequestException
* @throws RemotingConnectException
*/
RemotingCommand creatRetryTopic(String enodeName,
RemotingCommand creatRetryTopic(final RemotingChannel remotingChannel, String enodeName,
RemotingCommand request) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException;
/**
......@@ -98,20 +101,24 @@ public interface EnodeService {
* @param queueId QueueId of related topic.
* @param offset Current offset of target queue of subscribed topic.
*/
void persistOffset(String enodeName, String groupName, String topic, int queueId, long offset);
void persistOffset(final RemotingChannel remotingChannel, String enodeName, String groupName, String topic,
int queueId, long offset);
RemotingCommand loadOffset(String enodeName, String consumerGroup, String topic,
int queueId) throws InterruptedException, RemotingTimeoutException,
RemotingSendRequestException, RemotingConnectException;
long queryOffset(String enodeName, String consumerGroup,
String topic, int queueId) throws InterruptedException, RemotingTimeoutException,
RemotingSendRequestException, RemotingConnectException, RemotingCommandException;
RemotingCommand getMaxOffsetInQueue(String enodeName,
RemotingCommand request) throws InterruptedException, RemotingTimeoutException,
long getMaxOffsetInQueue(String enodeName, String topic,
int queueId, RemotingCommand request) throws InterruptedException, RemotingTimeoutException,
RemotingSendRequestException, RemotingConnectException, RemotingCommandException;
RemotingCommand getMinOffsetInQueue(String enodeName, String topic,
int queueId) throws InterruptedException, RemotingTimeoutException,
long getMinOffsetInQueue(String enodeName, String topic,
int queueId, RemotingCommand request) throws InterruptedException, RemotingTimeoutException,
RemotingSendRequestException, RemotingConnectException, RemotingCommandException;
RemotingCommand getOffsetByTimestamp(String enodeName,
RemotingCommand request) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException;
long getOffsetByTimestamp(String enodeName,
String topic, int queueId,
long timestamp,
RemotingCommand request) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException,
RemotingConnectException, RemotingCommandException;
}
......@@ -16,6 +16,7 @@
*/
package org.apache.rocketmq.snode.service.impl;
import java.util.List;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.protocol.RequestCode;
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.snode.service.impl;
import java.util.concurrent.CompletableFuture;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.RemotingChannel;
import org.apache.rocketmq.remoting.netty.CodecHelper;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.snode.service.EnodeService;
public class LocalEnodeServiceImpl implements EnodeService {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.SNODE_LOGGER_NAME);
private BrokerController brokerController;
public LocalEnodeServiceImpl(BrokerController brokerController) {
this.brokerController = brokerController;
}
@Override public void sendHeartbeat(RemotingCommand remotingCommand) {
return;
}
@Override
public CompletableFuture<RemotingCommand> sendMessage(final RemotingChannel remotingChannel, String enodeName,
RemotingCommand request) {
CompletableFuture<RemotingCommand> completableFuture = new CompletableFuture<>();
try {
log.debug("Send message request:{}", request);
RemotingCommand remotingCommand = this.brokerController.getSendProcessor().processRequest(remotingChannel, request);
CodecHelper.encodeHeader(remotingCommand);
completableFuture.complete(remotingCommand);
} catch (Exception ex) {
log.error("[Local]Request local enode send message error", ex);
completableFuture.completeExceptionally(ex);
}
return completableFuture;
}
@Override
public CompletableFuture<RemotingCommand> pullMessage(RemotingChannel remotingChannel, String enodeName,
RemotingCommand request) {
CompletableFuture<RemotingCommand> completableFuture = new CompletableFuture<>();
try {
RemotingCommand response = this.brokerController.getSnodePullMessageProcessor().processRequest(remotingChannel, request);
completableFuture.complete(response);
} catch (Exception ex) {
log.error("[Local]Request local enode pull message error", ex);
completableFuture.completeExceptionally(ex);
}
return completableFuture;
}
@Override public RemotingCommand creatRetryTopic(RemotingChannel remotingChannel, String enodeName,
RemotingCommand request) {
try {
return this.brokerController.getClientManageProcessor().processRequest(remotingChannel, request);
} catch (Exception ex) {
log.error("[Local]Request create retry topic error", ex);
}
return null;
}
@Override public void updateEnodeAddress(
String clusterName) {
}
@Override public boolean persistSubscriptionGroupConfig(
final SubscriptionGroupConfig subscriptionGroupConfig) {
boolean persist = false;
if (subscriptionGroupConfig != null) {
this.brokerController.getSubscriptionGroupManager().updateSubscriptionGroupConfig(subscriptionGroupConfig);
persist = true;
}
return persist;
}
@Override
public void persistOffset(RemotingChannel remotingChannel, String enodeName, String groupName, String topic,
int queueId, long offset) {
try {
// UpdateConsumerOffsetRequestHeader requestHeader = new UpdateConsumerOffsetRequestHeader();
// requestHeader.setConsumerGroup(groupName);
// requestHeader.setTopic(topic);
// requestHeader.setQueueId(queueId);
// requestHeader.setCommitOffset(offset);
// RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UPDATE_CONSUMER_OFFSET, requestHeader);
// this.brokerController.getConsumerManageProcessor().processRequest(remotingChannel, request);
this.brokerController.getConsumerOffsetManager().commitOffset(remotingChannel.remoteAddress().toString(), groupName,
topic, queueId, offset);
} catch (Exception ex) {
log.error("[Local]Persist offset to Enode error group: [{}], topic: [{}] queue: [{}]!", ex, groupName, topic, queueId);
}
}
@Override
public long queryOffset(String enodeName, String consumerGroup, String topic, int queueId) {
return this.brokerController.getConsumerOffsetManager().queryOffset(consumerGroup, topic, queueId);
}
@Override
public long getMaxOffsetInQueue(String enodeName, String topic, int queueId, RemotingCommand request) {
return this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId);
}
@Override
public long getMinOffsetInQueue(String enodeName, String topic, int queueId, RemotingCommand request) {
return this.brokerController.getMessageStore().getMinOffsetInQueue(topic, queueId);
}
@Override
public long getOffsetByTimestamp(String enodeName,
String topic, int queueId, long timestamp, RemotingCommand request) {
return this.brokerController.getMessageStore().getOffsetInQueueByTime(topic, queueId, timestamp);
}
}
......@@ -95,7 +95,7 @@ public class PushServiceImpl implements PushService {
messageExt.setFlag(sendMessageRequestHeader.getFlag());
messageExt.setBody(message);
messageExt.setBodyCRC(UtilAll.crc32(message));
log.debug("MessageExt:{}", messageExt);
log.info("MessageExt:{}", messageExt);
return messageExt;
}
......@@ -103,7 +103,6 @@ public class PushServiceImpl implements PushService {
public void run() {
if (!canceled.get()) {
try {
log.debug("sendMessageResponse: {}", sendMessageResponse);
SendMessageResponseHeader sendMessageResponseHeader = (SendMessageResponseHeader) sendMessageResponse.decodeCommandCustomHeader(SendMessageResponseHeader.class);
log.debug("sendMessageResponseHeader: {}", sendMessageResponseHeader);
MessageQueue messageQueue = new MessageQueue(sendMessageRequestHeader.getTopic(), sendMessageRequestHeader.getEnodeName(), sendMessageRequestHeader.getQueueId());
......@@ -118,6 +117,7 @@ public class PushServiceImpl implements PushService {
MessageExt messageExt = buildMessageExt(sendMessageResponseHeader, message, sendMessageRequestHeader);
pushMessage.setBody(MessageDecoder.encode(messageExt, false));
for (RemotingChannel remotingChannel : consumerTable) {
log.info("Push message pushMessage:{} to remotingChannel: {}", pushMessage, remotingChannel);
Client client = null;
if (remotingChannel instanceof NettyChannelImpl) {
Channel channel = ((NettyChannelImpl) remotingChannel).getChannel();
......
......@@ -28,13 +28,17 @@ import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.body.ClusterInfo;
import org.apache.rocketmq.common.protocol.header.GetMinOffsetRequestHeader;
import org.apache.rocketmq.common.protocol.header.GetMaxOffsetResponseHeader;
import org.apache.rocketmq.common.protocol.header.GetMinOffsetResponseHeader;
import org.apache.rocketmq.common.protocol.header.QueryConsumerOffsetRequestHeader;
import org.apache.rocketmq.common.protocol.header.QueryConsumerOffsetResponseHeader;
import org.apache.rocketmq.common.protocol.header.SearchOffsetResponseHeader;
import org.apache.rocketmq.common.protocol.header.UpdateConsumerOffsetRequestHeader;
import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.InvokeCallback;
import org.apache.rocketmq.remoting.RemotingChannel;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.exception.RemotingConnectException;
import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
......@@ -46,7 +50,7 @@ import org.apache.rocketmq.snode.SnodeController;
import org.apache.rocketmq.snode.constant.SnodeConstant;
import org.apache.rocketmq.snode.service.EnodeService;
public class EnodeServiceImpl implements EnodeService {
public class RemoteEnodeServiceImpl implements EnodeService {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.SNODE_LOGGER_NAME);
private SnodeController snodeController;
......@@ -54,7 +58,7 @@ public class EnodeServiceImpl implements EnodeService {
private final ConcurrentMap<String/* Broker Name */, HashMap<Long/* brokerId */, String/* address */>> enodeTable =
new ConcurrentHashMap<>();
public EnodeServiceImpl(SnodeController snodeController) {
public RemoteEnodeServiceImpl(SnodeController snodeController) {
this.snodeController = snodeController;
}
......@@ -73,7 +77,8 @@ public class EnodeServiceImpl implements EnodeService {
}
@Override
public CompletableFuture<RemotingCommand> pullMessage(final String enodeName, final RemotingCommand request) {
public CompletableFuture<RemotingCommand> pullMessage(final RemotingChannel remotingChannel, final String enodeName,
final RemotingCommand request) {
CompletableFuture<RemotingCommand> future = new CompletableFuture<>();
try {
......@@ -103,7 +108,8 @@ public class EnodeServiceImpl implements EnodeService {
}
@Override
public CompletableFuture<RemotingCommand> sendMessage(String enodeName, RemotingCommand request) {
public CompletableFuture<RemotingCommand> sendMessage(final RemotingChannel remotingChannel, String enodeName,
RemotingCommand request) {
CompletableFuture<RemotingCommand> future = new CompletableFuture<>();
try {
String enodeAddress = this.snodeController.getNnodeService().getAddressByEnodeName(enodeName, false);
......@@ -152,7 +158,8 @@ public class EnodeServiceImpl implements EnodeService {
}
@Override
public boolean persistSubscriptionGroupConfig(SubscriptionGroupConfig subscriptionGroupConfig) {
public boolean persistSubscriptionGroupConfig(
SubscriptionGroupConfig subscriptionGroupConfig) {
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UPDATE_AND_CREATE_SUBSCRIPTIONGROUP, null);
boolean persist = false;
for (Map.Entry<String, HashMap<Long, String>> entry : enodeTable.entrySet()) {
......@@ -177,7 +184,8 @@ public class EnodeServiceImpl implements EnodeService {
}
@Override
public void persistOffset(String enodeName, String groupName, String topic, int queueId, long offset) {
public void persistOffset(final RemotingChannel remotingChannel, String enodeName, String groupName, String topic,
int queueId, long offset) {
try {
String address = this.snodeController.getNnodeService().getAddressByEnodeName(enodeName, false);
UpdateConsumerOffsetRequestHeader requestHeader = new UpdateConsumerOffsetRequestHeader();
......@@ -194,22 +202,22 @@ public class EnodeServiceImpl implements EnodeService {
}
@Override
public RemotingCommand getMinOffsetInQueue(String enodeName, String topic,
int queueId) throws InterruptedException, RemotingTimeoutException,
RemotingSendRequestException, RemotingConnectException {
GetMinOffsetRequestHeader requestHeader = new GetMinOffsetRequestHeader();
requestHeader.setTopic(topic);
requestHeader.setQueueId(queueId);
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_MIN_OFFSET, requestHeader);
public long getMinOffsetInQueue(String enodeName, String topic,
int queueId, RemotingCommand request) throws InterruptedException, RemotingTimeoutException,
RemotingSendRequestException, RemotingConnectException, RemotingCommandException {
String addr = this.snodeController.getNnodeService().getAddressByEnodeName(enodeName, false);
return this.snodeController.getRemotingClient().invokeSync(MixAll.brokerVIPChannel(snodeController.getSnodeConfig().isVipChannelEnabled(), addr),
RemotingCommand remotingCommand = this.snodeController.getRemotingClient().invokeSync(MixAll.brokerVIPChannel(snodeController.getSnodeConfig().isVipChannelEnabled(), addr),
request, SnodeConstant.DEFAULT_TIMEOUT_MILLS);
GetMinOffsetResponseHeader responseHeader =
(GetMinOffsetResponseHeader) remotingCommand.decodeCommandCustomHeader(GetMinOffsetResponseHeader.class);
return responseHeader.getOffset();
}
@Override
public RemotingCommand loadOffset(String enodeName, String consumerGroup, String topic,
public long queryOffset(String enodeName, String consumerGroup, String topic,
int queueId) throws InterruptedException, RemotingTimeoutException,
RemotingSendRequestException, RemotingConnectException {
RemotingSendRequestException, RemotingConnectException, RemotingCommandException {
QueryConsumerOffsetRequestHeader requestHeader = new QueryConsumerOffsetRequestHeader();
requestHeader.setTopic(topic);
requestHeader.setConsumerGroup(consumerGroup);
......@@ -217,28 +225,39 @@ public class EnodeServiceImpl implements EnodeService {
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.QUERY_CONSUMER_OFFSET, requestHeader);
String addr = this.snodeController.getNnodeService().getAddressByEnodeName(enodeName, false);
return this.snodeController.getRemotingClient().invokeSync(MixAll.brokerVIPChannel(this.snodeController.getSnodeConfig().isVipChannelEnabled(), addr),
RemotingCommand remotingCommand = this.snodeController.getRemotingClient().invokeSync(MixAll.brokerVIPChannel(this.snodeController.getSnodeConfig().isVipChannelEnabled(), addr),
request, SnodeConstant.DEFAULT_TIMEOUT_MILLS);
QueryConsumerOffsetResponseHeader responseHeader =
(QueryConsumerOffsetResponseHeader) remotingCommand.decodeCommandCustomHeader(QueryConsumerOffsetResponseHeader.class);
return responseHeader.getOffset();
}
@Override
public RemotingCommand getMaxOffsetInQueue(String enodeName,
public long getMaxOffsetInQueue(String enodeName, String topic,
int queueId,
RemotingCommand request) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, RemotingCommandException {
String address = this.snodeController.getNnodeService().getAddressByEnodeName(enodeName, false);
return this.snodeController.getRemotingClient().invokeSync(address,
RemotingCommand remotingCommand = this.snodeController.getRemotingClient().invokeSync(address,
request, SnodeConstant.DEFAULT_TIMEOUT_MILLS);
GetMaxOffsetResponseHeader responseHeader =
(GetMaxOffsetResponseHeader) remotingCommand.decodeCommandCustomHeader(GetMaxOffsetResponseHeader.class);
return responseHeader.getOffset();
}
@Override
public RemotingCommand getOffsetByTimestamp(String enodeName,
RemotingCommand request) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
public long getOffsetByTimestamp(String enodeName, String topic, int queueId,
long timestamp,
RemotingCommand request) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, RemotingCommandException {
String address = this.snodeController.getNnodeService().getAddressByEnodeName(enodeName, false);
return this.snodeController.getRemotingClient().invokeSync(address,
RemotingCommand remotingCommand = this.snodeController.getRemotingClient().invokeSync(address,
request, SnodeConstant.DEFAULT_TIMEOUT_MILLS);
SearchOffsetResponseHeader responseHeader =
(SearchOffsetResponseHeader) remotingCommand.decodeCommandCustomHeader(SearchOffsetResponseHeader.class);
return responseHeader.getOffset();
}
@Override
public RemotingCommand creatRetryTopic(String enodeName,
public RemotingCommand creatRetryTopic(final RemotingChannel remotingChannel, String enodeName,
RemotingCommand request) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
String address = this.snodeController.getNnodeService().getAddressByEnodeName(enodeName, false);
return this.snodeController.getRemotingClient().invokeSync(address,
......
......@@ -74,7 +74,7 @@ public class SendMessageProcessorTest {
public void testSendMessageV2ProcessRequest() throws RemotingCommandException {
CompletableFuture<RemotingCommand> future = new CompletableFuture<>();
RemotingCommand request = createSendMesssageV2Command();
when(this.snodeController.getEnodeService().sendMessage(anyString(), any(RemotingCommand.class))).thenReturn(future);
when(this.snodeController.getEnodeService().sendMessage(null, anyString(), any(RemotingCommand.class))).thenReturn(future);
sendMessageProcessor.processRequest(remotingChannel, request);
}
......@@ -83,7 +83,7 @@ public class SendMessageProcessorTest {
snodeController.setEnodeService(enodeService);
CompletableFuture<RemotingCommand> future = new CompletableFuture<>();
RemotingCommand request = createSendBatchMesssageCommand();
when(this.snodeController.getEnodeService().sendMessage(anyString(), any(RemotingCommand.class))).thenReturn(future);
when(this.snodeController.getEnodeService().sendMessage(null, anyString(), any(RemotingCommand.class))).thenReturn(future);
sendMessageProcessor.processRequest(remotingChannel, request);
}
......
......@@ -30,7 +30,7 @@ import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.transport.rocketmq.NettyRemotingClient;
import org.apache.rocketmq.snode.SnodeController;
import org.apache.rocketmq.snode.SnodeTestBase;
import org.apache.rocketmq.snode.service.impl.EnodeServiceImpl;
import org.apache.rocketmq.snode.service.impl.RemoteEnodeServiceImpl;
import org.apache.rocketmq.store.GetMessageResult;
import org.apache.rocketmq.store.GetMessageStatus;
import org.junit.Before;
......@@ -51,7 +51,7 @@ import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
public class EnodeServiceImplTest extends SnodeTestBase {
public class RemoteEnodeServiceImplTest extends SnodeTestBase {
private EnodeService enodeService;
......@@ -74,7 +74,7 @@ public class EnodeServiceImplTest extends SnodeTestBase {
public void init() {
snodeController.setNnodeService(nnodeService);
snodeController.setRemotingClient(remotingClient);
enodeService = new EnodeServiceImpl(snodeController);
enodeService = new RemoteEnodeServiceImpl(snodeController);
}
@Test
......@@ -93,7 +93,7 @@ public class EnodeServiceImplTest extends SnodeTestBase {
return null;
}
}).when(remotingClient).invokeAsync(anyString(), any(RemotingCommand.class), anyLong(), any(InvokeCallback.class));
RemotingCommand response = enodeService.sendMessage(enodeName, createSendMesssageCommand(group, topic)).get(3000L, TimeUnit.MILLISECONDS);
RemotingCommand response = enodeService.sendMessage(null, enodeName, createSendMesssageCommand(group, topic)).get(3000L, TimeUnit.MILLISECONDS);
assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
}
......@@ -118,7 +118,7 @@ public class EnodeServiceImplTest extends SnodeTestBase {
return null;
}
}).when(remotingClient).invokeAsync(anyString(), any(RemotingCommand.class), anyLong(), any(InvokeCallback.class));
RemotingCommand response = enodeService.pullMessage(enodeName, createPullMessage()).get(3000L, TimeUnit.MILLISECONDS);
RemotingCommand response = enodeService.pullMessage(null, enodeName, createPullMessage()).get(3000L, TimeUnit.MILLISECONDS);
assertThat(response).isNotNull();
assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册