提交 f7c55b9f 编写于 作者: D duhenglucky

Fix unit test ensure mvn install pass

上级 ac25bccc
...@@ -16,12 +16,11 @@ ...@@ -16,12 +16,11 @@
*/ */
package org.apache.rocketmq.broker.client; package org.apache.rocketmq.broker.client;
import io.netty.channel.Channel;
import org.apache.rocketmq.remoting.RemotingChannel; import org.apache.rocketmq.remoting.RemotingChannel;
import org.apache.rocketmq.remoting.serialize.LanguageCode; import org.apache.rocketmq.remoting.serialize.LanguageCode;
public class ClientChannelInfo { public class ClientChannelInfo {
private final RemotingChannel channel; private final RemotingChannel remotingChannel;
private final String clientId; private final String clientId;
private final LanguageCode language; private final LanguageCode language;
private final int version; private final int version;
...@@ -31,15 +30,15 @@ public class ClientChannelInfo { ...@@ -31,15 +30,15 @@ public class ClientChannelInfo {
this(channel, null, null, 0); this(channel, null, null, 0);
} }
public ClientChannelInfo(RemotingChannel channel, String clientId, LanguageCode language, int version) { public ClientChannelInfo(RemotingChannel remotingChannel, String clientId, LanguageCode language, int version) {
this.channel = channel; this.remotingChannel = remotingChannel;
this.clientId = clientId; this.clientId = clientId;
this.language = language; this.language = language;
this.version = version; this.version = version;
} }
public RemotingChannel getChannel() { public RemotingChannel getRemotingChannel() {
return channel; return remotingChannel;
} }
public String getClientId() { public String getClientId() {
...@@ -66,7 +65,7 @@ public class ClientChannelInfo { ...@@ -66,7 +65,7 @@ public class ClientChannelInfo {
public int hashCode() { public int hashCode() {
final int prime = 31; final int prime = 31;
int result = 1; int result = 1;
result = prime * result + ((channel == null) ? 0 : channel.hashCode()); result = prime * result + ((remotingChannel == null) ? 0 : remotingChannel.hashCode());
result = prime * result + ((clientId == null) ? 0 : clientId.hashCode()); result = prime * result + ((clientId == null) ? 0 : clientId.hashCode());
result = prime * result + ((language == null) ? 0 : language.hashCode()); result = prime * result + ((language == null) ? 0 : language.hashCode());
result = prime * result + (int) (lastUpdateTimestamp ^ (lastUpdateTimestamp >>> 32)); result = prime * result + (int) (lastUpdateTimestamp ^ (lastUpdateTimestamp >>> 32));
...@@ -83,10 +82,10 @@ public class ClientChannelInfo { ...@@ -83,10 +82,10 @@ public class ClientChannelInfo {
if (getClass() != obj.getClass()) if (getClass() != obj.getClass())
return false; return false;
ClientChannelInfo other = (ClientChannelInfo) obj; ClientChannelInfo other = (ClientChannelInfo) obj;
if (channel == null) { if (remotingChannel == null) {
if (other.channel != null) if (other.remotingChannel != null)
return false; return false;
} else if (this.channel != other.channel) { } else if (this.remotingChannel != other.remotingChannel) {
return false; return false;
} }
...@@ -95,7 +94,7 @@ public class ClientChannelInfo { ...@@ -95,7 +94,7 @@ public class ClientChannelInfo {
@Override @Override
public String toString() { public String toString() {
return "ClientChannelInfo [channel=" + channel + ", clientId=" + clientId + ", language=" + language return "ClientChannelInfo [remotingChannel=" + remotingChannel + ", clientId=" + clientId + ", language=" + language
+ ", version=" + version + ", lastUpdateTimestamp=" + lastUpdateTimestamp + "]"; + ", version=" + version + ", lastUpdateTimestamp=" + lastUpdateTimestamp + "]";
} }
} }
...@@ -74,28 +74,28 @@ public class ClientHousekeepingService implements ChannelEventListener { ...@@ -74,28 +74,28 @@ public class ClientHousekeepingService implements ChannelEventListener {
log.info("Remoting channel closed: {}", RemotingHelper.parseChannelRemoteAddr(remotingChannel.remoteAddress())); log.info("Remoting channel closed: {}", RemotingHelper.parseChannelRemoteAddr(remotingChannel.remoteAddress()));
NettyChannelImpl nettyChannel = (NettyChannelImpl) remotingChannel; NettyChannelImpl nettyChannel = (NettyChannelImpl) remotingChannel;
Channel channel = nettyChannel.getChannel(); Channel channel = nettyChannel.getChannel();
this.brokerController.getProducerManager().doChannelCloseEvent(remoteAddr, channel); this.brokerController.getProducerManager().doChannelCloseEvent(remoteAddr, remotingChannel);
this.brokerController.getConsumerManager().doChannelCloseEvent(remoteAddr, channel); this.brokerController.getConsumerManager().doChannelCloseEvent(remoteAddr, remotingChannel);
this.brokerController.getFilterServerManager().doChannelCloseEvent(remoteAddr, channel); this.brokerController.getFilterServerManager().doChannelCloseEvent(remoteAddr, channel);
} }
@Override @Override
public void onChannelException(String remoteAddr, RemotingChannel remotingChannel) { public void onChannelException(String remoteAddr, RemotingChannel remotingChannel) {
log.info("Remoting channel exception: {}", RemotingHelper.parseChannelRemoteAddr(remotingChannel.remoteAddress()));
NettyChannelImpl nettyChannel = (NettyChannelImpl) remotingChannel; NettyChannelImpl nettyChannel = (NettyChannelImpl) remotingChannel;
Channel channel = nettyChannel.getChannel(); Channel channel = nettyChannel.getChannel();
this.brokerController.getProducerManager().doChannelCloseEvent(remoteAddr, channel); log.info("Remoting channel exception: {}", RemotingHelper.parseChannelRemoteAddr(remotingChannel.remoteAddress()));
this.brokerController.getConsumerManager().doChannelCloseEvent(remoteAddr, channel); this.brokerController.getProducerManager().doChannelCloseEvent(remoteAddr, remotingChannel);
this.brokerController.getConsumerManager().doChannelCloseEvent(remoteAddr, remotingChannel);
this.brokerController.getFilterServerManager().doChannelCloseEvent(remoteAddr, channel); this.brokerController.getFilterServerManager().doChannelCloseEvent(remoteAddr, channel);
} }
@Override @Override
public void onChannelIdle(String remoteAddr, RemotingChannel remotingChannel) { public void onChannelIdle(String remoteAddr, RemotingChannel remotingChannel) {
log.info("Remoting channel idle: {}", RemotingHelper.parseChannelRemoteAddr(remotingChannel.remoteAddress()));
NettyChannelImpl nettyChannel = (NettyChannelImpl) remotingChannel; NettyChannelImpl nettyChannel = (NettyChannelImpl) remotingChannel;
Channel channel = nettyChannel.getChannel(); Channel channel = nettyChannel.getChannel();
this.brokerController.getProducerManager().doChannelCloseEvent(remoteAddr, channel); log.info("Remoting channel idle: {}", RemotingHelper.parseChannelRemoteAddr(remotingChannel.remoteAddress()));
this.brokerController.getConsumerManager().doChannelCloseEvent(remoteAddr, channel); this.brokerController.getProducerManager().doChannelCloseEvent(remoteAddr, remotingChannel);
this.brokerController.getConsumerManager().doChannelCloseEvent(remoteAddr, remotingChannel);
this.brokerController.getFilterServerManager().doChannelCloseEvent(remoteAddr, channel); this.brokerController.getFilterServerManager().doChannelCloseEvent(remoteAddr, channel);
} }
} }
...@@ -16,7 +16,6 @@ ...@@ -16,7 +16,6 @@
*/ */
package org.apache.rocketmq.broker.client; package org.apache.rocketmq.broker.client;
import io.netty.channel.Channel;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
...@@ -26,11 +25,11 @@ import java.util.concurrent.ConcurrentHashMap; ...@@ -26,11 +25,11 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType; import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData; import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.RemotingChannel; import org.apache.rocketmq.remoting.RemotingChannel;
public class ConsumerGroupInfo { public class ConsumerGroupInfo {
...@@ -99,13 +98,13 @@ public class ConsumerGroupInfo { ...@@ -99,13 +98,13 @@ public class ConsumerGroupInfo {
} }
public void unregisterChannel(final ClientChannelInfo clientChannelInfo) { public void unregisterChannel(final ClientChannelInfo clientChannelInfo) {
ClientChannelInfo old = this.channelInfoTable.remove(clientChannelInfo.getChannel()); ClientChannelInfo old = this.channelInfoTable.remove(clientChannelInfo.getRemotingChannel());
if (old != null) { if (old != null) {
log.info("unregister a consumer[{}] from consumerGroupInfo {}", this.groupName, old.toString()); log.info("unregister a consumer[{}] from consumerGroupInfo {}", this.groupName, old.toString());
} }
} }
public boolean doChannelCloseEvent(final String remoteAddr, final Channel channel) { public boolean doChannelCloseEvent(final String remoteAddr, final RemotingChannel channel) {
final ClientChannelInfo info = this.channelInfoTable.remove(channel); final ClientChannelInfo info = this.channelInfoTable.remove(channel);
if (info != null) { if (info != null) {
log.warn( log.warn(
...@@ -124,9 +123,9 @@ public class ConsumerGroupInfo { ...@@ -124,9 +123,9 @@ public class ConsumerGroupInfo {
this.messageModel = messageModel; this.messageModel = messageModel;
this.consumeFromWhere = consumeFromWhere; this.consumeFromWhere = consumeFromWhere;
ClientChannelInfo infoOld = this.channelInfoTable.get(infoNew.getChannel()); ClientChannelInfo infoOld = this.channelInfoTable.get(infoNew.getRemotingChannel());
if (null == infoOld) { if (null == infoOld) {
ClientChannelInfo prev = this.channelInfoTable.put(infoNew.getChannel(), infoNew); ClientChannelInfo prev = this.channelInfoTable.put(infoNew.getRemotingChannel(), infoNew);
if (null == prev) { if (null == prev) {
log.info("new consumer connected, group: {} {} {} channel: {}", this.groupName, consumeType, log.info("new consumer connected, group: {} {} {} channel: {}", this.groupName, consumeType,
messageModel, infoNew.toString()); messageModel, infoNew.toString());
...@@ -140,7 +139,7 @@ public class ConsumerGroupInfo { ...@@ -140,7 +139,7 @@ public class ConsumerGroupInfo {
this.groupName, this.groupName,
infoOld.toString(), infoOld.toString(),
infoNew.toString()); infoNew.toString());
this.channelInfoTable.put(infoNew.getChannel(), infoNew); this.channelInfoTable.put(infoNew.getRemotingChannel(), infoNew);
} }
} }
......
...@@ -16,7 +16,6 @@ ...@@ -16,7 +16,6 @@
*/ */
package org.apache.rocketmq.broker.client; package org.apache.rocketmq.broker.client;
import io.netty.channel.Channel;
import java.util.HashSet; import java.util.HashSet;
import java.util.Iterator; import java.util.Iterator;
import java.util.Map.Entry; import java.util.Map.Entry;
...@@ -25,14 +24,13 @@ import java.util.concurrent.ConcurrentHashMap; ...@@ -25,14 +24,13 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType; import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData; import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.RemotingChannel; import org.apache.rocketmq.remoting.RemotingChannel;
import org.apache.rocketmq.remoting.common.RemotingHelper; import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.common.RemotingUtil;
public class ConsumerManager { public class ConsumerManager {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
...@@ -75,7 +73,7 @@ public class ConsumerManager { ...@@ -75,7 +73,7 @@ public class ConsumerManager {
return 0; return 0;
} }
public void doChannelCloseEvent(final String remoteAddr, final Channel channel) { public void doChannelCloseEvent(final String remoteAddr, final RemotingChannel channel) {
Iterator<Entry<String, ConsumerGroupInfo>> it = this.consumerTable.entrySet().iterator(); Iterator<Entry<String, ConsumerGroupInfo>> it = this.consumerTable.entrySet().iterator();
while (it.hasNext()) { while (it.hasNext()) {
Entry<String, ConsumerGroupInfo> next = it.next(); Entry<String, ConsumerGroupInfo> next = it.next();
...@@ -159,9 +157,9 @@ public class ConsumerManager { ...@@ -159,9 +157,9 @@ public class ConsumerManager {
if (diff > CHANNEL_EXPIRED_TIMEOUT) { if (diff > CHANNEL_EXPIRED_TIMEOUT) {
log.warn( log.warn(
"SCAN: remove expired channel from ConsumerManager consumerTable. channel={}, consumerGroup={}", "SCAN: remove expired channel from ConsumerManager consumerTable. channel={}, consumerGroup={}",
RemotingHelper.parseChannelRemoteAddr(clientChannelInfo.getChannel().remoteAddress()), group); RemotingHelper.parseChannelRemoteAddr(clientChannelInfo.getRemotingChannel().remoteAddress()), group);
clientChannelInfo.getChannel().close(); clientChannelInfo.getRemotingChannel().close();
itChannel.remove(); itChannel.remove();
} }
} }
......
...@@ -16,11 +16,8 @@ ...@@ -16,11 +16,8 @@
*/ */
package org.apache.rocketmq.broker.client; package org.apache.rocketmq.broker.client;
import io.netty.channel.Channel;
import java.util.Collection; import java.util.Collection;
import java.util.List; import java.util.List;
import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData; import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
import org.apache.rocketmq.remoting.RemotingChannel; import org.apache.rocketmq.remoting.RemotingChannel;
......
...@@ -16,8 +16,6 @@ ...@@ -16,8 +16,6 @@
*/ */
package org.apache.rocketmq.broker.client; package org.apache.rocketmq.broker.client;
import io.netty.channel.Channel;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.Iterator; import java.util.Iterator;
...@@ -27,14 +25,12 @@ import java.util.Map.Entry; ...@@ -27,14 +25,12 @@ import java.util.Map.Entry;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantLock;
import org.apache.rocketmq.broker.util.PositiveAtomicCounter; import org.apache.rocketmq.broker.util.PositiveAtomicCounter;
import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.RemotingChannel; import org.apache.rocketmq.remoting.RemotingChannel;
import org.apache.rocketmq.remoting.common.RemotingHelper; import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.common.RemotingUtil;
public class ProducerManager { public class ProducerManager {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
...@@ -86,8 +82,8 @@ public class ProducerManager { ...@@ -86,8 +82,8 @@ public class ProducerManager {
it.remove(); it.remove();
log.warn( log.warn(
"SCAN: remove expired channel[{}] from ProducerManager groupChannelTable, producer group name: {}", "SCAN: remove expired channel[{}] from ProducerManager groupChannelTable, producer group name: {}",
RemotingHelper.parseChannelRemoteAddr(info.getChannel().remoteAddress()), group); RemotingHelper.parseChannelRemoteAddr(info.getRemotingChannel().remoteAddress()), group);
info.getChannel().close(); info.getRemotingChannel().close();
} }
} }
} }
...@@ -102,7 +98,7 @@ public class ProducerManager { ...@@ -102,7 +98,7 @@ public class ProducerManager {
} }
} }
public void doChannelCloseEvent(final String remoteAddr, final Channel channel) { public void doChannelCloseEvent(final String remoteAddr, final RemotingChannel channel) {
if (channel != null) { if (channel != null) {
try { try {
if (this.groupChannelLock.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) { if (this.groupChannelLock.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
...@@ -110,16 +106,12 @@ public class ProducerManager { ...@@ -110,16 +106,12 @@ public class ProducerManager {
for (final Map.Entry<String, HashMap<RemotingChannel, ClientChannelInfo>> entry : this.groupChannelTable for (final Map.Entry<String, HashMap<RemotingChannel, ClientChannelInfo>> entry : this.groupChannelTable
.entrySet()) { .entrySet()) {
final String group = entry.getKey(); final String group = entry.getKey();
final HashMap<RemotingChannel, ClientChannelInfo> clientChannelInfoTable = final HashMap<RemotingChannel, ClientChannelInfo> clientChannelInfoTable = entry.getValue();
entry.getValue(); final ClientChannelInfo clientChannelInfo = clientChannelInfoTable.remove(channel);
final ClientChannelInfo clientChannelInfo =
clientChannelInfoTable.remove(channel);
if (clientChannelInfo != null) { if (clientChannelInfo != null) {
log.info( log.info("NETTY EVENT: remove channel[{}][{}] from ProducerManager groupChannelTable, producer group: {}",
"NETTY EVENT: remove channel[{}][{}] from ProducerManager groupChannelTable, producer group: {}",
clientChannelInfo.toString(), remoteAddr, group); clientChannelInfo.toString(), remoteAddr, group);
} }
} }
} finally { } finally {
this.groupChannelLock.unlock(); this.groupChannelLock.unlock();
...@@ -145,9 +137,9 @@ public class ProducerManager { ...@@ -145,9 +137,9 @@ public class ProducerManager {
this.groupChannelTable.put(group, channelTable); this.groupChannelTable.put(group, channelTable);
} }
clientChannelInfoFound = channelTable.get(clientChannelInfo.getChannel()); clientChannelInfoFound = channelTable.get(clientChannelInfo.getRemotingChannel());
if (null == clientChannelInfoFound) { if (null == clientChannelInfoFound) {
channelTable.put(clientChannelInfo.getChannel(), clientChannelInfo); channelTable.put(clientChannelInfo.getRemotingChannel(), clientChannelInfo);
log.info("new producer connected, group: {} channel: {}", group, log.info("new producer connected, group: {} channel: {}", group,
clientChannelInfo.toString()); clientChannelInfo.toString());
} }
...@@ -172,7 +164,7 @@ public class ProducerManager { ...@@ -172,7 +164,7 @@ public class ProducerManager {
try { try {
HashMap<RemotingChannel, ClientChannelInfo> channelTable = this.groupChannelTable.get(group); HashMap<RemotingChannel, ClientChannelInfo> channelTable = this.groupChannelTable.get(group);
if (null != channelTable && !channelTable.isEmpty()) { if (null != channelTable && !channelTable.isEmpty()) {
ClientChannelInfo old = channelTable.remove(clientChannelInfo.getChannel()); ClientChannelInfo old = channelTable.remove(clientChannelInfo.getRemotingChannel());
if (old != null) { if (old != null) {
log.info("unregister a producer[{}] from groupChannelTable {}", group, log.info("unregister a producer[{}] from groupChannelTable {}", group,
clientChannelInfo.toString()); clientChannelInfo.toString());
......
...@@ -16,7 +16,12 @@ ...@@ -16,7 +16,12 @@
*/ */
package org.apache.rocketmq.broker.client.net; package org.apache.rocketmq.broker.client.net;
import io.netty.channel.Channel; import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentMap;
import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.client.ClientChannelInfo; import org.apache.rocketmq.broker.client.ClientChannelInfo;
import org.apache.rocketmq.broker.client.ConsumerGroupInfo; import org.apache.rocketmq.broker.client.ConsumerGroupInfo;
...@@ -45,13 +50,6 @@ import org.apache.rocketmq.remoting.exception.RemotingSendRequestException; ...@@ -45,13 +50,6 @@ import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentMap;
public class Broker2Client { public class Broker2Client {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
private final BrokerController brokerController; private final BrokerController brokerController;
......
...@@ -47,6 +47,7 @@ import org.apache.rocketmq.common.protocol.header.namesrv.RegisterBrokerResponse ...@@ -47,6 +47,7 @@ import org.apache.rocketmq.common.protocol.header.namesrv.RegisterBrokerResponse
import org.apache.rocketmq.common.protocol.header.namesrv.UnRegisterBrokerRequestHeader; import org.apache.rocketmq.common.protocol.header.namesrv.UnRegisterBrokerRequestHeader;
import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.ClientConfig;
import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.RemotingClient; import org.apache.rocketmq.remoting.RemotingClient;
import org.apache.rocketmq.remoting.RemotingClientFactory; import org.apache.rocketmq.remoting.RemotingClientFactory;
...@@ -55,7 +56,6 @@ import org.apache.rocketmq.remoting.exception.RemotingConnectException; ...@@ -55,7 +56,6 @@ import org.apache.rocketmq.remoting.exception.RemotingConnectException;
import org.apache.rocketmq.remoting.exception.RemotingSendRequestException; import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException; import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException;
import org.apache.rocketmq.remoting.ClientConfig;
import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.protocol.RemotingCommand;
public class BrokerOuterAPI { public class BrokerOuterAPI {
...@@ -193,9 +193,6 @@ public class BrokerOuterAPI { ...@@ -193,9 +193,6 @@ public class BrokerOuterAPI {
RemotingCommand response = this.remotingClient.invokeSync(namesrvAddr, request, timeoutMills); RemotingCommand response = this.remotingClient.invokeSync(namesrvAddr, request, timeoutMills);
assert response != null; assert response != null;
if (response == null){
System.out.println("ssssssssssssss");
}
switch (response.getCode()) { switch (response.getCode()) {
case ResponseCode.SUCCESS: { case ResponseCode.SUCCESS: {
RegisterBrokerResponseHeader responseHeader = RegisterBrokerResponseHeader responseHeader =
......
...@@ -603,7 +603,7 @@ public class AdminBrokerProcessor implements RequestProcessor { ...@@ -603,7 +603,7 @@ public class AdminBrokerProcessor implements RequestProcessor {
connection.setClientId(info.getClientId()); connection.setClientId(info.getClientId());
connection.setLanguage(info.getLanguage()); connection.setLanguage(info.getLanguage());
connection.setVersion(info.getVersion()); connection.setVersion(info.getVersion());
connection.setClientAddr(RemotingHelper.parseChannelRemoteAddr(info.getChannel().remoteAddress())); connection.setClientAddr(RemotingHelper.parseChannelRemoteAddr(info.getRemotingChannel().remoteAddress()));
bodydata.getConnectionSet().add(connection); bodydata.getConnectionSet().add(connection);
} }
...@@ -638,7 +638,7 @@ public class AdminBrokerProcessor implements RequestProcessor { ...@@ -638,7 +638,7 @@ public class AdminBrokerProcessor implements RequestProcessor {
connection.setClientId(info.getClientId()); connection.setClientId(info.getClientId());
connection.setLanguage(info.getLanguage()); connection.setLanguage(info.getLanguage());
connection.setVersion(info.getVersion()); connection.setVersion(info.getVersion());
connection.setClientAddr(RemotingHelper.parseChannelRemoteAddr(info.getChannel().remoteAddress())); connection.setClientAddr(RemotingHelper.parseChannelRemoteAddr(info.getRemotingChannel().remoteAddress()));
bodydata.getConnectionSet().add(connection); bodydata.getConnectionSet().add(connection);
} }
...@@ -1278,7 +1278,7 @@ public class AdminBrokerProcessor implements RequestProcessor { ...@@ -1278,7 +1278,7 @@ public class AdminBrokerProcessor implements RequestProcessor {
newRequest.setExtFields(request.getExtFields()); newRequest.setExtFields(request.getExtFields());
newRequest.setBody(request.getBody()); newRequest.setBody(request.getBody());
return this.brokerController.getBroker2Client().callClient(clientChannelInfo.getChannel(), newRequest); return this.brokerController.getBroker2Client().callClient(clientChannelInfo.getRemotingChannel(), newRequest);
} catch (RemotingTimeoutException e) { } catch (RemotingTimeoutException e) {
response.setCode(ResponseCode.CONSUME_MSG_TIMEOUT); response.setCode(ResponseCode.CONSUME_MSG_TIMEOUT);
response response
......
...@@ -16,7 +16,11 @@ ...@@ -16,7 +16,11 @@
*/ */
package org.apache.rocketmq.broker.transaction; package org.apache.rocketmq.broker.transaction;
import io.netty.channel.Channel; import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.message.MessageConst; import org.apache.rocketmq.common.message.MessageConst;
...@@ -24,12 +28,6 @@ import org.apache.rocketmq.common.message.MessageExt; ...@@ -24,12 +28,6 @@ import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.header.CheckTransactionStateRequestHeader; import org.apache.rocketmq.common.protocol.header.CheckTransactionStateRequestHeader;
import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.logging.InternalLoggerFactory;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.remoting.RemotingChannel; import org.apache.rocketmq.remoting.RemotingChannel;
public abstract class AbstractTransactionalMessageCheckListener { public abstract class AbstractTransactionalMessageCheckListener {
......
...@@ -21,6 +21,7 @@ import io.netty.channel.ChannelFuture; ...@@ -21,6 +21,7 @@ import io.netty.channel.ChannelFuture;
import java.lang.reflect.Field; import java.lang.reflect.Field;
import java.util.HashMap; import java.util.HashMap;
import org.apache.rocketmq.remoting.RemotingChannel; import org.apache.rocketmq.remoting.RemotingChannel;
import org.apache.rocketmq.remoting.netty.NettyChannelImpl;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
...@@ -37,35 +38,38 @@ public class ProducerManagerTest { ...@@ -37,35 +38,38 @@ public class ProducerManagerTest {
private String group = "FooBar"; private String group = "FooBar";
private ClientChannelInfo clientInfo; private ClientChannelInfo clientInfo;
private RemotingChannel remotingChannel;
@Mock @Mock
private RemotingChannel channel; private Channel channel;
@Before @Before
public void init() { public void init() {
producerManager = new ProducerManager(); producerManager = new ProducerManager();
clientInfo = new ClientChannelInfo(channel); remotingChannel = new NettyChannelImpl(channel);
clientInfo = new ClientChannelInfo(remotingChannel);
} }
@Test @Test
public void scanNotActiveChannel() throws Exception { public void scanNotActiveChannel() throws Exception {
producerManager.registerProducer(group, clientInfo); producerManager.registerProducer(group, clientInfo);
assertThat(producerManager.getGroupChannelTable().get(group).get(channel)).isNotNull(); assertThat(producerManager.getGroupChannelTable().get(group).get(remotingChannel)).isNotNull();
Field field = ProducerManager.class.getDeclaredField("CHANNEL_EXPIRED_TIMEOUT"); Field field = ProducerManager.class.getDeclaredField("CHANNEL_EXPIRED_TIMEOUT");
field.setAccessible(true); field.setAccessible(true);
long CHANNEL_EXPIRED_TIMEOUT = field.getLong(producerManager); long CHANNEL_EXPIRED_TIMEOUT = field.getLong(producerManager);
clientInfo.setLastUpdateTimestamp(System.currentTimeMillis() - CHANNEL_EXPIRED_TIMEOUT - 10); clientInfo.setLastUpdateTimestamp(System.currentTimeMillis() - CHANNEL_EXPIRED_TIMEOUT - 10);
// when(channel.close()).thenReturn(mock(ChannelFuture.class)); when(channel.close()).thenReturn(mock(ChannelFuture.class));
producerManager.scanNotActiveChannel(); producerManager.scanNotActiveChannel();
assertThat(producerManager.getGroupChannelTable().get(group).get(channel)).isNull(); assertThat(producerManager.getGroupChannelTable().get(group).get(remotingChannel)).isNull();
} }
@Test @Test
public void doChannelCloseEvent() throws Exception { public void doChannelCloseEvent() throws Exception {
producerManager.registerProducer(group, clientInfo); producerManager.registerProducer(group, clientInfo);
assertThat(producerManager.getGroupChannelTable().get(group).get(channel)).isNotNull(); assertThat(producerManager.getGroupChannelTable().get(group).get(remotingChannel)).isNotNull();
// producerManager.doChannelCloseEvent("127.0.0.1", channel); producerManager.doChannelCloseEvent("127.0.0.1", remotingChannel);
assertThat(producerManager.getGroupChannelTable().get(group).get(channel)).isNull(); assertThat(producerManager.getGroupChannelTable().get(group).get(remotingChannel)).isNull();
} }
@Test @Test
...@@ -73,7 +77,7 @@ public class ProducerManagerTest { ...@@ -73,7 +77,7 @@ public class ProducerManagerTest {
producerManager.registerProducer(group, clientInfo); producerManager.registerProducer(group, clientInfo);
HashMap<RemotingChannel, ClientChannelInfo> channelMap = producerManager.getGroupChannelTable().get(group); HashMap<RemotingChannel, ClientChannelInfo> channelMap = producerManager.getGroupChannelTable().get(group);
assertThat(channelMap).isNotNull(); assertThat(channelMap).isNotNull();
assertThat(channelMap.get(channel)).isEqualTo(clientInfo); assertThat(channelMap.get(remotingChannel)).isEqualTo(clientInfo);
} }
@Test @Test
...@@ -81,7 +85,7 @@ public class ProducerManagerTest { ...@@ -81,7 +85,7 @@ public class ProducerManagerTest {
producerManager.registerProducer(group, clientInfo); producerManager.registerProducer(group, clientInfo);
HashMap<RemotingChannel, ClientChannelInfo> channelMap = producerManager.getGroupChannelTable().get(group); HashMap<RemotingChannel, ClientChannelInfo> channelMap = producerManager.getGroupChannelTable().get(group);
assertThat(channelMap).isNotNull(); assertThat(channelMap).isNotNull();
assertThat(channelMap.get(channel)).isEqualTo(clientInfo); assertThat(channelMap.get(remotingChannel)).isEqualTo(clientInfo);
producerManager.unregisterProducer(group, clientInfo); producerManager.unregisterProducer(group, clientInfo);
channelMap = producerManager.getGroupChannelTable().get(group); channelMap = producerManager.getGroupChannelTable().get(group);
......
...@@ -16,6 +16,7 @@ ...@@ -16,6 +16,7 @@
*/ */
package org.apache.rocketmq.broker.processor; package org.apache.rocketmq.broker.processor;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import java.util.HashMap; import java.util.HashMap;
import java.util.UUID; import java.util.UUID;
...@@ -27,14 +28,15 @@ import org.apache.rocketmq.common.protocol.RequestCode; ...@@ -27,14 +28,15 @@ import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.common.protocol.ResponseCode; import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.header.UnregisterClientRequestHeader; import org.apache.rocketmq.common.protocol.header.UnregisterClientRequestHeader;
import org.apache.rocketmq.common.protocol.heartbeat.ConsumerData; import org.apache.rocketmq.common.protocol.heartbeat.ConsumerData;
import org.apache.rocketmq.remoting.ClientConfig;
import org.apache.rocketmq.remoting.RemotingChannel; import org.apache.rocketmq.remoting.RemotingChannel;
import org.apache.rocketmq.remoting.ServerConfig;
import org.apache.rocketmq.remoting.exception.RemotingCommandException; import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.netty.CodecHelper;
import org.apache.rocketmq.remoting.netty.NettyChannelHandlerContextImpl; import org.apache.rocketmq.remoting.netty.NettyChannelHandlerContextImpl;
import org.apache.rocketmq.remoting.netty.NettyChannelImpl; import org.apache.rocketmq.remoting.netty.NettyChannelImpl;
import org.apache.rocketmq.remoting.ClientConfig;
import org.apache.rocketmq.remoting.ServerConfig;
import org.apache.rocketmq.remoting.serialize.LanguageCode;
import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.serialize.LanguageCode;
import org.apache.rocketmq.store.config.MessageStoreConfig; import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
...@@ -45,6 +47,7 @@ import org.mockito.junit.MockitoJUnitRunner; ...@@ -45,6 +47,7 @@ import org.mockito.junit.MockitoJUnitRunner;
import static org.apache.rocketmq.broker.processor.PullMessageProcessorTest.createConsumerData; import static org.apache.rocketmq.broker.processor.PullMessageProcessorTest.createConsumerData;
import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class) @RunWith(MockitoJUnitRunner.class)
public class ClientManageProcessorTest { public class ClientManageProcessorTest {
...@@ -52,9 +55,15 @@ public class ClientManageProcessorTest { ...@@ -52,9 +55,15 @@ public class ClientManageProcessorTest {
@Spy @Spy
private BrokerController brokerController = new BrokerController(new BrokerConfig(), new ServerConfig(), new ClientConfig(), new MessageStoreConfig()); private BrokerController brokerController = new BrokerController(new BrokerConfig(), new ServerConfig(), new ClientConfig(), new MessageStoreConfig());
@Mock @Mock
private ChannelHandlerContext handlerContext; private NettyChannelHandlerContextImpl handlerContext;
@Mock @Mock
private NettyChannelImpl channel; private ChannelHandlerContext channelHandlerContext;
private RemotingChannel remotingChannel;
@Mock
private Channel channel;
private ClientChannelInfo clientChannelInfo; private ClientChannelInfo clientChannelInfo;
private String clientId = UUID.randomUUID().toString(); private String clientId = UUID.randomUUID().toString();
...@@ -63,11 +72,12 @@ public class ClientManageProcessorTest { ...@@ -63,11 +72,12 @@ public class ClientManageProcessorTest {
@Before @Before
public void init() { public void init() {
// when(handlerContext.channel()).thenReturn(channel); when(handlerContext.getChannelHandlerContext()).thenReturn(channelHandlerContext);
when(channelHandlerContext.channel()).thenReturn(channel);
clientManageProcessor = new ClientManageProcessor(brokerController); clientManageProcessor = new ClientManageProcessor(brokerController);
clientChannelInfo = new ClientChannelInfo(channel, clientId, LanguageCode.JAVA, 100); remotingChannel = new NettyChannelImpl(channel);
clientChannelInfo = new ClientChannelInfo(remotingChannel, clientId, LanguageCode.JAVA, 100);
brokerController.getProducerManager().registerProducer(group, clientChannelInfo); brokerController.getProducerManager().registerProducer(group, clientChannelInfo);
ConsumerData consumerData = createConsumerData(group, topic); ConsumerData consumerData = createConsumerData(group, topic);
brokerController.getConsumerManager().registerConsumer( brokerController.getConsumerManager().registerConsumer(
consumerData.getGroupName(), consumerData.getGroupName(),
...@@ -84,11 +94,10 @@ public class ClientManageProcessorTest { ...@@ -84,11 +94,10 @@ public class ClientManageProcessorTest {
brokerController.getProducerManager().registerProducer(group, clientChannelInfo); brokerController.getProducerManager().registerProducer(group, clientChannelInfo);
HashMap<RemotingChannel, ClientChannelInfo> channelMap = brokerController.getProducerManager().getGroupChannelTable().get(group); HashMap<RemotingChannel, ClientChannelInfo> channelMap = brokerController.getProducerManager().getGroupChannelTable().get(group);
assertThat(channelMap).isNotNull(); assertThat(channelMap).isNotNull();
assertThat(channelMap.get(channel)).isEqualTo(clientChannelInfo); assertThat(channelMap.get(remotingChannel)).isEqualTo(clientChannelInfo);
RemotingCommand request = createUnRegisterProducerCommand(); RemotingCommand request = createUnRegisterProducerCommand();
NettyChannelHandlerContextImpl nettyChannelHandlerContext = new NettyChannelHandlerContextImpl(handlerContext); RemotingCommand response = clientManageProcessor.processRequest(handlerContext, request);
RemotingCommand response = clientManageProcessor.processRequest(nettyChannelHandlerContext, request);
assertThat(response).isNotNull(); assertThat(response).isNotNull();
assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS); assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
...@@ -102,9 +111,8 @@ public class ClientManageProcessorTest { ...@@ -102,9 +111,8 @@ public class ClientManageProcessorTest {
assertThat(consumerGroupInfo).isNotNull(); assertThat(consumerGroupInfo).isNotNull();
RemotingCommand request = createUnRegisterConsumerCommand(); RemotingCommand request = createUnRegisterConsumerCommand();
RemotingCommand response = clientManageProcessor.processRequest(handlerContext, request);
NettyChannelHandlerContextImpl nettyChannelHandlerContext = new NettyChannelHandlerContextImpl(handlerContext); assertThat(response).isNotNull();
RemotingCommand response = clientManageProcessor.processRequest(nettyChannelHandlerContext, request); assertThat(response).isNotNull();
assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS); assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
consumerGroupInfo = brokerController.getConsumerManager().getConsumerGroupInfo(group); consumerGroupInfo = brokerController.getConsumerManager().getConsumerGroupInfo(group);
...@@ -118,7 +126,7 @@ public class ClientManageProcessorTest { ...@@ -118,7 +126,7 @@ public class ClientManageProcessorTest {
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UNREGISTER_CLIENT, requestHeader); RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UNREGISTER_CLIENT, requestHeader);
request.setLanguage(LanguageCode.JAVA); request.setLanguage(LanguageCode.JAVA);
request.setVersion(100); request.setVersion(100);
// request.makeCustomHeaderToNet(); CodecHelper.makeCustomHeaderToNet(request);
return request; return request;
} }
...@@ -129,7 +137,7 @@ public class ClientManageProcessorTest { ...@@ -129,7 +137,7 @@ public class ClientManageProcessorTest {
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UNREGISTER_CLIENT, requestHeader); RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UNREGISTER_CLIENT, requestHeader);
request.setLanguage(LanguageCode.JAVA); request.setLanguage(LanguageCode.JAVA);
request.setVersion(100); request.setVersion(100);
// request.makeCustomHeaderToNet(); CodecHelper.makeCustomHeaderToNet(request);
return request; return request;
} }
} }
\ No newline at end of file
...@@ -16,6 +16,7 @@ ...@@ -16,6 +16,7 @@
*/ */
package org.apache.rocketmq.broker.processor; package org.apache.rocketmq.broker.processor;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.transaction.OperationResult; import org.apache.rocketmq.broker.transaction.OperationResult;
...@@ -28,10 +29,11 @@ import org.apache.rocketmq.common.protocol.RequestCode; ...@@ -28,10 +29,11 @@ import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.common.protocol.ResponseCode; import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.header.EndTransactionRequestHeader; import org.apache.rocketmq.common.protocol.header.EndTransactionRequestHeader;
import org.apache.rocketmq.common.sysflag.MessageSysFlag; import org.apache.rocketmq.common.sysflag.MessageSysFlag;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.netty.NettyChannelHandlerContextImpl;
import org.apache.rocketmq.remoting.ClientConfig; import org.apache.rocketmq.remoting.ClientConfig;
import org.apache.rocketmq.remoting.ServerConfig; import org.apache.rocketmq.remoting.ServerConfig;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.netty.CodecHelper;
import org.apache.rocketmq.remoting.netty.NettyChannelHandlerContextImpl;
import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.store.AppendMessageResult; import org.apache.rocketmq.store.AppendMessageResult;
import org.apache.rocketmq.store.AppendMessageStatus; import org.apache.rocketmq.store.AppendMessageStatus;
...@@ -49,6 +51,7 @@ import org.mockito.junit.MockitoJUnitRunner; ...@@ -49,6 +51,7 @@ import org.mockito.junit.MockitoJUnitRunner;
import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class) @RunWith(MockitoJUnitRunner.class)
...@@ -57,7 +60,10 @@ public class EndTransactionProcessorTest { ...@@ -57,7 +60,10 @@ public class EndTransactionProcessorTest {
private EndTransactionProcessor endTransactionProcessor; private EndTransactionProcessor endTransactionProcessor;
@Mock @Mock
private ChannelHandlerContext handlerContext; private NettyChannelHandlerContextImpl handlerContext;
@Mock
private ChannelHandlerContext channelHandlerContext;
@Spy @Spy
private BrokerController private BrokerController
...@@ -74,10 +80,13 @@ public class EndTransactionProcessorTest { ...@@ -74,10 +80,13 @@ public class EndTransactionProcessorTest {
public void init() { public void init() {
brokerController.setMessageStore(messageStore); brokerController.setMessageStore(messageStore);
brokerController.setTransactionalMessageService(transactionMsgService); brokerController.setTransactionalMessageService(transactionMsgService);
Channel mockChannel = mock(Channel.class);
when(handlerContext.getChannelHandlerContext()).thenReturn(channelHandlerContext);
when(channelHandlerContext.channel()).thenReturn(mockChannel);
endTransactionProcessor = new EndTransactionProcessor(brokerController); endTransactionProcessor = new EndTransactionProcessor(brokerController);
} }
private OperationResult createResponse(int status) { private OperationResult createResponse(int status){
OperationResult response = new OperationResult(); OperationResult response = new OperationResult();
response.setPrepareMessage(createDefaultMessageExt()); response.setPrepareMessage(createDefaultMessageExt());
response.setResponseCode(status); response.setResponseCode(status);
...@@ -91,9 +100,7 @@ public class EndTransactionProcessorTest { ...@@ -91,9 +100,7 @@ public class EndTransactionProcessorTest {
when(messageStore.putMessage(any(MessageExtBrokerInner.class))).thenReturn(new PutMessageResult when(messageStore.putMessage(any(MessageExtBrokerInner.class))).thenReturn(new PutMessageResult
(PutMessageStatus.PUT_OK, new AppendMessageResult(AppendMessageStatus.PUT_OK))); (PutMessageStatus.PUT_OK, new AppendMessageResult(AppendMessageStatus.PUT_OK)));
RemotingCommand request = createEndTransactionMsgCommand(MessageSysFlag.TRANSACTION_COMMIT_TYPE, false); RemotingCommand request = createEndTransactionMsgCommand(MessageSysFlag.TRANSACTION_COMMIT_TYPE, false);
RemotingCommand response = endTransactionProcessor.processRequest(handlerContext, request);
NettyChannelHandlerContextImpl nettyChannelHandlerContext = new NettyChannelHandlerContextImpl(handlerContext);
RemotingCommand response = endTransactionProcessor.processRequest(nettyChannelHandlerContext, request);
assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS); assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
} }
...@@ -103,16 +110,14 @@ public class EndTransactionProcessorTest { ...@@ -103,16 +110,14 @@ public class EndTransactionProcessorTest {
when(messageStore.putMessage(any(MessageExtBrokerInner.class))).thenReturn(new PutMessageResult when(messageStore.putMessage(any(MessageExtBrokerInner.class))).thenReturn(new PutMessageResult
(PutMessageStatus.PUT_OK, new AppendMessageResult(AppendMessageStatus.PUT_OK))); (PutMessageStatus.PUT_OK, new AppendMessageResult(AppendMessageStatus.PUT_OK)));
RemotingCommand request = createEndTransactionMsgCommand(MessageSysFlag.TRANSACTION_COMMIT_TYPE, true); RemotingCommand request = createEndTransactionMsgCommand(MessageSysFlag.TRANSACTION_COMMIT_TYPE, true);
NettyChannelHandlerContextImpl nettyChannelHandlerContext = new NettyChannelHandlerContextImpl(handlerContext); RemotingCommand response = endTransactionProcessor.processRequest(handlerContext, request);
RemotingCommand response = endTransactionProcessor.processRequest(nettyChannelHandlerContext, request);
assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS); assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
} }
@Test @Test
public void testProcessRequest_NotType() throws RemotingCommandException { public void testProcessRequest_NotType() throws RemotingCommandException {
RemotingCommand request = createEndTransactionMsgCommand(MessageSysFlag.TRANSACTION_NOT_TYPE, true); RemotingCommand request = createEndTransactionMsgCommand(MessageSysFlag.TRANSACTION_NOT_TYPE, true);
NettyChannelHandlerContextImpl nettyChannelHandlerContext = new NettyChannelHandlerContextImpl(handlerContext); RemotingCommand response = endTransactionProcessor.processRequest(handlerContext, request);
RemotingCommand response = endTransactionProcessor.processRequest(nettyChannelHandlerContext, request);
assertThat(response).isNull(); assertThat(response).isNull();
} }
...@@ -120,8 +125,7 @@ public class EndTransactionProcessorTest { ...@@ -120,8 +125,7 @@ public class EndTransactionProcessorTest {
public void testProcessRequest_RollBack() throws RemotingCommandException { public void testProcessRequest_RollBack() throws RemotingCommandException {
when(transactionMsgService.rollbackMessage(any(EndTransactionRequestHeader.class))).thenReturn(createResponse(ResponseCode.SUCCESS)); when(transactionMsgService.rollbackMessage(any(EndTransactionRequestHeader.class))).thenReturn(createResponse(ResponseCode.SUCCESS));
RemotingCommand request = createEndTransactionMsgCommand(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE, true); RemotingCommand request = createEndTransactionMsgCommand(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE, true);
NettyChannelHandlerContextImpl nettyChannelHandlerContext = new NettyChannelHandlerContextImpl(handlerContext); RemotingCommand response = endTransactionProcessor.processRequest(handlerContext, request);
RemotingCommand response = endTransactionProcessor.processRequest(nettyChannelHandlerContext, request);
assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS); assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
} }
...@@ -152,7 +156,7 @@ public class EndTransactionProcessorTest { ...@@ -152,7 +156,7 @@ public class EndTransactionProcessorTest {
private RemotingCommand createEndTransactionMsgCommand(int status, boolean isCheckMsg) { private RemotingCommand createEndTransactionMsgCommand(int status, boolean isCheckMsg) {
EndTransactionRequestHeader header = createEndTransactionRequestHeader(status, isCheckMsg); EndTransactionRequestHeader header = createEndTransactionRequestHeader(status, isCheckMsg);
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.END_TRANSACTION, header); RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.END_TRANSACTION, header);
// request.makeCustomHeaderToNet(); CodecHelper.makeCustomHeaderToNet(request);
return request; return request;
} }
} }
...@@ -38,11 +38,12 @@ import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType; ...@@ -38,11 +38,12 @@ import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType;
import org.apache.rocketmq.common.protocol.heartbeat.ConsumerData; import org.apache.rocketmq.common.protocol.heartbeat.ConsumerData;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData; import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
import org.apache.rocketmq.remoting.ClientConfig;
import org.apache.rocketmq.remoting.RemotingChannel; import org.apache.rocketmq.remoting.RemotingChannel;
import org.apache.rocketmq.remoting.ServerConfig;
import org.apache.rocketmq.remoting.exception.RemotingCommandException; import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.netty.CodecHelper;
import org.apache.rocketmq.remoting.netty.NettyChannelHandlerContextImpl; import org.apache.rocketmq.remoting.netty.NettyChannelHandlerContextImpl;
import org.apache.rocketmq.remoting.ClientConfig;
import org.apache.rocketmq.remoting.ServerConfig;
import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.store.GetMessageResult; import org.apache.rocketmq.store.GetMessageResult;
import org.apache.rocketmq.store.GetMessageStatus; import org.apache.rocketmq.store.GetMessageStatus;
...@@ -69,9 +70,11 @@ public class PullMessageProcessorTest { ...@@ -69,9 +70,11 @@ public class PullMessageProcessorTest {
@Spy @Spy
private BrokerController brokerController = new BrokerController(new BrokerConfig(), new ServerConfig(), new ClientConfig(), new MessageStoreConfig()); private BrokerController brokerController = new BrokerController(new BrokerConfig(), new ServerConfig(), new ClientConfig(), new MessageStoreConfig());
@Mock @Mock
private ChannelHandlerContext handlerContext; private NettyChannelHandlerContextImpl handlerContext;
@Mock @Mock
private Channel channel; private ChannelHandlerContext channelHandlerContext;
@Mock @Mock
private MessageStore messageStore; private MessageStore messageStore;
private ClientChannelInfo clientChannelInfo; private ClientChannelInfo clientChannelInfo;
...@@ -82,11 +85,13 @@ public class PullMessageProcessorTest { ...@@ -82,11 +85,13 @@ public class PullMessageProcessorTest {
public void init() { public void init() {
brokerController.setMessageStore(messageStore); brokerController.setMessageStore(messageStore);
pullMessageProcessor = new PullMessageProcessor(brokerController); pullMessageProcessor = new PullMessageProcessor(brokerController);
RemotingChannel mockChannel = mock(RemotingChannel.class); Channel mockChannel = mock(Channel.class);
RemotingChannel remotingChannel = mock(RemotingChannel.class);
when(mockChannel.remoteAddress()).thenReturn(new InetSocketAddress(1024)); when(mockChannel.remoteAddress()).thenReturn(new InetSocketAddress(1024));
// when(handlerContext.channel()).thenReturn(mockChannel); when(handlerContext.getChannelHandlerContext()).thenReturn(channelHandlerContext);
when(channelHandlerContext.channel()).thenReturn(mockChannel);
brokerController.getTopicConfigManager().getTopicConfigTable().put(topic, new TopicConfig()); brokerController.getTopicConfigManager().getTopicConfigTable().put(topic, new TopicConfig());
clientChannelInfo = new ClientChannelInfo(mockChannel); clientChannelInfo = new ClientChannelInfo(remotingChannel);
ConsumerData consumerData = createConsumerData(group, topic); ConsumerData consumerData = createConsumerData(group, topic);
brokerController.getConsumerManager().registerConsumer( brokerController.getConsumerManager().registerConsumer(
consumerData.getGroupName(), consumerData.getGroupName(),
...@@ -102,8 +107,7 @@ public class PullMessageProcessorTest { ...@@ -102,8 +107,7 @@ public class PullMessageProcessorTest {
public void testProcessRequest_TopicNotExist() throws RemotingCommandException { public void testProcessRequest_TopicNotExist() throws RemotingCommandException {
brokerController.getTopicConfigManager().getTopicConfigTable().remove(topic); brokerController.getTopicConfigManager().getTopicConfigTable().remove(topic);
final RemotingCommand request = createPullMsgCommand(RequestCode.PULL_MESSAGE); final RemotingCommand request = createPullMsgCommand(RequestCode.PULL_MESSAGE);
NettyChannelHandlerContextImpl nettyChannelHandlerContext = new NettyChannelHandlerContextImpl(handlerContext); RemotingCommand response = pullMessageProcessor.processRequest(handlerContext, request);
RemotingCommand response = pullMessageProcessor.processRequest(nettyChannelHandlerContext, request);
assertThat(response).isNotNull(); assertThat(response).isNotNull();
assertThat(response.getCode()).isEqualTo(ResponseCode.TOPIC_NOT_EXIST); assertThat(response.getCode()).isEqualTo(ResponseCode.TOPIC_NOT_EXIST);
assertThat(response.getRemark()).contains("topic[" + topic + "] not exist"); assertThat(response.getRemark()).contains("topic[" + topic + "] not exist");
...@@ -113,9 +117,7 @@ public class PullMessageProcessorTest { ...@@ -113,9 +117,7 @@ public class PullMessageProcessorTest {
public void testProcessRequest_SubNotExist() throws RemotingCommandException { public void testProcessRequest_SubNotExist() throws RemotingCommandException {
brokerController.getConsumerManager().unregisterConsumer(group, clientChannelInfo, false); brokerController.getConsumerManager().unregisterConsumer(group, clientChannelInfo, false);
final RemotingCommand request = createPullMsgCommand(RequestCode.PULL_MESSAGE); final RemotingCommand request = createPullMsgCommand(RequestCode.PULL_MESSAGE);
RemotingCommand response = pullMessageProcessor.processRequest(handlerContext, request);
NettyChannelHandlerContextImpl nettyChannelHandlerContext = new NettyChannelHandlerContextImpl(handlerContext);
RemotingCommand response = pullMessageProcessor.processRequest(nettyChannelHandlerContext, request);
assertThat(response).isNotNull(); assertThat(response).isNotNull();
assertThat(response.getCode()).isEqualTo(ResponseCode.SUBSCRIPTION_NOT_EXIST); assertThat(response.getCode()).isEqualTo(ResponseCode.SUBSCRIPTION_NOT_EXIST);
assertThat(response.getRemark()).contains("consumer's group info not exist"); assertThat(response.getRemark()).contains("consumer's group info not exist");
...@@ -125,8 +127,7 @@ public class PullMessageProcessorTest { ...@@ -125,8 +127,7 @@ public class PullMessageProcessorTest {
public void testProcessRequest_SubNotLatest() throws RemotingCommandException { public void testProcessRequest_SubNotLatest() throws RemotingCommandException {
final RemotingCommand request = createPullMsgCommand(RequestCode.PULL_MESSAGE); final RemotingCommand request = createPullMsgCommand(RequestCode.PULL_MESSAGE);
request.addExtField("subVersion", String.valueOf(101)); request.addExtField("subVersion", String.valueOf(101));
NettyChannelHandlerContextImpl nettyChannelHandlerContext = new NettyChannelHandlerContextImpl(handlerContext); RemotingCommand response = pullMessageProcessor.processRequest(handlerContext, request);
RemotingCommand response = pullMessageProcessor.processRequest(nettyChannelHandlerContext, request);
assertThat(response).isNotNull(); assertThat(response).isNotNull();
assertThat(response.getCode()).isEqualTo(ResponseCode.SUBSCRIPTION_NOT_LATEST); assertThat(response.getCode()).isEqualTo(ResponseCode.SUBSCRIPTION_NOT_LATEST);
assertThat(response.getRemark()).contains("subscription not latest"); assertThat(response.getRemark()).contains("subscription not latest");
...@@ -138,9 +139,7 @@ public class PullMessageProcessorTest { ...@@ -138,9 +139,7 @@ public class PullMessageProcessorTest {
when(messageStore.getMessage(anyString(), anyString(), anyInt(), anyLong(), anyInt(), any(ExpressionMessageFilter.class))).thenReturn(getMessageResult); when(messageStore.getMessage(anyString(), anyString(), anyInt(), anyLong(), anyInt(), any(ExpressionMessageFilter.class))).thenReturn(getMessageResult);
final RemotingCommand request = createPullMsgCommand(RequestCode.PULL_MESSAGE); final RemotingCommand request = createPullMsgCommand(RequestCode.PULL_MESSAGE);
RemotingCommand response = pullMessageProcessor.processRequest(handlerContext, request);
NettyChannelHandlerContextImpl nettyChannelHandlerContext = new NettyChannelHandlerContextImpl(handlerContext);
RemotingCommand response = pullMessageProcessor.processRequest(nettyChannelHandlerContext, request);
assertThat(response).isNotNull(); assertThat(response).isNotNull();
assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS); assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
} }
...@@ -169,9 +168,7 @@ public class PullMessageProcessorTest { ...@@ -169,9 +168,7 @@ public class PullMessageProcessorTest {
consumeMessageHookList.add(consumeMessageHook); consumeMessageHookList.add(consumeMessageHook);
pullMessageProcessor.registerConsumeMessageHook(consumeMessageHookList); pullMessageProcessor.registerConsumeMessageHook(consumeMessageHookList);
final RemotingCommand request = createPullMsgCommand(RequestCode.PULL_MESSAGE); final RemotingCommand request = createPullMsgCommand(RequestCode.PULL_MESSAGE);
RemotingCommand response = pullMessageProcessor.processRequest(handlerContext, request);
NettyChannelHandlerContextImpl nettyChannelHandlerContext = new NettyChannelHandlerContextImpl(handlerContext);
RemotingCommand response = pullMessageProcessor.processRequest(nettyChannelHandlerContext, request);
assertThat(response).isNotNull(); assertThat(response).isNotNull();
assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS); assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
assertThat(messageContext[0]).isNotNull(); assertThat(messageContext[0]).isNotNull();
...@@ -187,9 +184,7 @@ public class PullMessageProcessorTest { ...@@ -187,9 +184,7 @@ public class PullMessageProcessorTest {
when(messageStore.getMessage(anyString(), anyString(), anyInt(), anyLong(), anyInt(), any(ExpressionMessageFilter.class))).thenReturn(getMessageResult); when(messageStore.getMessage(anyString(), anyString(), anyInt(), anyLong(), anyInt(), any(ExpressionMessageFilter.class))).thenReturn(getMessageResult);
final RemotingCommand request = createPullMsgCommand(RequestCode.PULL_MESSAGE); final RemotingCommand request = createPullMsgCommand(RequestCode.PULL_MESSAGE);
RemotingCommand response = pullMessageProcessor.processRequest(handlerContext, request);
NettyChannelHandlerContextImpl nettyChannelHandlerContext = new NettyChannelHandlerContextImpl(handlerContext);
RemotingCommand response = pullMessageProcessor.processRequest(nettyChannelHandlerContext, request);
assertThat(response).isNotNull(); assertThat(response).isNotNull();
assertThat(response.getCode()).isEqualTo(ResponseCode.PULL_RETRY_IMMEDIATELY); assertThat(response.getCode()).isEqualTo(ResponseCode.PULL_RETRY_IMMEDIATELY);
} }
...@@ -201,9 +196,7 @@ public class PullMessageProcessorTest { ...@@ -201,9 +196,7 @@ public class PullMessageProcessorTest {
when(messageStore.getMessage(anyString(), anyString(), anyInt(), anyLong(), anyInt(), any(ExpressionMessageFilter.class))).thenReturn(getMessageResult); when(messageStore.getMessage(anyString(), anyString(), anyInt(), anyLong(), anyInt(), any(ExpressionMessageFilter.class))).thenReturn(getMessageResult);
final RemotingCommand request = createPullMsgCommand(RequestCode.PULL_MESSAGE); final RemotingCommand request = createPullMsgCommand(RequestCode.PULL_MESSAGE);
RemotingCommand response = pullMessageProcessor.processRequest(handlerContext, request);
NettyChannelHandlerContextImpl nettyChannelHandlerContext = new NettyChannelHandlerContextImpl(handlerContext);
RemotingCommand response = pullMessageProcessor.processRequest(nettyChannelHandlerContext, request);
assertThat(response).isNotNull(); assertThat(response).isNotNull();
assertThat(response.getCode()).isEqualTo(ResponseCode.PULL_OFFSET_MOVED); assertThat(response.getCode()).isEqualTo(ResponseCode.PULL_OFFSET_MOVED);
} }
...@@ -220,7 +213,7 @@ public class PullMessageProcessorTest { ...@@ -220,7 +213,7 @@ public class PullMessageProcessorTest {
requestHeader.setSysFlag(0); requestHeader.setSysFlag(0);
requestHeader.setSubVersion(100L); requestHeader.setSubVersion(100L);
RemotingCommand request = RemotingCommand.createRequestCommand(requestCode, requestHeader); RemotingCommand request = RemotingCommand.createRequestCommand(requestCode, requestHeader);
// request.makeCustomHeaderToNet(); CodecHelper.makeCustomHeaderToNet(request);
return request; return request;
} }
......
...@@ -18,6 +18,10 @@ package org.apache.rocketmq.broker.processor; ...@@ -18,6 +18,10 @@ package org.apache.rocketmq.broker.processor;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.mqtrace.SendMessageContext; import org.apache.rocketmq.broker.mqtrace.SendMessageContext;
import org.apache.rocketmq.broker.mqtrace.SendMessageHook; import org.apache.rocketmq.broker.mqtrace.SendMessageHook;
...@@ -32,10 +36,11 @@ import org.apache.rocketmq.common.protocol.ResponseCode; ...@@ -32,10 +36,11 @@ import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.header.ConsumerSendMsgBackRequestHeader; import org.apache.rocketmq.common.protocol.header.ConsumerSendMsgBackRequestHeader;
import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader; import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader;
import org.apache.rocketmq.common.sysflag.MessageSysFlag; import org.apache.rocketmq.common.sysflag.MessageSysFlag;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.netty.NettyChannelHandlerContextImpl;
import org.apache.rocketmq.remoting.ClientConfig; import org.apache.rocketmq.remoting.ClientConfig;
import org.apache.rocketmq.remoting.ServerConfig; import org.apache.rocketmq.remoting.ServerConfig;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.netty.CodecHelper;
import org.apache.rocketmq.remoting.netty.NettyChannelHandlerContextImpl;
import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.store.AppendMessageResult; import org.apache.rocketmq.store.AppendMessageResult;
import org.apache.rocketmq.store.AppendMessageStatus; import org.apache.rocketmq.store.AppendMessageStatus;
...@@ -53,11 +58,6 @@ import org.mockito.invocation.InvocationOnMock; ...@@ -53,11 +58,6 @@ import org.mockito.invocation.InvocationOnMock;
import org.mockito.junit.MockitoJUnitRunner; import org.mockito.junit.MockitoJUnitRunner;
import org.mockito.stubbing.Answer; import org.mockito.stubbing.Answer;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.anyLong;
...@@ -69,7 +69,10 @@ import static org.mockito.Mockito.when; ...@@ -69,7 +69,10 @@ import static org.mockito.Mockito.when;
public class SendMessageProcessorTest { public class SendMessageProcessorTest {
private SendMessageProcessor sendMessageProcessor; private SendMessageProcessor sendMessageProcessor;
@Mock @Mock
private ChannelHandlerContext handlerContext; private NettyChannelHandlerContextImpl handlerContext;
@Mock
private ChannelHandlerContext channelHandlerContext;
@Spy @Spy
private BrokerController brokerController = new BrokerController(new BrokerConfig(), new ServerConfig(), new ClientConfig(), new MessageStoreConfig()); private BrokerController brokerController = new BrokerController(new BrokerConfig(), new ServerConfig(), new ClientConfig(), new MessageStoreConfig());
@Mock @Mock
...@@ -87,7 +90,8 @@ public class SendMessageProcessorTest { ...@@ -87,7 +90,8 @@ public class SendMessageProcessorTest {
when(messageStore.now()).thenReturn(System.currentTimeMillis()); when(messageStore.now()).thenReturn(System.currentTimeMillis());
Channel mockChannel = mock(Channel.class); Channel mockChannel = mock(Channel.class);
when(mockChannel.remoteAddress()).thenReturn(new InetSocketAddress(1024)); when(mockChannel.remoteAddress()).thenReturn(new InetSocketAddress(1024));
when(handlerContext.channel()).thenReturn(mockChannel); when(handlerContext.getChannelHandlerContext()).thenReturn(channelHandlerContext);
when(channelHandlerContext.channel()).thenReturn(mockChannel);
when(messageStore.lookMessageByOffset(anyLong())).thenReturn(new MessageExt()); when(messageStore.lookMessageByOffset(anyLong())).thenReturn(new MessageExt());
sendMessageProcessor = new SendMessageProcessor(brokerController); sendMessageProcessor = new SendMessageProcessor(brokerController);
} }
...@@ -182,9 +186,7 @@ public class SendMessageProcessorTest { ...@@ -182,9 +186,7 @@ public class SendMessageProcessorTest {
final RemotingCommand request = createSendMsgBackCommand(RequestCode.CONSUMER_SEND_MSG_BACK); final RemotingCommand request = createSendMsgBackCommand(RequestCode.CONSUMER_SEND_MSG_BACK);
sendMessageProcessor = new SendMessageProcessor(brokerController); sendMessageProcessor = new SendMessageProcessor(brokerController);
final RemotingCommand response = sendMessageProcessor.processRequest(handlerContext, request);
NettyChannelHandlerContextImpl nettyChannelHandlerContext = new NettyChannelHandlerContextImpl(handlerContext);
RemotingCommand response = sendMessageProcessor.processRequest(nettyChannelHandlerContext, request);
assertThat(response).isNotNull(); assertThat(response).isNotNull();
assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS); assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
} }
...@@ -201,11 +203,8 @@ public class SendMessageProcessorTest { ...@@ -201,11 +203,8 @@ public class SendMessageProcessorTest {
response[0] = invocation.getArgument(0); response[0] = invocation.getArgument(0);
return null; return null;
} }
}).when(handlerContext).writeAndFlush(any(Object.class)); }).when(channelHandlerContext).writeAndFlush(any(Object.class));
RemotingCommand responseToReturn = sendMessageProcessor.processRequest(handlerContext, request);
NettyChannelHandlerContextImpl nettyChannelHandlerContext = new NettyChannelHandlerContextImpl(handlerContext);
RemotingCommand responseToReturn = sendMessageProcessor.processRequest(nettyChannelHandlerContext, request);
if (responseToReturn != null) { if (responseToReturn != null) {
assertThat(response[0]).isNull(); assertThat(response[0]).isNull();
response[0] = responseToReturn; response[0] = responseToReturn;
...@@ -213,7 +212,6 @@ public class SendMessageProcessorTest { ...@@ -213,7 +212,6 @@ public class SendMessageProcessorTest {
assertThat(response[0].getCode()).isEqualTo(ResponseCode.SUCCESS); assertThat(response[0].getCode()).isEqualTo(ResponseCode.SUCCESS);
} }
private RemotingCommand createSendTransactionMsgCommand(int requestCode) { private RemotingCommand createSendTransactionMsgCommand(int requestCode) {
SendMessageRequestHeader header = createSendMsgRequestHeader(); SendMessageRequestHeader header = createSendMsgRequestHeader();
int sysFlag = header.getSysFlag(); int sysFlag = header.getSysFlag();
...@@ -224,7 +222,7 @@ public class SendMessageProcessorTest { ...@@ -224,7 +222,7 @@ public class SendMessageProcessorTest {
header.setSysFlag(sysFlag); header.setSysFlag(sysFlag);
RemotingCommand request = RemotingCommand.createRequestCommand(requestCode, header); RemotingCommand request = RemotingCommand.createRequestCommand(requestCode, header);
request.setBody(new byte[] {'a'}); request.setBody(new byte[] {'a'});
// request.makeCustomHeaderToNet(); CodecHelper.makeCustomHeaderToNet(request);
return request; return request;
} }
...@@ -247,7 +245,7 @@ public class SendMessageProcessorTest { ...@@ -247,7 +245,7 @@ public class SendMessageProcessorTest {
RemotingCommand request = RemotingCommand.createRequestCommand(requestCode, requestHeader); RemotingCommand request = RemotingCommand.createRequestCommand(requestCode, requestHeader);
request.setBody(new byte[] {'a'}); request.setBody(new byte[] {'a'});
// request.makeCustomHeaderToNet(); CodecHelper.makeCustomHeaderToNet(request);
return request; return request;
} }
...@@ -260,7 +258,7 @@ public class SendMessageProcessorTest { ...@@ -260,7 +258,7 @@ public class SendMessageProcessorTest {
requestHeader.setOffset(123L); requestHeader.setOffset(123L);
RemotingCommand request = RemotingCommand.createRequestCommand(requestCode, requestHeader); RemotingCommand request = RemotingCommand.createRequestCommand(requestCode, requestHeader);
// request.makeCustomHeaderToNet(); CodecHelper.makeCustomHeaderToNet(request);
return request; return request;
} }
...@@ -273,10 +271,8 @@ public class SendMessageProcessorTest { ...@@ -273,10 +271,8 @@ public class SendMessageProcessorTest {
response[0] = invocation.getArgument(0); response[0] = invocation.getArgument(0);
return null; return null;
} }
}).when(handlerContext).writeAndFlush(any(Object.class)); }).when(channelHandlerContext).writeAndFlush(any(Object.class));
RemotingCommand responseToReturn = sendMessageProcessor.processRequest(handlerContext, request);
NettyChannelHandlerContextImpl nettyChannelHandlerContext = new NettyChannelHandlerContextImpl(handlerContext);
RemotingCommand responseToReturn = sendMessageProcessor.processRequest(nettyChannelHandlerContext, request);
if (responseToReturn != null) { if (responseToReturn != null) {
assertThat(response[0]).isNull(); assertThat(response[0]).isNull();
response[0] = responseToReturn; response[0] = responseToReturn;
......
...@@ -16,7 +16,6 @@ ...@@ -16,7 +16,6 @@
*/ */
package org.apache.rocketmq.client.consumer.store; package org.apache.rocketmq.client.consumer.store;
import com.sun.org.apache.regexp.internal.RE;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.Map; import java.util.Map;
...@@ -31,10 +30,10 @@ import org.apache.rocketmq.client.impl.factory.MQClientInstance; ...@@ -31,10 +30,10 @@ import org.apache.rocketmq.client.impl.factory.MQClientInstance;
import org.apache.rocketmq.client.log.ClientLogger; import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.header.QueryConsumerOffsetRequestHeader; import org.apache.rocketmq.common.protocol.header.QueryConsumerOffsetRequestHeader;
import org.apache.rocketmq.common.protocol.header.UpdateConsumerOffsetRequestHeader; import org.apache.rocketmq.common.protocol.header.UpdateConsumerOffsetRequestHeader;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.remoting.exception.RemotingException; import org.apache.rocketmq.remoting.exception.RemotingException;
/** /**
......
...@@ -135,8 +135,8 @@ import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData; ...@@ -135,8 +135,8 @@ import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
import org.apache.rocketmq.common.protocol.route.TopicRouteData; import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig; import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.remoting.ClientConfig;
import org.apache.rocketmq.remoting.InvokeCallback; import org.apache.rocketmq.remoting.InvokeCallback;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.RemotingClient; import org.apache.rocketmq.remoting.RemotingClient;
import org.apache.rocketmq.remoting.exception.RemotingCommandException; import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.exception.RemotingConnectException; import org.apache.rocketmq.remoting.exception.RemotingConnectException;
...@@ -144,12 +144,10 @@ import org.apache.rocketmq.remoting.exception.RemotingException; ...@@ -144,12 +144,10 @@ import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.remoting.exception.RemotingSendRequestException; import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException; import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException;
import org.apache.rocketmq.remoting.ClientConfig;
import org.apache.rocketmq.remoting.interceptor.Interceptor;
import org.apache.rocketmq.remoting.interceptor.InterceptorGroup; import org.apache.rocketmq.remoting.interceptor.InterceptorGroup;
import org.apache.rocketmq.remoting.netty.ResponseFuture; import org.apache.rocketmq.remoting.netty.ResponseFuture;
import org.apache.rocketmq.remoting.serialize.LanguageCode;
import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.serialize.LanguageCode;
import org.apache.rocketmq.remoting.serialize.RemotingSerializable; import org.apache.rocketmq.remoting.serialize.RemotingSerializable;
import org.apache.rocketmq.remoting.transport.rocketmq.NettyRemotingClient; import org.apache.rocketmq.remoting.transport.rocketmq.NettyRemotingClient;
......
...@@ -62,7 +62,6 @@ import org.apache.rocketmq.common.ServiceState; ...@@ -62,7 +62,6 @@ import org.apache.rocketmq.common.ServiceState;
import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.constant.PermName; import org.apache.rocketmq.common.constant.PermName;
import org.apache.rocketmq.common.filter.ExpressionType; import org.apache.rocketmq.common.filter.ExpressionType;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult; import org.apache.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult;
...@@ -75,11 +74,10 @@ import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData; ...@@ -75,11 +74,10 @@ import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
import org.apache.rocketmq.common.protocol.route.BrokerData; import org.apache.rocketmq.common.protocol.route.BrokerData;
import org.apache.rocketmq.common.protocol.route.QueueData; import org.apache.rocketmq.common.protocol.route.QueueData;
import org.apache.rocketmq.common.protocol.route.TopicRouteData; import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.remoting.ClientConfig;
import org.apache.rocketmq.remoting.common.RemotingHelper; import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingException; import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.remoting.ClientConfig;
import org.apache.rocketmq.remoting.interceptor.Interceptor;
import org.apache.rocketmq.remoting.interceptor.InterceptorGroup; import org.apache.rocketmq.remoting.interceptor.InterceptorGroup;
import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.protocol.RemotingCommand;
......
...@@ -147,7 +147,6 @@ public class DefaultMQPushConsumerTest { ...@@ -147,7 +147,6 @@ public class DefaultMQPushConsumerTest {
}); });
doReturn(new FindBrokerResult("127.0.0.1:10911", false)).when(mQClientFactory).findBrokerAddressInSubscribe(anyString(), anyLong(), anyBoolean()); doReturn(new FindBrokerResult("127.0.0.1:10911", false)).when(mQClientFactory).findBrokerAddressInSubscribe(anyString(), anyLong(), anyBoolean());
doReturn(Collections.singletonList(mQClientFactory.getClientId())).when(mQClientFactory).findConsumerIdList(anyString(), anyString());
Set<MessageQueue> messageQueueSet = new HashSet<MessageQueue>(); Set<MessageQueue> messageQueueSet = new HashSet<MessageQueue>();
messageQueueSet.add(createPullRequest().getMessageQueue()); messageQueueSet.add(createPullRequest().getMessageQueue());
pushConsumer.getDefaultMQPushConsumerImpl().updateTopicSubscribeInfo(topic, messageQueueSet); pushConsumer.getDefaultMQPushConsumerImpl().updateTopicSubscribeInfo(topic, messageQueueSet);
......
...@@ -16,8 +16,6 @@ ...@@ -16,8 +16,6 @@
*/ */
package org.apache.rocketmq.common; package org.apache.rocketmq.common;
import com.alibaba.fastjson.parser.ParserConfig;
import com.alibaba.fastjson.util.TypeUtils;
import java.io.ByteArrayInputStream; import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream; import java.io.ByteArrayOutputStream;
import java.io.File; import java.io.File;
...@@ -31,12 +29,10 @@ import java.net.NetworkInterface; ...@@ -31,12 +29,10 @@ import java.net.NetworkInterface;
import java.text.NumberFormat; import java.text.NumberFormat;
import java.text.ParseException; import java.text.ParseException;
import java.text.SimpleDateFormat; import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Calendar; import java.util.Calendar;
import java.util.Date; import java.util.Date;
import java.util.Enumeration; import java.util.Enumeration;
import java.util.Iterator; import java.util.Iterator;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.zip.CRC32; import java.util.zip.CRC32;
import java.util.zip.DeflaterOutputStream; import java.util.zip.DeflaterOutputStream;
......
...@@ -37,14 +37,11 @@ public class FlowControlConfig { ...@@ -37,14 +37,11 @@ public class FlowControlConfig {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME); private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME);
private String flowControlFileHome = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY, System.getenv(MixAll.ROCKETMQ_HOME_ENV)); private String flowControlFileHome = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY, System.getenv(MixAll.ROCKETMQ_HOME_ENV));
private static final String DEFAULT_FLOW_CONTROL_FILE = "conf/flowControl.yml"; private static final String DEFAULT_FLOW_CONTROL_FILE = "conf/flowControl.yml";
private String flowControlFileName = System.getProperty("rocketmq.flow.control.file", DEFAULT_FLOW_CONTROL_FILE); private String flowControlFileName = System.getProperty("rocketmq.flow.control.file", DEFAULT_FLOW_CONTROL_FILE);
private List<FlowControlRule> rules;
public static final String defaultResourceName = "overallFlowControl";
private Map<String/*server name*/, Map<String/*flowControlType*/, List<FlowControlRule>>> plainFlowControlRules; private Map<String/*server name*/, Map<String/*flowControlType*/, List<FlowControlRule>>> plainFlowControlRules;
public FlowControlConfig() { public FlowControlConfig() {
......
...@@ -16,8 +16,6 @@ ...@@ -16,8 +16,6 @@
*/ */
package org.apache.rocketmq.common.flowcontrol; package org.apache.rocketmq.common.flowcontrol;
import org.apache.rocketmq.remoting.interceptor.Interceptor;
public class FlowControlRule { public class FlowControlRule {
private String flowControlResourceName; private String flowControlResourceName;
private Integer flowControlGrade; private Integer flowControlGrade;
......
...@@ -24,7 +24,6 @@ import java.util.TimerTask; ...@@ -24,7 +24,6 @@ import java.util.TimerTask;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option; import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options; import org.apache.commons.cli.Options;
...@@ -33,8 +32,8 @@ import org.apache.rocketmq.client.exception.MQBrokerException; ...@@ -33,8 +32,8 @@ import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.log.ClientLogger; import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.remoting.common.RemotingHelper; import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingException; import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.srvutil.ServerUtil; import org.apache.rocketmq.srvutil.ServerUtil;
......
...@@ -20,7 +20,6 @@ import org.apache.rocketmq.client.exception.MQClientException; ...@@ -20,7 +20,6 @@ import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.utils.ThreadUtils;
import org.apache.rocketmq.remoting.common.RemotingHelper; import org.apache.rocketmq.remoting.common.RemotingHelper;
/** /**
......
...@@ -295,10 +295,9 @@ public class DefaultRequestProcessor implements RequestProcessor { ...@@ -295,10 +295,9 @@ public class DefaultRequestProcessor implements RequestProcessor {
public RemotingCommand registerBroker(ChannelHandlerContext ctx, public RemotingCommand registerBroker(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException { RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(RegisterBrokerResponseHeader.class); final RemotingCommand response = RemotingCommand.createResponseCommand(RegisterBrokerResponseHeader.class);
final RegisterBrokerResponseHeader responseHeader = (RegisterBrokerResponseHeader) response.readCustomHeader();
final RegisterBrokerRequestHeader requestHeader = final RegisterBrokerRequestHeader requestHeader =
(RegisterBrokerRequestHeader) request.decodeCommandCustomHeader(RegisterBrokerRequestHeader.class); (RegisterBrokerRequestHeader) request.decodeCommandCustomHeader(RegisterBrokerRequestHeader.class);
log.info("requestHeader: " + requestHeader); final RegisterBrokerResponseHeader responseHeader = (RegisterBrokerResponseHeader) response.readCustomHeader();
if (!checksum(ctx, request, requestHeader)) { if (!checksum(ctx, request, requestHeader)) {
response.setCode(ResponseCode.SYSTEM_ERROR); response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("crc32 not match"); response.setRemark("crc32 not match");
......
...@@ -39,22 +39,28 @@ public class BrokerHousekeepingService implements ChannelEventListener { ...@@ -39,22 +39,28 @@ public class BrokerHousekeepingService implements ChannelEventListener {
@Override @Override
public void onChannelClose(String remoteAddr, RemotingChannel remotingChannel) { public void onChannelClose(String remoteAddr, RemotingChannel remotingChannel) {
NettyChannelImpl nettyChannel = (NettyChannelImpl) remotingChannel; if (remotingChannel != null) {
Channel channel = nettyChannel.getChannel(); NettyChannelImpl nettyChannel = (NettyChannelImpl) remotingChannel;
this.namesrvController.getRouteInfoManager().onChannelDestroy(remoteAddr, channel); Channel channel = nettyChannel.getChannel();
this.namesrvController.getRouteInfoManager().onChannelDestroy(remoteAddr, channel);
}
} }
@Override @Override
public void onChannelException(String remoteAddr, RemotingChannel remotingChannel) { public void onChannelException(String remoteAddr, RemotingChannel remotingChannel) {
NettyChannelImpl nettyChannel = (NettyChannelImpl) remotingChannel; if (remotingChannel != null) {
Channel channel = nettyChannel.getChannel(); NettyChannelImpl nettyChannel = (NettyChannelImpl) remotingChannel;
this.namesrvController.getRouteInfoManager().onChannelDestroy(remoteAddr, channel); Channel channel = nettyChannel.getChannel();
this.namesrvController.getRouteInfoManager().onChannelDestroy(remoteAddr, channel);
}
} }
@Override @Override
public void onChannelIdle(String remoteAddr, RemotingChannel remotingChannel) { public void onChannelIdle(String remoteAddr, RemotingChannel remotingChannel) {
NettyChannelImpl nettyChannel = (NettyChannelImpl) remotingChannel; if (remotingChannel != null) {
Channel channel = nettyChannel.getChannel(); NettyChannelImpl nettyChannel = (NettyChannelImpl) remotingChannel;
this.namesrvController.getRouteInfoManager().onChannelDestroy(remoteAddr, channel); Channel channel = nettyChannel.getChannel();
this.namesrvController.getRouteInfoManager().onChannelDestroy(remoteAddr, channel);
}
} }
} }
...@@ -25,7 +25,6 @@ import java.util.HashMap; ...@@ -25,7 +25,6 @@ import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import org.apache.rocketmq.common.TopicConfig; import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.common.namesrv.NamesrvConfig; import org.apache.rocketmq.common.namesrv.NamesrvConfig;
import org.apache.rocketmq.common.namesrv.RegisterBrokerResult; import org.apache.rocketmq.common.namesrv.RegisterBrokerResult;
import org.apache.rocketmq.common.protocol.RequestCode; import org.apache.rocketmq.common.protocol.RequestCode;
...@@ -37,11 +36,12 @@ import org.apache.rocketmq.common.protocol.header.namesrv.GetKVConfigResponseHea ...@@ -37,11 +36,12 @@ import org.apache.rocketmq.common.protocol.header.namesrv.GetKVConfigResponseHea
import org.apache.rocketmq.common.protocol.header.namesrv.PutKVConfigRequestHeader; import org.apache.rocketmq.common.protocol.header.namesrv.PutKVConfigRequestHeader;
import org.apache.rocketmq.common.protocol.header.namesrv.RegisterBrokerRequestHeader; import org.apache.rocketmq.common.protocol.header.namesrv.RegisterBrokerRequestHeader;
import org.apache.rocketmq.common.protocol.route.BrokerData; import org.apache.rocketmq.common.protocol.route.BrokerData;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.namesrv.NamesrvController; import org.apache.rocketmq.namesrv.NamesrvController;
import org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager; import org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager;
import org.apache.rocketmq.remoting.ServerConfig;
import org.apache.rocketmq.remoting.exception.RemotingCommandException; import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.netty.NettyChannelHandlerContextImpl; import org.apache.rocketmq.remoting.netty.NettyChannelHandlerContextImpl;
import org.apache.rocketmq.remoting.ServerConfig;
import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.assertj.core.util.Maps; import org.assertj.core.util.Maps;
import org.junit.Before; import org.junit.Before;
...@@ -91,8 +91,10 @@ public class DefaultRequestProcessorTest { ...@@ -91,8 +91,10 @@ public class DefaultRequestProcessorTest {
request.addExtField("namespace", "namespace"); request.addExtField("namespace", "namespace");
request.addExtField("key", "key"); request.addExtField("key", "key");
request.addExtField("value", "value"); request.addExtField("value", "value");
NettyChannelHandlerContextImpl remotingChannel = mock(NettyChannelHandlerContextImpl.class);
RemotingCommand response = defaultRequestProcessor.processRequest(null, request); ChannelHandlerContext channelHandlerContext = mock(ChannelHandlerContext.class);
when(remotingChannel.getChannelHandlerContext()).thenReturn(channelHandlerContext);
RemotingCommand response = defaultRequestProcessor.processRequest(remotingChannel, request);
assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS); assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
assertThat(response.getRemark()).isNull(); assertThat(response.getRemark()).isNull();
...@@ -111,7 +113,10 @@ public class DefaultRequestProcessorTest { ...@@ -111,7 +113,10 @@ public class DefaultRequestProcessorTest {
request.addExtField("namespace", "namespace"); request.addExtField("namespace", "namespace");
request.addExtField("key", "key"); request.addExtField("key", "key");
RemotingCommand response = defaultRequestProcessor.processRequest(null, request); NettyChannelHandlerContextImpl remotingChannel = mock(NettyChannelHandlerContextImpl.class);
ChannelHandlerContext channelHandlerContext = mock(ChannelHandlerContext.class);
when(remotingChannel.getChannelHandlerContext()).thenReturn(channelHandlerContext);
RemotingCommand response = defaultRequestProcessor.processRequest(remotingChannel, request);
assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS); assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
assertThat(response.getRemark()).isNull(); assertThat(response.getRemark()).isNull();
...@@ -130,7 +135,10 @@ public class DefaultRequestProcessorTest { ...@@ -130,7 +135,10 @@ public class DefaultRequestProcessorTest {
request.addExtField("namespace", "namespace"); request.addExtField("namespace", "namespace");
request.addExtField("key", "key"); request.addExtField("key", "key");
RemotingCommand response = defaultRequestProcessor.processRequest(null, request); NettyChannelHandlerContextImpl remotingChannel = mock(NettyChannelHandlerContextImpl.class);
ChannelHandlerContext channelHandlerContext = mock(ChannelHandlerContext.class);
when(remotingChannel.getChannelHandlerContext()).thenReturn(channelHandlerContext);
RemotingCommand response = defaultRequestProcessor.processRequest(remotingChannel, request);
assertThat(response.getCode()).isEqualTo(ResponseCode.QUERY_NOT_FOUND); assertThat(response.getCode()).isEqualTo(ResponseCode.QUERY_NOT_FOUND);
assertThat(response.getRemark()).isEqualTo("No config item, Namespace: namespace Key: key"); assertThat(response.getRemark()).isEqualTo("No config item, Namespace: namespace Key: key");
...@@ -151,8 +159,11 @@ public class DefaultRequestProcessorTest { ...@@ -151,8 +159,11 @@ public class DefaultRequestProcessorTest {
request.addExtField("namespace", "namespace"); request.addExtField("namespace", "namespace");
request.addExtField("key", "key"); request.addExtField("key", "key");
RemotingCommand response = defaultRequestProcessor.processRequest(null, request); NettyChannelHandlerContextImpl remotingChannel = mock(NettyChannelHandlerContextImpl.class);
ChannelHandlerContext channelHandlerContext = mock(ChannelHandlerContext.class);
when(remotingChannel.getChannelHandlerContext()).thenReturn(channelHandlerContext);
RemotingCommand response = defaultRequestProcessor.processRequest(remotingChannel, request);
assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS); assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
assertThat(response.getRemark()).isNull(); assertThat(response.getRemark()).isNull();
......
...@@ -21,7 +21,6 @@ import org.apache.rocketmq.remoting.common.Pair; ...@@ -21,7 +21,6 @@ import org.apache.rocketmq.remoting.common.Pair;
import org.apache.rocketmq.remoting.exception.RemotingSendRequestException; import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException; import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException;
import org.apache.rocketmq.remoting.interceptor.InterceptorGroup;
import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.protocol.RemotingCommand;
public interface RemotingServer extends RemotingService { public interface RemotingServer extends RemotingService {
......
...@@ -99,7 +99,7 @@ public class NettyChannelHandlerContextImpl implements RemotingChannel { ...@@ -99,7 +99,7 @@ public class NettyChannelHandlerContextImpl implements RemotingChannel {
final NettyChannelHandlerContextImpl that = (NettyChannelHandlerContextImpl) o; final NettyChannelHandlerContextImpl that = (NettyChannelHandlerContextImpl) o;
return channelHandlerContext != null ? channelHandlerContext.equals(that.channelHandlerContext) : that.channelHandlerContext == null; return channelHandlerContext.channel() != null ? channelHandlerContext.channel().equals(that.channelHandlerContext.channel()) : that.channelHandlerContext.channel() == null;
} }
......
...@@ -32,7 +32,7 @@ public class NettyChannelImpl implements RemotingChannel { ...@@ -32,7 +32,7 @@ public class NettyChannelImpl implements RemotingChannel {
private static final InternalLogger log = InternalLoggerFactory.getLogger(ROCKETMQ_REMOTING); private static final InternalLogger log = InternalLoggerFactory.getLogger(ROCKETMQ_REMOTING);
private final io.netty.channel.Channel channel; private final Channel channel;
public NettyChannelImpl(Channel channel) { public NettyChannelImpl(Channel channel) {
this.channel = channel; this.channel = channel;
......
...@@ -17,7 +17,6 @@ ...@@ -17,7 +17,6 @@
package org.apache.rocketmq.remoting.transport.http2; package org.apache.rocketmq.remoting.transport.http2;
import io.netty.bootstrap.Bootstrap; import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption; import io.netty.channel.ChannelOption;
...@@ -41,26 +40,21 @@ import javax.net.ssl.SSLException; ...@@ -41,26 +40,21 @@ import javax.net.ssl.SSLException;
import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.ChannelEventListener; import org.apache.rocketmq.remoting.ChannelEventListener;
import org.apache.rocketmq.remoting.ClientConfig;
import org.apache.rocketmq.remoting.InvokeCallback; import org.apache.rocketmq.remoting.InvokeCallback;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.RemotingChannel; import org.apache.rocketmq.remoting.RemotingChannel;
import org.apache.rocketmq.remoting.RemotingClient; import org.apache.rocketmq.remoting.RemotingClient;
import org.apache.rocketmq.remoting.common.Pair; import org.apache.rocketmq.remoting.RequestProcessor;
import org.apache.rocketmq.remoting.common.RemotingHelper; import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingConnectException; import org.apache.rocketmq.remoting.exception.RemotingConnectException;
import org.apache.rocketmq.remoting.exception.RemotingSendRequestException; import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException; import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException;
import org.apache.rocketmq.remoting.ClientConfig;
import org.apache.rocketmq.remoting.interceptor.Interceptor;
import org.apache.rocketmq.remoting.interceptor.InterceptorGroup; import org.apache.rocketmq.remoting.interceptor.InterceptorGroup;
import org.apache.rocketmq.remoting.interceptor.InterceptorInvoker;
import org.apache.rocketmq.remoting.netty.NettyChannelImpl;
import org.apache.rocketmq.remoting.transport.rocketmq.NettyDecoder;
import org.apache.rocketmq.remoting.transport.rocketmq.NettyEncoder;
import org.apache.rocketmq.remoting.RequestProcessor;
import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.transport.NettyRemotingClientAbstract; import org.apache.rocketmq.remoting.transport.NettyRemotingClientAbstract;
import org.apache.rocketmq.remoting.transport.rocketmq.NettyDecoder;
import org.apache.rocketmq.remoting.transport.rocketmq.NettyEncoder;
import org.apache.rocketmq.remoting.util.ThreadUtils; import org.apache.rocketmq.remoting.util.ThreadUtils;
public class Http2ClientImpl extends NettyRemotingClientAbstract implements RemotingClient { public class Http2ClientImpl extends NettyRemotingClientAbstract implements RemotingClient {
...@@ -248,7 +242,7 @@ public class Http2ClientImpl extends NettyRemotingClientAbstract implements Remo ...@@ -248,7 +242,7 @@ public class Http2ClientImpl extends NettyRemotingClientAbstract implements Remo
@Override @Override
public void registerProcessor(int requestCode, RequestProcessor processor, ExecutorService executor) { public void registerProcessor(int requestCode, RequestProcessor processor, ExecutorService executor) {
executor = (executor == null ? this.publicExecutor : executor); executor = executor == null ? this.publicExecutor : executor;
registerNettyProcessor(requestCode, processor, executor); registerNettyProcessor(requestCode, processor, executor);
} }
......
...@@ -48,24 +48,22 @@ import org.apache.rocketmq.logging.InternalLogger; ...@@ -48,24 +48,22 @@ import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.ChannelEventListener; import org.apache.rocketmq.remoting.ChannelEventListener;
import org.apache.rocketmq.remoting.InvokeCallback; import org.apache.rocketmq.remoting.InvokeCallback;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.RemotingChannel; import org.apache.rocketmq.remoting.RemotingChannel;
import org.apache.rocketmq.remoting.RemotingServer; import org.apache.rocketmq.remoting.RemotingServer;
import org.apache.rocketmq.remoting.RequestProcessor;
import org.apache.rocketmq.remoting.ServerConfig;
import org.apache.rocketmq.remoting.common.Pair; import org.apache.rocketmq.remoting.common.Pair;
import org.apache.rocketmq.remoting.common.RemotingHelper; import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingSendRequestException; import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException; import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException;
import org.apache.rocketmq.remoting.interceptor.Interceptor;
import org.apache.rocketmq.remoting.interceptor.InterceptorGroup; import org.apache.rocketmq.remoting.interceptor.InterceptorGroup;
import org.apache.rocketmq.remoting.netty.ChannelStatisticsHandler; import org.apache.rocketmq.remoting.netty.ChannelStatisticsHandler;
import org.apache.rocketmq.remoting.netty.NettyChannelImpl; import org.apache.rocketmq.remoting.netty.NettyChannelImpl;
import org.apache.rocketmq.remoting.transport.rocketmq.NettyDecoder;
import org.apache.rocketmq.remoting.transport.rocketmq.NettyEncoder;
import org.apache.rocketmq.remoting.RequestProcessor;
import org.apache.rocketmq.remoting.ServerConfig;
import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.transport.NettyRemotingServerAbstract; import org.apache.rocketmq.remoting.transport.NettyRemotingServerAbstract;
import org.apache.rocketmq.remoting.transport.rocketmq.NettyDecoder;
import org.apache.rocketmq.remoting.transport.rocketmq.NettyEncoder;
import org.apache.rocketmq.remoting.util.JvmUtils; import org.apache.rocketmq.remoting.util.JvmUtils;
import org.apache.rocketmq.remoting.util.ThreadUtils; import org.apache.rocketmq.remoting.util.ThreadUtils;
...@@ -139,7 +137,7 @@ public class Http2ServerImpl extends NettyRemotingServerAbstract implements Remo ...@@ -139,7 +137,7 @@ public class Http2ServerImpl extends NettyRemotingServerAbstract implements Remo
@Override @Override
public void registerProcessor(int requestCode, RequestProcessor processor, ExecutorService executor) { public void registerProcessor(int requestCode, RequestProcessor processor, ExecutorService executor) {
executor = (executor == null ? this.publicExecutor : executor); executor = executor == null ? this.publicExecutor : executor;
registerNettyProcessor(requestCode, processor, executor); registerNettyProcessor(requestCode, processor, executor);
} }
...@@ -204,25 +202,23 @@ public class Http2ServerImpl extends NettyRemotingServerAbstract implements Remo ...@@ -204,25 +202,23 @@ public class Http2ServerImpl extends NettyRemotingServerAbstract implements Remo
public void start() { public void start() {
super.start(); super.start();
final ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); final ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
this.serverBootstrap.group(this.bossGroup, this.ioGroup). this.serverBootstrap.group(this.bossGroup, this.ioGroup).channel(socketChannelClass).childHandler(new ChannelInitializer<SocketChannel>() {
channel(socketChannelClass).childHandler(new ChannelInitializer<SocketChannel>() { @Override
@Override public void initChannel(SocketChannel ch) {
public void initChannel(SocketChannel ch) throws Exception { channels.add(ch);
channels.add(ch); ChannelPipeline cp = ch.pipeline();
cp.addLast(ChannelStatisticsHandler.NAME, new ChannelStatisticsHandler(channels));
ChannelPipeline cp = ch.pipeline(); cp.addLast(workerGroup,
cp.addLast(ChannelStatisticsHandler.NAME, new ChannelStatisticsHandler(channels)); Http2Handler.newHandler(true),
cp.addLast(workerGroup, new NettyEncoder(),
Http2Handler.newHandler(true), new NettyDecoder(),
new NettyEncoder(), new IdleStateHandler(serverConfig.getConnectionChannelReaderIdleSeconds(),
new NettyDecoder(), serverConfig.getConnectionChannelWriterIdleSeconds(),
new IdleStateHandler(serverConfig.getConnectionChannelReaderIdleSeconds(), serverConfig.getServerChannelMaxIdleTimeSeconds()),
serverConfig.getConnectionChannelWriterIdleSeconds(), new NettyConnectManageHandler(),
serverConfig.getServerChannelMaxIdleTimeSeconds()), new NettyServerHandler());
new NettyConnectManageHandler(), }
new NettyServerHandler()); });
}
});
applyOptions(serverBootstrap); applyOptions(serverBootstrap);
ChannelFuture channelFuture = this.serverBootstrap.bind(this.port).syncUninterruptibly(); ChannelFuture channelFuture = this.serverBootstrap.bind(this.port).syncUninterruptibly();
......
...@@ -36,10 +36,11 @@ import java.util.concurrent.ExecutorService; ...@@ -36,10 +36,11 @@ import java.util.concurrent.ExecutorService;
import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.ChannelEventListener; import org.apache.rocketmq.remoting.ChannelEventListener;
import org.apache.rocketmq.remoting.ClientConfig;
import org.apache.rocketmq.remoting.InvokeCallback; import org.apache.rocketmq.remoting.InvokeCallback;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.RemotingChannel; import org.apache.rocketmq.remoting.RemotingChannel;
import org.apache.rocketmq.remoting.RemotingClient; import org.apache.rocketmq.remoting.RemotingClient;
import org.apache.rocketmq.remoting.RequestProcessor;
import org.apache.rocketmq.remoting.common.Pair; import org.apache.rocketmq.remoting.common.Pair;
import org.apache.rocketmq.remoting.common.RemotingHelper; import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingConnectException; import org.apache.rocketmq.remoting.exception.RemotingConnectException;
...@@ -47,14 +48,7 @@ import org.apache.rocketmq.remoting.exception.RemotingException; ...@@ -47,14 +48,7 @@ import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.remoting.exception.RemotingSendRequestException; import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException; import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException;
import org.apache.rocketmq.remoting.ClientConfig;
import org.apache.rocketmq.remoting.RequestProcessor;
import org.apache.rocketmq.remoting.interceptor.ExceptionContext;
import org.apache.rocketmq.remoting.interceptor.Interceptor;
import org.apache.rocketmq.remoting.interceptor.InterceptorGroup; import org.apache.rocketmq.remoting.interceptor.InterceptorGroup;
import org.apache.rocketmq.remoting.interceptor.InterceptorInvoker;
import org.apache.rocketmq.remoting.interceptor.RequestContext;
import org.apache.rocketmq.remoting.interceptor.ResponseContext;
import org.apache.rocketmq.remoting.netty.NettyChannelImpl; import org.apache.rocketmq.remoting.netty.NettyChannelImpl;
import org.apache.rocketmq.remoting.netty.TlsHelper; import org.apache.rocketmq.remoting.netty.TlsHelper;
import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.protocol.RemotingCommand;
......
...@@ -52,7 +52,6 @@ import org.apache.rocketmq.remoting.common.TlsMode; ...@@ -52,7 +52,6 @@ import org.apache.rocketmq.remoting.common.TlsMode;
import org.apache.rocketmq.remoting.exception.RemotingSendRequestException; import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException; import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException;
import org.apache.rocketmq.remoting.interceptor.Interceptor;
import org.apache.rocketmq.remoting.interceptor.InterceptorGroup; import org.apache.rocketmq.remoting.interceptor.InterceptorGroup;
import org.apache.rocketmq.remoting.netty.FileRegionEncoder; import org.apache.rocketmq.remoting.netty.FileRegionEncoder;
import org.apache.rocketmq.remoting.netty.NettyChannelImpl; import org.apache.rocketmq.remoting.netty.NettyChannelImpl;
...@@ -222,7 +221,7 @@ public class NettyRemotingServer extends NettyRemotingServerAbstract implements ...@@ -222,7 +221,7 @@ public class NettyRemotingServer extends NettyRemotingServerAbstract implements
@Override @Override
public void registerProcessor(int requestCode, RequestProcessor processor, ExecutorService executor) { public void registerProcessor(int requestCode, RequestProcessor processor, ExecutorService executor) {
executor = (executor == null ? this.publicExecutor : executor); executor = executor == null ? this.publicExecutor : executor;
registerNettyProcessor(requestCode, processor, executor); registerNettyProcessor(requestCode, processor, executor);
} }
......
...@@ -31,7 +31,7 @@ public class NettyRemotingClientTest { ...@@ -31,7 +31,7 @@ public class NettyRemotingClientTest {
private NettyRemotingClient remotingClient = new NettyRemotingClient(new ClientConfig()); private NettyRemotingClient remotingClient = new NettyRemotingClient(new ClientConfig());
@Test @Test
public void testSetCallbackExecutor() throws NoSuchFieldException, IllegalAccessException { public void testSetCallbackExecutor() {
ExecutorService customized = Executors.newCachedThreadPool(); ExecutorService customized = Executors.newCachedThreadPool();
remotingClient.setCallbackExecutor(customized); remotingClient.setCallbackExecutor(customized);
......
...@@ -109,7 +109,7 @@ public class RemotingCommandTest { ...@@ -109,7 +109,7 @@ public class RemotingCommandTest {
@Test @Test
public void testNotNullField() throws Exception { public void testNotNullField() throws Exception {
RemotingCommand remotingCommand = new RemotingCommand(); RemotingCommand remotingCommand = new RemotingCommand();
Method method = RemotingCommand.class.getDeclaredMethod("isFieldNullable", Field.class); Method method = CodecHelper.class.getDeclaredMethod("isFieldNullable", Field.class);
method.setAccessible(true); method.setAccessible(true);
Field nullString = FieldTestClass.class.getDeclaredField("nullString"); Field nullString = FieldTestClass.class.getDeclaredField("nullString");
......
...@@ -20,7 +20,9 @@ import java.util.List; ...@@ -20,7 +20,9 @@ import java.util.List;
import org.apache.rocketmq.remoting.RemotingChannel; import org.apache.rocketmq.remoting.RemotingChannel;
import org.apache.rocketmq.snode.SnodeController; import org.apache.rocketmq.snode.SnodeController;
//TODO Filter implementation /**
* TODO Filter
*/
public class DefaultConsumerIdsChangeListener implements ConsumerIdsChangeListener { public class DefaultConsumerIdsChangeListener implements ConsumerIdsChangeListener {
private final SnodeController snodeController; private final SnodeController snodeController;
......
...@@ -15,12 +15,13 @@ ...@@ -15,12 +15,13 @@
* limitations under the License. * limitations under the License.
*/ */
package org.apache.rocketmq.snode.constant; package org.apache.rocketmq.snode.constant;
public class SnodeConstant { public class SnodeConstant {
public static final long heartbeatTimeout = 3000; public static final long HEARTBEAT_TIME_OUT = 3000;
public static final long oneWaytimeout = 10; public static final long ONE_WAY_TIMEOUT = 10;
public static final long defaultTimeoutMills = 3000L; public static final long DEFAULT_TIMEOUT_MILLS = 3000L;
public static final long CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND = 1000 * 30; public static final long CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND = 1000 * 30;
......
...@@ -16,7 +16,6 @@ ...@@ -16,7 +16,6 @@
*/ */
package org.apache.rocketmq.snode.flowcontrol; package org.apache.rocketmq.snode.flowcontrol;
import com.sun.org.apache.bcel.internal.generic.IF_ACMPEQ;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.flowcontrol.AbstractFlowControlService; import org.apache.rocketmq.common.flowcontrol.AbstractFlowControlService;
......
...@@ -61,7 +61,7 @@ public class HeartbeatProcessor implements RequestProcessor { ...@@ -61,7 +61,7 @@ public class HeartbeatProcessor implements RequestProcessor {
private RemotingCommand heartbeat(RemotingChannel remotingChannel, RemotingCommand request) { private RemotingCommand heartbeat(RemotingChannel remotingChannel, RemotingCommand request) {
HeartbeatData heartbeatData = HeartbeatData.decode(request.getBody(), HeartbeatData.class); HeartbeatData heartbeatData = HeartbeatData.decode(request.getBody(), HeartbeatData.class);
ClientChannelInfo clientChannelInfo = new ClientChannelInfo( ClientChannelInfo clientChannelInfo = new ClientChannelInfo(
new NettyChannelImpl((((NettyChannelHandlerContextImpl) remotingChannel).getChannelHandlerContext().channel())), new NettyChannelImpl(((NettyChannelHandlerContextImpl) remotingChannel).getChannelHandlerContext().channel()),
heartbeatData.getClientID(), heartbeatData.getClientID(),
request.getLanguage(), request.getLanguage(),
request.getVersion() request.getVersion()
......
...@@ -54,7 +54,6 @@ public class SendMessageProcessor implements RequestProcessor { ...@@ -54,7 +54,6 @@ public class SendMessageProcessor implements RequestProcessor {
boolean isSendBack = false; boolean isSendBack = false;
if (request.getCode() == RequestCode.SEND_MESSAGE_V2) { if (request.getCode() == RequestCode.SEND_MESSAGE_V2) {
sendMessageRequestHeaderV2 = (SendMessageRequestHeaderV2) request.decodeCommandCustomHeader(SendMessageRequestHeaderV2.class); sendMessageRequestHeaderV2 = (SendMessageRequestHeaderV2) request.decodeCommandCustomHeader(SendMessageRequestHeaderV2.class);
System.out.println("sendMessageRequestHeaderV2: " + sendMessageRequestHeaderV2);
enodeName = sendMessageRequestHeaderV2.getN(); enodeName = sendMessageRequestHeaderV2.getN();
} else { } else {
isSendBack = true; isSendBack = true;
......
...@@ -29,13 +29,10 @@ import org.apache.rocketmq.common.constant.LoggerName; ...@@ -29,13 +29,10 @@ import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.protocol.RequestCode; import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.common.protocol.ResponseCode; import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.body.ClusterInfo; import org.apache.rocketmq.common.protocol.body.ClusterInfo;
import org.apache.rocketmq.common.protocol.header.ConsumerSendMsgBackRequestHeader;
import org.apache.rocketmq.common.protocol.header.CreateTopicRequestHeader; import org.apache.rocketmq.common.protocol.header.CreateTopicRequestHeader;
import org.apache.rocketmq.common.protocol.header.GetMinOffsetRequestHeader; import org.apache.rocketmq.common.protocol.header.GetMinOffsetRequestHeader;
import org.apache.rocketmq.common.protocol.header.NotifyConsumerIdsChangedRequestHeader; import org.apache.rocketmq.common.protocol.header.NotifyConsumerIdsChangedRequestHeader;
import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader;
import org.apache.rocketmq.common.protocol.header.QueryConsumerOffsetRequestHeader; import org.apache.rocketmq.common.protocol.header.QueryConsumerOffsetRequestHeader;
import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeaderV2;
import org.apache.rocketmq.common.protocol.header.UpdateConsumerOffsetRequestHeader; import org.apache.rocketmq.common.protocol.header.UpdateConsumerOffsetRequestHeader;
import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig; import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLogger;
...@@ -71,7 +68,7 @@ public class EnodeServiceImpl implements EnodeService { ...@@ -71,7 +68,7 @@ public class EnodeServiceImpl implements EnodeService {
String enodeAddr = entry.getValue().get(MixAll.MASTER_ID); String enodeAddr = entry.getValue().get(MixAll.MASTER_ID);
if (enodeAddr != null) { if (enodeAddr != null) {
try { try {
this.snodeController.getRemotingClient().invokeSync(enodeAddr, remotingCommand, SnodeConstant.defaultTimeoutMills); this.snodeController.getRemotingClient().invokeSync(enodeAddr, remotingCommand, SnodeConstant.DEFAULT_TIMEOUT_MILLS);
} catch (Exception ex) { } catch (Exception ex) {
log.warn("Send heart beat faild:{} ,ex:{}", enodeAddr, ex); log.warn("Send heart beat faild:{} ,ex:{}", enodeAddr, ex);
} }
...@@ -114,7 +111,7 @@ public class EnodeServiceImpl implements EnodeService { ...@@ -114,7 +111,7 @@ public class EnodeServiceImpl implements EnodeService {
CompletableFuture<RemotingCommand> future = new CompletableFuture<>(); CompletableFuture<RemotingCommand> future = new CompletableFuture<>();
try { try {
String enodeAddress = this.snodeController.getNnodeService().getAddressByEnodeName(enodeName, false); String enodeAddress = this.snodeController.getNnodeService().getAddressByEnodeName(enodeName, false);
this.snodeController.getRemotingClient().invokeAsync(enodeAddress, request, SnodeConstant.defaultTimeoutMills, (responseFuture) -> { this.snodeController.getRemotingClient().invokeAsync(enodeAddress, request, SnodeConstant.DEFAULT_TIMEOUT_MILLS, (responseFuture) -> {
future.complete(responseFuture.getResponseCommand()); future.complete(responseFuture.getResponseCommand());
}); });
} catch (Exception ex) { } catch (Exception ex) {
...@@ -139,7 +136,7 @@ public class EnodeServiceImpl implements EnodeService { ...@@ -139,7 +136,7 @@ public class EnodeServiceImpl implements EnodeService {
RemotingCommand.createRequestCommand(RequestCode.NOTIFY_CONSUMER_IDS_CHANGED, requestHeader); RemotingCommand.createRequestCommand(RequestCode.NOTIFY_CONSUMER_IDS_CHANGED, requestHeader);
try { try {
this.snodeController.getSnodeServer().invokeOneway(channel, request, SnodeConstant.oneWaytimeout); this.snodeController.getSnodeServer().invokeOneway(channel, request, SnodeConstant.ONE_WAY_TIMEOUT);
} catch (Exception e) { } catch (Exception e) {
log.error("NotifyConsumerIdsChanged consumer group: {} exception ", consumerGroup, e); log.error("NotifyConsumerIdsChanged consumer group: {} exception ", consumerGroup, e);
} }
...@@ -164,7 +161,7 @@ public class EnodeServiceImpl implements EnodeService { ...@@ -164,7 +161,7 @@ public class EnodeServiceImpl implements EnodeService {
public void updateEnodeAddr(String clusterName) throws InterruptedException, RemotingTimeoutException, public void updateEnodeAddr(String clusterName) throws InterruptedException, RemotingTimeoutException,
RemotingSendRequestException, RemotingConnectException, MQBrokerException { RemotingSendRequestException, RemotingConnectException, MQBrokerException {
synchronized (this) { synchronized (this) {
ClusterInfo clusterInfo = getBrokerClusterInfo(SnodeConstant.defaultTimeoutMills); ClusterInfo clusterInfo = getBrokerClusterInfo(SnodeConstant.DEFAULT_TIMEOUT_MILLS);
if (clusterInfo != null) { if (clusterInfo != null) {
HashMap<String, Set<String>> enodeAddress = clusterInfo.getClusterAddrTable(); HashMap<String, Set<String>> enodeAddress = clusterInfo.getClusterAddrTable();
for (Map.Entry<String, Set<String>> entry : enodeAddress.entrySet()) { for (Map.Entry<String, Set<String>> entry : enodeAddress.entrySet()) {
...@@ -189,7 +186,7 @@ public class EnodeServiceImpl implements EnodeService { ...@@ -189,7 +186,7 @@ public class EnodeServiceImpl implements EnodeService {
String enodeAddress = entry.getValue().get(MixAll.MASTER_ID); String enodeAddress = entry.getValue().get(MixAll.MASTER_ID);
try { try {
RemotingCommand response = this.snodeController.getRemotingClient().invokeSync(enodeAddress, RemotingCommand response = this.snodeController.getRemotingClient().invokeSync(enodeAddress,
request, SnodeConstant.defaultTimeoutMills); request, SnodeConstant.DEFAULT_TIMEOUT_MILLS);
if (response != null && response.getCode() == ResponseCode.SUCCESS) { if (response != null && response.getCode() == ResponseCode.SUCCESS) {
persist = true; persist = true;
} else { } else {
...@@ -215,7 +212,7 @@ public class EnodeServiceImpl implements EnodeService { ...@@ -215,7 +212,7 @@ public class EnodeServiceImpl implements EnodeService {
requestHeader.setCommitOffset(offset); requestHeader.setCommitOffset(offset);
requestHeader.setEnodeName(enodeName); requestHeader.setEnodeName(enodeName);
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UPDATE_CONSUMER_OFFSET, requestHeader); RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UPDATE_CONSUMER_OFFSET, requestHeader);
this.snodeController.getRemotingClient().invokeOneway(address, request, SnodeConstant.defaultTimeoutMills); this.snodeController.getRemotingClient().invokeOneway(address, request, SnodeConstant.DEFAULT_TIMEOUT_MILLS);
} catch (Exception ex) { } catch (Exception ex) {
log.error("Persist offset to Enode error!"); log.error("Persist offset to Enode error!");
} }
...@@ -231,7 +228,7 @@ public class EnodeServiceImpl implements EnodeService { ...@@ -231,7 +228,7 @@ public class EnodeServiceImpl implements EnodeService {
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_MIN_OFFSET, requestHeader); RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_MIN_OFFSET, requestHeader);
String addr = this.snodeController.getNnodeService().getAddressByEnodeName(enodeName, false); String addr = this.snodeController.getNnodeService().getAddressByEnodeName(enodeName, false);
return this.snodeController.getRemotingClient().invokeSync(MixAll.brokerVIPChannel(snodeController.getSnodeConfig().isVipChannelEnabled(), addr), return this.snodeController.getRemotingClient().invokeSync(MixAll.brokerVIPChannel(snodeController.getSnodeConfig().isVipChannelEnabled(), addr),
request, SnodeConstant.defaultTimeoutMills); request, SnodeConstant.DEFAULT_TIMEOUT_MILLS);
} }
@Override @Override
...@@ -246,7 +243,7 @@ public class EnodeServiceImpl implements EnodeService { ...@@ -246,7 +243,7 @@ public class EnodeServiceImpl implements EnodeService {
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.QUERY_CONSUMER_OFFSET, requestHeader); RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.QUERY_CONSUMER_OFFSET, requestHeader);
String addr = this.snodeController.getNnodeService().getAddressByEnodeName(enodeName, false); String addr = this.snodeController.getNnodeService().getAddressByEnodeName(enodeName, false);
return this.snodeController.getRemotingClient().invokeSync(MixAll.brokerVIPChannel(this.snodeController.getSnodeConfig().isVipChannelEnabled(), addr), return this.snodeController.getRemotingClient().invokeSync(MixAll.brokerVIPChannel(this.snodeController.getSnodeConfig().isVipChannelEnabled(), addr),
request, SnodeConstant.defaultTimeoutMills); request, SnodeConstant.DEFAULT_TIMEOUT_MILLS);
} }
@Override @Override
...@@ -254,7 +251,7 @@ public class EnodeServiceImpl implements EnodeService { ...@@ -254,7 +251,7 @@ public class EnodeServiceImpl implements EnodeService {
RemotingCommand request) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, RemotingCommandException { RemotingCommand request) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, RemotingCommandException {
String addr = this.snodeController.getNnodeService().getAddressByEnodeName(enodeName, false); String addr = this.snodeController.getNnodeService().getAddressByEnodeName(enodeName, false);
return this.snodeController.getRemotingClient().invokeSync(MixAll.brokerVIPChannel(snodeController.getSnodeConfig().isVipChannelEnabled(), addr), return this.snodeController.getRemotingClient().invokeSync(MixAll.brokerVIPChannel(snodeController.getSnodeConfig().isVipChannelEnabled(), addr),
request, SnodeConstant.defaultTimeoutMills); request, SnodeConstant.DEFAULT_TIMEOUT_MILLS);
} }
@Override @Override
...@@ -262,7 +259,7 @@ public class EnodeServiceImpl implements EnodeService { ...@@ -262,7 +259,7 @@ public class EnodeServiceImpl implements EnodeService {
RemotingCommand request) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException { RemotingCommand request) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
String addr = this.snodeController.getNnodeService().getAddressByEnodeName(enodeName, false); String addr = this.snodeController.getNnodeService().getAddressByEnodeName(enodeName, false);
return this.snodeController.getRemotingClient().invokeSync(MixAll.brokerVIPChannel(snodeController.getSnodeConfig().isVipChannelEnabled(), addr), return this.snodeController.getRemotingClient().invokeSync(MixAll.brokerVIPChannel(snodeController.getSnodeConfig().isVipChannelEnabled(), addr),
request, SnodeConstant.defaultTimeoutMills); request, SnodeConstant.DEFAULT_TIMEOUT_MILLS);
} }
@Override @Override
...@@ -279,6 +276,6 @@ public class EnodeServiceImpl implements EnodeService { ...@@ -279,6 +276,6 @@ public class EnodeServiceImpl implements EnodeService {
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UPDATE_AND_CREATE_TOPIC, requestHeader); RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UPDATE_AND_CREATE_TOPIC, requestHeader);
String address = this.snodeController.getNnodeService().getAddressByEnodeName(enodeName, false); String address = this.snodeController.getNnodeService().getAddressByEnodeName(enodeName, false);
return this.snodeController.getRemotingClient().invokeSync(address, return this.snodeController.getRemotingClient().invokeSync(address,
request, SnodeConstant.defaultTimeoutMills); request, SnodeConstant.DEFAULT_TIMEOUT_MILLS);
} }
} }
...@@ -66,7 +66,7 @@ public class NnodeServiceImpl implements NnodeService { ...@@ -66,7 +66,7 @@ public class NnodeServiceImpl implements NnodeService {
if (nnodeAddressList != null && nnodeAddressList.size() > 0) { if (nnodeAddressList != null && nnodeAddressList.size() > 0) {
for (String nodeAddress : nnodeAddressList) { for (String nodeAddress : nnodeAddressList) {
try { try {
this.snodeController.getRemotingClient().invokeSync(nodeAddress, remotingCommand, SnodeConstant.heartbeatTimeout); this.snodeController.getRemotingClient().invokeSync(nodeAddress, remotingCommand, SnodeConstant.HEARTBEAT_TIME_OUT);
} catch (Exception ex) { } catch (Exception ex) {
log.warn("Register Snode to Nnode addr: {} error, ex:{} ", nodeAddress, ex); log.warn("Register Snode to Nnode addr: {} error, ex:{} ", nodeAddress, ex);
} }
...@@ -93,7 +93,7 @@ public class NnodeServiceImpl implements NnodeService { ...@@ -93,7 +93,7 @@ public class NnodeServiceImpl implements NnodeService {
requestHeader.setTopic(topic); requestHeader.setTopic(topic);
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ROUTEINTO_BY_TOPIC, requestHeader); RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ROUTEINTO_BY_TOPIC, requestHeader);
RemotingCommand response = this.snodeController.getRemotingClient().invokeSync(null, request, SnodeConstant.defaultTimeoutMills); RemotingCommand response = this.snodeController.getRemotingClient().invokeSync(null, request, SnodeConstant.DEFAULT_TIMEOUT_MILLS);
log.info("GetTopicRouteInfoFromNameServer response: " + response); log.info("GetTopicRouteInfoFromNameServer response: " + response);
assert response != null; assert response != null;
switch (response.getCode()) { switch (response.getCode()) {
...@@ -167,7 +167,7 @@ public class NnodeServiceImpl implements NnodeService { ...@@ -167,7 +167,7 @@ public class NnodeServiceImpl implements NnodeService {
RemotingSendRequestException, RemotingConnectException { RemotingSendRequestException, RemotingConnectException {
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_BROKER_CLUSTER_INFO, null); RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_BROKER_CLUSTER_INFO, null);
RemotingCommand response = this.snodeController.getRemotingClient().invokeSync(null, request, SnodeConstant.defaultTimeoutMills); RemotingCommand response = this.snodeController.getRemotingClient().invokeSync(null, request, SnodeConstant.DEFAULT_TIMEOUT_MILLS);
switch (response.getCode()) { switch (response.getCode()) {
case ResponseCode.SUCCESS: { case ResponseCode.SUCCESS: {
ClusterInfo clusterInfo = ClusterInfo.decode(response.getBody(), ClusterInfo.class); ClusterInfo clusterInfo = ClusterInfo.decode(response.getBody(), ClusterInfo.class);
......
...@@ -90,7 +90,7 @@ public class PushServiceImpl implements PushService { ...@@ -90,7 +90,7 @@ public class PushServiceImpl implements PushService {
RemotingChannel remotingChannel = clientChannelInfoEntry.getValue().getChannel(); RemotingChannel remotingChannel = clientChannelInfoEntry.getValue().getChannel();
if (remotingChannel.isWritable()) { if (remotingChannel.isWritable()) {
log.warn("Push message to topic: {} queueId: {} consumer group:{}, message:{}", topic, queueId, clientChannelInfoEntry.getKey(), pushMessage); log.warn("Push message to topic: {} queueId: {} consumer group:{}, message:{}", topic, queueId, clientChannelInfoEntry.getKey(), pushMessage);
snodeController.getSnodeServer().push(remotingChannel, pushMessage, SnodeConstant.defaultTimeoutMills); snodeController.getSnodeServer().push(remotingChannel, pushMessage, SnodeConstant.DEFAULT_TIMEOUT_MILLS);
} }
} }
} else { } else {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册