提交 2f46644e 编写于 作者: C chengxiangwang

reformat code to rmq_codestyle

上级 5f72d0d3
......@@ -23,7 +23,7 @@ public class WillMessage {
private byte[] body;
private boolean isRetain;
private boolean isRetain;
private int qos;
......@@ -58,6 +58,7 @@ public class WillMessage {
public void setQos(int qos) {
this.qos = qos;
}
public String getString() {
return new String(body);
}
......
......@@ -29,7 +29,7 @@ public class MqttSubscriptionData extends SubscriptionData {
}
public MqttSubscriptionData(int qos, String clientId, String topicFilter) {
super(topicFilter,null);
super(topicFilter, null);
this.qos = qos;
this.clientId = clientId;
}
......
......@@ -67,7 +67,7 @@ import org.apache.rocketmq.remoting.util.ThreadUtils;
public class MqttRemotingServer extends NettyRemotingServerAbstract implements RemotingServer {
private static final InternalLogger log = InternalLoggerFactory
.getLogger(RemotingHelper.ROCKETMQ_REMOTING);
.getLogger(RemotingHelper.ROCKETMQ_REMOTING);
private ServerBootstrap serverBootstrap;
private EventLoopGroup eventLoopGroupSelector;
private EventLoopGroup eventLoopGroupBoss;
......@@ -94,16 +94,16 @@ public class MqttRemotingServer extends NettyRemotingServerAbstract implements R
}
public MqttRemotingServer(final ServerConfig nettyServerConfig,
final ChannelEventListener channelEventListener) {
final ChannelEventListener channelEventListener) {
init(nettyServerConfig, channelEventListener);
}
@Override
public RemotingServer init(ServerConfig serverConfig,
ChannelEventListener channelEventListener) {
ChannelEventListener channelEventListener) {
this.nettyServerConfig = serverConfig;
super.init(nettyServerConfig.getServerOnewaySemaphoreValue(),
nettyServerConfig.getServerAsyncSemaphoreValue());
nettyServerConfig.getServerAsyncSemaphoreValue());
this.serverBootstrap = new ServerBootstrap();
this.channelEventListener = channelEventListener;
......@@ -112,33 +112,33 @@ public class MqttRemotingServer extends NettyRemotingServerAbstract implements R
publicThreadNums = 4;
}
this.publicExecutor = ThreadUtils.newFixedThreadPool(
publicThreadNums,
10000, "MqttRemoting-PublicExecutor", true);
publicThreadNums,
10000, "MqttRemoting-PublicExecutor", true);
if (JvmUtils.isUseEpoll() && this.nettyServerConfig.isUseEpollNativeSelector()) {
this.eventLoopGroupSelector = new EpollEventLoopGroup(
serverConfig.getServerSelectorThreads(),
ThreadUtils.newGenericThreadFactory("MqttNettyEpollIoThreads",
serverConfig.getServerSelectorThreads()));
serverConfig.getServerSelectorThreads(),
ThreadUtils.newGenericThreadFactory("MqttNettyEpollIoThreads",
serverConfig.getServerSelectorThreads()));
this.eventLoopGroupBoss = new EpollEventLoopGroup(
serverConfig.getServerAcceptorThreads(),
ThreadUtils.newGenericThreadFactory("MqttNettyBossThreads",
serverConfig.getServerAcceptorThreads()));
serverConfig.getServerAcceptorThreads(),
ThreadUtils.newGenericThreadFactory("MqttNettyBossThreads",
serverConfig.getServerAcceptorThreads()));
this.socketChannelClass = EpollServerSocketChannel.class;
} else {
this.eventLoopGroupBoss = new NioEventLoopGroup(serverConfig.getServerAcceptorThreads(),
ThreadUtils.newGenericThreadFactory("MqttNettyBossThreads",
serverConfig.getServerAcceptorThreads()));
ThreadUtils.newGenericThreadFactory("MqttNettyBossThreads",
serverConfig.getServerAcceptorThreads()));
this.eventLoopGroupSelector = new NioEventLoopGroup(
serverConfig.getServerSelectorThreads(),
ThreadUtils.newGenericThreadFactory("MqttNettyNioIoThreads",
serverConfig.getServerSelectorThreads()));
serverConfig.getServerSelectorThreads(),
ThreadUtils.newGenericThreadFactory("MqttNettyNioIoThreads",
serverConfig.getServerSelectorThreads()));
this.socketChannelClass = NioServerSocketChannel.class;
}
this.port = nettyServerConfig.getMqttListenPort();
this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
serverConfig.getServerWorkerThreads(),
ThreadUtils.newGenericThreadFactory("MqttNettyWorkerThreads",
serverConfig.getServerWorkerThreads()));
serverConfig.getServerWorkerThreads(),
ThreadUtils.newGenericThreadFactory("MqttNettyWorkerThreads",
serverConfig.getServerWorkerThreads()));
loadSslContext();
return this;
}
......@@ -162,40 +162,40 @@ public class MqttRemotingServer extends NettyRemotingServerAbstract implements R
public void start() {
super.start();
ServerBootstrap childHandler =
this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)
.channel(socketChannelClass)
.option(ChannelOption.SO_BACKLOG, 1024)
.option(ChannelOption.SO_REUSEADDR, true)
.option(ChannelOption.SO_KEEPALIVE, false)
.childOption(ChannelOption.TCP_NODELAY, true)
.childOption(ChannelOption.SO_SNDBUF,
nettyServerConfig.getServerSocketSndBufSize())
.childOption(ChannelOption.SO_RCVBUF,
nettyServerConfig.getServerSocketRcvBufSize())
.localAddress(new InetSocketAddress(this.port))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline()
.addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME,
new HandshakeHandler(TlsSystemConfig.tlsMode))
.addLast(defaultEventExecutorGroup,
new MqttDecoder(),
MqttEncoder.INSTANCE,
new MqttMessage2RemotingCommandHandler(),
new RemotingCommand2MqttMessageHandler(),
new IdleStateHandler(nettyServerConfig
.getConnectionChannelReaderIdleSeconds(),
nettyServerConfig
.getConnectionChannelWriterIdleSeconds(),
nettyServerConfig
.getServerChannelMaxIdleTimeSeconds()),
new NettyConnectManageHandler(),
new NettyServerHandler()
);
}
});
this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)
.channel(socketChannelClass)
.option(ChannelOption.SO_BACKLOG, 1024)
.option(ChannelOption.SO_REUSEADDR, true)
.option(ChannelOption.SO_KEEPALIVE, false)
.childOption(ChannelOption.TCP_NODELAY, true)
.childOption(ChannelOption.SO_SNDBUF,
nettyServerConfig.getServerSocketSndBufSize())
.childOption(ChannelOption.SO_RCVBUF,
nettyServerConfig.getServerSocketRcvBufSize())
.localAddress(new InetSocketAddress(this.port))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline()
.addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME,
new HandshakeHandler(TlsSystemConfig.tlsMode))
.addLast(defaultEventExecutorGroup,
new MqttDecoder(),
MqttEncoder.INSTANCE,
new MqttMessage2RemotingCommandHandler(),
new RemotingCommand2MqttMessageHandler(),
new IdleStateHandler(nettyServerConfig
.getConnectionChannelReaderIdleSeconds(),
nettyServerConfig
.getConnectionChannelWriterIdleSeconds(),
nettyServerConfig
.getServerChannelMaxIdleTimeSeconds()),
new NettyConnectManageHandler(),
new NettyServerHandler()
);
}
});
if (nettyServerConfig.isServerPooledByteBufAllocatorEnable()) {
childHandler.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
......@@ -207,7 +207,7 @@ public class MqttRemotingServer extends NettyRemotingServerAbstract implements R
this.port = addr.getPort();
} catch (InterruptedException e1) {
throw new RuntimeException("this.serverBootstrap.bind().sync() InterruptedException",
e1);
e1);
}
startUpHouseKeepingService();
registerMessageHandler();
......@@ -216,6 +216,7 @@ public class MqttRemotingServer extends NettyRemotingServerAbstract implements R
private void registerMessageHandler() {
}
@Override
public void shutdown() {
try {
......@@ -269,28 +270,28 @@ public class MqttRemotingServer extends NettyRemotingServerAbstract implements R
@Override
public RemotingCommand invokeSync(final RemotingChannel remotingChannel,
final RemotingCommand request,
final long timeoutMillis)
throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException {
final RemotingCommand request,
final long timeoutMillis)
throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException {
return this.invokeSyncImpl(((NettyChannelImpl) remotingChannel).getChannel(), request,
timeoutMillis);
timeoutMillis);
}
@Override
public void invokeAsync(RemotingChannel remotingChannel, RemotingCommand request,
long timeoutMillis,
InvokeCallback invokeCallback)
throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {
long timeoutMillis,
InvokeCallback invokeCallback)
throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {
this.invokeAsyncImpl(((NettyChannelImpl) remotingChannel).getChannel(), request,
timeoutMillis, invokeCallback);
timeoutMillis, invokeCallback);
}
@Override
public void invokeOneway(RemotingChannel remotingChannel, RemotingCommand request,
long timeoutMillis) throws InterruptedException,
RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {
long timeoutMillis) throws InterruptedException,
RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {
this.invokeOnewayImpl(((NettyChannelImpl) remotingChannel).getChannel(), request,
timeoutMillis);
timeoutMillis);
}
@Override
......@@ -305,7 +306,7 @@ public class MqttRemotingServer extends NettyRemotingServerAbstract implements R
@Override
protected RemotingChannel getAndCreateChannel(String addr, long timeout)
throws InterruptedException {
throws InterruptedException {
return null;
}
......@@ -338,23 +339,23 @@ public class MqttRemotingServer extends NettyRemotingServerAbstract implements R
case DISABLED:
ctx.close();
log.warn(
"Clients intend to establish a SSL connection while this server is running in SSL disabled mode");
"Clients intend to establish a SSL connection while this server is running in SSL disabled mode");
break;
case PERMISSIVE:
case ENFORCING:
if (null != sslContext) {
ctx.pipeline()
.addAfter(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME,
TLS_HANDLER_NAME,
sslContext.newHandler(ctx.channel().alloc()))
.addAfter(defaultEventExecutorGroup, TLS_HANDLER_NAME,
FILE_REGION_ENCODER_NAME, new FileRegionEncoder());
.addAfter(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME,
TLS_HANDLER_NAME,
sslContext.newHandler(ctx.channel().alloc()))
.addAfter(defaultEventExecutorGroup, TLS_HANDLER_NAME,
FILE_REGION_ENCODER_NAME, new FileRegionEncoder());
log.info(
"Handlers prepended to channel pipeline to establish SSL connection");
"Handlers prepended to channel pipeline to establish SSL connection");
} else {
ctx.close();
log.error(
"Trying to establish a SSL connection but sslContext is null");
"Trying to establish a SSL connection but sslContext is null");
}
break;
......@@ -365,7 +366,7 @@ public class MqttRemotingServer extends NettyRemotingServerAbstract implements R
} else if (tlsMode == TlsMode.ENFORCING) {
ctx.close();
log.warn(
"Clients intend to establish an insecure connection while this server is running in SSL enforcing mode");
"Clients intend to establish an insecure connection while this server is running in SSL enforcing mode");
}
// reset the reader index so that handshake negotiation may proceed as normal.
......@@ -385,8 +386,8 @@ public class MqttRemotingServer extends NettyRemotingServerAbstract implements R
@Override
public void push(RemotingChannel remotingChannel, RemotingCommand request,
long timeoutMillis) throws InterruptedException,
RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {
long timeoutMillis) throws InterruptedException,
RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {
this.invokeOneway(remotingChannel, request, timeoutMillis);
}
}
......@@ -51,6 +51,7 @@ public final class RocketMQMqttSubAckPayload extends RemotingSerializable {
public MqttSubAckPayload toMqttSubAckPayload() throws UnsupportedEncodingException {
return new MqttSubAckPayload(this.grantedQoSLevels);
}
@Override
public String toString() {
return StringUtil.simpleClassName(this) + '[' + "grantedQoSLevels=" + this.grantedQoSLevels + ']';
......
......@@ -42,9 +42,10 @@ public final class RocketMQMqttSubscribePayload extends RemotingSerializable {
}
public void setTopicSubscriptions(
List<MqttTopicSubscription> topicSubscriptions) {
List<MqttTopicSubscription> topicSubscriptions) {
this.topicSubscriptions = topicSubscriptions;
}
public static RocketMQMqttSubscribePayload fromMqttSubscribePayload(MqttSubscribePayload payload) {
return new RocketMQMqttSubscribePayload(payload.topicSubscriptions());
}
......@@ -52,6 +53,7 @@ public final class RocketMQMqttSubscribePayload extends RemotingSerializable {
public MqttSubscribePayload toMqttSubscribePayload() throws UnsupportedEncodingException {
return new MqttSubscribePayload(this.topicSubscriptions);
}
@Override
public String toString() {
StringBuilder builder = new StringBuilder(StringUtil.simpleClassName(this)).append('[');
......
......@@ -41,6 +41,7 @@ public class RocketMQMqttUnSubscribePayload extends RemotingSerializable {
public void setTopics(List<String> topics) {
this.topics = Collections.unmodifiableList(topics);
}
public static RocketMQMqttUnSubscribePayload fromMqttUnSubscribePayload(MqttUnsubscribePayload payload) {
return new RocketMQMqttUnSubscribePayload(payload.topics());
}
......@@ -48,6 +49,7 @@ public class RocketMQMqttUnSubscribePayload extends RemotingSerializable {
public MqttUnsubscribePayload toMqttUnsubscribePayload() throws UnsupportedEncodingException {
return new MqttUnsubscribePayload(this.topics);
}
@Override
public String toString() {
StringBuilder builder = new StringBuilder(StringUtil.simpleClassName(this)).append('[');
......@@ -55,7 +57,7 @@ public class RocketMQMqttUnSubscribePayload extends RemotingSerializable {
builder.append("topicName = ").append(topics.get(i)).append(", ");
}
builder.append("topicName = ").append(topics.get(topics.size() - 1))
.append(']');
.append(']');
return builder.toString();
}
}
......@@ -38,13 +38,13 @@ public class MqttConnectackEncodeDecode implements Message2MessageEncodeDecode {
@Override
public MqttMessage encode(RemotingCommand remotingCommand) throws RemotingCommandException {
MqttHeader mqttHeader = (MqttHeader) remotingCommand
.decodeCommandCustomHeader(MqttHeader.class);
.decodeCommandCustomHeader(MqttHeader.class);
return new MqttConnAckMessage(
new MqttFixedHeader(MqttMessageType.CONNACK, mqttHeader.isDup(),
MqttQoS.valueOf(mqttHeader.getQosLevel()), mqttHeader.isRetain(),
mqttHeader.getRemainingLength()), new MqttConnAckVariableHeader(
MqttConnectReturnCode.valueOf(mqttHeader.getConnectReturnCode()),
mqttHeader.isSessionPresent()));
new MqttFixedHeader(MqttMessageType.CONNACK, mqttHeader.isDup(),
MqttQoS.valueOf(mqttHeader.getQosLevel()), mqttHeader.isRetain(),
mqttHeader.getRemainingLength()), new MqttConnAckVariableHeader(
MqttConnectReturnCode.valueOf(mqttHeader.getConnectReturnCode()),
mqttHeader.isSessionPresent()));
}
}
......@@ -37,12 +37,12 @@ public class MqttPubackEncodeDecode implements Message2MessageEncodeDecode {
@Override
public MqttMessage encode(RemotingCommand remotingCommand) throws RemotingCommandException {
MqttHeader mqttHeader = (MqttHeader) remotingCommand
.decodeCommandCustomHeader(MqttHeader.class);
.decodeCommandCustomHeader(MqttHeader.class);
return new MqttPubAckMessage(
new MqttFixedHeader(MqttMessageType.PUBACK, mqttHeader.isDup(),
MqttQoS.valueOf(mqttHeader.getQosLevel()), mqttHeader.isRetain(),
mqttHeader.getRemainingLength()),
MqttMessageIdVariableHeader.from(mqttHeader.getMessageId()));
new MqttFixedHeader(MqttMessageType.PUBACK, mqttHeader.isDup(),
MqttQoS.valueOf(mqttHeader.getQosLevel()), mqttHeader.isRetain(),
mqttHeader.getRemainingLength()),
MqttMessageIdVariableHeader.from(mqttHeader.getMessageId()));
}
}
......@@ -37,7 +37,7 @@ public class MqttPublishEncodeDecode implements Message2MessageEncodeDecode {
RemotingCommand requestCommand = null;
MqttFixedHeader mqttFixedHeader = mqttMessage.fixedHeader();
MqttConnAckVariableHeader variableHeader = (MqttConnAckVariableHeader) mqttMessage
.variableHeader();
.variableHeader();
MqttHeader mqttHeader = new MqttHeader();
mqttHeader.setMessageType(mqttFixedHeader.messageType().value());
......@@ -50,7 +50,7 @@ public class MqttPublishEncodeDecode implements Message2MessageEncodeDecode {
mqttHeader.setSessionPresent(variableHeader.isSessionPresent());
requestCommand = RemotingCommand
.createRequestCommand(1000, mqttHeader);
.createRequestCommand(1000, mqttHeader);
CodecHelper.makeCustomHeaderToNet(requestCommand);
requestCommand.setBody(payload);
......
......@@ -38,12 +38,12 @@ public class MqttSubackEncodeDecode implements Message2MessageEncodeDecode {
@Override
public MqttMessage encode(RemotingCommand remotingCommand) throws RemotingCommandException {
MqttHeader mqttHeader = (MqttHeader) remotingCommand
.decodeCommandCustomHeader(MqttHeader.class);
.decodeCommandCustomHeader(MqttHeader.class);
return new MqttSubAckMessage(
new MqttFixedHeader(MqttMessageType.SUBACK, mqttHeader.isDup(),
MqttQoS.valueOf(mqttHeader.getQosLevel()), mqttHeader.isRetain(),
mqttHeader.getRemainingLength()),
MqttMessageIdVariableHeader.from(mqttHeader.getMessageId()),new MqttSubAckPayload());
new MqttFixedHeader(MqttMessageType.SUBACK, mqttHeader.isDup(),
MqttQoS.valueOf(mqttHeader.getQosLevel()), mqttHeader.isRetain(),
mqttHeader.getRemainingLength()),
MqttMessageIdVariableHeader.from(mqttHeader.getMessageId()), new MqttSubAckPayload());
}
}
......@@ -31,12 +31,12 @@ public class MqttSubscribeEncodeDecode implements Message2MessageEncodeDecode {
@Override
public RemotingCommand decode(MqttMessage mqttMessage) {
RocketMQMqttSubscribePayload payload = RocketMQMqttSubscribePayload
.fromMqttSubscribePayload(((MqttSubscribeMessage) mqttMessage).payload());
.fromMqttSubscribePayload(((MqttSubscribeMessage) mqttMessage).payload());
RemotingCommand requestCommand = null;
MqttFixedHeader mqttFixedHeader = mqttMessage.fixedHeader();
MqttMessageIdVariableHeader variableHeader = (MqttMessageIdVariableHeader) mqttMessage
.variableHeader();
.variableHeader();
MqttHeader mqttHeader = new MqttHeader();
mqttHeader.setMessageType(mqttFixedHeader.messageType().value());
......@@ -48,7 +48,7 @@ public class MqttSubscribeEncodeDecode implements Message2MessageEncodeDecode {
mqttHeader.setMessageId(variableHeader.messageId());
requestCommand = RemotingCommand
.createRequestCommand(1000, mqttHeader);
.createRequestCommand(1000, mqttHeader);
CodecHelper.makeCustomHeaderToNet(requestCommand);
requestCommand.setBody(payload.encode());
......
......@@ -37,12 +37,12 @@ public class MqttUnSubackEncodeDecode implements Message2MessageEncodeDecode {
@Override
public MqttMessage encode(RemotingCommand remotingCommand) throws RemotingCommandException {
MqttHeader mqttHeader = (MqttHeader) remotingCommand
.decodeCommandCustomHeader(MqttHeader.class);
.decodeCommandCustomHeader(MqttHeader.class);
return new MqttUnsubAckMessage(
new MqttFixedHeader(MqttMessageType.UNSUBACK, mqttHeader.isDup(),
MqttQoS.valueOf(mqttHeader.getQosLevel()), mqttHeader.isRetain(),
mqttHeader.getRemainingLength()),
MqttMessageIdVariableHeader.from(mqttHeader.getMessageId()));
new MqttFixedHeader(MqttMessageType.UNSUBACK, mqttHeader.isDup(),
MqttQoS.valueOf(mqttHeader.getQosLevel()), mqttHeader.isRetain(),
mqttHeader.getRemainingLength()),
MqttMessageIdVariableHeader.from(mqttHeader.getMessageId()));
}
}
......@@ -31,12 +31,12 @@ public class MqttUnSubscribeEncodeDecode implements Message2MessageEncodeDecode
@Override
public RemotingCommand decode(MqttMessage mqttMessage) {
RocketMQMqttUnSubscribePayload payload = RocketMQMqttUnSubscribePayload
.fromMqttUnSubscribePayload(((MqttUnsubscribeMessage) mqttMessage).payload());
.fromMqttUnSubscribePayload(((MqttUnsubscribeMessage) mqttMessage).payload());
RemotingCommand requestCommand = null;
MqttFixedHeader mqttFixedHeader = mqttMessage.fixedHeader();
MqttMessageIdVariableHeader variableHeader = (MqttMessageIdVariableHeader) mqttMessage
.variableHeader();
.variableHeader();
MqttHeader mqttHeader = new MqttHeader();
mqttHeader.setMessageType(mqttFixedHeader.messageType().value());
......@@ -48,7 +48,7 @@ public class MqttUnSubscribeEncodeDecode implements Message2MessageEncodeDecode
mqttHeader.setMessageId(variableHeader.messageId());
requestCommand = RemotingCommand
.createRequestCommand(1000, mqttHeader);
.createRequestCommand(1000, mqttHeader);
CodecHelper.makeCustomHeaderToNet(requestCommand);
requestCommand.setBody(payload.encode());
......
......@@ -79,7 +79,7 @@ import org.apache.rocketmq.snode.service.impl.WillMessageServiceImpl;
public class SnodeController {
private static final InternalLogger log = InternalLoggerFactory
.getLogger(LoggerName.SNODE_LOGGER_NAME);
.getLogger(LoggerName.SNODE_LOGGER_NAME);
private final SnodeConfig snodeConfig;
private final ServerConfig nettyServerConfig;
......@@ -118,12 +118,12 @@ public class SnodeController {
private WillMessageService willMessageService;
private final ScheduledExecutorService scheduledExecutorService = Executors
.newSingleThreadScheduledExecutor(new ThreadFactoryImpl(
"SnodeControllerScheduledThread"));
.newSingleThreadScheduledExecutor(new ThreadFactoryImpl(
"SnodeControllerScheduledThread"));
public SnodeController(ServerConfig nettyServerConfig,
ClientConfig nettyClientConfig,
SnodeConfig snodeConfig) {
ClientConfig nettyClientConfig,
SnodeConfig snodeConfig) {
this.nettyClientConfig = nettyClientConfig;
this.nettyServerConfig = nettyServerConfig;
this.snodeConfig = snodeConfig;
......@@ -131,37 +131,37 @@ public class SnodeController {
this.nnodeService = new NnodeServiceImpl(this);
this.scheduledService = new ScheduledServiceImpl(this);
this.remotingClient = RemotingClientFactory.getInstance().createRemotingClient()
.init(this.getNettyClientConfig(), null);
.init(this.getNettyClientConfig(), null);
this.mqttRemotingClient = RemotingClientFactory.getInstance()
.createRemotingClient(RemotingUtil.MQTT_PROTOCOL)
.init(this.getNettyClientConfig(), null);
.createRemotingClient(RemotingUtil.MQTT_PROTOCOL)
.init(this.getNettyClientConfig(), null);
this.sendMessageExecutor = ThreadUtils.newThreadPoolExecutor(
snodeConfig.getSnodeSendMessageMinPoolSize(),
snodeConfig.getSnodeSendMessageMaxPoolSize(),
3000,
TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(snodeConfig.getSnodeSendThreadPoolQueueCapacity()),
"SnodeSendMessageThread",
false);
snodeConfig.getSnodeSendMessageMinPoolSize(),
snodeConfig.getSnodeSendMessageMaxPoolSize(),
3000,
TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(snodeConfig.getSnodeSendThreadPoolQueueCapacity()),
"SnodeSendMessageThread",
false);
this.pullMessageExecutor = ThreadUtils.newThreadPoolExecutor(
snodeConfig.getSnodeSendMessageMinPoolSize(),
snodeConfig.getSnodeSendMessageMaxPoolSize(),
3000,
TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(snodeConfig.getSnodeSendThreadPoolQueueCapacity()),
"SnodePullMessageThread",
false);
snodeConfig.getSnodeSendMessageMinPoolSize(),
snodeConfig.getSnodeSendMessageMaxPoolSize(),
3000,
TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(snodeConfig.getSnodeSendThreadPoolQueueCapacity()),
"SnodePullMessageThread",
false);
this.heartbeatExecutor = ThreadUtils.newThreadPoolExecutor(
snodeConfig.getSnodeHeartBeatCorePoolSize(),
snodeConfig.getSnodeHeartBeatMaxPoolSize(),
1000 * 60,
TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(snodeConfig.getSnodeHeartBeatThreadPoolQueueCapacity()),
"SnodeHeartbeatThread",
true);
snodeConfig.getSnodeHeartBeatCorePoolSize(),
snodeConfig.getSnodeHeartBeatMaxPoolSize(),
1000 * 60,
TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(snodeConfig.getSnodeHeartBeatThreadPoolQueueCapacity()),
"SnodeHeartbeatThread",
true);
// this.consumerManagerExecutor = ThreadUtils.newThreadPoolExecutor(
// snodeConfig.getSnodeSendMessageMinPoolSize(),
......@@ -173,27 +173,27 @@ public class SnodeController {
// false);
this.consumerManageExecutor = ThreadUtils.newThreadPoolExecutor(
snodeConfig.getSnodeSendMessageMinPoolSize(),
snodeConfig.getSnodeSendMessageMaxPoolSize(),
3000,
TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(snodeConfig.getSnodeSendThreadPoolQueueCapacity()),
"ConsumerManagerThread",
false);
snodeConfig.getSnodeSendMessageMinPoolSize(),
snodeConfig.getSnodeSendMessageMaxPoolSize(),
3000,
TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(snodeConfig.getSnodeSendThreadPoolQueueCapacity()),
"ConsumerManagerThread",
false);
this.handleMqttMessageExecutor = ThreadUtils.newThreadPoolExecutor(
snodeConfig.getSnodeHandleMqttMessageMinPoolSize(),
snodeConfig.getSnodeHandleMqttMessageMaxPoolSize(),
3000,
TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(snodeConfig.getSnodeHandleMqttThreadPoolQueueCapacity()),
"SnodeHandleMqttMessageThread",
false);
snodeConfig.getSnodeHandleMqttMessageMinPoolSize(),
snodeConfig.getSnodeHandleMqttMessageMaxPoolSize(),
3000,
TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(snodeConfig.getSnodeHandleMqttThreadPoolQueueCapacity()),
"SnodeHandleMqttMessageThread",
false);
if (this.snodeConfig.getNamesrvAddr() != null) {
this.nnodeService.updateNnodeAddressList(this.snodeConfig.getNamesrvAddr());
log.info("Set user specified name server address: {}",
this.snodeConfig.getNamesrvAddr());
this.snodeConfig.getNamesrvAddr());
}
this.subscriptionGroupManager = new SubscriptionGroupManager(this);
......@@ -210,7 +210,7 @@ public class SnodeController {
this.consumerManager = new ConsumerManagerImpl(this);
this.iotClientManager = new IOTClientManagerImpl(this);
this.clientHousekeepingService = new ClientHousekeepingService(this.producerManager,
this.consumerManager, this.iotClientManager);
this.consumerManager, this.iotClientManager);
this.slowConsumerService = new SlowConsumerServiceImpl(this);
this.metricsService = new MetricsServiceImpl();
this.willMessageService = new WillMessageServiceImpl(this);
......@@ -222,7 +222,7 @@ public class SnodeController {
private void initRemotingServerInterceptorGroup() {
List<Interceptor> remotingServerInterceptors = InterceptorFactory.getInstance()
.loadInterceptors(this.snodeConfig.getRemotingServerInterceptorPath());
.loadInterceptors(this.snodeConfig.getRemotingServerInterceptorPath());
if (remotingServerInterceptors != null && remotingServerInterceptors.size() > 0) {
if (this.remotingServerInterceptorGroup == null) {
this.remotingServerInterceptorGroup = new InterceptorGroup();
......@@ -230,17 +230,17 @@ public class SnodeController {
for (Interceptor interceptor : remotingServerInterceptors) {
this.remotingServerInterceptorGroup.registerInterceptor(interceptor);
log.warn("Remoting server interceptor: {} registered!",
interceptor.interceptorName());
interceptor.interceptorName());
}
}
}
public boolean initialize() {
this.snodeServer = RemotingServerFactory.getInstance().createRemotingServer()
.init(this.nettyServerConfig, this.clientHousekeepingService);
.init(this.nettyServerConfig, this.clientHousekeepingService);
this.mqttRemotingServer = RemotingServerFactory.getInstance().createRemotingServer(
RemotingUtil.MQTT_PROTOCOL)
.init(this.nettyServerConfig, this.clientHousekeepingService);
RemotingUtil.MQTT_PROTOCOL)
.init(this.nettyServerConfig, this.clientHousekeepingService);
this.registerProcessor();
initSnodeInterceptorGroup();
initRemotingServerInterceptorGroup();
......@@ -258,7 +258,7 @@ public class SnodeController {
}
List<AccessValidator> accessValidators = ServiceProvider
.loadServiceList(ServiceProvider.ACL_VALIDATOR_ID, AccessValidator.class);
.loadServiceList(ServiceProvider.ACL_VALIDATOR_ID, AccessValidator.class);
if (accessValidators == null || accessValidators.isEmpty()) {
log.info("The snode dose not load the AccessValidator");
return;
......@@ -278,7 +278,7 @@ public class SnodeController {
//Do not catch the exception
RemotingCommand request = requestContext.getRequest();
String remoteAddr = RemotingUtil.socketAddress2IpString(
requestContext.getRemotingChannel().remoteAddress());
requestContext.getRemotingChannel().remoteAddress());
validator.validate(validator.parse(request, remoteAddr));
}
......@@ -296,17 +296,17 @@ public class SnodeController {
private void initSnodeInterceptorGroup() {
List<Interceptor> consumeMessageInterceptors = InterceptorFactory.getInstance()
.loadInterceptors(this.snodeConfig.getConsumeMessageInterceptorPath());
.loadInterceptors(this.snodeConfig.getConsumeMessageInterceptorPath());
if (consumeMessageInterceptors != null && consumeMessageInterceptors.size() > 0) {
this.consumeMessageInterceptorGroup = new InterceptorGroup();
for (Interceptor interceptor : consumeMessageInterceptors) {
this.consumeMessageInterceptorGroup.registerInterceptor(interceptor);
log.warn("Consume message interceptor: {} registered!",
interceptor.interceptorName());
interceptor.interceptorName());
}
}
List<Interceptor> sendMessageInterceptors = InterceptorFactory.getInstance()
.loadInterceptors(this.snodeConfig.getSendMessageInterceptorPath());
.loadInterceptors(this.snodeConfig.getSendMessageInterceptorPath());
if (sendMessageInterceptors != null && sendMessageInterceptors.size() > 0) {
this.sendMessageInterceptorGroup = new InterceptorGroup();
for (Interceptor interceptor : sendMessageInterceptors) {
......@@ -319,37 +319,37 @@ public class SnodeController {
public void registerProcessor() {
this.snodeServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendMessageProcessor,
this.sendMessageExecutor);
this.sendMessageExecutor);
this.snodeServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendMessageProcessor,
this.sendMessageExecutor);
this.sendMessageExecutor);
this.snodeServer.registerProcessor(RequestCode.HEART_BEAT, heartbeatProcessor,
this.heartbeatExecutor);
this.heartbeatExecutor);
this.snodeServer.registerProcessor(RequestCode.UNREGISTER_CLIENT, heartbeatProcessor,
this.heartbeatExecutor);
this.heartbeatExecutor);
this.snodeServer.registerProcessor(RequestCode.CHECK_CLIENT_CONFIG, heartbeatProcessor,
this.heartbeatExecutor);
this.heartbeatExecutor);
this.snodeServer.registerProcessor(RequestCode.SNODE_PULL_MESSAGE, pullMessageProcessor,
this.pullMessageExecutor);
this.pullMessageExecutor);
this.snodeServer
.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP, consumerManageProcessor,
this.consumerManageExecutor);
.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP, consumerManageProcessor,
this.consumerManageExecutor);
this.snodeServer
.registerProcessor(RequestCode.UPDATE_CONSUMER_OFFSET, consumerManageProcessor,
this.consumerManageExecutor);
.registerProcessor(RequestCode.UPDATE_CONSUMER_OFFSET, consumerManageProcessor,
this.consumerManageExecutor);
this.snodeServer
.registerProcessor(RequestCode.QUERY_CONSUMER_OFFSET, consumerManageProcessor,
this.consumerManageExecutor);
this.snodeServer.registerProcessor(RequestCode.GET_MIN_OFFSET, consumerManageProcessor,
.registerProcessor(RequestCode.QUERY_CONSUMER_OFFSET, consumerManageProcessor,
this.consumerManageExecutor);
this.snodeServer.registerProcessor(RequestCode.GET_MIN_OFFSET, consumerManageProcessor,
this.consumerManageExecutor);
this.snodeServer.registerProcessor(RequestCode.GET_MAX_OFFSET, consumerManageProcessor,
this.consumerManageExecutor);
this.consumerManageExecutor);
this.snodeServer
.registerProcessor(RequestCode.SEARCH_OFFSET_BY_TIMESTAMP, consumerManageProcessor,
this.consumerManageExecutor);
this.snodeServer.registerProcessor(RequestCode.CREATE_RETRY_TOPIC, consumerManageProcessor,
.registerProcessor(RequestCode.SEARCH_OFFSET_BY_TIMESTAMP, consumerManageProcessor,
this.consumerManageExecutor);
this.snodeServer.registerProcessor(RequestCode.CREATE_RETRY_TOPIC, consumerManageProcessor,
this.consumerManageExecutor);
this.mqttRemotingServer.registerProcessor(RequestCode.MQTT_MESSAGE,
defaultMqttMessageProcessor, handleMqttMessageExecutor);
defaultMqttMessageProcessor, handleMqttMessageExecutor);
}
......@@ -476,7 +476,7 @@ public class SnodeController {
}
public void setRemotingServerInterceptorGroup(
InterceptorGroup remotingServerInterceptorGroup) {
InterceptorGroup remotingServerInterceptorGroup) {
this.remotingServerInterceptorGroup = remotingServerInterceptorGroup;
}
......@@ -545,7 +545,7 @@ public class SnodeController {
}
public void setWillMessageService(
WillMessageService willMessageService) {
WillMessageService willMessageService) {
this.willMessageService = willMessageService;
}
}
......@@ -62,39 +62,39 @@ public class DefaultMqttMessageProcessor implements RequestProcessor {
public DefaultMqttMessageProcessor(SnodeController snodeController) {
this.snodeController = snodeController;
registerMessageHandler(MqttMessageType.CONNECT,
new MqttConnectMessageHandler(this.snodeController));
new MqttConnectMessageHandler(this.snodeController));
registerMessageHandler(MqttMessageType.DISCONNECT,
new MqttDisconnectMessageHandler(this.snodeController));
new MqttDisconnectMessageHandler(this.snodeController));
registerMessageHandler(MqttMessageType.PINGREQ,
new MqttPingreqMessageHandler(this.snodeController));
new MqttPingreqMessageHandler(this.snodeController));
registerMessageHandler(MqttMessageType.PUBLISH,
new MqttPublishMessageHandler(this.snodeController));
new MqttPublishMessageHandler(this.snodeController));
registerMessageHandler(MqttMessageType.PUBACK, new MqttPubackMessageHandler(this.snodeController));
registerMessageHandler(MqttMessageType.PUBCOMP,
new MqttPubcompMessageHandler(this.snodeController));
new MqttPubcompMessageHandler(this.snodeController));
registerMessageHandler(MqttMessageType.PUBREC, new MqttPubrecMessageHandler(this.snodeController));
registerMessageHandler(MqttMessageType.PUBREL, new MqttPubrelMessageHandler(this.snodeController));
registerMessageHandler(MqttMessageType.SUBSCRIBE,
new MqttSubscribeMessageHandler(this.snodeController));
new MqttSubscribeMessageHandler(this.snodeController));
registerMessageHandler(MqttMessageType.UNSUBSCRIBE,
new MqttUnsubscribeMessagHandler(this.snodeController));
new MqttUnsubscribeMessagHandler(this.snodeController));
}
@Override
public RemotingCommand processRequest(RemotingChannel remotingChannel, RemotingCommand message)
throws RemotingCommandException, UnsupportedEncodingException {
throws RemotingCommandException, UnsupportedEncodingException {
MqttHeader mqttHeader = (MqttHeader) message.decodeCommandCustomHeader(MqttHeader.class);
MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.valueOf(mqttHeader.getMessageType()),
mqttHeader.isDup(), MqttQoS.valueOf(mqttHeader.getQosLevel()), mqttHeader.isRetain(),
mqttHeader.getRemainingLength());
mqttHeader.isDup(), MqttQoS.valueOf(mqttHeader.getQosLevel()), mqttHeader.isRetain(),
mqttHeader.getRemainingLength());
MqttMessage mqttMessage = null;
switch (fixedHeader.messageType()) {
case CONNECT:
MqttConnectVariableHeader variableHeader = new MqttConnectVariableHeader(
mqttHeader.getName(), mqttHeader.getVersion(), mqttHeader.isHasUserName(),
mqttHeader.isHasPassword(), mqttHeader.isWillRetain(),
mqttHeader.getWillQos(), mqttHeader.isWillFlag(),
mqttHeader.isCleanSession(), mqttHeader.getKeepAliveTimeSeconds());
mqttHeader.getName(), mqttHeader.getVersion(), mqttHeader.isHasUserName(),
mqttHeader.isHasPassword(), mqttHeader.isWillRetain(),
mqttHeader.getWillQos(), mqttHeader.isWillFlag(),
mqttHeader.isCleanSession(), mqttHeader.getKeepAliveTimeSeconds());
RocketMQMqttConnectPayload payload = decode(message.getBody(), RocketMQMqttConnectPayload.class);
mqttMessage = new MqttConnectMessage(fixedHeader, variableHeader, payload.toMqttConnectPayload());
case DISCONNECT:
......
......@@ -31,39 +31,33 @@ import org.apache.rocketmq.snode.client.impl.IOTClientManagerImpl;
public class MqttDisconnectMessageHandler implements MessageHandler {
private static final InternalLogger log = InternalLoggerFactory
.getLogger(LoggerName.SNODE_LOGGER_NAME);
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.SNODE_LOGGER_NAME);
private final SnodeController snodeController;
public MqttDisconnectMessageHandler(SnodeController snodeController) {
this.snodeController = snodeController;
}
/**
* handle the DISCONNECT message from the client
* <ol>
* <li>discard the Will Message and Will Topic</li>
* <li>remove the client from the IOTClientManager</li>
* <li>disconnect the connection</li>
* </ol>
* handle the DISCONNECT message from the client <ol> <li>discard the Will Message and Will Topic</li> <li>remove
* the client from the IOTClientManager</li> <li>disconnect the connection</li> </ol>
*/
@Override
public RemotingCommand handleMessage(MqttMessage message, RemotingChannel remotingChannel) {
// TODO discard the Will Message and Will Topic
MqttFixedHeader fixedHeader = message.fixedHeader();
if (fixedHeader.qosLevel() != MqttQoS.AT_MOST_ONCE || !fixedHeader.isDup() || !fixedHeader
.isRetain()) {
.isRetain()) {
log.error(
"The reserved bits(qos/isDup/isRetain) are not zero. Qos={}, isDup={}, isRetain={}",
fixedHeader.qosLevel(), fixedHeader.isDup(), fixedHeader.isRetain());
"The reserved bits(qos/isDup/isRetain) are not zero. Qos={}, isDup={}, isRetain={}",
fixedHeader.qosLevel(), fixedHeader.isDup(), fixedHeader.isRetain());
remotingChannel.close();
return null;
}
//discard will message associated with the current connection(client)
Client client = snodeController.getIotClientManager()
.getClient(IOTClientManagerImpl.IOT_GROUP, remotingChannel);
.getClient(IOTClientManagerImpl.IOT_GROUP, remotingChannel);
if (client != null) {
snodeController.getWillMessageService().deleteWillMessage(client.getClientId());
}
......
......@@ -97,6 +97,7 @@ public class DefaultMqttMessageProcessorTest {
CodecHelper.makeCustomHeaderToNet(request);
return request;
}
private byte[] encode(Object obj) {
String json = JSON.toJSONString(obj, false);
if (json != null) {
......
......@@ -44,16 +44,16 @@ public class MqttDisconnectMessageHandlerTest {
public void testHandlerMessage() throws Exception {
SnodeController snodeController = new SnodeController(new ServerConfig(),
new ClientConfig(), new SnodeConfig());
new ClientConfig(), new SnodeConfig());
MqttDisconnectMessageHandler mqttDisconnectMessageHandler = new MqttDisconnectMessageHandler(
snodeController);
snodeController);
Client client = new Client();
client.setRemotingChannel(remotingChannel);
client.setClientId("123456");
snodeController.getIotClientManager().register(IOTClientManagerImpl.IOT_GROUP, client);
snodeController.getWillMessageService().saveWillMessage("123456", new WillMessage());
MqttMessage mqttDisconnectMessage = new MqttMessage(new MqttFixedHeader(
MqttMessageType.DISCONNECT, false, MqttQoS.AT_MOST_ONCE, false, 200));
MqttMessageType.DISCONNECT, false, MqttQoS.AT_MOST_ONCE, false, 200));
mqttDisconnectMessageHandler.handleMessage(mqttDisconnectMessage, remotingChannel);
}
......
......@@ -34,7 +34,7 @@ public class WillMessageServiceImplTest extends SnodeTestBase {
@Spy
private SnodeController snodeController = new SnodeController(new ServerConfig(),
new ClientConfig(), new SnodeConfig());
new ClientConfig(), new SnodeConfig());
private WillMessageService willMessageService;
......
......@@ -27,16 +27,14 @@ import java.nio.channels.FileChannel;
import java.nio.channels.OverlappingFileLockException;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.store.config.FlushDiskType;
import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.apache.rocketmq.store.config.StorePathConfigHelper;
import org.junit.After;
import org.apache.rocketmq.store.stats.BrokerStatsManager;
import org.junit.After;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import static org.assertj.core.api.Assertions.assertThat;
......@@ -62,7 +60,6 @@ public class DefaultMessageStoreTest {
messageStore.start();
}
@Ignore
@Test(expected = OverlappingFileLockException.class)
public void test_repate_restart() throws Exception {
QUEUE_TOTAL = 1;
......@@ -294,7 +291,7 @@ public class DefaultMessageStoreTest {
private class MyMessageArrivingListener implements MessageArrivingListener {
@Override
public void arriving(String topic, int queueId, long logicOffset, long tagsCode, long msgStoreTime,
byte[] filterBitMap, Map<String, String> properties) {
byte[] filterBitMap, Map<String, String> properties) {
}
}
}
......@@ -16,12 +16,10 @@ import org.apache.rocketmq.store.MessageExtBrokerInner;
import org.apache.rocketmq.store.PutMessageResult;
import org.apache.rocketmq.store.PutMessageStatus;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;
public class DLedgerCommitlogTest extends MessageStoreTestBase {
@Ignore
@Test
public void testTruncateCQ() throws Exception {
String base = createBaseDir();
......@@ -77,7 +75,6 @@ public class DLedgerCommitlogTest extends MessageStoreTestBase {
}
@Ignore
@Test
public void testRecover() throws Exception {
String base = createBaseDir();
......@@ -118,7 +115,6 @@ public class DLedgerCommitlogTest extends MessageStoreTestBase {
}
@Ignore
@Test
public void testPutAndGetMessage() throws Exception {
String base = createBaseDir();
......@@ -159,7 +155,6 @@ public class DLedgerCommitlogTest extends MessageStoreTestBase {
messageStore.shutdown();
}
@Ignore
@Test
public void testCommittedPos() throws Exception {
String peers = String.format("n0-localhost:%d;n1-localhost:%d", nextPort(), nextPort());
......
......@@ -5,13 +5,11 @@ import org.apache.rocketmq.store.DefaultMessageStore;
import org.apache.rocketmq.store.StoreTestBase;
import org.apache.rocketmq.store.config.StorePathConfigHelper;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;
public class MixCommitlogTest extends MessageStoreTestBase {
@Ignore
@Test
public void testFallBehindCQ() throws Exception {
String base = createBaseDir();
......@@ -51,7 +49,6 @@ public class MixCommitlogTest extends MessageStoreTestBase {
}
@Ignore
@Test
public void testPutAndGet() throws Exception {
String base = createBaseDir();
......@@ -112,7 +109,6 @@ public class MixCommitlogTest extends MessageStoreTestBase {
recoverDledgerStore.shutdown();
}
}
@Ignore
@Test
public void testDeleteExpiredFiles() throws Exception {
String base = createBaseDir();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册