提交 21d18b35 编写于 作者: D duhenglucky

Merge branch 'snode' of github.com:apache/rocketmq into snode

...@@ -160,6 +160,12 @@ public class LocalFileOffsetStore implements OffsetStore { ...@@ -160,6 +160,12 @@ public class LocalFileOffsetStore implements OffsetStore {
} }
@Override
public void updateConsumeOffsetToSnode(final MessageQueue mq, final long offset, final boolean isOneway)
throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
}
@Override @Override
public void updateConsumeOffsetToBroker(final MessageQueue mq, final long offset, final boolean isOneway) public void updateConsumeOffsetToBroker(final MessageQueue mq, final long offset, final boolean isOneway)
throws RemotingException, MQBrokerException, InterruptedException, MQClientException { throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
......
...@@ -71,4 +71,12 @@ public interface OffsetStore { ...@@ -71,4 +71,12 @@ public interface OffsetStore {
*/ */
void updateConsumeOffsetToBroker(MessageQueue mq, long offset, boolean isOneway) throws RemotingException, void updateConsumeOffsetToBroker(MessageQueue mq, long offset, boolean isOneway) throws RemotingException,
MQBrokerException, InterruptedException, MQClientException; MQBrokerException, InterruptedException, MQClientException;
/**
* @param mq
* @param offset
* @param isOneway
*/
void updateConsumeOffsetToSnode(MessageQueue mq, long offset, boolean isOneway) throws RemotingException,
MQBrokerException, InterruptedException, MQClientException;
} }
...@@ -87,7 +87,7 @@ public class RemoteBrokerOffsetStore implements OffsetStore { ...@@ -87,7 +87,7 @@ public class RemoteBrokerOffsetStore implements OffsetStore {
} }
case READ_FROM_STORE: { case READ_FROM_STORE: {
try { try {
long brokerOffset = this.fetchConsumeOffsetFromBroker(mq); long brokerOffset = this.fetchConsumeOffsetFromSnode(mq);
AtomicLong offset = new AtomicLong(brokerOffset); AtomicLong offset = new AtomicLong(brokerOffset);
this.updateOffset(mq, offset.get(), false); this.updateOffset(mq, offset.get(), false);
return brokerOffset; return brokerOffset;
...@@ -195,14 +195,14 @@ public class RemoteBrokerOffsetStore implements OffsetStore { ...@@ -195,14 +195,14 @@ public class RemoteBrokerOffsetStore implements OffsetStore {
private void updateConsumeOffsetToSnode(MessageQueue mq, long offset) throws RemotingException, private void updateConsumeOffsetToSnode(MessageQueue mq, long offset) throws RemotingException,
MQBrokerException, InterruptedException, MQClientException { MQBrokerException, InterruptedException, MQClientException {
updateConsumeOffsetToBroker(mq, offset, true); updateConsumeOffsetToSnode(mq, offset, true);
} }
/** /**
* Update the Consumer Offset synchronously, once the Master is off, updated to Slave, here need to be optimized. * Update the Consumer Offset synchronously, once the Master is off, updated to Slave, here need to be optimized.
*/ */
@Override @Override
public void updateConsumeOffsetToBroker(MessageQueue mq, long offset, boolean isOneway) throws RemotingException, public void updateConsumeOffsetToSnode(MessageQueue mq, long offset, boolean isOneway) throws RemotingException,
MQBrokerException, InterruptedException, MQClientException { MQBrokerException, InterruptedException, MQClientException {
String snodeAddr = this.mQClientFactory.findSnodeAddressInPublish(); String snodeAddr = this.mQClientFactory.findSnodeAddressInPublish();
...@@ -226,10 +226,65 @@ public class RemoteBrokerOffsetStore implements OffsetStore { ...@@ -226,10 +226,65 @@ public class RemoteBrokerOffsetStore implements OffsetStore {
snodeAddr, requestHeader, 1000 * 5); snodeAddr, requestHeader, 1000 * 5);
} }
} else { } else {
throw new MQClientException("Update offset to Broker[" + mq.getBrokerName() + "] failed, Snode is null.", null); throw new MQClientException("Update offset to Snode[" + mq.getBrokerName() + "] failed, Snode is null.", null);
}
}
/**
* Preserved firstly,Compatible with RocketMQ 4.X Version
*/
@Override
public void updateConsumeOffsetToBroker(MessageQueue mq, long offset,
boolean isOneway) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName());
if (null == findBrokerResult) {
this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName());
}
if (findBrokerResult != null) {
UpdateConsumerOffsetRequestHeader requestHeader = new UpdateConsumerOffsetRequestHeader();
requestHeader.setTopic(mq.getTopic());
requestHeader.setConsumerGroup(this.groupName);
requestHeader.setQueueId(mq.getQueueId());
requestHeader.setCommitOffset(offset);
requestHeader.setEnodeName(mq.getBrokerName());
if (isOneway) {
this.mQClientFactory.getMQClientAPIImpl().updateConsumerOffsetOneway(
findBrokerResult.getBrokerAddr(), requestHeader, 1000 * 5);
} else {
this.mQClientFactory.getMQClientAPIImpl().updateConsumerOffset(
findBrokerResult.getBrokerAddr(), requestHeader, 1000 * 5);
}
} else {
throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
} }
} }
private long fetchConsumeOffsetFromSnode(MessageQueue mq) throws RemotingException, MQBrokerException,
InterruptedException, MQClientException {
String snodeAddr = this.mQClientFactory.findSnodeAddressInPublish();
if (null == snodeAddr) {
this.mQClientFactory.updateSnodeInfoFromNameServer();
snodeAddr = this.mQClientFactory.findSnodeAddressInPublish();
}
if (snodeAddr != null) {
QueryConsumerOffsetRequestHeader requestHeader = new QueryConsumerOffsetRequestHeader();
requestHeader.setTopic(mq.getTopic());
requestHeader.setConsumerGroup(this.groupName);
requestHeader.setQueueId(mq.getQueueId());
requestHeader.setEnodeName(mq.getBrokerName());
return this.mQClientFactory.getMQClientAPIImpl().queryConsumerOffset(
snodeAddr, requestHeader, 1000 * 5);
} else {
throw new MQClientException("Get Offset from Snode[" + mq.getBrokerName() + "] failed, Snode is not exist", null);
}
}
/**
* Preserved firstly,Compatible with RocketMQ 4.X Version
*/
private long fetchConsumeOffsetFromBroker(MessageQueue mq) throws RemotingException, MQBrokerException, private long fetchConsumeOffsetFromBroker(MessageQueue mq) throws RemotingException, MQBrokerException,
InterruptedException, MQClientException { InterruptedException, MQClientException {
String snodeAddr = this.mQClientFactory.findSnodeAddressInPublish(); String snodeAddr = this.mQClientFactory.findSnodeAddressInPublish();
......
...@@ -1194,7 +1194,7 @@ public class MQClientAPIImpl { ...@@ -1194,7 +1194,7 @@ public class MQClientAPIImpl {
public SnodeClusterInfo getSnodeClusterInfo( public SnodeClusterInfo getSnodeClusterInfo(
//Todo Redifine snode exception //Todo Redifine snode exception
final long timeoutMillis) throws InterruptedException, RemotingTimeoutException, final long timeoutMillis) throws InterruptedException, RemotingTimeoutException,
RemotingSendRequestException, RemotingConnectException , MQBrokerException{ RemotingSendRequestException, RemotingConnectException , MQBrokerException {
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_SNODE_CLUSTER_INFO, null); RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_SNODE_CLUSTER_INFO, null);
RemotingCommand response = this.remotingClient.invokeSync(null, request, timeoutMillis); RemotingCommand response = this.remotingClient.invokeSync(null, request, timeoutMillis);
......
...@@ -480,9 +480,9 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner { ...@@ -480,9 +480,9 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner {
sendMessageBack(msg, delayLevel, brokerName, this.defaultMQPullConsumer.getConsumerGroup()); sendMessageBack(msg, delayLevel, brokerName, this.defaultMQPullConsumer.getConsumerGroup());
} }
public void updateConsumeOffsetToBroker(MessageQueue mq, long offset, boolean isOneway) throws RemotingException, public void updateConsumeOffsetToSnode(MessageQueue mq, long offset, boolean isOneway) throws RemotingException,
MQBrokerException, InterruptedException, MQClientException { MQBrokerException, InterruptedException, MQClientException {
this.offsetStore.updateConsumeOffsetToBroker(mq, offset, isOneway); this.offsetStore.updateConsumeOffsetToSnode(mq, offset, isOneway);
} }
public void sendMessageBack(MessageExt msg, int delayLevel, final String brokerName, String consumerGroup) public void sendMessageBack(MessageExt msg, int delayLevel, final String brokerName, String consumerGroup)
......
...@@ -77,6 +77,8 @@ public class DefaultMQPullConsumerTest { ...@@ -77,6 +77,8 @@ public class DefaultMQPullConsumerTest {
field.set(mQClientFactory, mQClientAPIImpl); field.set(mQClientFactory, mQClientAPIImpl);
when(mQClientFactory.findBrokerAddressInSubscribe(anyString(), anyLong(), anyBoolean())).thenReturn(new FindBrokerResult("127.0.0.1:10911", false)); when(mQClientFactory.findBrokerAddressInSubscribe(anyString(), anyLong(), anyBoolean())).thenReturn(new FindBrokerResult("127.0.0.1:10911", false));
when(mQClientFactory.findSnodeAddressInPublish()).thenReturn("127.0.0.1:10911");
} }
@After @After
......
...@@ -147,6 +147,8 @@ public class DefaultMQPushConsumerTest { ...@@ -147,6 +147,8 @@ public class DefaultMQPushConsumerTest {
}); });
doReturn(new FindBrokerResult("127.0.0.1:10911", false)).when(mQClientFactory).findBrokerAddressInSubscribe(anyString(), anyLong(), anyBoolean()); doReturn(new FindBrokerResult("127.0.0.1:10911", false)).when(mQClientFactory).findBrokerAddressInSubscribe(anyString(), anyLong(), anyBoolean());
doReturn("127.0.0.1:10911").when(mQClientFactory).findSnodeAddressInPublish();
Set<MessageQueue> messageQueueSet = new HashSet<MessageQueue>(); Set<MessageQueue> messageQueueSet = new HashSet<MessageQueue>();
messageQueueSet.add(createPullRequest().getMessageQueue()); messageQueueSet.add(createPullRequest().getMessageQueue());
pushConsumer.getDefaultMQPushConsumerImpl().updateTopicSubscribeInfo(topic, messageQueueSet); pushConsumer.getDefaultMQPushConsumerImpl().updateTopicSubscribeInfo(topic, messageQueueSet);
......
...@@ -20,7 +20,6 @@ import java.util.Collections; ...@@ -20,7 +20,6 @@ import java.util.Collections;
import java.util.HashSet; import java.util.HashSet;
import org.apache.rocketmq.client.ClientConfig; import org.apache.rocketmq.client.ClientConfig;
import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.impl.FindBrokerResult;
import org.apache.rocketmq.client.impl.MQClientAPIImpl; import org.apache.rocketmq.client.impl.MQClientAPIImpl;
import org.apache.rocketmq.client.impl.factory.MQClientInstance; import org.apache.rocketmq.client.impl.factory.MQClientInstance;
import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.message.MessageQueue;
...@@ -34,7 +33,6 @@ import org.mockito.Mock; ...@@ -34,7 +33,6 @@ import org.mockito.Mock;
import org.mockito.invocation.InvocationOnMock; import org.mockito.invocation.InvocationOnMock;
import org.mockito.junit.MockitoJUnitRunner; import org.mockito.junit.MockitoJUnitRunner;
import org.mockito.stubbing.Answer; import org.mockito.stubbing.Answer;
import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.anyLong;
...@@ -58,8 +56,8 @@ public class RemoteBrokerOffsetStoreTest { ...@@ -58,8 +56,8 @@ public class RemoteBrokerOffsetStoreTest {
System.setProperty("rocketmq.client.localOffsetStoreDir", System.getProperty("java.io.tmpdir") + ".rocketmq_offsets"); System.setProperty("rocketmq.client.localOffsetStoreDir", System.getProperty("java.io.tmpdir") + ".rocketmq_offsets");
String clientId = new ClientConfig().buildMQClientId() + "#TestNamespace" + System.currentTimeMillis(); String clientId = new ClientConfig().buildMQClientId() + "#TestNamespace" + System.currentTimeMillis();
when(mQClientFactory.getClientId()).thenReturn(clientId); when(mQClientFactory.getClientId()).thenReturn(clientId);
when(mQClientFactory.findBrokerAddressInAdmin(brokerName)).thenReturn(new FindBrokerResult("127.0.0.1", false));
when(mQClientFactory.getMQClientAPIImpl()).thenReturn(mqClientAPI); when(mQClientFactory.getMQClientAPIImpl()).thenReturn(mqClientAPI);
when(mQClientFactory.findSnodeAddressInPublish()).thenReturn("127.0.0.1:10911");
} }
@Test @Test
......
...@@ -106,6 +106,8 @@ public class DefaultMQProducerTest { ...@@ -106,6 +106,8 @@ public class DefaultMQProducerTest {
when(mQClientAPIImpl.sendMessage(anyString(), anyString(), any(Message.class), any(SendMessageRequestHeader.class), anyLong(), any(CommunicationMode.class), when(mQClientAPIImpl.sendMessage(anyString(), anyString(), any(Message.class), any(SendMessageRequestHeader.class), anyLong(), any(CommunicationMode.class),
nullable(SendCallback.class), nullable(TopicPublishInfo.class), nullable(MQClientInstance.class), anyInt(), nullable(SendMessageContext.class), any(DefaultMQProducerImpl.class))) nullable(SendCallback.class), nullable(TopicPublishInfo.class), nullable(MQClientInstance.class), anyInt(), nullable(SendMessageContext.class), any(DefaultMQProducerImpl.class)))
.thenReturn(createSendResult(SendStatus.SEND_OK)); .thenReturn(createSendResult(SendStatus.SEND_OK));
when(mQClientFactory.findSnodeAddressInPublish()).thenReturn("127.0.0.1:10911");
} }
@After @After
......
...@@ -14,21 +14,21 @@ ...@@ -14,21 +14,21 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.rocketmq.snode.config; package org.apache.rocketmq.common;
import java.net.InetAddress; import java.net.InetAddress;
import java.net.UnknownHostException; import java.net.UnknownHostException;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.annotation.ImportantField; import org.apache.rocketmq.common.annotation.ImportantField;
import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.common.RemotingUtil; import org.apache.rocketmq.remoting.common.RemotingUtil;
import static org.apache.rocketmq.client.ClientConfig.SEND_MESSAGE_WITH_VIP_CHANNEL_PROPERTY;
public class SnodeConfig { public class SnodeConfig {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME); private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.SNODE_LOGGER_NAME);
public static final String SEND_MESSAGE_WITH_VIP_CHANNEL_PROPERTY = "com.rocketmq.sendMessageWithVIPChannel";
private String rocketmqHome = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY, System.getenv(MixAll.ROCKETMQ_HOME_ENV)); private String rocketmqHome = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY, System.getenv(MixAll.ROCKETMQ_HOME_ENV));
......
...@@ -24,6 +24,7 @@ import java.util.concurrent.Executors; ...@@ -24,6 +24,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.acl.AccessValidator; import org.apache.rocketmq.acl.AccessValidator;
import org.apache.rocketmq.common.SnodeConfig;
import org.apache.rocketmq.common.ThreadFactoryImpl; import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.protocol.RequestCode; import org.apache.rocketmq.common.protocol.RequestCode;
...@@ -55,7 +56,6 @@ import org.apache.rocketmq.snode.client.impl.IOTClientManagerImpl; ...@@ -55,7 +56,6 @@ import org.apache.rocketmq.snode.client.impl.IOTClientManagerImpl;
import org.apache.rocketmq.snode.client.impl.ProducerManagerImpl; import org.apache.rocketmq.snode.client.impl.ProducerManagerImpl;
import org.apache.rocketmq.snode.client.impl.SlowConsumerServiceImpl; import org.apache.rocketmq.snode.client.impl.SlowConsumerServiceImpl;
import org.apache.rocketmq.snode.client.impl.SubscriptionManagerImpl; import org.apache.rocketmq.snode.client.impl.SubscriptionManagerImpl;
import org.apache.rocketmq.snode.config.SnodeConfig;
import org.apache.rocketmq.snode.offset.ConsumerOffsetManager; import org.apache.rocketmq.snode.offset.ConsumerOffsetManager;
import org.apache.rocketmq.snode.processor.ConsumerManageProcessor; import org.apache.rocketmq.snode.processor.ConsumerManageProcessor;
import org.apache.rocketmq.snode.processor.DefaultMqttMessageProcessor; import org.apache.rocketmq.snode.processor.DefaultMqttMessageProcessor;
......
...@@ -26,11 +26,13 @@ import java.io.FileInputStream; ...@@ -26,11 +26,13 @@ import java.io.FileInputStream;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.util.Properties; import java.util.Properties;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option; import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options; import org.apache.commons.cli.Options;
import org.apache.commons.cli.PosixParser; import org.apache.commons.cli.PosixParser;
import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.SnodeConfig;
import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.logging.InternalLoggerFactory;
...@@ -39,7 +41,6 @@ import org.apache.rocketmq.remoting.ServerConfig; ...@@ -39,7 +41,6 @@ import org.apache.rocketmq.remoting.ServerConfig;
import org.apache.rocketmq.remoting.common.TlsMode; import org.apache.rocketmq.remoting.common.TlsMode;
import org.apache.rocketmq.remoting.netty.TlsSystemConfig; import org.apache.rocketmq.remoting.netty.TlsSystemConfig;
import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.snode.config.SnodeConfig;
import org.apache.rocketmq.srvutil.ServerUtil; import org.apache.rocketmq.srvutil.ServerUtil;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
...@@ -123,19 +124,31 @@ public class SnodeStartup { ...@@ -123,19 +124,31 @@ public class SnodeStartup {
nettyClientConfig, nettyClientConfig,
snodeConfig); snodeConfig);
boolean initResult = snodeController.initialize();
if (!initResult) {
snodeController.shutdown();
System.exit(-3);
}
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
private volatile boolean hasShutdown = false; private volatile boolean hasShutdown = false;
private AtomicInteger shutdownTimes = new AtomicInteger(0);
@Override @Override
public void run() { public void run() {
synchronized (this) { synchronized (this) {
log.info("Shutdown hook was invoked, {}", this.shutdownTimes.incrementAndGet());
if (!this.hasShutdown) { if (!this.hasShutdown) {
this.hasShutdown = true; this.hasShutdown = true;
long beginTime = System.currentTimeMillis();
snodeController.shutdown(); snodeController.shutdown();
long consumingTimeTotal = System.currentTimeMillis() - beginTime;
log.info("Shutdown hook over, consuming total time(ms): {}", consumingTimeTotal);
} }
} }
} }
})); },"ShutdownHook"));
return snodeController; return snodeController;
} }
...@@ -143,6 +156,15 @@ public class SnodeStartup { ...@@ -143,6 +156,15 @@ public class SnodeStartup {
Option opt = new Option("c", "configFile", true, "SNode config properties file"); Option opt = new Option("c", "configFile", true, "SNode config properties file");
opt.setRequired(false); opt.setRequired(false);
options.addOption(opt); options.addOption(opt);
opt = new Option("p", "printConfigItem", false, "Print all config item");
opt.setRequired(false);
options.addOption(opt);
opt = new Option("m", "printImportantConfig", false, "Print important config item");
opt.setRequired(false);
options.addOption(opt);
return options; return options;
} }
} }
......
...@@ -28,7 +28,7 @@ import org.apache.rocketmq.remoting.netty.NettyChannelImpl; ...@@ -28,7 +28,7 @@ import org.apache.rocketmq.remoting.netty.NettyChannelImpl;
import org.apache.rocketmq.snode.constant.SnodeConstant; import org.apache.rocketmq.snode.constant.SnodeConstant;
public class ClientHousekeepingService implements ChannelEventListener { public class ClientHousekeepingService implements ChannelEventListener {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.SNODE_LOGGER_NAME);
private final ClientManager producerManager; private final ClientManager producerManager;
private final ClientManager consumerManager; private final ClientManager consumerManager;
private final ClientManager iotClientManager; private final ClientManager iotClientManager;
......
...@@ -43,7 +43,7 @@ import org.apache.rocketmq.remoting.protocol.RemotingCommand; ...@@ -43,7 +43,7 @@ import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.snode.SnodeController; import org.apache.rocketmq.snode.SnodeController;
public class ConsumerManageProcessor implements RequestProcessor { public class ConsumerManageProcessor implements RequestProcessor {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.SNODE_LOGGER_NAME);
private final SnodeController snodeController; private final SnodeController snodeController;
......
...@@ -28,6 +28,9 @@ import java.io.UnsupportedEncodingException; ...@@ -28,6 +28,9 @@ import java.io.UnsupportedEncodingException;
import java.nio.charset.Charset; import java.nio.charset.Charset;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.RemotingChannel; import org.apache.rocketmq.remoting.RemotingChannel;
import org.apache.rocketmq.remoting.RequestProcessor; import org.apache.rocketmq.remoting.RequestProcessor;
import org.apache.rocketmq.remoting.common.RemotingUtil; import org.apache.rocketmq.remoting.common.RemotingUtil;
...@@ -38,6 +41,7 @@ import org.apache.rocketmq.remoting.transport.mqtt.RocketMQMqttConnectPayload; ...@@ -38,6 +41,7 @@ import org.apache.rocketmq.remoting.transport.mqtt.RocketMQMqttConnectPayload;
import org.apache.rocketmq.snode.processor.mqtthandler.MessageHandler; import org.apache.rocketmq.snode.processor.mqtthandler.MessageHandler;
public class DefaultMqttMessageProcessor implements RequestProcessor { public class DefaultMqttMessageProcessor implements RequestProcessor {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.SNODE_LOGGER_NAME);
private Map<MqttMessageType, MessageHandler> type2handler = new HashMap<>(); private Map<MqttMessageType, MessageHandler> type2handler = new HashMap<>();
private static final int MIN_AVAILABLE_VERSION = 3; private static final int MIN_AVAILABLE_VERSION = 3;
......
...@@ -18,12 +18,12 @@ package org.apache.rocketmq.snode.service; ...@@ -18,12 +18,12 @@ package org.apache.rocketmq.snode.service;
import java.util.Set; import java.util.Set;
import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.SnodeConfig;
import org.apache.rocketmq.common.protocol.body.ClusterInfo; import org.apache.rocketmq.common.protocol.body.ClusterInfo;
import org.apache.rocketmq.common.protocol.route.TopicRouteData; import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.remoting.exception.RemotingConnectException; import org.apache.rocketmq.remoting.exception.RemotingConnectException;
import org.apache.rocketmq.remoting.exception.RemotingSendRequestException; import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.snode.config.SnodeConfig;
public interface NnodeService { public interface NnodeService {
/** /**
......
...@@ -22,6 +22,7 @@ import java.util.Set; ...@@ -22,6 +22,7 @@ import java.util.Set;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.SnodeConfig;
import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.namesrv.TopAddressing; import org.apache.rocketmq.common.namesrv.TopAddressing;
import org.apache.rocketmq.common.protocol.RequestCode; import org.apache.rocketmq.common.protocol.RequestCode;
...@@ -37,7 +38,6 @@ import org.apache.rocketmq.remoting.exception.RemotingSendRequestException; ...@@ -37,7 +38,6 @@ import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.snode.SnodeController; import org.apache.rocketmq.snode.SnodeController;
import org.apache.rocketmq.snode.config.SnodeConfig;
import org.apache.rocketmq.snode.constant.SnodeConstant; import org.apache.rocketmq.snode.constant.SnodeConstant;
import org.apache.rocketmq.snode.service.NnodeService; import org.apache.rocketmq.snode.service.NnodeService;
......
...@@ -20,6 +20,7 @@ import java.util.concurrent.Executors; ...@@ -20,6 +20,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.common.SnodeConfig;
import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.protocol.RequestCode; import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.common.protocol.heartbeat.HeartbeatData; import org.apache.rocketmq.common.protocol.heartbeat.HeartbeatData;
...@@ -28,7 +29,6 @@ import org.apache.rocketmq.logging.InternalLogger; ...@@ -28,7 +29,6 @@ import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.snode.SnodeController; import org.apache.rocketmq.snode.SnodeController;
import org.apache.rocketmq.snode.config.SnodeConfig;
import org.apache.rocketmq.snode.service.ScheduledService; import org.apache.rocketmq.snode.service.ScheduledService;
public class ScheduledServiceImpl implements ScheduledService { public class ScheduledServiceImpl implements ScheduledService {
......
...@@ -16,9 +16,9 @@ ...@@ -16,9 +16,9 @@
*/ */
package org.apache.rocketmq.snode; package org.apache.rocketmq.snode;
import org.apache.rocketmq.common.SnodeConfig;
import org.apache.rocketmq.remoting.ClientConfig; import org.apache.rocketmq.remoting.ClientConfig;
import org.apache.rocketmq.remoting.ServerConfig; import org.apache.rocketmq.remoting.ServerConfig;
import org.apache.rocketmq.snode.config.SnodeConfig;
import org.junit.Test; import org.junit.Test;
import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThat;
......
...@@ -22,6 +22,7 @@ import io.netty.handler.codec.mqtt.MqttMessageType; ...@@ -22,6 +22,7 @@ import io.netty.handler.codec.mqtt.MqttMessageType;
import io.netty.handler.codec.mqtt.MqttQoS; import io.netty.handler.codec.mqtt.MqttQoS;
import java.io.UnsupportedEncodingException; import java.io.UnsupportedEncodingException;
import java.nio.charset.Charset; import java.nio.charset.Charset;
import org.apache.rocketmq.common.SnodeConfig;
import org.apache.rocketmq.common.protocol.RequestCode; import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.remoting.ClientConfig; import org.apache.rocketmq.remoting.ClientConfig;
import org.apache.rocketmq.remoting.RemotingChannel; import org.apache.rocketmq.remoting.RemotingChannel;
...@@ -33,7 +34,6 @@ import org.apache.rocketmq.remoting.protocol.RemotingCommand; ...@@ -33,7 +34,6 @@ import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.transport.mqtt.MqttHeader; import org.apache.rocketmq.remoting.transport.mqtt.MqttHeader;
import org.apache.rocketmq.remoting.transport.mqtt.RocketMQMqttConnectPayload; import org.apache.rocketmq.remoting.transport.mqtt.RocketMQMqttConnectPayload;
import org.apache.rocketmq.snode.SnodeController; import org.apache.rocketmq.snode.SnodeController;
import org.apache.rocketmq.snode.config.SnodeConfig;
import org.apache.rocketmq.snode.processor.mqtthandler.MqttConnectMessageHandler; import org.apache.rocketmq.snode.processor.mqtthandler.MqttConnectMessageHandler;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
......
...@@ -22,11 +22,11 @@ import io.netty.handler.codec.mqtt.MqttConnectVariableHeader; ...@@ -22,11 +22,11 @@ import io.netty.handler.codec.mqtt.MqttConnectVariableHeader;
import io.netty.handler.codec.mqtt.MqttFixedHeader; import io.netty.handler.codec.mqtt.MqttFixedHeader;
import io.netty.handler.codec.mqtt.MqttMessageType; import io.netty.handler.codec.mqtt.MqttMessageType;
import io.netty.handler.codec.mqtt.MqttQoS; import io.netty.handler.codec.mqtt.MqttQoS;
import org.apache.rocketmq.common.SnodeConfig;
import org.apache.rocketmq.remoting.ClientConfig; import org.apache.rocketmq.remoting.ClientConfig;
import org.apache.rocketmq.remoting.RemotingChannel; import org.apache.rocketmq.remoting.RemotingChannel;
import org.apache.rocketmq.remoting.ServerConfig; import org.apache.rocketmq.remoting.ServerConfig;
import org.apache.rocketmq.snode.SnodeController; import org.apache.rocketmq.snode.SnodeController;
import org.apache.rocketmq.snode.config.SnodeConfig;
import org.apache.rocketmq.snode.processor.mqtthandler.MqttConnectMessageHandler; import org.apache.rocketmq.snode.processor.mqtthandler.MqttConnectMessageHandler;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
......
...@@ -18,6 +18,7 @@ package org.apache.rocketmq.snode.processor; ...@@ -18,6 +18,7 @@ package org.apache.rocketmq.snode.processor;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.SnodeConfig;
import org.apache.rocketmq.common.protocol.RequestCode; import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.common.protocol.ResponseCode; import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeaderV2; import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeaderV2;
...@@ -28,7 +29,6 @@ import org.apache.rocketmq.remoting.exception.RemotingCommandException; ...@@ -28,7 +29,6 @@ import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.netty.CodecHelper; import org.apache.rocketmq.remoting.netty.CodecHelper;
import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.snode.SnodeController; import org.apache.rocketmq.snode.SnodeController;
import org.apache.rocketmq.snode.config.SnodeConfig;
import org.apache.rocketmq.snode.service.EnodeService; import org.apache.rocketmq.snode.service.EnodeService;
import org.apache.rocketmq.snode.service.NnodeService; import org.apache.rocketmq.snode.service.NnodeService;
import org.junit.Before; import org.junit.Before;
......
...@@ -18,6 +18,7 @@ package org.apache.rocketmq.snode.service; ...@@ -18,6 +18,7 @@ package org.apache.rocketmq.snode.service;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.common.SnodeConfig;
import org.apache.rocketmq.common.TopicConfig; import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.protocol.RequestCode; import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.common.protocol.ResponseCode; import org.apache.rocketmq.common.protocol.ResponseCode;
...@@ -30,7 +31,6 @@ import org.apache.rocketmq.remoting.protocol.RemotingCommand; ...@@ -30,7 +31,6 @@ import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.transport.rocketmq.NettyRemotingClient; import org.apache.rocketmq.remoting.transport.rocketmq.NettyRemotingClient;
import org.apache.rocketmq.snode.SnodeController; import org.apache.rocketmq.snode.SnodeController;
import org.apache.rocketmq.snode.SnodeTestBase; import org.apache.rocketmq.snode.SnodeTestBase;
import org.apache.rocketmq.snode.config.SnodeConfig;
import org.apache.rocketmq.snode.service.impl.EnodeServiceImpl; import org.apache.rocketmq.snode.service.impl.EnodeServiceImpl;
import org.apache.rocketmq.store.GetMessageResult; import org.apache.rocketmq.store.GetMessageResult;
import org.apache.rocketmq.store.GetMessageStatus; import org.apache.rocketmq.store.GetMessageStatus;
......
...@@ -18,6 +18,7 @@ package org.apache.rocketmq.snode.service; ...@@ -18,6 +18,7 @@ package org.apache.rocketmq.snode.service;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import org.apache.rocketmq.common.SnodeConfig;
import org.apache.rocketmq.remoting.ClientConfig; import org.apache.rocketmq.remoting.ClientConfig;
import org.apache.rocketmq.remoting.ServerConfig; import org.apache.rocketmq.remoting.ServerConfig;
import org.apache.rocketmq.remoting.exception.RemotingConnectException; import org.apache.rocketmq.remoting.exception.RemotingConnectException;
...@@ -27,7 +28,6 @@ import org.apache.rocketmq.remoting.protocol.RemotingCommand; ...@@ -27,7 +28,6 @@ import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.transport.rocketmq.NettyRemotingClient; import org.apache.rocketmq.remoting.transport.rocketmq.NettyRemotingClient;
import org.apache.rocketmq.snode.SnodeController; import org.apache.rocketmq.snode.SnodeController;
import org.apache.rocketmq.snode.SnodeTestBase; import org.apache.rocketmq.snode.SnodeTestBase;
import org.apache.rocketmq.snode.config.SnodeConfig;
import org.apache.rocketmq.snode.service.impl.NnodeServiceImpl; import org.apache.rocketmq.snode.service.impl.NnodeServiceImpl;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
......
...@@ -16,12 +16,12 @@ ...@@ -16,12 +16,12 @@
*/ */
package org.apache.rocketmq.snode.service; package org.apache.rocketmq.snode.service;
import org.apache.rocketmq.common.SnodeConfig;
import org.apache.rocketmq.remoting.ClientConfig; import org.apache.rocketmq.remoting.ClientConfig;
import org.apache.rocketmq.remoting.ServerConfig; import org.apache.rocketmq.remoting.ServerConfig;
import org.apache.rocketmq.snode.SnodeController; import org.apache.rocketmq.snode.SnodeController;
import org.apache.rocketmq.snode.client.SlowConsumerService; import org.apache.rocketmq.snode.client.SlowConsumerService;
import org.apache.rocketmq.snode.client.impl.SlowConsumerServiceImpl; import org.apache.rocketmq.snode.client.impl.SlowConsumerServiceImpl;
import org.apache.rocketmq.snode.config.SnodeConfig;
import org.apache.rocketmq.snode.offset.ConsumerOffsetManager; import org.apache.rocketmq.snode.offset.ConsumerOffsetManager;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册