提交 776911d4 编写于 作者: S shroman 提交者: yukon

[ROCKETMQ-34] Potential NPE in NettyConnetManageHandler#connect, closes...

[ROCKETMQ-34] Potential NPE in NettyConnetManageHandler#connect, closes apache/incubator-rocketmq#30
上级 85467dfd
...@@ -73,12 +73,12 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti ...@@ -73,12 +73,12 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
private final Bootstrap bootstrap = new Bootstrap(); private final Bootstrap bootstrap = new Bootstrap();
private final EventLoopGroup eventLoopGroupWorker; private final EventLoopGroup eventLoopGroupWorker;
private final Lock lockChannelTables = new ReentrantLock(); private final Lock lockChannelTables = new ReentrantLock();
private final ConcurrentHashMap<String /* addr */, ChannelWrapper> channelTables = new ConcurrentHashMap<String, ChannelWrapper>(); private final ConcurrentHashMap<String /* addr */, ChannelWrapper> channelTables = new ConcurrentHashMap<>();
private final Timer timer = new Timer("ClientHouseKeepingService", true); private final Timer timer = new Timer("ClientHouseKeepingService", true);
private final AtomicReference<List<String>> namesrvAddrList = new AtomicReference<List<String>>(); private final AtomicReference<List<String>> namesrvAddrList = new AtomicReference<>();
private final AtomicReference<String> namesrvAddrChoosed = new AtomicReference<String>(); private final AtomicReference<String> namesrvAddrChoosed = new AtomicReference<>();
private final AtomicInteger namesrvIndex = new AtomicInteger(initValueIndex()); private final AtomicInteger namesrvIndex = new AtomicInteger(initValueIndex());
private final Lock lockNamesrvChannel = new ReentrantLock(); private final Lock lockNamesrvChannel = new ReentrantLock();
...@@ -155,7 +155,7 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti ...@@ -155,7 +155,7 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
new NettyEncoder(), new NettyEncoder(),
new NettyDecoder(), new NettyDecoder(),
new IdleStateHandler(0, 0, nettyClientConfig.getClientChannelMaxIdleTimeSeconds()), new IdleStateHandler(0, 0, nettyClientConfig.getClientChannelMaxIdleTimeSeconds()),
new NettyConnetManageHandler(), new NettyConnectManageHandler(),
new NettyClientHandler()); new NettyClientHandler());
} }
}); });
...@@ -527,7 +527,7 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti ...@@ -527,7 +527,7 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
executorThis = this.publicExecutor; executorThis = this.publicExecutor;
} }
Pair<NettyRequestProcessor, ExecutorService> pair = new Pair<NettyRequestProcessor, ExecutorService>(processor, executorThis); Pair<NettyRequestProcessor, ExecutorService> pair = new Pair<>(processor, executorThis);
this.processorTable.put(requestCode, pair); this.processorTable.put(requestCode, pair);
} }
...@@ -596,17 +596,18 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti ...@@ -596,17 +596,18 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
} }
} }
class NettyConnetManageHandler extends ChannelDuplexHandler { class NettyConnectManageHandler extends ChannelDuplexHandler {
@Override @Override
public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress,
throws Exception { ChannelPromise promise) throws Exception {
final String local = localAddress == null ? "UNKNOW" : localAddress.toString(); final String local = localAddress == null ? "UNKNOW" : localAddress.toString();
final String remote = remoteAddress == null ? "UNKNOW" : remoteAddress.toString(); final String remote = remoteAddress == null ? "UNKNOW" : remoteAddress.toString();
log.info("NETTY CLIENT PIPELINE: CONNECT {} => {}", local, remote); log.info("NETTY CLIENT PIPELINE: CONNECT {} => {}", local, remote);
super.connect(ctx, remoteAddress, localAddress, promise); super.connect(ctx, remoteAddress, localAddress, promise);
if (NettyRemotingClient.this.channelEventListener != null) { if (NettyRemotingClient.this.channelEventListener != null) {
NettyRemotingClient.this.putNettyEvent(new NettyEvent(NettyEventType.CONNECT, remoteAddress.toString(), ctx.channel())); NettyRemotingClient.this.putNettyEvent(new NettyEvent(NettyEventType.CONNECT, remote, ctx.channel()));
} }
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册