Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
武汉红喜
whatsmars
提交
554d66bf
W
whatsmars
项目概览
武汉红喜
/
whatsmars
通知
3
Star
0
Fork
1
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
W
whatsmars
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
提交
554d66bf
编写于
8月 04, 2019
作者:
武汉红喜
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
netty
上级
1ad512d8
变更
1
隐藏空白更改
内联
并排
Showing
1 changed file
with
204 addition
and
0 deletion
+204
-0
whatsmars-rpc/whatsmars-netty/README.md
whatsmars-rpc/whatsmars-netty/README.md
+204
-0
未找到文件。
whatsmars-rpc/whatsmars-netty/README.md
浏览文件 @
554d66bf
...
...
@@ -8,3 +8,207 @@ NioEventLoop,就是基于Nio的实现。在这个类中有一个亮点,就是
方法的空轮询,核心思想是,如果连续多少次(默认为512)在没有超时的情况就返回,并且已经准备就绪的键的数量为0,
则认为发生了空轮询,如果发生了空轮询,就新建一个新的Selector,并重新将通道,关心的事件注册到新的Selector,
并关闭旧的Selector
### ServerBootstrap bind
```
java
private
ChannelFuture
doBind
(
final
SocketAddress
localAddress
)
{
final
ChannelFuture
regFuture
=
initAndRegister
();
final
Channel
channel
=
regFuture
.
channel
();
if
(
regFuture
.
cause
()
!=
null
)
{
return
regFuture
;
}
if
(
regFuture
.
isDone
())
{
// At this point we know that the registration was complete and successful.
ChannelPromise
promise
=
channel
.
newPromise
();
doBind0
(
regFuture
,
channel
,
localAddress
,
promise
);
return
promise
;
}
else
{
// Registration future is almost always fulfilled already, but just in case it's not.
final
PendingRegistrationPromise
promise
=
new
PendingRegistrationPromise
(
channel
);
regFuture
.
addListener
(
new
ChannelFutureListener
()
{
@Override
public
void
operationComplete
(
ChannelFuture
future
)
throws
Exception
{
Throwable
cause
=
future
.
cause
();
if
(
cause
!=
null
)
{
// Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
// IllegalStateException once we try to access the EventLoop of the Channel.
promise
.
setFailure
(
cause
);
}
else
{
// Registration was successful, so set the correct executor to use.
// See https://github.com/netty/netty/issues/2586
promise
.
registered
();
doBind0
(
regFuture
,
channel
,
localAddress
,
promise
);
}
}
});
return
promise
;
}
}
```
-
初始化一个通道,并注册,如果注册失败,直接返回。
-
如果初始化并立即注册成功,执行doBind0方法,进行绑定
-
如果未立即注册成功,则添加监听事件,等初始化并注册成功后,执行doBind0方法,这里是Netty对jdk
Future模式进行的扩展,引入事件机制
### group().register(channel)
```
java
final
ChannelFuture
initAndRegister
()
{
Channel
channel
=
null
;
try
{
channel
=
channelFactory
.
newChannel
();
init
(
channel
);
// NioServerSocketChannel
}
catch
(
Throwable
t
)
{
if
(
channel
!=
null
)
{
channel
.
unsafe
().
closeForcibly
();
return
new
DefaultChannelPromise
(
channel
,
GlobalEventExecutor
.
INSTANCE
).
setFailure
(
t
);
}
return
new
DefaultChannelPromise
(
new
FailedChannel
(),
GlobalEventExecutor
.
INSTANCE
).
setFailure
(
t
);
}
ChannelFuture
regFuture
=
config
().
group
().
register
(
channel
);
if
(
regFuture
.
cause
()
!=
null
)
{
if
(
channel
.
isRegistered
())
{
channel
.
close
();
}
else
{
channel
.
unsafe
().
closeForcibly
();
}
}
return
regFuture
;
}
```
根据Netty线程模型,group()返回的是EventLoopGroup,然后Netty会从该EventLoopGroup中获取下一个
EventLoop来执行。由于程序入口使用的是NioServerSocketChannel,故本例最终会使用NioEventLoop
来作为事件处理器来服务,这里的register方法,其实现在NioEventLoop的父类SingleThreadEventLoop中。
```
java
@Override
public
ChannelFuture
register
(
Channel
channel
)
{
return
register
(
new
DefaultChannelPromise
(
channel
,
this
));
}
@Override
public
ChannelFuture
register
(
final
ChannelPromise
promise
)
{
ObjectUtil
.
checkNotNull
(
promise
,
"promise"
);
promise
.
channel
().
unsafe
().
register
(
this
,
promise
);
return
promise
;
}
```
有关Channel的IO类操作,最终都会转发给Unsafe类去执行,直接进入到AbstractUnsafe中
```
java
@Override
public
final
void
register
(
EventLoop
eventLoop
,
final
ChannelPromise
promise
)
{
if
(
eventLoop
==
null
)
{
throw
new
NullPointerException
(
"eventLoop"
);
}
if
(
isRegistered
())
{
promise
.
setFailure
(
new
IllegalStateException
(
"registered to an event loop already"
));
return
;
}
if
(!
isCompatible
(
eventLoop
))
{
promise
.
setFailure
(
new
IllegalStateException
(
"incompatible event loop type: "
+
eventLoop
.
getClass
().
getName
()));
return
;
}
AbstractChannel
.
this
.
eventLoop
=
eventLoop
;
if
(
eventLoop
.
inEventLoop
())
{
register0
(
promise
);
}
else
{
try
{
eventLoop
.
execute
(
new
Runnable
()
{
@Override
public
void
run
()
{
register0
(
promise
);
}
});
}
catch
(
Throwable
t
)
{
logger
.
warn
(
"Force-closing a channel whose registration task was not accepted by an event loop: {}"
,
AbstractChannel
.
this
,
t
);
closeForcibly
();
closeFuture
.
setClosed
();
safeSetFailure
(
promise
,
t
);
}
}
}
private
void
register0
(
ChannelPromise
promise
)
{
try
{
// check if the channel is still open as it could be closed in the mean time when the register
// call was outside of the eventLoop
if
(!
promise
.
setUncancellable
()
||
!
ensureOpen
(
promise
))
{
return
;
}
boolean
firstRegistration
=
neverRegistered
;
doRegister
();
// @@@@@ 1
neverRegistered
=
false
;
registered
=
true
;
// Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
// user may already fire events through the pipeline in the ChannelFutureListener.
pipeline
.
invokeHandlerAddedIfNeeded
();
safeSetSuccess
(
promise
);
pipeline
.
fireChannelRegistered
();
// @@@@@ 2
// Only fire a channelActive if the channel has never been registered. This prevents firing
// multiple channel actives if the channel is deregistered and re-registered.
if
(
isActive
())
{
if
(
firstRegistration
)
{
pipeline
.
fireChannelActive
();
}
else
if
(
config
().
isAutoRead
())
{
// This channel was registered before and autoRead() is set. This means we need to begin read
// again so that we process inbound data.
//
// See https://github.com/netty/netty/issues/4805
beginRead
();
}
}
}
catch
(
Throwable
t
)
{
// Close the channel directly to avoid FD leak.
closeForcibly
();
closeFuture
.
setClosed
();
safeSetFailure
(
promise
,
t
);
}
}
```
完成Channel的注册外,需要调用管道的pipline.fireChannelRegistered,跟踪进去,最终将执行
DefaultChannelHandlerInvoker的invokeChannelRegistered方法,该方法会执行ChannelInitializer的init方法。
### ChannelPipeline
设计理念:ChannelPipeline管道,提供相应的API,增加ChannelHander形成处理链条,在DefaultChannelPipeline
中并不是用一个LikedList
<ChannelHander>
来实现链表,而是在其自身就是一个链表结构,链表的节点是
AbstractChannelHandlerContext,里面有next,与perv分别指向下一个或上一个节点。
在DefaultChannelHanderPipeline中持有tail与head引用。
```
java
public
ChannelPipeline
fireChannelRegistered
()
{
head
.
fireChannelRegistered
();
return
this
;
}
@Override
public
ChannelPipeline
read
()
{
tail
.
read
();
return
this
;
}
@Override
public
ChannelFuture
write
(
Object
msg
)
{
return
tail
.
write
(
msg
);
}
```
从上述方法,不难看出,ChannelPipeline的实现源码,就是沿着调用链向上或向下传播事件并执行之。
### ChannelHandlers的执行顺序
pipeline里的handlers分为两类,分别实现了ChannelInboundHandler和ChannelOutboundHandler接口。
ChannelInboundHandler对从客户端发往服务器的报文进行处理,一般用来执行解码、读取客户端数据、进行业务处理等;
ChannelOutboundHandler对从服务器发往客户端的报文进行处理,一般用来进行编码、发送报文到客户端。
ChannelInboundHandler按照注册的先后顺序执行,ChannelOutboundHandler按照注册的先后顺序逆序执行。
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录