diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java index c898366e981b1efb5610dfba50fe258be47e3087..d9a618b65bd776afaa8354ca748da572de48481d 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java @@ -121,6 +121,7 @@ public class BrokerController { private final ClientHousekeepingService clientHousekeepingService; private final PullMessageProcessor pullMessageProcessor; private final SnodePullMessageProcessor snodePullMessageProcessor; + private final SendMessageProcessor sendProcessor; private final PullRequestHoldService pullRequestHoldService; private final MessageArrivingListener messageArrivingListener; private final Broker2Client broker2Client; @@ -164,6 +165,9 @@ public class BrokerController { private TransactionalMessageService transactionalMessageService; private AbstractTransactionalMessageCheckListener transactionalMessageCheckListener; private Future slaveSyncFuture; + private ClientManageProcessor clientManageProcessor; + private AdminBrokerProcessor adminProcessor; + private ConsumerManageProcessor consumerManageProcessor; public BrokerController( final BrokerConfig brokerConfig, @@ -178,6 +182,7 @@ public class BrokerController { this.consumerOffsetManager = new ConsumerOffsetManager(this); this.topicConfigManager = new TopicConfigManager(this); this.pullMessageProcessor = new PullMessageProcessor(this); + this.sendProcessor = new SendMessageProcessor(this); this.pullRequestHoldService = new PullRequestHoldService(this); this.messageArrivingListener = new NotifyMessageArrivingListener(this.pullRequestHoldService); this.consumerIdsChangeListener = new DefaultConsumerIdsChangeListener(this); @@ -543,9 +548,8 @@ public class BrokerController { /** * SendMessageProcessor */ - SendMessageProcessor sendProcessor = new SendMessageProcessor(this); - sendProcessor.registerSendMessageHook(sendMessageHookList); - sendProcessor.registerConsumeMessageHook(consumeMessageHookList); + this.sendProcessor.registerSendMessageHook(sendMessageHookList); + this.sendProcessor.registerConsumeMessageHook(consumeMessageHookList); this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor); this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor); @@ -575,28 +579,28 @@ public class BrokerController { /** * ClientManageProcessor */ - ClientManageProcessor clientProcessor = new ClientManageProcessor(this); - this.remotingServer.registerProcessor(RequestCode.HEART_BEAT, clientProcessor, this.heartbeatExecutor); - this.remotingServer.registerProcessor(RequestCode.UNREGISTER_CLIENT, clientProcessor, this.clientManageExecutor); - this.remotingServer.registerProcessor(RequestCode.CHECK_CLIENT_CONFIG, clientProcessor, this.clientManageExecutor); - this.remotingServer.registerProcessor(RequestCode.CREATE_RETRY_TOPIC, clientProcessor, this.clientManageExecutor); + this.clientManageProcessor = new ClientManageProcessor(this); + this.remotingServer.registerProcessor(RequestCode.HEART_BEAT, clientManageProcessor, this.heartbeatExecutor); + this.remotingServer.registerProcessor(RequestCode.UNREGISTER_CLIENT, clientManageProcessor, this.clientManageExecutor); + this.remotingServer.registerProcessor(RequestCode.CHECK_CLIENT_CONFIG, clientManageProcessor, this.clientManageExecutor); + this.remotingServer.registerProcessor(RequestCode.CREATE_RETRY_TOPIC, clientManageProcessor, this.clientManageExecutor); - this.fastRemotingServer.registerProcessor(RequestCode.HEART_BEAT, clientProcessor, this.heartbeatExecutor); - this.fastRemotingServer.registerProcessor(RequestCode.UNREGISTER_CLIENT, clientProcessor, this.clientManageExecutor); - this.fastRemotingServer.registerProcessor(RequestCode.CHECK_CLIENT_CONFIG, clientProcessor, this.clientManageExecutor); - this.fastRemotingServer.registerProcessor(RequestCode.CREATE_RETRY_TOPIC, clientProcessor, this.clientManageExecutor); + this.fastRemotingServer.registerProcessor(RequestCode.HEART_BEAT, clientManageProcessor, this.heartbeatExecutor); + this.fastRemotingServer.registerProcessor(RequestCode.UNREGISTER_CLIENT, clientManageProcessor, this.clientManageExecutor); + this.fastRemotingServer.registerProcessor(RequestCode.CHECK_CLIENT_CONFIG, clientManageProcessor, this.clientManageExecutor); + this.fastRemotingServer.registerProcessor(RequestCode.CREATE_RETRY_TOPIC, clientManageProcessor, this.clientManageExecutor); /** * ConsumerManageProcessor */ - ConsumerManageProcessor consumerManageProcessor = new ConsumerManageProcessor(this); - this.remotingServer.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP, consumerManageProcessor, this.consumerManageExecutor); - this.remotingServer.registerProcessor(RequestCode.UPDATE_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor); - this.remotingServer.registerProcessor(RequestCode.QUERY_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor); + this.consumerManageProcessor = new ConsumerManageProcessor(this); + this.remotingServer.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP, this.consumerManageProcessor, this.consumerManageExecutor); + this.remotingServer.registerProcessor(RequestCode.UPDATE_CONSUMER_OFFSET, this.consumerManageProcessor, this.consumerManageExecutor); + this.remotingServer.registerProcessor(RequestCode.QUERY_CONSUMER_OFFSET, this.consumerManageProcessor, this.consumerManageExecutor); - this.fastRemotingServer.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP, consumerManageProcessor, this.consumerManageExecutor); - this.fastRemotingServer.registerProcessor(RequestCode.UPDATE_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor); - this.fastRemotingServer.registerProcessor(RequestCode.QUERY_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor); + this.fastRemotingServer.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP, this.consumerManageProcessor, this.consumerManageExecutor); + this.fastRemotingServer.registerProcessor(RequestCode.UPDATE_CONSUMER_OFFSET, this.consumerManageProcessor, this.consumerManageExecutor); + this.fastRemotingServer.registerProcessor(RequestCode.QUERY_CONSUMER_OFFSET, this.consumerManageProcessor, this.consumerManageExecutor); /** * EndTransactionProcessor @@ -607,7 +611,7 @@ public class BrokerController { /** * Default */ - AdminBrokerProcessor adminProcessor = new AdminBrokerProcessor(this); + this.adminProcessor = new AdminBrokerProcessor(this); this.remotingServer.registerDefaultProcessor(adminProcessor, this.adminBrokerExecutor); this.fastRemotingServer.registerDefaultProcessor(adminProcessor, this.adminBrokerExecutor); } @@ -1220,4 +1224,23 @@ public class BrokerController { } } + public SendMessageProcessor getSendProcessor() { + return sendProcessor; + } + + public ClientManageProcessor getClientManageProcessor() { + return clientManageProcessor; + } + + public AdminBrokerProcessor getAdminProcessor() { + return adminProcessor; + } + + public void setAdminProcessor(AdminBrokerProcessor adminProcessor) { + this.adminProcessor = adminProcessor; + } + + public ConsumerManageProcessor getConsumerManageProcessor() { + return consumerManageProcessor; + } } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java index 17d2f0e7e3308f0910145b3b9a2ba61f0e98194a..97a655c3399c733019e3b6770efc1c5c73c80593 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java @@ -18,6 +18,11 @@ package org.apache.rocketmq.broker; import ch.qos.logback.classic.LoggerContext; import ch.qos.logback.classic.joran.JoranConfigurator; +import java.io.BufferedInputStream; +import java.io.FileInputStream; +import java.io.InputStream; +import java.util.Properties; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.Option; import org.apache.commons.cli.Options; @@ -28,10 +33,10 @@ import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLoggerFactory; -import org.apache.rocketmq.remoting.common.RemotingUtil; -import org.apache.rocketmq.remoting.common.TlsMode; import org.apache.rocketmq.remoting.ClientConfig; import org.apache.rocketmq.remoting.ServerConfig; +import org.apache.rocketmq.remoting.common.RemotingUtil; +import org.apache.rocketmq.remoting.common.TlsMode; import org.apache.rocketmq.remoting.netty.NettySystemConfig; import org.apache.rocketmq.remoting.netty.TlsSystemConfig; import org.apache.rocketmq.remoting.protocol.RemotingCommand; @@ -40,12 +45,6 @@ import org.apache.rocketmq.store.config.BrokerRole; import org.apache.rocketmq.store.config.MessageStoreConfig; import org.slf4j.LoggerFactory; -import java.io.BufferedInputStream; -import java.io.FileInputStream; -import java.io.InputStream; -import java.util.Properties; -import java.util.concurrent.atomic.AtomicInteger; - import static org.apache.rocketmq.remoting.netty.TlsSystemConfig.TLS_ENABLE; public class BrokerStartup { @@ -53,6 +52,7 @@ public class BrokerStartup { public static CommandLine commandLine = null; public static String configFile = null; public static InternalLogger log; + private static BrokerController brokerController = null; public static void main(String[] args) { start(createBrokerController(args)); @@ -238,7 +238,7 @@ public class BrokerStartup { } } }, "ShutdownHook")); - + brokerController = controller; return controller; } catch (Throwable e) { e.printStackTrace(); @@ -260,7 +260,6 @@ public class BrokerStartup { private static Options buildCommandlineOptions(final Options options) { - Option opt = new Option("c", "configFile", true, "Broker config properties file"); opt.setRequired(false); options.addOption(opt); @@ -275,4 +274,8 @@ public class BrokerStartup { return options; } + + public static BrokerController getBrokerController() { + return brokerController; + } } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java index bd7625abec1bf4b51bd558032e715f26b75ff8dd..2e04726887ad43cd6dcdd7a38cae13f286a651f6 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java @@ -17,6 +17,11 @@ package org.apache.rocketmq.broker.processor; import io.netty.channel.ChannelHandlerContext; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.util.List; +import java.util.Map; +import java.util.Random; import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.broker.mqtrace.SendMessageContext; import org.apache.rocketmq.broker.mqtrace.SendMessageHook; @@ -27,8 +32,6 @@ import org.apache.rocketmq.common.constant.DBMsgConstants; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.constant.PermName; import org.apache.rocketmq.common.help.FAQUrl; -import org.apache.rocketmq.logging.InternalLogger; -import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.common.message.MessageAccessor; import org.apache.rocketmq.common.message.MessageConst; import org.apache.rocketmq.common.message.MessageDecoder; @@ -40,18 +43,14 @@ import org.apache.rocketmq.common.protocol.header.SendMessageResponseHeader; import org.apache.rocketmq.common.sysflag.MessageSysFlag; import org.apache.rocketmq.common.sysflag.TopicSysFlag; import org.apache.rocketmq.common.utils.ChannelUtil; +import org.apache.rocketmq.logging.InternalLogger; +import org.apache.rocketmq.logging.InternalLoggerFactory; +import org.apache.rocketmq.remoting.RequestProcessor; import org.apache.rocketmq.remoting.common.RemotingHelper; import org.apache.rocketmq.remoting.exception.RemotingCommandException; -import org.apache.rocketmq.remoting.RequestProcessor; import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.store.MessageExtBrokerInner; -import java.net.InetSocketAddress; -import java.net.SocketAddress; -import java.util.List; -import java.util.Map; -import java.util.Random; - public abstract class AbstractSendMessageProcessor implements RequestProcessor { protected static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); @@ -159,7 +158,7 @@ public abstract class AbstractSendMessageProcessor implements RequestProcessor { return response; } - protected RemotingCommand msgCheck(final ChannelHandlerContext ctx, + protected RemotingCommand msgCheck(final String remoteAddress, final SendMessageRequestHeader requestHeader, final RemotingCommand response) { if (!PermName.isWriteable(this.brokerController.getBrokerConfig().getBrokerPermission()) && this.brokerController.getTopicConfigManager().isOrderTopic(requestHeader.getTopic())) { @@ -188,11 +187,11 @@ public abstract class AbstractSendMessageProcessor implements RequestProcessor { } } - log.warn("the topic {} not exist, producer: {}", requestHeader.getTopic(), ctx.channel().remoteAddress()); + log.warn("the topic {} not exist, producer: {}", requestHeader.getTopic(), remoteAddress); topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageMethod( requestHeader.getTopic(), requestHeader.getDefaultTopic(), - RemotingHelper.parseChannelRemoteAddr(ctx.channel()), + remoteAddress, requestHeader.getDefaultTopicQueueNums(), topicSysFlag); if (null == topicConfig) { @@ -218,7 +217,7 @@ public abstract class AbstractSendMessageProcessor implements RequestProcessor { String errorInfo = String.format("request queueId[%d] is illegal, %s Producer: %s", queueIdInt, topicConfig.toString(), - RemotingHelper.parseChannelRemoteAddr(ctx.channel())); + remoteAddress); log.warn(errorInfo); response.setCode(ResponseCode.SYSTEM_ERROR); diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/ClientManageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/ClientManageProcessor.java index e7a1e7377da55fcb6bf2766636cd8af3e09ea8d3..e5a95e9c48e313907e2ece39587a4f27ad3e4d5a 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/ClientManageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/ClientManageProcessor.java @@ -207,9 +207,8 @@ public class ClientManageProcessor implements RequestProcessor { private RemotingCommand createRetryTopic(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { RemotingCommand response = RemotingCommand.createResponseCommand(null); - final CreateRetryTopicRequestHeader requestHeader = - (CreateRetryTopicRequestHeader) request - .decodeCommandCustomHeader(CreateRetryTopicRequestHeader.class); + final CreateRetryTopicRequestHeader requestHeader = (CreateRetryTopicRequestHeader) request + .decodeCommandCustomHeader(CreateRetryTopicRequestHeader.class); if (requestHeader.getGroupName() != null) { SubscriptionGroupConfig subscriptionGroupConfig = this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(requestHeader.getGroupName()); @@ -217,7 +216,8 @@ public class ClientManageProcessor implements RequestProcessor { createRetryTopic(false, requestHeader.getGroupName(), subscriptionGroupConfig.getRetryQueueNums()); } } - + response.setCode(ResponseCode.SUCCESS); + response.setRemark(null); return response; } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java index 3557a1745b305c5fd9c1a13f34b626aa2dbb13c0..1408cce39b8dc9f794483bfad4eacf2dcee28da7 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java @@ -46,6 +46,7 @@ import org.apache.rocketmq.common.sysflag.MessageSysFlag; import org.apache.rocketmq.common.sysflag.TopicSysFlag; import org.apache.rocketmq.remoting.RemotingChannel; import org.apache.rocketmq.remoting.RequestProcessor; +import org.apache.rocketmq.remoting.common.RemotingHelper; import org.apache.rocketmq.remoting.exception.RemotingCommandException; import org.apache.rocketmq.remoting.netty.NettyChannelHandlerContextImpl; import org.apache.rocketmq.remoting.protocol.RemotingCommand; @@ -71,7 +72,7 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement SendMessageContext mqtraceContext; switch (request.getCode()) { case RequestCode.CONSUMER_SEND_MSG_BACK: - return this.consumerSendMsgBack(ctx, request); + return this.consumerSendMsgBack(request); default: SendMessageRequestHeader requestHeader = parseRequestHeader(request); if (requestHeader == null) { @@ -82,10 +83,13 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement this.executeSendMessageHookBefore(ctx, request, mqtraceContext); RemotingCommand response; +// String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel()); + SocketAddress bornHost = ctx.channel().remoteAddress(); + if (requestHeader.isBatch()) { - response = this.sendBatchMessage(ctx, request, mqtraceContext, requestHeader); + response = this.sendBatchMessage(bornHost, request, mqtraceContext, requestHeader); } else { - response = this.sendMessage(ctx, request, mqtraceContext, requestHeader); + response = this.sendMessage(bornHost, request, mqtraceContext, requestHeader); } this.executeSendMessageHookAfter(response, mqtraceContext); @@ -99,7 +103,7 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement this.brokerController.getMessageStore().isTransientStorePoolDeficient(); } - private RemotingCommand consumerSendMsgBack(final ChannelHandlerContext ctx, final RemotingCommand request) + private RemotingCommand consumerSendMsgBack(final RemotingCommand request) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(null); final ConsumerSendMsgBackRequestHeader requestHeader = @@ -296,7 +300,7 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement return true; } - private RemotingCommand sendMessage(final ChannelHandlerContext ctx, + private RemotingCommand sendMessage(final SocketAddress remoteAddress, final RemotingCommand request, final SendMessageContext sendMessageContext, final SendMessageRequestHeader requestHeader) throws RemotingCommandException { @@ -319,7 +323,7 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement } response.setCode(-1); - super.msgCheck(ctx, requestHeader, response); + super.msgCheck(RemotingHelper.parseChannelRemoteAddr(remoteAddress), requestHeader, response); if (response.getCode() != -1) { return response; } @@ -346,9 +350,12 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement MessageAccessor.setProperties(msgInner, MessageDecoder.string2messageProperties(requestHeader.getProperties())); msgInner.setPropertiesString(requestHeader.getProperties()); msgInner.setBornTimestamp(requestHeader.getBornTimestamp()); - msgInner.setBornHost(ctx.channel().remoteAddress()); + msgInner.setBornHost(requestHeader.getBornHost()); msgInner.setStoreHost(this.getStoreHost()); msgInner.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes()); + msgInner.setBornHost(remoteAddress); +// ByteBuffer hostHolder = ByteBuffer.allocate(8); +// String bornHost = msgInner.getStoreHostBytes(hostHolder).toString(); PutMessageResult putMessageResult = null; Map oriProps = MessageDecoder.string2messageProperties(requestHeader.getProperties()); String traFlag = oriProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED); @@ -365,14 +372,14 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner); } - return handlePutMessageResult(putMessageResult, response, request, msgInner, responseHeader, sendMessageContext, ctx, queueIdInt); + return handlePutMessageResult(putMessageResult, response, request, msgInner, responseHeader, sendMessageContext, queueIdInt, remoteAddress); } private RemotingCommand handlePutMessageResult(PutMessageResult putMessageResult, RemotingCommand response, RemotingCommand request, MessageExt msg, - SendMessageResponseHeader responseHeader, SendMessageContext sendMessageContext, ChannelHandlerContext ctx, - int queueIdInt) { + SendMessageResponseHeader responseHeader, SendMessageContext sendMessageContext, + int queueIdInt, SocketAddress bornHost) { if (putMessageResult == null) { response.setCode(ResponseCode.SYSTEM_ERROR); response.setRemark("store putMessage return null"); @@ -445,8 +452,8 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement responseHeader.setCommitLogOffset(putMessageResult.getAppendMessageResult().getWroteOffset()); responseHeader.setStoreTimestamp(putMessageResult.getAppendMessageResult().getStoreTimestamp()); responseHeader.setStoreSize(putMessageResult.getAppendMessageResult().getWroteBytes()); - responseHeader.setStoreHost(ctx.channel().localAddress().toString()); - doResponse(ctx, request, response); + responseHeader.setStoreHost(RemotingHelper.parseChannelRemoteAddr(bornHost)); +// doResponse(ctx, request, response); if (hasSendMessageHook()) { sendMessageContext.setMsgId(responseHeader.getMsgId()); @@ -462,6 +469,10 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement sendMessageContext.setCommercialSendSize(wroteSize); sendMessageContext.setCommercialOwner(owner); } + if (!request.isOnewayRPC()) { + response.setCustomHeader(responseHeader); + return response; + } return null; } else { if (hasSendMessageHook()) { @@ -477,7 +488,7 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement return response; } - private RemotingCommand sendBatchMessage(final ChannelHandlerContext ctx, + private RemotingCommand sendBatchMessage(final SocketAddress remoteAddress, final RemotingCommand request, final SendMessageContext sendMessageContext, final SendMessageRequestHeader requestHeader) throws RemotingCommandException { @@ -500,7 +511,7 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement } response.setCode(-1); - super.msgCheck(ctx, requestHeader, response); + super.msgCheck(RemotingHelper.parseChannelRemoteAddr(remoteAddress), requestHeader, response); if (response.getCode() != -1) { return response; } @@ -537,13 +548,17 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement MessageAccessor.setProperties(messageExtBatch, MessageDecoder.string2messageProperties(requestHeader.getProperties())); messageExtBatch.setBody(request.getBody()); messageExtBatch.setBornTimestamp(requestHeader.getBornTimestamp()); - messageExtBatch.setBornHost(ctx.channel().remoteAddress()); + messageExtBatch.setBornHost(requestHeader.getBornHost()); messageExtBatch.setStoreHost(this.getStoreHost()); + messageExtBatch.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes()); +// ByteBuffer hostHolder = ByteBuffer.allocate(8); +// String storeHost = messageExtBatch.getStoreHostBytes(hostHolder).toString(); + PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessages(messageExtBatch); - return handlePutMessageResult(putMessageResult, response, request, messageExtBatch, responseHeader, sendMessageContext, ctx, queueIdInt); + return handlePutMessageResult(putMessageResult, response, request, messageExtBatch, responseHeader, sendMessageContext, queueIdInt, storeHost); } public boolean hasConsumeMessageHook() { diff --git a/common/src/main/java/org/apache/rocketmq/common/SnodeConfig.java b/common/src/main/java/org/apache/rocketmq/common/SnodeConfig.java index abe1a573e854ccfb9eb98e6f8082d024f4508ed7..dd380759b08fd5358e1583d5542af45891c34612 100644 --- a/common/src/main/java/org/apache/rocketmq/common/SnodeConfig.java +++ b/common/src/main/java/org/apache/rocketmq/common/SnodeConfig.java @@ -22,6 +22,8 @@ import org.apache.rocketmq.common.annotation.ImportantField; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLoggerFactory; +import org.apache.rocketmq.remoting.ClientConfig; +import org.apache.rocketmq.remoting.ServerConfig; import org.apache.rocketmq.remoting.common.RemotingUtil; public class SnodeConfig { @@ -32,6 +34,10 @@ public class SnodeConfig { private String rocketmqHome = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY, System.getenv(MixAll.ROCKETMQ_HOME_ENV)); + private ServerConfig nettyServerConfig; + + private ClientConfig nettyClientConfig; + @ImportantField private String namesrvAddr = System.getProperty(MixAll.NAMESRV_ADDR_PROPERTY, System.getenv(MixAll.NAMESRV_ADDR_ENV)); @@ -106,6 +112,9 @@ public class SnodeConfig { @ImportantField private boolean aclEnable = false; + @ImportantField + private boolean embeddedModeEnable = true; + public void setSnodeHeartBeatInterval(long snodeHeartBeatInterval) { this.snodeHeartBeatInterval = snodeHeartBeatInterval; } @@ -410,4 +419,27 @@ public class SnodeConfig { this.loadOffsetInterval = loadOffsetInterval; } + public boolean isEmbeddedModeEnable() { + return embeddedModeEnable; + } + + public void setEmbeddedModeEnable(boolean embeddedModeEnable) { + this.embeddedModeEnable = embeddedModeEnable; + } + + public ServerConfig getNettyServerConfig() { + return nettyServerConfig; + } + + public void setNettyServerConfig(ServerConfig nettyServerConfig) { + this.nettyServerConfig = nettyServerConfig; + } + + public ClientConfig getNettyClientConfig() { + return nettyClientConfig; + } + + public void setNettyClientConfig(ClientConfig nettyClientConfig) { + this.nettyClientConfig = nettyClientConfig; + } } diff --git a/snode/pom.xml b/snode/pom.xml index bb28f6406c0837577fe71118baf5d4c75746d94e..da1e3f3ef19a7155b87eae1146c21020d18a6356 100644 --- a/snode/pom.xml +++ b/snode/pom.xml @@ -15,7 +15,8 @@ limitations under the License. --> - + org.apache.rocketmq rocketmq-all @@ -92,6 +93,10 @@ io.prometheus simpleclient_hotspot + + ${project.groupId} + rocketmq-broker + diff --git a/snode/src/main/java/org/apache/rocketmq/snode/SnodeController.java b/snode/src/main/java/org/apache/rocketmq/snode/SnodeController.java index 6523d8bd3c1dcdfa56c56a4749a2ffb9279bf178..5ca8a19debd4c964938fe25d3e3124d75cfeb047 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/SnodeController.java +++ b/snode/src/main/java/org/apache/rocketmq/snode/SnodeController.java @@ -23,6 +23,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import org.apache.rocketmq.acl.AccessValidator; +import org.apache.rocketmq.broker.BrokerStartup; import org.apache.rocketmq.common.SnodeConfig; import org.apache.rocketmq.common.ThreadFactoryImpl; import org.apache.rocketmq.common.constant.LoggerName; @@ -69,10 +70,11 @@ import org.apache.rocketmq.snode.service.PushService; import org.apache.rocketmq.snode.service.ScheduledService; import org.apache.rocketmq.snode.service.WillMessageService; import org.apache.rocketmq.snode.service.impl.ClientServiceImpl; -import org.apache.rocketmq.snode.service.impl.EnodeServiceImpl; +import org.apache.rocketmq.snode.service.impl.LocalEnodeServiceImpl; import org.apache.rocketmq.snode.service.impl.MetricsServiceImpl; import org.apache.rocketmq.snode.service.impl.NnodeServiceImpl; import org.apache.rocketmq.snode.service.impl.PushServiceImpl; +import org.apache.rocketmq.snode.service.impl.RemoteEnodeServiceImpl; import org.apache.rocketmq.snode.service.impl.ScheduledServiceImpl; import org.apache.rocketmq.snode.service.impl.WillMessageServiceImpl; @@ -127,7 +129,11 @@ public class SnodeController { this.nettyClientConfig = nettyClientConfig; this.nettyServerConfig = nettyServerConfig; this.snodeConfig = snodeConfig; - this.enodeService = new EnodeServiceImpl(this); + if (!this.snodeConfig.isEmbeddedModeEnable()) { + this.enodeService = new RemoteEnodeServiceImpl(this); + } else { + this.enodeService = new LocalEnodeServiceImpl(BrokerStartup.getBrokerController()); + } this.nnodeService = new NnodeServiceImpl(this); this.scheduledService = new ScheduledServiceImpl(this); this.remotingClient = RemotingClientFactory.getInstance().createRemotingClient() @@ -163,7 +169,6 @@ public class SnodeController { "SnodeHeartbeatThread", true); - this.consumerManageExecutor = ThreadUtils.newThreadPoolExecutor( snodeConfig.getSnodeSendMessageMinPoolSize(), snodeConfig.getSnodeSendMessageMaxPoolSize(), @@ -228,10 +233,8 @@ public class SnodeController { } public boolean initialize() { - this.snodeServer = RemotingServerFactory.getInstance().createRemotingServer() - .init(this.nettyServerConfig, this.clientHousekeepingService); - this.mqttRemotingServer = RemotingServerFactory.getInstance().createRemotingServer( - RemotingUtil.MQTT_PROTOCOL) + this.snodeServer = RemotingServerFactory.getInstance().createRemotingServer().init(this.nettyServerConfig, this.clientHousekeepingService); + this.mqttRemotingServer = RemotingServerFactory.getInstance().createRemotingServer(RemotingUtil.MQTT_PROTOCOL) .init(this.nettyServerConfig, this.clientHousekeepingService); this.registerProcessor(); initSnodeInterceptorGroup(); diff --git a/snode/src/main/java/org/apache/rocketmq/snode/SnodeStartup.java b/snode/src/main/java/org/apache/rocketmq/snode/SnodeStartup.java index 232def9ca35ce031fa7c696bca75d9db62c49eb6..23ea68a18355afee476fb786376b97a1d3b8a7da 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/SnodeStartup.java +++ b/snode/src/main/java/org/apache/rocketmq/snode/SnodeStartup.java @@ -16,8 +16,6 @@ */ package org.apache.rocketmq.snode; -import static org.apache.rocketmq.remoting.netty.TlsSystemConfig.TLS_ENABLE; - import ch.qos.logback.classic.LoggerContext; import ch.qos.logback.classic.joran.JoranConfigurator; import ch.qos.logback.core.joran.spi.JoranException; @@ -31,6 +29,7 @@ import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.Option; import org.apache.commons.cli.Options; import org.apache.commons.cli.PosixParser; +import org.apache.rocketmq.broker.BrokerStartup; import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.SnodeConfig; import org.apache.rocketmq.common.constant.LoggerName; @@ -44,6 +43,8 @@ import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.srvutil.ServerUtil; import org.slf4j.LoggerFactory; +import static org.apache.rocketmq.remoting.netty.TlsSystemConfig.TLS_ENABLE; + public class SnodeStartup { private static InternalLogger log; public static Properties properties = null; @@ -51,13 +52,17 @@ public class SnodeStartup { public static String configFile = null; public static void main(String[] args) throws IOException, JoranException { - startup(createSnodeController(args)); + SnodeConfig snodeConfig = loadConfig(args); + if (snodeConfig.isEmbeddedModeEnable()) { + BrokerStartup.start(BrokerStartup.createBrokerController(args)); + } + SnodeController snodeController = createSnodeController(snodeConfig); + startup(snodeController); } public static SnodeController startup(SnodeController controller) { try { controller.start(); - String tip = "The snode[" + controller.getSnodeConfig().getSnodeName() + ", " + controller.getSnodeConfig().getSnodeIP1() + "] boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer(); @@ -74,7 +79,7 @@ public class SnodeStartup { return null; } - public static SnodeController createSnodeController(String[] args) throws IOException, JoranException { + public static SnodeConfig loadConfig(String[] args) throws IOException { Options options = ServerUtil.buildCommandlineOptions(new Options()); commandLine = ServerUtil.parseCmdLine("snode", args, buildCommandlineOptions(options), new PosixParser()); @@ -82,12 +87,11 @@ public class SnodeStartup { System.exit(-1); } - final SnodeConfig snodeConfig = new SnodeConfig(); + SnodeConfig snodeConfig = new SnodeConfig(); final ServerConfig nettyServerConfig = new ServerConfig(); final ClientConfig nettyClientConfig = new ClientConfig(); nettyServerConfig.setListenPort(snodeConfig.getListenPort()); - nettyServerConfig.setListenPort(11911); nettyClientConfig.setUseTLS(Boolean.parseBoolean(System.getProperty(TLS_ENABLE, String.valueOf(TlsSystemConfig.tlsMode == TlsMode.ENFORCING)))); @@ -101,27 +105,28 @@ public class SnodeStartup { MixAll.properties2Object(properties, snodeConfig); MixAll.properties2Object(properties, nettyServerConfig); MixAll.properties2Object(properties, nettyClientConfig); + in.close(); } } + snodeConfig.setNettyServerConfig(nettyServerConfig); + snodeConfig.setNettyClientConfig(nettyClientConfig); if (null == snodeConfig.getRocketmqHome()) { System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation", MixAll.ROCKETMQ_HOME_ENV); System.exit(-2); } - LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory(); - JoranConfigurator configurator = new JoranConfigurator(); - configurator.setContext(lc); - lc.reset(); - configurator.doConfigure(snodeConfig.getRocketmqHome() + "/conf/logback_snode.xml"); - log = InternalLoggerFactory.getLogger(LoggerName.SNODE_LOGGER_NAME); - MixAll.printObjectProperties(log, snodeConfig); - MixAll.printObjectProperties(log, nettyClientConfig); - MixAll.printObjectProperties(log, nettyServerConfig); + MixAll.printObjectProperties(log, snodeConfig.getNettyServerConfig()); + MixAll.printObjectProperties(log, snodeConfig.getNettyClientConfig()); + return snodeConfig; + } + + public static SnodeController createSnodeController(SnodeConfig snodeConfig) throws JoranException { + final SnodeController snodeController = new SnodeController( - nettyServerConfig, - nettyClientConfig, + snodeConfig.getNettyServerConfig(), + snodeConfig.getNettyClientConfig(), snodeConfig); boolean initResult = snodeController.initialize(); @@ -148,7 +153,14 @@ public class SnodeStartup { } } } - },"ShutdownHook")); + }, "ShutdownHook")); + LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory(); + JoranConfigurator configurator = new JoranConfigurator(); + configurator.setContext(lc); + lc.reset(); + configurator.doConfigure(snodeConfig.getRocketmqHome() + "/conf/logback_snode.xml"); + log = InternalLoggerFactory.getLogger(LoggerName.SNODE_LOGGER_NAME); + return snodeController; } @@ -164,7 +176,7 @@ public class SnodeStartup { opt = new Option("m", "printImportantConfig", false, "Print important config item"); opt.setRequired(false); options.addOption(opt); - + return options; } } diff --git a/snode/src/main/java/org/apache/rocketmq/snode/client/SubscriptionGroupManager.java b/snode/src/main/java/org/apache/rocketmq/snode/client/SubscriptionGroupManager.java index ab013b8b9c4b788769b007cb442d215ae64e7447..47f10c9b056a8517b49b0e34e4cb48fb0547eb03 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/client/SubscriptionGroupManager.java +++ b/snode/src/main/java/org/apache/rocketmq/snode/client/SubscriptionGroupManager.java @@ -42,19 +42,6 @@ public class SubscriptionGroupManager { private void init() { } - public void updateSubscriptionGroupConfig(final SubscriptionGroupConfig config) { - SubscriptionGroupConfig old = this.subscriptionGroupTable.put(config.getGroupName(), config); - if (old != null) { - log.info("Update subscription group config, old: {} new: {}", old, config); - } else { - log.info("Create new subscription group, {}", config); - } - - this.dataVersion.nextVersion(); - - this.persistSubscription(config); - } - public void disableConsume(final String groupName) { SubscriptionGroupConfig old = this.subscriptionGroupTable.get(groupName); if (old != null) { @@ -63,7 +50,8 @@ public class SubscriptionGroupManager { } } - public SubscriptionGroupConfig findSubscriptionGroupConfig(final String group) { + public SubscriptionGroupConfig findSubscriptionGroupConfig( + final String group) { SubscriptionGroupConfig subscriptionGroupConfig = this.subscriptionGroupTable.get(group); if (null == subscriptionGroupConfig) { if (snodeController.getSnodeConfig().isAutoCreateSubscriptionGroup() || MixAll.isSysConsumerGroup(group)) { @@ -74,15 +62,13 @@ public class SubscriptionGroupManager { log.info("Auto create a subscription group, {}", subscriptionGroupConfig.toString()); } this.dataVersion.nextVersion(); - this.persistSubscription(subscriptionGroupConfig); + this.snodeController.getEnodeService().persistSubscriptionGroupConfig(subscriptionGroupConfig); } } return subscriptionGroupConfig; } - - public ConcurrentMap getSubscriptionGroupTable() { return subscriptionGroupTable; } @@ -91,18 +77,4 @@ public class SubscriptionGroupManager { return dataVersion; } - public void deleteSubscriptionGroupConfig(final String groupName) { - SubscriptionGroupConfig old = this.subscriptionGroupTable.remove(groupName); - if (old != null) { - log.info("delete subscription group OK, subscription group:{}", old); - this.dataVersion.nextVersion(); - this.persistSubscription(old); - } else { - log.warn("delete subscription group failed, subscription groupName: {} not exist", groupName); - } - } - - void persistSubscription(SubscriptionGroupConfig config) { - this.snodeController.getEnodeService().persistSubscriptionGroupConfig(config); - } } diff --git a/snode/src/main/java/org/apache/rocketmq/snode/client/SubscriptionManager.java b/snode/src/main/java/org/apache/rocketmq/snode/client/SubscriptionManager.java index a164fc46f4a862424333411059a9aa49c4ccda09..a648ba812960f42ec1217ac03fecd43be3c513e7 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/client/SubscriptionManager.java +++ b/snode/src/main/java/org/apache/rocketmq/snode/client/SubscriptionManager.java @@ -29,11 +29,6 @@ public interface SubscriptionManager { boolean subscribe(String groupId, Set subscriptionDataSet, ConsumeType consumeType, MessageModel messageModel, ConsumeFromWhere consumeFromWhere); - void unSubscribe(String groupId, RemotingChannel remotingChannel, - Set subscriptionDataSet); - - void cleanSubscription(String groupId, String topic); - Subscription getSubscription(String groupId); void registerPushSession(Set subscriptionDataSet, RemotingChannel remotingChannel, diff --git a/snode/src/main/java/org/apache/rocketmq/snode/client/impl/SubscriptionManagerImpl.java b/snode/src/main/java/org/apache/rocketmq/snode/client/impl/SubscriptionManagerImpl.java index 1f5ecd878805644b00ad1756ea4a390ddd181f19..4a4a35e881bd1860f25e988a9674fc9186e47d51 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/client/impl/SubscriptionManagerImpl.java +++ b/snode/src/main/java/org/apache/rocketmq/snode/client/impl/SubscriptionManagerImpl.java @@ -44,7 +44,7 @@ public class SubscriptionManagerImpl implements SubscriptionManager { @Override public void registerPushSession(Set subscriptionDataSet, RemotingChannel remotingChannel, String groupId) { - log.debug("Before ConsumerGroup: {} RemotingChannel: {} subscription: {}", groupId, remotingChannel.remoteAddress(), subscriptionDataSet); + log.info("Before ConsumerGroup: {} RemotingChannel: {} subscription: {}", groupId, remotingChannel.remoteAddress(), subscriptionDataSet); Set prevSubSet = this.clientSubscriptionTable.get(remotingChannel); Set keySet = new HashSet<>(); for (SubscriptionData subscriptionData : subscriptionDataSet) { @@ -77,7 +77,7 @@ public class SubscriptionManagerImpl implements SubscriptionManager { } } } - log.debug("After ConsumerGroup: {} RemotingChannel: {} subscription: {}", groupId, remotingChannel.remoteAddress(), this.clientSubscriptionTable.get(remotingChannel)); + log.info("After ConsumerGroup: {} RemotingChannel: {} subscription: {}", groupId, remotingChannel.remoteAddress(), this.clientSubscriptionTable.get(remotingChannel)); } @Override @@ -191,17 +191,6 @@ public class SubscriptionManagerImpl implements SubscriptionManager { return updated; } - @Override - public void unSubscribe(String groupId, RemotingChannel remotingChannel, - Set subscriptionDataSet) { - - } - - @Override - public void cleanSubscription(String groupId, String topic) { - - } - @Override public Subscription getSubscription(String groupId) { return groupSubscriptionTable.get(groupId); diff --git a/snode/src/main/java/org/apache/rocketmq/snode/offset/ConsumerOffsetManager.java b/snode/src/main/java/org/apache/rocketmq/snode/offset/ConsumerOffsetManager.java index d9b23b4fd32268a8d9927fd611c88b8bc984f60f..ba2a80011b20003e9ad8e64f1ea9f714ec176f1a 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/offset/ConsumerOffsetManager.java +++ b/snode/src/main/java/org/apache/rocketmq/snode/offset/ConsumerOffsetManager.java @@ -20,13 +20,9 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.protocol.ResponseCode; -import org.apache.rocketmq.common.protocol.header.QueryConsumerOffsetResponseHeader; import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLoggerFactory; -import org.apache.rocketmq.remoting.exception.RemotingConnectException; -import org.apache.rocketmq.remoting.exception.RemotingSendRequestException; -import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; -import org.apache.rocketmq.remoting.protocol.RemotingCommand; +import org.apache.rocketmq.remoting.RemotingChannel; import org.apache.rocketmq.snode.SnodeController; import org.apache.rocketmq.snode.exception.SnodeException; @@ -79,17 +75,18 @@ public class ConsumerOffsetManager { } } - private long parserOffset(final String enodeName, final String group, final String topic, final int queueId) { - try { - RemotingCommand remotingCommand = queryOffset(enodeName, group, topic, queueId); - QueryConsumerOffsetResponseHeader responseHeader = - (QueryConsumerOffsetResponseHeader) remotingCommand.decodeCommandCustomHeader(QueryConsumerOffsetResponseHeader.class); - return responseHeader.getOffset(); - } catch (Exception ex) { - log.error("Load offset from broker error", ex); - } - return -1; - } +// private long parserOffset(final RemotingChannel remotingChannel, final String enodeName, final String group, +// final String topic, final int queueId) { +// try { +// RemotingCommand remotingCommand = queryOffset(remotingChannel, enodeName, group, topic, queueId); +// QueryConsumerOffsetResponseHeader responseHeader = +// (QueryConsumerOffsetResponseHeader) remotingCommand.decodeCommandCustomHeader(QueryConsumerOffsetResponseHeader.class); +// return responseHeader.getOffset(); +// } catch (Exception ex) { +// log.error("Load offset from broker error", ex); +// } +// return -1; +// } public long queryCacheOffset(final String enodeName, final String group, final String topic, final int queueId) { String key = buildKey(enodeName, topic, group); @@ -99,30 +96,33 @@ public class ConsumerOffsetManager { map = this.offsetTable.putIfAbsent(key, map); } CacheOffset cacheOffset = map.get(queueId); - if (cacheOffset != null) { - if (System.currentTimeMillis() - cacheOffset.getUpdateTimestamp() > snodeController.getSnodeConfig().getLoadOffsetInterval()) { - cacheOffset.setOffset(parserOffset(enodeName, group, topic, queueId)); - cacheOffset.setUpdateTimestamp(System.currentTimeMillis()); + try { + if (cacheOffset != null) { + if (!this.snodeController.getSnodeConfig().isEmbeddedModeEnable() && System.currentTimeMillis() - cacheOffset.getUpdateTimestamp() > snodeController.getSnodeConfig().getLoadOffsetInterval()) { + long offset = this.snodeController.getEnodeService().queryOffset(enodeName, group, topic, queueId); + cacheOffset.setOffset(offset); + cacheOffset.setUpdateTimestamp(System.currentTimeMillis()); + } else { + long offset = this.snodeController.getEnodeService().queryOffset(enodeName, group, topic, queueId); + cacheOffset.setOffset(offset); + } + } else { + long offset = this.snodeController.getEnodeService().queryOffset(enodeName, group, topic, queueId); + cacheOffset = new CacheOffset(key, offset, System.currentTimeMillis()); + map.put(queueId, cacheOffset); } - } else { - cacheOffset = new CacheOffset(key, parserOffset(enodeName, group, topic, queueId), System.currentTimeMillis()); - map.put(queueId, cacheOffset); + } catch (Exception ex) { + log.warn("Load offset error, enodeName: {}, group:{},topic:{} queueId:{}", enodeName, group, topic, queueId); } return cacheOffset.getOffset(); - } - public void commitOffset(final String enodeName, final String clientHost, final String group, final String topic, + public void commitOffset(final RemotingChannel remotingChannel, final String enodeName, final String clientHost, + final String group, final String topic, final int queueId, final long offset) { cacheOffset(enodeName, clientHost, group, topic, queueId, offset); - this.snodeController.getEnodeService().persistOffset(enodeName, group, topic, queueId, offset); - } - - public RemotingCommand queryOffset(final String enodeName, final String group, final String topic, - final int queueId) throws InterruptedException, RemotingTimeoutException, - RemotingSendRequestException, RemotingConnectException { - return this.snodeController.getEnodeService().loadOffset(enodeName, group, topic, queueId); + this.snodeController.getEnodeService().persistOffset(remotingChannel, enodeName, group, topic, queueId, offset); } public class CacheOffset { diff --git a/snode/src/main/java/org/apache/rocketmq/snode/processor/ConsumerManageProcessor.java b/snode/src/main/java/org/apache/rocketmq/snode/processor/ConsumerManageProcessor.java index a00058913dd7478d18e45c15db4e56bb557c282b..1953de1cb04a0d4f2f59b4d24c2c25141fb4df79 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/processor/ConsumerManageProcessor.java +++ b/snode/src/main/java/org/apache/rocketmq/snode/processor/ConsumerManageProcessor.java @@ -25,9 +25,13 @@ import org.apache.rocketmq.common.protocol.header.GetConsumerListByGroupRequestH import org.apache.rocketmq.common.protocol.header.GetConsumerListByGroupResponseBody; import org.apache.rocketmq.common.protocol.header.GetConsumerListByGroupResponseHeader; import org.apache.rocketmq.common.protocol.header.GetMaxOffsetRequestHeader; +import org.apache.rocketmq.common.protocol.header.GetMaxOffsetResponseHeader; import org.apache.rocketmq.common.protocol.header.GetMinOffsetRequestHeader; +import org.apache.rocketmq.common.protocol.header.GetMinOffsetResponseHeader; import org.apache.rocketmq.common.protocol.header.QueryConsumerOffsetRequestHeader; +import org.apache.rocketmq.common.protocol.header.QueryConsumerOffsetResponseHeader; import org.apache.rocketmq.common.protocol.header.SearchOffsetRequestHeader; +import org.apache.rocketmq.common.protocol.header.SearchOffsetResponseHeader; import org.apache.rocketmq.common.protocol.header.UpdateConsumerOffsetRequestHeader; import org.apache.rocketmq.common.protocol.header.UpdateConsumerOffsetResponseHeader; import org.apache.rocketmq.logging.InternalLogger; @@ -87,7 +91,14 @@ public class ConsumerManageProcessor implements RequestProcessor { (SearchOffsetRequestHeader) request .decodeCommandCustomHeader(SearchOffsetRequestHeader.class); try { - return this.snodeController.getEnodeService().getOffsetByTimestamp(requestHeader.getEnodeName(), request); + final RemotingCommand response = RemotingCommand.createResponseCommand(SearchOffsetResponseHeader.class); + final SearchOffsetResponseHeader responseHeader = (SearchOffsetResponseHeader) response.readCustomHeader(); + + long offset = this.snodeController.getEnodeService().getOffsetByTimestamp(requestHeader.getEnodeName(), + requestHeader.getTopic(), requestHeader.getQueueId(), requestHeader.getTimestamp(), request); + responseHeader.setOffset(offset); + response.setCode(ResponseCode.SUCCESS); + response.setRemark(null); } catch (Exception ex) { log.error("Search offset by timestamp error:{}", ex); } @@ -100,7 +111,14 @@ public class ConsumerManageProcessor implements RequestProcessor { (GetMinOffsetRequestHeader) request .decodeCommandCustomHeader(GetMinOffsetRequestHeader.class); try { - return this.snodeController.getEnodeService().getMinOffsetInQueue(requestHeader.getEnodeName(), requestHeader.getTopic(), requestHeader.getQueueId()); + final RemotingCommand response = RemotingCommand.createResponseCommand(GetMinOffsetResponseHeader.class); + final GetMinOffsetResponseHeader responseHeader = (GetMinOffsetResponseHeader) response.readCustomHeader(); + + long offset = this.snodeController.getEnodeService().getMinOffsetInQueue(requestHeader.getEnodeName(), + requestHeader.getTopic(), requestHeader.getQueueId(), request); + responseHeader.setOffset(offset); + response.setCode(ResponseCode.SUCCESS); + response.setRemark(null); } catch (Exception ex) { log.error("Get min offset error:{}", ex); } @@ -113,9 +131,16 @@ public class ConsumerManageProcessor implements RequestProcessor { (GetMaxOffsetRequestHeader) request .decodeCommandCustomHeader(GetMaxOffsetRequestHeader.class); try { - return this.snodeController.getEnodeService().getMaxOffsetInQueue(requestHeader.getEnodeName(), request); + long offset = this.snodeController.getEnodeService().getMaxOffsetInQueue(requestHeader.getEnodeName(), + requestHeader.getTopic(), requestHeader.getQueueId(), request); + final RemotingCommand response = RemotingCommand.createResponseCommand(GetMaxOffsetResponseHeader.class); + final GetMaxOffsetResponseHeader responseHeader = (GetMaxOffsetResponseHeader) response.readCustomHeader(); + responseHeader.setOffset(offset); + response.setCode(ResponseCode.SUCCESS); + response.setRemark(null); + return response; } catch (Exception ex) { - log.error("Get min offset error:{}", ex); + log.error("Get min offset error, remoting: {} error: {} ", remotingChannel.remoteAddress(), ex); } return null; } @@ -153,7 +178,7 @@ public class ConsumerManageProcessor implements RequestProcessor { final UpdateConsumerOffsetRequestHeader requestHeader = (UpdateConsumerOffsetRequestHeader) request .decodeCommandCustomHeader(UpdateConsumerOffsetRequestHeader.class); - this.snodeController.getConsumerOffsetManager().commitOffset(requestHeader.getEnodeName(), RemotingHelper.parseChannelRemoteAddr(remotingChannel.remoteAddress()), requestHeader.getConsumerGroup(), + this.snodeController.getConsumerOffsetManager().commitOffset(remotingChannel, requestHeader.getEnodeName(), RemotingHelper.parseChannelRemoteAddr(remotingChannel.remoteAddress()), requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId(), requestHeader.getCommitOffset()); response.setCode(ResponseCode.SUCCESS); response.setRemark(null); @@ -171,9 +196,15 @@ public class ConsumerManageProcessor implements RequestProcessor { requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId()); - return this.snodeController.getConsumerOffsetManager().queryOffset(requestHeader.getEnodeName(), requestHeader.getConsumerGroup(), requestHeader.getTopic(), + long offset = this.snodeController.getEnodeService().queryOffset(requestHeader.getEnodeName(), requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId()); + final RemotingCommand response = RemotingCommand.createResponseCommand(QueryConsumerOffsetResponseHeader.class); + final QueryConsumerOffsetResponseHeader responseHeader = (QueryConsumerOffsetResponseHeader) response.readCustomHeader(); + responseHeader.setOffset(offset); + response.setCode(ResponseCode.SUCCESS); + response.setRemark(null); + return response; } public RemotingCommand createRetryTopic(RemotingChannel remotingChannel, @@ -181,7 +212,7 @@ public class ConsumerManageProcessor implements RequestProcessor { RemotingSendRequestException, RemotingConnectException, RemotingCommandException { final CreateRetryTopicRequestHeader requestHeader = (CreateRetryTopicRequestHeader) request.decodeCommandCustomHeader(CreateRetryTopicRequestHeader.class); requestHeader.getEnodeName(); - return this.snodeController.getEnodeService().creatRetryTopic(requestHeader.getEnodeName(), request); + return this.snodeController.getEnodeService().creatRetryTopic(remotingChannel, requestHeader.getEnodeName(), request); } } diff --git a/snode/src/main/java/org/apache/rocketmq/snode/processor/PullMessageProcessor.java b/snode/src/main/java/org/apache/rocketmq/snode/processor/PullMessageProcessor.java index fc13f5ea0c7a4c82da9880364bcac8f92e3e8d9d..95982bc7c7c6528ebe50bd89259d2beca0405f65 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/processor/PullMessageProcessor.java +++ b/snode/src/main/java/org/apache/rocketmq/snode/processor/PullMessageProcessor.java @@ -127,14 +127,16 @@ public class PullMessageProcessor implements RequestProcessor { } } - CompletableFuture responseFuture = snodeController.getEnodeService().pullMessage(requestHeader.getEnodeName(), request); + CompletableFuture responseFuture = snodeController.getEnodeService().pullMessage(remotingChannel, requestHeader.getEnodeName(), request); responseFuture.whenComplete((data, ex) -> { if (ex == null) { if (this.snodeController.getConsumeMessageInterceptorGroup() != null) { ResponseContext responseContext = new ResponseContext(request, remotingChannel, data); this.snodeController.getSendMessageInterceptorGroup().afterRequest(responseContext); } - remotingChannel.reply(data); + if (data != null) { + remotingChannel.reply(data); + } } else { if (this.snodeController.getConsumeMessageInterceptorGroup() != null) { ExceptionContext exceptionContext = new ExceptionContext(request, remotingChannel, ex, null); diff --git a/snode/src/main/java/org/apache/rocketmq/snode/processor/SendMessageProcessor.java b/snode/src/main/java/org/apache/rocketmq/snode/processor/SendMessageProcessor.java index c3fd2fe8972af9410fa55f51391261a8267b0094..019794ed87e3deacb26c07afd11556685b80ecf5 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/processor/SendMessageProcessor.java +++ b/snode/src/main/java/org/apache/rocketmq/snode/processor/SendMessageProcessor.java @@ -83,11 +83,11 @@ public class SendMessageProcessor implements RequestProcessor { stringBuffer.append(MixAll.getRetryTopic(consumerSendMsgBackRequestHeader.getGroup())); } - CompletableFuture responseFuture = snodeController.getEnodeService().sendMessage(enodeName, request); + CompletableFuture responseFuture = snodeController.getEnodeService().sendMessage(remotingChannel, enodeName, request); sendMessageRequestHeaderV2.setO(remotingChannel.remoteAddress()); final byte[] message = request.getBody(); - final boolean isNeedPush = !isSendBack; + final boolean needPush = !isSendBack; final SendMessageRequestHeader sendMessageRequestHeader = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV1(sendMessageRequestHeaderV2); responseFuture.whenComplete((data, ex) -> { @@ -98,7 +98,7 @@ public class SendMessageProcessor implements RequestProcessor { } remotingChannel.reply(data); this.snodeController.getMetricsService().recordRequestSize(stringBuffer.toString(), request.getBody().length); - if (data.getCode() == ResponseCode.SUCCESS && isNeedPush) { + if (data.getCode() == ResponseCode.SUCCESS && needPush) { this.snodeController.getPushService().pushMessage(sendMessageRequestHeader, message, data); } } else { diff --git a/snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MqttPingreqMessageHandler.java b/snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MqttPingreqMessageHandler.java index 11f8f1892b2dc1a9ac7864e890edfd67c8f77db8..1f75fcc64e7f4e7773c1418f636f9a95abb2291e 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MqttPingreqMessageHandler.java +++ b/snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MqttPingreqMessageHandler.java @@ -29,6 +29,7 @@ public class MqttPingreqMessageHandler implements MessageHandler { public MqttPingreqMessageHandler(SnodeController snodeController) { this.snodeController = snodeController; } + /** * handle the PINGREQ message from client *
    @@ -41,7 +42,8 @@ public class MqttPingreqMessageHandler implements MessageHandler { * @param message * @return */ - @Override public RemotingCommand handleMessage(MqttMessage message, RemotingChannel remotingChannel) { + @Override + public RemotingCommand handleMessage(MqttMessage message, RemotingChannel remotingChannel) { return null; } } diff --git a/snode/src/main/java/org/apache/rocketmq/snode/service/AdminService.java b/snode/src/main/java/org/apache/rocketmq/snode/service/AdminService.java index 560233a5512aafb413a9a87488b9ecd375796190..774a2a65f02e2de533c61734a7ab6b5ab0441478 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/service/AdminService.java +++ b/snode/src/main/java/org/apache/rocketmq/snode/service/AdminService.java @@ -1,4 +1,4 @@ -package org.apache.rocketmq.snode.service;/* +/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. @@ -14,7 +14,7 @@ package org.apache.rocketmq.snode.service;/* * See the License for the specific language governing permissions and * limitations under the License. */ - +package org.apache.rocketmq.snode.service; public interface AdminService { } diff --git a/snode/src/main/java/org/apache/rocketmq/snode/service/EnodeService.java b/snode/src/main/java/org/apache/rocketmq/snode/service/EnodeService.java index a4fa28c07558511dc264b1a86a6f519bf61ce608..e2db16eb7589700d4b7476811d0803b61feda426 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/service/EnodeService.java +++ b/snode/src/main/java/org/apache/rocketmq/snode/service/EnodeService.java @@ -21,6 +21,7 @@ import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader; import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeaderV2; import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig; +import org.apache.rocketmq.remoting.RemotingChannel; import org.apache.rocketmq.remoting.exception.RemotingCommandException; import org.apache.rocketmq.remoting.exception.RemotingConnectException; import org.apache.rocketmq.remoting.exception.RemotingSendRequestException; @@ -42,7 +43,8 @@ public interface EnodeService { * @param request {@link SendMessageRequestHeaderV2} Send message request header * @return Send message response future */ - CompletableFuture sendMessage(final String enodeName, final RemotingCommand request); + CompletableFuture sendMessage(final RemotingChannel remotingChannel, final String enodeName, + final RemotingCommand request); /** * Pull message from enode server. @@ -51,7 +53,8 @@ public interface EnodeService { * @param request {@link PullMessageRequestHeader} Pull message request header * @return Pull message Response future */ - CompletableFuture pullMessage(final String enodeName, final RemotingCommand request); + CompletableFuture pullMessage(final RemotingChannel remotingChannel, final String enodeName, + final RemotingCommand request); /** * Create retry topic in enode server. @@ -64,7 +67,7 @@ public interface EnodeService { * @throws RemotingSendRequestException * @throws RemotingConnectException */ - RemotingCommand creatRetryTopic(String enodeName, + RemotingCommand creatRetryTopic(final RemotingChannel remotingChannel, String enodeName, RemotingCommand request) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException; /** @@ -98,20 +101,24 @@ public interface EnodeService { * @param queueId QueueId of related topic. * @param offset Current offset of target queue of subscribed topic. */ - void persistOffset(String enodeName, String groupName, String topic, int queueId, long offset); + void persistOffset(final RemotingChannel remotingChannel, String enodeName, String groupName, String topic, + int queueId, long offset); - RemotingCommand loadOffset(String enodeName, String consumerGroup, String topic, - int queueId) throws InterruptedException, RemotingTimeoutException, - RemotingSendRequestException, RemotingConnectException; + long queryOffset(String enodeName, String consumerGroup, + String topic, int queueId) throws InterruptedException, RemotingTimeoutException, + RemotingSendRequestException, RemotingConnectException, RemotingCommandException; - RemotingCommand getMaxOffsetInQueue(String enodeName, - RemotingCommand request) throws InterruptedException, RemotingTimeoutException, + long getMaxOffsetInQueue(String enodeName, String topic, + int queueId, RemotingCommand request) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, RemotingCommandException; - RemotingCommand getMinOffsetInQueue(String enodeName, String topic, - int queueId) throws InterruptedException, RemotingTimeoutException, + long getMinOffsetInQueue(String enodeName, String topic, + int queueId, RemotingCommand request) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, RemotingCommandException; - RemotingCommand getOffsetByTimestamp(String enodeName, - RemotingCommand request) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException; + long getOffsetByTimestamp(String enodeName, + String topic, int queueId, + long timestamp, + RemotingCommand request) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, + RemotingConnectException, RemotingCommandException; } diff --git a/snode/src/main/java/org/apache/rocketmq/snode/service/impl/ClientServiceImpl.java b/snode/src/main/java/org/apache/rocketmq/snode/service/impl/ClientServiceImpl.java index f170ae1116eff91d036a0ceda12721e593943b29..aaed69047eacd2267f6c79e9d0f7a04bbe9670a7 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/service/impl/ClientServiceImpl.java +++ b/snode/src/main/java/org/apache/rocketmq/snode/service/impl/ClientServiceImpl.java @@ -16,6 +16,7 @@ */ package org.apache.rocketmq.snode.service.impl; + import java.util.List; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.protocol.RequestCode; diff --git a/snode/src/main/java/org/apache/rocketmq/snode/service/impl/LocalEnodeServiceImpl.java b/snode/src/main/java/org/apache/rocketmq/snode/service/impl/LocalEnodeServiceImpl.java new file mode 100644 index 0000000000000000000000000000000000000000..57891043ec6454ac1cc9391295bc37341013ebf6 --- /dev/null +++ b/snode/src/main/java/org/apache/rocketmq/snode/service/impl/LocalEnodeServiceImpl.java @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.snode.service.impl; + +import java.util.concurrent.CompletableFuture; +import org.apache.rocketmq.broker.BrokerController; +import org.apache.rocketmq.common.constant.LoggerName; +import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig; +import org.apache.rocketmq.logging.InternalLogger; +import org.apache.rocketmq.logging.InternalLoggerFactory; +import org.apache.rocketmq.remoting.RemotingChannel; +import org.apache.rocketmq.remoting.netty.CodecHelper; +import org.apache.rocketmq.remoting.protocol.RemotingCommand; +import org.apache.rocketmq.snode.service.EnodeService; + +public class LocalEnodeServiceImpl implements EnodeService { + + private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.SNODE_LOGGER_NAME); + + private BrokerController brokerController; + + public LocalEnodeServiceImpl(BrokerController brokerController) { + this.brokerController = brokerController; + } + + @Override public void sendHeartbeat(RemotingCommand remotingCommand) { + return; + } + + @Override + public CompletableFuture sendMessage(final RemotingChannel remotingChannel, String enodeName, + RemotingCommand request) { + CompletableFuture completableFuture = new CompletableFuture<>(); + try { + log.debug("Send message request:{}", request); + RemotingCommand remotingCommand = this.brokerController.getSendProcessor().processRequest(remotingChannel, request); + CodecHelper.encodeHeader(remotingCommand); + completableFuture.complete(remotingCommand); + } catch (Exception ex) { + log.error("[Local]Request local enode send message error", ex); + completableFuture.completeExceptionally(ex); + } + return completableFuture; + } + + @Override + public CompletableFuture pullMessage(RemotingChannel remotingChannel, String enodeName, + RemotingCommand request) { + CompletableFuture completableFuture = new CompletableFuture<>(); + try { + RemotingCommand response = this.brokerController.getSnodePullMessageProcessor().processRequest(remotingChannel, request); + completableFuture.complete(response); + } catch (Exception ex) { + log.error("[Local]Request local enode pull message error", ex); + completableFuture.completeExceptionally(ex); + } + return completableFuture; + } + + @Override public RemotingCommand creatRetryTopic(RemotingChannel remotingChannel, String enodeName, + RemotingCommand request) { + try { + return this.brokerController.getClientManageProcessor().processRequest(remotingChannel, request); + } catch (Exception ex) { + log.error("[Local]Request create retry topic error", ex); + } + return null; + } + + @Override public void updateEnodeAddress( + String clusterName) { + + } + + @Override public boolean persistSubscriptionGroupConfig( + final SubscriptionGroupConfig subscriptionGroupConfig) { + boolean persist = false; + if (subscriptionGroupConfig != null) { + this.brokerController.getSubscriptionGroupManager().updateSubscriptionGroupConfig(subscriptionGroupConfig); + persist = true; + } + return persist; + } + + @Override + public void persistOffset(RemotingChannel remotingChannel, String enodeName, String groupName, String topic, + int queueId, long offset) { + try { +// UpdateConsumerOffsetRequestHeader requestHeader = new UpdateConsumerOffsetRequestHeader(); +// requestHeader.setConsumerGroup(groupName); +// requestHeader.setTopic(topic); +// requestHeader.setQueueId(queueId); +// requestHeader.setCommitOffset(offset); +// RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UPDATE_CONSUMER_OFFSET, requestHeader); +// this.brokerController.getConsumerManageProcessor().processRequest(remotingChannel, request); + + this.brokerController.getConsumerOffsetManager().commitOffset(remotingChannel.remoteAddress().toString(), groupName, + topic, queueId, offset); + } catch (Exception ex) { + log.error("[Local]Persist offset to Enode error group: [{}], topic: [{}] queue: [{}]!", ex, groupName, topic, queueId); + } + } + + @Override + public long queryOffset(String enodeName, String consumerGroup, String topic, int queueId) { + return this.brokerController.getConsumerOffsetManager().queryOffset(consumerGroup, topic, queueId); + } + + @Override + public long getMaxOffsetInQueue(String enodeName, String topic, int queueId, RemotingCommand request) { + return this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId); + } + + @Override + public long getMinOffsetInQueue(String enodeName, String topic, int queueId, RemotingCommand request) { + return this.brokerController.getMessageStore().getMinOffsetInQueue(topic, queueId); + } + + @Override + public long getOffsetByTimestamp(String enodeName, + String topic, int queueId, long timestamp, RemotingCommand request) { + return this.brokerController.getMessageStore().getOffsetInQueueByTime(topic, queueId, timestamp); + } +} diff --git a/snode/src/main/java/org/apache/rocketmq/snode/service/impl/PushServiceImpl.java b/snode/src/main/java/org/apache/rocketmq/snode/service/impl/PushServiceImpl.java index 72f658e0187d3e54f85b4d2d5ec34f691940e645..9f4b016f1346a48630733cafcc800cb981e45fad 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/service/impl/PushServiceImpl.java +++ b/snode/src/main/java/org/apache/rocketmq/snode/service/impl/PushServiceImpl.java @@ -95,7 +95,7 @@ public class PushServiceImpl implements PushService { messageExt.setFlag(sendMessageRequestHeader.getFlag()); messageExt.setBody(message); messageExt.setBodyCRC(UtilAll.crc32(message)); - log.debug("MessageExt:{}", messageExt); + log.info("MessageExt:{}", messageExt); return messageExt; } @@ -103,7 +103,6 @@ public class PushServiceImpl implements PushService { public void run() { if (!canceled.get()) { try { - log.debug("sendMessageResponse: {}", sendMessageResponse); SendMessageResponseHeader sendMessageResponseHeader = (SendMessageResponseHeader) sendMessageResponse.decodeCommandCustomHeader(SendMessageResponseHeader.class); log.debug("sendMessageResponseHeader: {}", sendMessageResponseHeader); MessageQueue messageQueue = new MessageQueue(sendMessageRequestHeader.getTopic(), sendMessageRequestHeader.getEnodeName(), sendMessageRequestHeader.getQueueId()); @@ -118,6 +117,7 @@ public class PushServiceImpl implements PushService { MessageExt messageExt = buildMessageExt(sendMessageResponseHeader, message, sendMessageRequestHeader); pushMessage.setBody(MessageDecoder.encode(messageExt, false)); for (RemotingChannel remotingChannel : consumerTable) { + log.info("Push message pushMessage:{} to remotingChannel: {}", pushMessage, remotingChannel); Client client = null; if (remotingChannel instanceof NettyChannelImpl) { Channel channel = ((NettyChannelImpl) remotingChannel).getChannel(); diff --git a/snode/src/main/java/org/apache/rocketmq/snode/service/impl/EnodeServiceImpl.java b/snode/src/main/java/org/apache/rocketmq/snode/service/impl/RemoteEnodeServiceImpl.java similarity index 77% rename from snode/src/main/java/org/apache/rocketmq/snode/service/impl/EnodeServiceImpl.java rename to snode/src/main/java/org/apache/rocketmq/snode/service/impl/RemoteEnodeServiceImpl.java index 4d1e522e1f273fd2803a403f15a9cfc63906117e..311406fdb422f8ee91a6f399aee824f1f635274b 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/service/impl/EnodeServiceImpl.java +++ b/snode/src/main/java/org/apache/rocketmq/snode/service/impl/RemoteEnodeServiceImpl.java @@ -28,13 +28,17 @@ import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.protocol.RequestCode; import org.apache.rocketmq.common.protocol.ResponseCode; import org.apache.rocketmq.common.protocol.body.ClusterInfo; -import org.apache.rocketmq.common.protocol.header.GetMinOffsetRequestHeader; +import org.apache.rocketmq.common.protocol.header.GetMaxOffsetResponseHeader; +import org.apache.rocketmq.common.protocol.header.GetMinOffsetResponseHeader; import org.apache.rocketmq.common.protocol.header.QueryConsumerOffsetRequestHeader; +import org.apache.rocketmq.common.protocol.header.QueryConsumerOffsetResponseHeader; +import org.apache.rocketmq.common.protocol.header.SearchOffsetResponseHeader; import org.apache.rocketmq.common.protocol.header.UpdateConsumerOffsetRequestHeader; import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig; import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.remoting.InvokeCallback; +import org.apache.rocketmq.remoting.RemotingChannel; import org.apache.rocketmq.remoting.exception.RemotingCommandException; import org.apache.rocketmq.remoting.exception.RemotingConnectException; import org.apache.rocketmq.remoting.exception.RemotingSendRequestException; @@ -46,7 +50,7 @@ import org.apache.rocketmq.snode.SnodeController; import org.apache.rocketmq.snode.constant.SnodeConstant; import org.apache.rocketmq.snode.service.EnodeService; -public class EnodeServiceImpl implements EnodeService { +public class RemoteEnodeServiceImpl implements EnodeService { private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.SNODE_LOGGER_NAME); private SnodeController snodeController; @@ -54,7 +58,7 @@ public class EnodeServiceImpl implements EnodeService { private final ConcurrentMap> enodeTable = new ConcurrentHashMap<>(); - public EnodeServiceImpl(SnodeController snodeController) { + public RemoteEnodeServiceImpl(SnodeController snodeController) { this.snodeController = snodeController; } @@ -73,7 +77,8 @@ public class EnodeServiceImpl implements EnodeService { } @Override - public CompletableFuture pullMessage(final String enodeName, final RemotingCommand request) { + public CompletableFuture pullMessage(final RemotingChannel remotingChannel, final String enodeName, + final RemotingCommand request) { CompletableFuture future = new CompletableFuture<>(); try { @@ -103,7 +108,8 @@ public class EnodeServiceImpl implements EnodeService { } @Override - public CompletableFuture sendMessage(String enodeName, RemotingCommand request) { + public CompletableFuture sendMessage(final RemotingChannel remotingChannel, String enodeName, + RemotingCommand request) { CompletableFuture future = new CompletableFuture<>(); try { String enodeAddress = this.snodeController.getNnodeService().getAddressByEnodeName(enodeName, false); @@ -152,7 +158,8 @@ public class EnodeServiceImpl implements EnodeService { } @Override - public boolean persistSubscriptionGroupConfig(SubscriptionGroupConfig subscriptionGroupConfig) { + public boolean persistSubscriptionGroupConfig( + SubscriptionGroupConfig subscriptionGroupConfig) { RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UPDATE_AND_CREATE_SUBSCRIPTIONGROUP, null); boolean persist = false; for (Map.Entry> entry : enodeTable.entrySet()) { @@ -177,7 +184,8 @@ public class EnodeServiceImpl implements EnodeService { } @Override - public void persistOffset(String enodeName, String groupName, String topic, int queueId, long offset) { + public void persistOffset(final RemotingChannel remotingChannel, String enodeName, String groupName, String topic, + int queueId, long offset) { try { String address = this.snodeController.getNnodeService().getAddressByEnodeName(enodeName, false); UpdateConsumerOffsetRequestHeader requestHeader = new UpdateConsumerOffsetRequestHeader(); @@ -194,22 +202,22 @@ public class EnodeServiceImpl implements EnodeService { } @Override - public RemotingCommand getMinOffsetInQueue(String enodeName, String topic, - int queueId) throws InterruptedException, RemotingTimeoutException, - RemotingSendRequestException, RemotingConnectException { - GetMinOffsetRequestHeader requestHeader = new GetMinOffsetRequestHeader(); - requestHeader.setTopic(topic); - requestHeader.setQueueId(queueId); - RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_MIN_OFFSET, requestHeader); + public long getMinOffsetInQueue(String enodeName, String topic, + int queueId, RemotingCommand request) throws InterruptedException, RemotingTimeoutException, + RemotingSendRequestException, RemotingConnectException, RemotingCommandException { String addr = this.snodeController.getNnodeService().getAddressByEnodeName(enodeName, false); - return this.snodeController.getRemotingClient().invokeSync(MixAll.brokerVIPChannel(snodeController.getSnodeConfig().isVipChannelEnabled(), addr), + RemotingCommand remotingCommand = this.snodeController.getRemotingClient().invokeSync(MixAll.brokerVIPChannel(snodeController.getSnodeConfig().isVipChannelEnabled(), addr), request, SnodeConstant.DEFAULT_TIMEOUT_MILLS); + GetMinOffsetResponseHeader responseHeader = + (GetMinOffsetResponseHeader) remotingCommand.decodeCommandCustomHeader(GetMinOffsetResponseHeader.class); + return responseHeader.getOffset(); + } @Override - public RemotingCommand loadOffset(String enodeName, String consumerGroup, String topic, + public long queryOffset(String enodeName, String consumerGroup, String topic, int queueId) throws InterruptedException, RemotingTimeoutException, - RemotingSendRequestException, RemotingConnectException { + RemotingSendRequestException, RemotingConnectException, RemotingCommandException { QueryConsumerOffsetRequestHeader requestHeader = new QueryConsumerOffsetRequestHeader(); requestHeader.setTopic(topic); requestHeader.setConsumerGroup(consumerGroup); @@ -217,28 +225,39 @@ public class EnodeServiceImpl implements EnodeService { RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.QUERY_CONSUMER_OFFSET, requestHeader); String addr = this.snodeController.getNnodeService().getAddressByEnodeName(enodeName, false); - return this.snodeController.getRemotingClient().invokeSync(MixAll.brokerVIPChannel(this.snodeController.getSnodeConfig().isVipChannelEnabled(), addr), + RemotingCommand remotingCommand = this.snodeController.getRemotingClient().invokeSync(MixAll.brokerVIPChannel(this.snodeController.getSnodeConfig().isVipChannelEnabled(), addr), request, SnodeConstant.DEFAULT_TIMEOUT_MILLS); + QueryConsumerOffsetResponseHeader responseHeader = + (QueryConsumerOffsetResponseHeader) remotingCommand.decodeCommandCustomHeader(QueryConsumerOffsetResponseHeader.class); + return responseHeader.getOffset(); } @Override - public RemotingCommand getMaxOffsetInQueue(String enodeName, + public long getMaxOffsetInQueue(String enodeName, String topic, + int queueId, RemotingCommand request) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, RemotingCommandException { String address = this.snodeController.getNnodeService().getAddressByEnodeName(enodeName, false); - return this.snodeController.getRemotingClient().invokeSync(address, + RemotingCommand remotingCommand = this.snodeController.getRemotingClient().invokeSync(address, request, SnodeConstant.DEFAULT_TIMEOUT_MILLS); + GetMaxOffsetResponseHeader responseHeader = + (GetMaxOffsetResponseHeader) remotingCommand.decodeCommandCustomHeader(GetMaxOffsetResponseHeader.class); + return responseHeader.getOffset(); } @Override - public RemotingCommand getOffsetByTimestamp(String enodeName, - RemotingCommand request) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException { + public long getOffsetByTimestamp(String enodeName, String topic, int queueId, + long timestamp, + RemotingCommand request) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, RemotingCommandException { String address = this.snodeController.getNnodeService().getAddressByEnodeName(enodeName, false); - return this.snodeController.getRemotingClient().invokeSync(address, + RemotingCommand remotingCommand = this.snodeController.getRemotingClient().invokeSync(address, request, SnodeConstant.DEFAULT_TIMEOUT_MILLS); + SearchOffsetResponseHeader responseHeader = + (SearchOffsetResponseHeader) remotingCommand.decodeCommandCustomHeader(SearchOffsetResponseHeader.class); + return responseHeader.getOffset(); } @Override - public RemotingCommand creatRetryTopic(String enodeName, + public RemotingCommand creatRetryTopic(final RemotingChannel remotingChannel, String enodeName, RemotingCommand request) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException { String address = this.snodeController.getNnodeService().getAddressByEnodeName(enodeName, false); return this.snodeController.getRemotingClient().invokeSync(address, diff --git a/snode/src/test/java/org/apache/rocketmq/snode/processor/SendMessageProcessorTest.java b/snode/src/test/java/org/apache/rocketmq/snode/processor/SendMessageProcessorTest.java index e6e56a826b31f34ab8f07b402e5c55c320c435f0..a97f169d04af8a939a149250b48d0b7ade3f53d8 100644 --- a/snode/src/test/java/org/apache/rocketmq/snode/processor/SendMessageProcessorTest.java +++ b/snode/src/test/java/org/apache/rocketmq/snode/processor/SendMessageProcessorTest.java @@ -74,7 +74,7 @@ public class SendMessageProcessorTest { public void testSendMessageV2ProcessRequest() throws RemotingCommandException { CompletableFuture future = new CompletableFuture<>(); RemotingCommand request = createSendMesssageV2Command(); - when(this.snodeController.getEnodeService().sendMessage(anyString(), any(RemotingCommand.class))).thenReturn(future); + when(this.snodeController.getEnodeService().sendMessage(null, anyString(), any(RemotingCommand.class))).thenReturn(future); sendMessageProcessor.processRequest(remotingChannel, request); } @@ -83,7 +83,7 @@ public class SendMessageProcessorTest { snodeController.setEnodeService(enodeService); CompletableFuture future = new CompletableFuture<>(); RemotingCommand request = createSendBatchMesssageCommand(); - when(this.snodeController.getEnodeService().sendMessage(anyString(), any(RemotingCommand.class))).thenReturn(future); + when(this.snodeController.getEnodeService().sendMessage(null, anyString(), any(RemotingCommand.class))).thenReturn(future); sendMessageProcessor.processRequest(remotingChannel, request); } diff --git a/snode/src/test/java/org/apache/rocketmq/snode/service/EnodeServiceImplTest.java b/snode/src/test/java/org/apache/rocketmq/snode/service/RemoteEnodeServiceImplTest.java similarity index 93% rename from snode/src/test/java/org/apache/rocketmq/snode/service/EnodeServiceImplTest.java rename to snode/src/test/java/org/apache/rocketmq/snode/service/RemoteEnodeServiceImplTest.java index c4075e1871423b2a13590b7b089e683d6fbc8d3c..582c16e17fa9110e47b313ed5b149fc88fad101a 100644 --- a/snode/src/test/java/org/apache/rocketmq/snode/service/EnodeServiceImplTest.java +++ b/snode/src/test/java/org/apache/rocketmq/snode/service/RemoteEnodeServiceImplTest.java @@ -30,7 +30,7 @@ import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.transport.rocketmq.NettyRemotingClient; import org.apache.rocketmq.snode.SnodeController; import org.apache.rocketmq.snode.SnodeTestBase; -import org.apache.rocketmq.snode.service.impl.EnodeServiceImpl; +import org.apache.rocketmq.snode.service.impl.RemoteEnodeServiceImpl; import org.apache.rocketmq.store.GetMessageResult; import org.apache.rocketmq.store.GetMessageStatus; import org.junit.Before; @@ -51,7 +51,7 @@ import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.when; @RunWith(MockitoJUnitRunner.class) -public class EnodeServiceImplTest extends SnodeTestBase { +public class RemoteEnodeServiceImplTest extends SnodeTestBase { private EnodeService enodeService; @@ -74,7 +74,7 @@ public class EnodeServiceImplTest extends SnodeTestBase { public void init() { snodeController.setNnodeService(nnodeService); snodeController.setRemotingClient(remotingClient); - enodeService = new EnodeServiceImpl(snodeController); + enodeService = new RemoteEnodeServiceImpl(snodeController); } @Test @@ -93,7 +93,7 @@ public class EnodeServiceImplTest extends SnodeTestBase { return null; } }).when(remotingClient).invokeAsync(anyString(), any(RemotingCommand.class), anyLong(), any(InvokeCallback.class)); - RemotingCommand response = enodeService.sendMessage(enodeName, createSendMesssageCommand(group, topic)).get(3000L, TimeUnit.MILLISECONDS); + RemotingCommand response = enodeService.sendMessage(null, enodeName, createSendMesssageCommand(group, topic)).get(3000L, TimeUnit.MILLISECONDS); assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS); } @@ -118,7 +118,7 @@ public class EnodeServiceImplTest extends SnodeTestBase { return null; } }).when(remotingClient).invokeAsync(anyString(), any(RemotingCommand.class), anyLong(), any(InvokeCallback.class)); - RemotingCommand response = enodeService.pullMessage(enodeName, createPullMessage()).get(3000L, TimeUnit.MILLISECONDS); + RemotingCommand response = enodeService.pullMessage(null, enodeName, createPullMessage()).get(3000L, TimeUnit.MILLISECONDS); assertThat(response).isNotNull(); assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS); }