Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
openanolis
dragonwell8_jdk
提交
70bb8ffd
D
dragonwell8_jdk
项目概览
openanolis
/
dragonwell8_jdk
通知
4
Star
2
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
D
dragonwell8_jdk
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
70bb8ffd
编写于
6月 24, 2011
作者:
A
alanb
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
6965150: TEST_BUG: java/nio/channels/AsynchronousSocketChannel/Basic.java takes too long
Reviewed-by: chegar
上级
763725b1
变更
1
隐藏空白更改
内联
并排
Showing
1 changed file
with
584 addition
and
564 deletion
+584
-564
test/java/nio/channels/AsynchronousSocketChannel/Basic.java
test/java/nio/channels/AsynchronousSocketChannel/Basic.java
+584
-564
未找到文件。
test/java/nio/channels/AsynchronousSocketChannel/Basic.java
浏览文件 @
70bb8ffd
...
@@ -24,7 +24,7 @@
...
@@ -24,7 +24,7 @@
/* @test
/* @test
* @bug 4607272 6842687 6878369 6944810 7023403
* @bug 4607272 6842687 6878369 6944810 7023403
* @summary Unit test for AsynchronousSocketChannel
* @summary Unit test for AsynchronousSocketChannel
* @run main
/timeout=600 Basic
* @run main
Basic -skipSlowConnectTest
*/
*/
import
java.nio.ByteBuffer
;
import
java.nio.ByteBuffer
;
...
@@ -34,12 +34,25 @@ import java.net.*;
...
@@ -34,12 +34,25 @@ import java.net.*;
import
java.util.Random
;
import
java.util.Random
;
import
java.util.concurrent.*
;
import
java.util.concurrent.*
;
import
java.util.concurrent.atomic.*
;
import
java.util.concurrent.atomic.*
;
import
java.io.Closeable
;
import
java.io.IOException
;
import
java.io.IOException
;
public
class
Basic
{
public
class
Basic
{
static
final
Random
rand
=
new
Random
();
static
final
Random
rand
=
new
Random
();
static
boolean
skipSlowConnectTest
=
false
;
public
static
void
main
(
String
[]
args
)
throws
Exception
{
public
static
void
main
(
String
[]
args
)
throws
Exception
{
for
(
String
arg:
args
)
{
switch
(
arg
)
{
case
"-skipSlowConnectTest"
:
skipSlowConnectTest
=
true
;
break
;
default
:
throw
new
RuntimeException
(
"Unrecognized argument: "
+
arg
);
}
}
testBind
();
testBind
();
testSocketOptions
();
testSocketOptions
();
testConnect
();
testConnect
();
...
@@ -54,7 +67,7 @@ public class Basic {
...
@@ -54,7 +67,7 @@ public class Basic {
testShutdown
();
testShutdown
();
}
}
static
class
Server
{
static
class
Server
implements
Closeable
{
private
final
ServerSocketChannel
ssc
;
private
final
ServerSocketChannel
ssc
;
private
final
InetSocketAddress
address
;
private
final
InetSocketAddress
address
;
...
@@ -74,10 +87,8 @@ public class Basic {
...
@@ -74,10 +87,8 @@ public class Basic {
return
ssc
.
accept
();
return
ssc
.
accept
();
}
}
void
close
()
{
public
void
close
()
throws
IOException
{
try
{
ssc
.
close
();
ssc
.
close
();
}
catch
(
IOException
ignore
)
{
}
}
}
}
}
...
@@ -85,28 +96,28 @@ public class Basic {
...
@@ -85,28 +96,28 @@ public class Basic {
static
void
testBind
()
throws
Exception
{
static
void
testBind
()
throws
Exception
{
System
.
out
.
println
(
"-- bind --"
);
System
.
out
.
println
(
"-- bind --"
);
AsynchronousSocketChannel
ch
=
AsynchronousSocketChannel
.
open
();
try
(
AsynchronousSocketChannel
ch
=
AsynchronousSocketChannel
.
open
())
{
if
(
ch
.
getLocalAddress
()
!=
null
)
if
(
ch
.
getLocalAddress
()
!=
null
)
throw
new
RuntimeException
(
"Local address should be 'null'"
);
throw
new
RuntimeException
(
"Local address should be 'null'"
);
ch
.
bind
(
new
InetSocketAddress
(
0
));
// check local address after binding
InetSocketAddress
local
=
(
InetSocketAddress
)
ch
.
getLocalAddress
();
if
(
local
.
getPort
()
==
0
)
throw
new
RuntimeException
(
"Unexpected port"
);
if
(!
local
.
getAddress
().
isAnyLocalAddress
())
throw
new
RuntimeException
(
"Not bound to a wildcard address"
);
// try to re-bind
try
{
ch
.
bind
(
new
InetSocketAddress
(
0
));
ch
.
bind
(
new
InetSocketAddress
(
0
));
throw
new
RuntimeException
(
"AlreadyBoundException expected"
);
}
catch
(
AlreadyBoundException
x
)
{
// check local address after binding
InetSocketAddress
local
=
(
InetSocketAddress
)
ch
.
getLocalAddress
();
if
(
local
.
getPort
()
==
0
)
throw
new
RuntimeException
(
"Unexpected port"
);
if
(!
local
.
getAddress
().
isAnyLocalAddress
())
throw
new
RuntimeException
(
"Not bound to a wildcard address"
);
// try to re-bind
try
{
ch
.
bind
(
new
InetSocketAddress
(
0
));
throw
new
RuntimeException
(
"AlreadyBoundException expected"
);
}
catch
(
AlreadyBoundException
x
)
{
}
}
}
ch
.
close
();
// check ClosedChannelException
// check ClosedChannelException
ch
=
AsynchronousSocketChannel
.
open
();
AsynchronousSocketChannel
ch
=
AsynchronousSocketChannel
.
open
();
ch
.
close
();
ch
.
close
();
try
{
try
{
ch
.
bind
(
new
InetSocketAddress
(
0
));
ch
.
bind
(
new
InetSocketAddress
(
0
));
...
@@ -118,109 +129,124 @@ public class Basic {
...
@@ -118,109 +129,124 @@ public class Basic {
static
void
testSocketOptions
()
throws
Exception
{
static
void
testSocketOptions
()
throws
Exception
{
System
.
out
.
println
(
"-- socket options --"
);
System
.
out
.
println
(
"-- socket options --"
);
AsynchronousSocketChannel
ch
=
AsynchronousSocketChannel
.
open
()
try
(
AsynchronousSocketChannel
ch
=
AsynchronousSocketChannel
.
open
())
{
.
setOption
(
SO_RCVBUF
,
128
*
1024
)
ch
.
setOption
(
SO_RCVBUF
,
128
*
1024
)
.
setOption
(
SO_SNDBUF
,
128
*
1024
)
.
setOption
(
SO_SNDBUF
,
128
*
1024
)
.
setOption
(
SO_REUSEADDR
,
true
);
.
setOption
(
SO_REUSEADDR
,
true
);
// check SO_SNDBUF/SO_RCVBUF limits
// check SO_SNDBUF/SO_RCVBUF limits
int
before
,
after
;
int
before
,
after
;
before
=
ch
.
getOption
(
SO_SNDBUF
);
before
=
ch
.
getOption
(
SO_SNDBUF
);
after
=
ch
.
setOption
(
SO_SNDBUF
,
Integer
.
MAX_VALUE
).
getOption
(
SO_SNDBUF
);
after
=
ch
.
setOption
(
SO_SNDBUF
,
Integer
.
MAX_VALUE
).
getOption
(
SO_SNDBUF
);
if
(
after
<
before
)
if
(
after
<
before
)
throw
new
RuntimeException
(
"setOption caused SO_SNDBUF to decrease"
);
throw
new
RuntimeException
(
"setOption caused SO_SNDBUF to decrease"
);
before
=
ch
.
getOption
(
SO_RCVBUF
);
before
=
ch
.
getOption
(
SO_RCVBUF
);
after
=
ch
.
setOption
(
SO_RCVBUF
,
Integer
.
MAX_VALUE
).
getOption
(
SO_RCVBUF
);
after
=
ch
.
setOption
(
SO_RCVBUF
,
Integer
.
MAX_VALUE
).
getOption
(
SO_RCVBUF
);
if
(
after
<
before
)
if
(
after
<
before
)
throw
new
RuntimeException
(
"setOption caused SO_RCVBUF to decrease"
);
throw
new
RuntimeException
(
"setOption caused SO_RCVBUF to decrease"
);
ch
.
bind
(
new
InetSocketAddress
(
0
));
// default values
if
((
Boolean
)
ch
.
getOption
(
SO_KEEPALIVE
))
throw
new
RuntimeException
(
"Default of SO_KEEPALIVE should be 'false'"
);
if
((
Boolean
)
ch
.
getOption
(
TCP_NODELAY
))
throw
new
RuntimeException
(
"Default of TCP_NODELAY should be 'false'"
);
// set and check
if
(!(
Boolean
)
ch
.
setOption
(
SO_KEEPALIVE
,
true
).
getOption
(
SO_KEEPALIVE
))
throw
new
RuntimeException
(
"SO_KEEPALIVE did not change"
);
if
(!(
Boolean
)
ch
.
setOption
(
TCP_NODELAY
,
true
).
getOption
(
TCP_NODELAY
))
throw
new
RuntimeException
(
"SO_KEEPALIVE did not change"
);
// read others (can't check as actual value is implementation dependent)
ch
.
getOption
(
SO_RCVBUF
);
ch
.
getOption
(
SO_SNDBUF
);
ch
.
close
();
ch
.
bind
(
new
InetSocketAddress
(
0
));
// default values
if
(
ch
.
getOption
(
SO_KEEPALIVE
))
throw
new
RuntimeException
(
"Default of SO_KEEPALIVE should be 'false'"
);
if
(
ch
.
getOption
(
TCP_NODELAY
))
throw
new
RuntimeException
(
"Default of TCP_NODELAY should be 'false'"
);
// set and check
if
(!
ch
.
setOption
(
SO_KEEPALIVE
,
true
).
getOption
(
SO_KEEPALIVE
))
throw
new
RuntimeException
(
"SO_KEEPALIVE did not change"
);
if
(!
ch
.
setOption
(
TCP_NODELAY
,
true
).
getOption
(
TCP_NODELAY
))
throw
new
RuntimeException
(
"SO_KEEPALIVE did not change"
);
// read others (can't check as actual value is implementation dependent)
ch
.
getOption
(
SO_RCVBUF
);
ch
.
getOption
(
SO_SNDBUF
);
}
}
}
static
void
testConnect
()
throws
Exception
{
static
void
testConnect
()
throws
Exception
{
System
.
out
.
println
(
"-- connect --"
);
System
.
out
.
println
(
"-- connect --"
);
Server
server
=
new
Server
();
SocketAddress
address
;
AsynchronousSocketChannel
ch
=
AsynchronousSocketChannel
.
open
();
ch
.
connect
(
server
.
address
()).
get
();
try
(
Server
server
=
new
Server
())
{
address
=
server
.
address
();
// check local address
if
(
ch
.
getLocalAddress
()
==
null
)
// connect to server and check local/remote addresses
throw
new
RuntimeException
(
"Not bound to local address"
);
try
(
AsynchronousSocketChannel
ch
=
AsynchronousSocketChannel
.
open
())
{
ch
.
connect
(
address
).
get
();
// check remote address
// check local address
InetSocketAddress
remote
=
(
InetSocketAddress
)
ch
.
getRemoteAddress
();
if
(
ch
.
getLocalAddress
()
==
null
)
if
(
remote
.
getPort
()
!=
server
.
address
().
getPort
())
throw
new
RuntimeException
(
"Not bound to local address"
);
throw
new
RuntimeException
(
"Connected to unexpected port"
);
if
(!
remote
.
getAddress
().
equals
(
server
.
address
().
getAddress
()))
// check remote address
throw
new
RuntimeException
(
"Connected to unexpected address"
);
InetSocketAddress
remote
=
(
InetSocketAddress
)
ch
.
getRemoteAddress
();
if
(
remote
.
getPort
()
!=
server
.
address
().
getPort
())
throw
new
RuntimeException
(
"Connected to unexpected port"
);
if
(!
remote
.
getAddress
().
equals
(
server
.
address
().
getAddress
()))
throw
new
RuntimeException
(
"Connected to unexpected address"
);
// try to connect again
try
{
ch
.
connect
(
server
.
address
()).
get
();
throw
new
RuntimeException
(
"AlreadyConnectedException expected"
);
}
catch
(
AlreadyConnectedException
x
)
{
}
// try to connect again
// clean-up
try
{
server
.
accept
().
close
();
ch
.
connect
(
server
.
address
()).
get
();
}
throw
new
RuntimeException
(
"AlreadyConnectedException expected"
);
}
catch
(
AlreadyConnectedException
x
)
{
}
ch
.
close
();
// check that connect fails with ClosedChannelException)
// check that connect fails with ClosedChannelException
ch
=
AsynchronousSocketChannel
.
open
();
AsynchronousSocketChannel
ch
=
AsynchronousSocketChannel
.
open
();
ch
.
close
();
ch
.
close
();
try
{
try
{
ch
.
connect
(
server
.
address
()).
get
();
ch
.
connect
(
server
.
address
()).
get
();
throw
new
RuntimeException
(
"ExecutionException expected"
);
throw
new
RuntimeException
(
"ExecutionException expected"
);
}
catch
(
ExecutionException
x
)
{
}
catch
(
ExecutionException
x
)
{
if
(!(
x
.
getCause
()
instanceof
ClosedChannelException
))
if
(!(
x
.
getCause
()
instanceof
ClosedChannelException
))
throw
new
RuntimeException
(
"Cause of ClosedChannelException expected"
);
throw
new
RuntimeException
(
"Cause of ClosedChannelException expected"
);
}
final
AtomicReference
<
Throwable
>
connectException
=
new
AtomicReference
<
Throwable
>();
ch
.
connect
(
server
.
address
(),
(
Void
)
null
,
new
CompletionHandler
<
Void
,
Void
>()
{
public
void
completed
(
Void
result
,
Void
att
)
{
}
}
public
void
failed
(
Throwable
exc
,
Void
att
)
{
final
AtomicReference
<
Throwable
>
connectException
=
new
AtomicReference
<>();
connectException
.
set
(
exc
);
ch
.
connect
(
server
.
address
(),
(
Void
)
null
,
new
CompletionHandler
<
Void
,
Void
>()
{
public
void
completed
(
Void
result
,
Void
att
)
{
}
public
void
failed
(
Throwable
exc
,
Void
att
)
{
connectException
.
set
(
exc
);
}
});
while
(
connectException
.
get
()
==
null
)
{
Thread
.
sleep
(
100
);
}
}
});
if
(!(
connectException
.
get
()
instanceof
ClosedChannelException
))
while
(
connectException
.
get
()
==
null
)
{
throw
new
RuntimeException
(
"ClosedChannelException expected"
);
Thread
.
sleep
(
100
);
}
}
if
(!(
connectException
.
get
()
instanceof
ClosedChannelException
))
throw
new
RuntimeException
(
"ClosedChannelException expected"
);
System
.
out
.
println
(
"-- connect to non-existent host --"
);
// test that failure to connect closes the channel
// test that failure to connect closes the channel
ch
=
AsynchronousSocketChannel
.
open
();
try
(
AsynchronousSocketChannel
ch
=
AsynchronousSocketChannel
.
open
())
{
try
{
try
{
ch
.
connect
(
genSocketAddress
()).
get
();
ch
.
connect
(
address
).
get
();
}
catch
(
ExecutionException
x
)
{
}
catch
(
ExecutionException
x
)
{
// failed to establish connection
// failed to establish connection
if
(
ch
.
isOpen
())
if
(
ch
.
isOpen
())
throw
new
RuntimeException
(
"Channel should be closed"
);
throw
new
RuntimeException
(
"Channel should be closed"
);
}
finally
{
}
ch
.
close
();
}
}
server
.
close
();
// repeat test by connecting to a (probably) non-existent host. This
// improves the chance that the connect will not fail immediately.
if
(!
skipSlowConnectTest
)
{
try
(
AsynchronousSocketChannel
ch
=
AsynchronousSocketChannel
.
open
())
{
try
{
ch
.
connect
(
genSocketAddress
()).
get
();
}
catch
(
ExecutionException
x
)
{
// failed to establish connection
if
(
ch
.
isOpen
())
throw
new
RuntimeException
(
"Channel should be closed"
);
}
}
}
}
}
static
void
testCloseWhenPending
()
throws
Exception
{
static
void
testCloseWhenPending
()
throws
Exception
{
...
@@ -249,466 +275,460 @@ public class Basic {
...
@@ -249,466 +275,460 @@ public class Basic {
System
.
out
.
println
(
"-- asynchronous close when reading --"
);
System
.
out
.
println
(
"-- asynchronous close when reading --"
);
Server
server
=
new
Server
();
try
(
Server
server
=
new
Server
())
{
ch
=
AsynchronousSocketChannel
.
open
();
ch
=
AsynchronousSocketChannel
.
open
();
ch
.
connect
(
server
.
address
()).
get
();
ch
.
connect
(
server
.
address
()).
get
();
ByteBuffer
dst
=
ByteBuffer
.
allocateDirect
(
100
);
ByteBuffer
dst
=
ByteBuffer
.
allocateDirect
(
100
);
Future
<
Integer
>
result
=
ch
.
read
(
dst
);
Future
<
Integer
>
result
=
ch
.
read
(
dst
);
// attempt a second read - should fail with ReadPendingException
// attempt a second read - should fail with ReadPendingException
ByteBuffer
buf
=
ByteBuffer
.
allocateDirect
(
100
);
ByteBuffer
buf
=
ByteBuffer
.
allocateDirect
(
100
);
try
{
try
{
ch
.
read
(
buf
);
ch
.
read
(
buf
);
throw
new
RuntimeException
(
"ReadPendingException expected"
);
throw
new
RuntimeException
(
"ReadPendingException expected"
);
}
catch
(
ReadPendingException
x
)
{
}
catch
(
ReadPendingException
x
)
{
}
}
// close channel (should cause initial read to complete)
// close channel (should cause initial read to complete)
ch
.
close
();
ch
.
close
();
server
.
accept
().
close
();
// check that AsynchronousCloseException is thrown
// check that AsynchronousCloseException is thrown
try
{
try
{
result
.
get
();
result
.
get
();
throw
new
RuntimeException
(
"Should not read"
);
throw
new
RuntimeException
(
"Should not read"
);
}
catch
(
ExecutionException
x
)
{
}
catch
(
ExecutionException
x
)
{
if
(!(
x
.
getCause
()
instanceof
AsynchronousCloseException
))
if
(!(
x
.
getCause
()
instanceof
AsynchronousCloseException
))
throw
new
RuntimeException
(
x
);
throw
new
RuntimeException
(
x
);
}
}
System
.
out
.
println
(
"-- asynchronous close when writing --"
);
System
.
out
.
println
(
"-- asynchronous close when writing --"
);
ch
=
AsynchronousSocketChannel
.
open
();
ch
=
AsynchronousSocketChannel
.
open
();
ch
.
connect
(
server
.
address
()).
get
();
ch
.
connect
(
server
.
address
()).
get
();
final
AtomicReference
<
Throwable
>
writeException
=
final
AtomicReference
<
Throwable
>
writeException
=
new
AtomicReference
<
Throwable
>();
new
AtomicReference
<
Throwable
>();
// write bytes to fill socket buffer
// write bytes to fill socket buffer
ch
.
write
(
genBuffer
(),
ch
,
new
CompletionHandler
<
Integer
,
AsynchronousSocketChannel
>()
{
ch
.
write
(
genBuffer
(),
ch
,
new
CompletionHandler
<
Integer
,
AsynchronousSocketChannel
>()
{
public
void
completed
(
Integer
result
,
AsynchronousSocketChannel
ch
)
{
public
void
completed
(
Integer
result
,
AsynchronousSocketChannel
ch
)
{
ch
.
write
(
genBuffer
(),
ch
,
this
);
ch
.
write
(
genBuffer
(),
ch
,
this
);
}
}
public
void
failed
(
Throwable
x
,
AsynchronousSocketChannel
ch
)
{
public
void
failed
(
Throwable
x
,
AsynchronousSocketChannel
ch
)
{
writeException
.
set
(
x
);
writeException
.
set
(
x
);
}
}
});
});
// give time for socket buffer to fill up.
// give time for socket buffer to fill up.
Thread
.
sleep
(
5
*
1000
);
Thread
.
sleep
(
5
*
1000
);
// attempt a concurrent write - should fail with WritePendingException
// attempt a concurrent write - should fail with WritePendingException
try
{
try
{
ch
.
write
(
genBuffer
());
ch
.
write
(
genBuffer
());
throw
new
RuntimeException
(
"WritePendingException expected"
);
throw
new
RuntimeException
(
"WritePendingException expected"
);
}
catch
(
WritePendingException
x
)
{
}
catch
(
WritePendingException
x
)
{
}
}
// close channel - should cause initial write to complete
// close channel - should cause initial write to complete
ch
.
close
();
ch
.
close
();
server
.
accept
().
close
();
// wait for exception
// wait for exception
while
(
writeException
.
get
()
==
null
)
{
while
(
writeException
.
get
()
==
null
)
{
Thread
.
sleep
(
100
);
Thread
.
sleep
(
100
);
}
if
(!(
writeException
.
get
()
instanceof
AsynchronousCloseException
))
throw
new
RuntimeException
(
"AsynchronousCloseException expected"
);
}
}
if
(!(
writeException
.
get
()
instanceof
AsynchronousCloseException
))
throw
new
RuntimeException
(
"AsynchronousCloseException expected"
);
server
.
close
();
}
}
static
void
testCancel
()
throws
Exception
{
static
void
testCancel
()
throws
Exception
{
System
.
out
.
println
(
"-- cancel --"
);
System
.
out
.
println
(
"-- cancel --"
);
Server
server
=
new
Server
();
try
(
Server
server
=
new
Server
())
{
for
(
int
i
=
0
;
i
<
2
;
i
++)
{
for
(
int
i
=
0
;
i
<
2
;
i
++)
{
boolean
mayInterruptIfRunning
=
(
i
==
0
)
?
false
:
true
;
boolean
mayInterruptIfRunning
=
(
i
==
0
)
?
false
:
true
;
// establish loopback connection
// establish loopback connection
AsynchronousSocketChannel
ch
=
AsynchronousSocketChannel
.
open
();
AsynchronousSocketChannel
ch
=
AsynchronousSocketChannel
.
open
();
ch
.
connect
(
server
.
address
()).
get
();
ch
.
connect
(
server
.
address
()).
get
();
SocketChannel
peer
=
server
.
accept
();
SocketChannel
peer
=
server
.
accept
();
// start read operation
// start read operation
ByteBuffer
buf
=
ByteBuffer
.
allocate
(
1
);
ByteBuffer
buf
=
ByteBuffer
.
allocate
(
1
);
Future
<
Integer
>
res
=
ch
.
read
(
buf
);
Future
<
Integer
>
res
=
ch
.
read
(
buf
);
// cancel operation
// cancel operation
boolean
cancelled
=
res
.
cancel
(
mayInterruptIfRunning
);
boolean
cancelled
=
res
.
cancel
(
mayInterruptIfRunning
);
// check post-conditions
if
(!
res
.
isDone
())
throw
new
RuntimeException
(
"isDone should return true"
);
if
(
res
.
isCancelled
()
!=
cancelled
)
throw
new
RuntimeException
(
"isCancelled not consistent"
);
try
{
res
.
get
();
throw
new
RuntimeException
(
"CancellationException expected"
);
}
catch
(
CancellationException
x
)
{
}
try
{
res
.
get
(
1
,
TimeUnit
.
SECONDS
);
throw
new
RuntimeException
(
"CancellationException expected"
);
}
catch
(
CancellationException
x
)
{
}
// check post-conditions
// check that the cancel doesn't impact writing to the channel
if
(!
res
.
isDone
())
if
(!
mayInterruptIfRunning
)
{
throw
new
RuntimeException
(
"isDone should return true"
);
buf
=
ByteBuffer
.
wrap
(
"a"
.
getBytes
());
if
(
res
.
isCancelled
()
!=
cancelled
)
ch
.
write
(
buf
).
get
();
throw
new
RuntimeException
(
"isCancelled not consistent"
);
}
try
{
res
.
get
();
throw
new
RuntimeException
(
"CancellationException expected"
);
}
catch
(
CancellationException
x
)
{
}
try
{
res
.
get
(
1
,
TimeUnit
.
SECONDS
);
throw
new
RuntimeException
(
"CancellationException expected"
);
}
catch
(
CancellationException
x
)
{
}
// check that the cancel doesn't impact writing to the channel
ch
.
close
();
if
(!
mayInterruptIfRunning
)
{
peer
.
close
();
buf
=
ByteBuffer
.
wrap
(
"a"
.
getBytes
());
ch
.
write
(
buf
).
get
();
}
}
ch
.
close
();
peer
.
close
();
}
}
server
.
close
();
}
}
static
void
testRead1
()
throws
Exception
{
static
void
testRead1
()
throws
Exception
{
System
.
out
.
println
(
"-- read (1) --"
);
System
.
out
.
println
(
"-- read (1) --"
);
Server
server
=
new
Server
();
try
(
Server
server
=
new
Server
())
{
final
AsynchronousSocketChannel
ch
=
AsynchronousSocketChannel
.
open
();
final
AsynchronousSocketChannel
ch
=
AsynchronousSocketChannel
.
open
();
ch
.
connect
(
server
.
address
()).
get
();
ch
.
connect
(
server
.
address
()).
get
();
// read with 0 bytes remaining should complete immediately
// read with 0 bytes remaining should complete immediately
ByteBuffer
buf
=
ByteBuffer
.
allocate
(
1
);
ByteBuffer
buf
=
ByteBuffer
.
allocate
(
1
);
buf
.
put
((
byte
)
0
);
buf
.
put
((
byte
)
0
);
int
n
=
ch
.
read
(
buf
).
get
();
int
n
=
ch
.
read
(
buf
).
get
();
if
(
n
!=
0
)
if
(
n
!=
0
)
throw
new
RuntimeException
(
"0 expected"
);
throw
new
RuntimeException
(
"0 expected"
);
// write bytes and close connection
// write bytes and close connection
SocketChannel
sc
=
server
.
accept
();
ByteBuffer
src
=
genBuffer
();
ByteBuffer
src
=
genBuffer
();
try
(
SocketChannel
sc
=
server
.
accept
())
{
sc
.
setOption
(
StandardSocketOptions
.
SO_SNDBUF
,
src
.
remaining
());
sc
.
setOption
(
SO_SNDBUF
,
src
.
remaining
());
while
(
src
.
hasRemaining
())
while
(
src
.
hasRemaining
())
sc
.
write
(
src
);
sc
.
write
(
src
);
sc
.
close
();
// reads should complete immediately
final
ByteBuffer
dst
=
ByteBuffer
.
allocateDirect
(
src
.
capacity
()
+
100
);
final
CountDownLatch
latch
=
new
CountDownLatch
(
1
);
ch
.
read
(
dst
,
(
Void
)
null
,
new
CompletionHandler
<
Integer
,
Void
>()
{
public
void
completed
(
Integer
result
,
Void
att
)
{
int
n
=
result
;
if
(
n
>
0
)
{
ch
.
read
(
dst
,
(
Void
)
null
,
this
);
}
else
{
latch
.
countDown
();
}
}
public
void
failed
(
Throwable
exc
,
Void
att
)
{
}
}
});
latch
.
await
();
// reads should complete immediately
final
ByteBuffer
dst
=
ByteBuffer
.
allocateDirect
(
src
.
capacity
()
+
100
);
final
CountDownLatch
latch
=
new
CountDownLatch
(
1
);
ch
.
read
(
dst
,
(
Void
)
null
,
new
CompletionHandler
<
Integer
,
Void
>()
{
public
void
completed
(
Integer
result
,
Void
att
)
{
int
n
=
result
;
if
(
n
>
0
)
{
ch
.
read
(
dst
,
(
Void
)
null
,
this
);
}
else
{
latch
.
countDown
();
}
}
public
void
failed
(
Throwable
exc
,
Void
att
)
{
}
});
// check buffers
latch
.
await
();
src
.
flip
();
dst
.
flip
();
if
(!
src
.
equals
(
dst
))
{
throw
new
RuntimeException
(
"Contents differ"
);
}
// close channel
// check buffers
ch
.
close
();
src
.
flip
();
dst
.
flip
();
if
(!
src
.
equals
(
dst
))
{
throw
new
RuntimeException
(
"Contents differ"
);
}
// check read fails with ClosedChannelException
// close channel
try
{
ch
.
close
();
ch
.
read
(
dst
).
get
();
throw
new
RuntimeException
(
"ExecutionException expected"
);
}
catch
(
ExecutionException
x
)
{
if
(!(
x
.
getCause
()
instanceof
ClosedChannelException
))
throw
new
RuntimeException
(
"Cause of ClosedChannelException expected"
);
}
server
.
close
();
// check read fails with ClosedChannelException
try
{
ch
.
read
(
dst
).
get
();
throw
new
RuntimeException
(
"ExecutionException expected"
);
}
catch
(
ExecutionException
x
)
{
if
(!(
x
.
getCause
()
instanceof
ClosedChannelException
))
throw
new
RuntimeException
(
"Cause of ClosedChannelException expected"
);
}
}
}
}
static
void
testRead2
()
throws
Exception
{
static
void
testRead2
()
throws
Exception
{
System
.
out
.
println
(
"-- read (2) --"
);
System
.
out
.
println
(
"-- read (2) --"
);
Server
server
=
new
Server
();
try
(
Server
server
=
new
Server
())
{
final
AsynchronousSocketChannel
ch
=
AsynchronousSocketChannel
.
open
();
final
AsynchronousSocketChannel
ch
=
AsynchronousSocketChannel
.
open
();
ch
.
connect
(
server
.
address
()).
get
();
ch
.
connect
(
server
.
address
()).
ge
t
();
SocketChannel
sc
=
server
.
accep
t
();
SocketChannel
sc
=
server
.
accept
();
ByteBuffer
src
=
genBuffer
();
ByteBuffer
src
=
genBuffer
();
// read until the buffer is full
// read until the buffer is full
final
ByteBuffer
dst
=
ByteBuffer
.
allocateDirect
(
src
.
capacity
());
final
ByteBuffer
dst
=
ByteBuffer
.
allocateDirect
(
src
.
capacity
()
);
final
CountDownLatch
latch
=
new
CountDownLatch
(
1
);
final
CountDownLatch
latch
=
new
CountDownLatch
(
1
);
ch
.
read
(
dst
,
(
Void
)
null
,
new
CompletionHandler
<
Integer
,
Void
>()
{
ch
.
read
(
dst
,
(
Void
)
null
,
new
CompletionHandler
<
Integer
,
Void
>(
)
{
public
void
completed
(
Integer
result
,
Void
att
)
{
public
void
completed
(
Integer
result
,
Void
att
)
{
if
(
dst
.
hasRemaining
()
)
{
if
(
dst
.
hasRemaining
())
{
ch
.
read
(
dst
,
(
Void
)
null
,
this
);
ch
.
read
(
dst
,
(
Void
)
null
,
this
);
}
else
{
}
else
{
latch
.
countDown
();
latch
.
countDown
();
}
}
}
public
void
failed
(
Throwable
exc
,
Void
att
)
{
}
});
// trickle the writing
do
{
int
rem
=
src
.
remaining
();
int
size
=
(
rem
<=
100
)
?
rem
:
50
+
rand
.
nextInt
(
rem
-
100
);
ByteBuffer
buf
=
ByteBuffer
.
allocate
(
size
);
for
(
int
i
=
0
;
i
<
size
;
i
++)
buf
.
put
(
src
.
get
());
buf
.
flip
();
Thread
.
sleep
(
50
+
rand
.
nextInt
(
1500
));
while
(
buf
.
hasRemaining
())
sc
.
write
(
buf
);
}
while
(
src
.
hasRemaining
());
// wait until ascynrhonous reading has completed
latch
.
await
();
// check buffers
src
.
flip
();
dst
.
flip
();
if
(!
src
.
equals
(
dst
))
{
throw
new
RuntimeException
(
"Contents differ"
);
}
}
public
void
failed
(
Throwable
exc
,
Void
att
)
{
}
});
// trickle the writing
do
{
int
rem
=
src
.
remaining
();
int
size
=
(
rem
<=
100
)
?
rem
:
50
+
rand
.
nextInt
(
rem
-
100
);
ByteBuffer
buf
=
ByteBuffer
.
allocate
(
size
);
for
(
int
i
=
0
;
i
<
size
;
i
++)
buf
.
put
(
src
.
get
());
buf
.
flip
();
Thread
.
sleep
(
50
+
rand
.
nextInt
(
1500
));
while
(
buf
.
hasRemaining
())
sc
.
write
(
buf
);
}
while
(
src
.
hasRemaining
());
// wait until ascynrhonous reading has completed
latch
.
await
();
// check buffers
src
.
flip
();
dst
.
flip
();
if
(!
src
.
equals
(
dst
))
{
throw
new
RuntimeException
(
"Contents differ"
);
}
sc
.
close
();
sc
.
close
();
ch
.
close
();
ch
.
close
();
server
.
close
();
}
}
}
// exercise scattering read
// exercise scattering read
static
void
testRead3
()
throws
Exception
{
static
void
testRead3
()
throws
Exception
{
System
.
out
.
println
(
"-- read (3) --"
);
System
.
out
.
println
(
"-- read (3) --"
);
Server
server
=
new
Server
();
try
(
Server
server
=
new
Server
())
{
final
AsynchronousSocketChannel
ch
=
AsynchronousSocketChannel
.
open
();
final
AsynchronousSocketChannel
ch
=
AsynchronousSocketChannel
.
open
();
ch
.
connect
(
server
.
address
()).
get
();
ch
.
connect
(
server
.
address
()).
get
();
SocketChannel
sc
=
server
.
accept
();
SocketChannel
sc
=
server
.
accept
();
ByteBuffer
[]
dsts
=
new
ByteBuffer
[
3
];
ByteBuffer
[]
dsts
=
new
ByteBuffer
[
3
];
for
(
int
i
=
0
;
i
<
dsts
.
length
;
i
++)
{
for
(
int
i
=
0
;
i
<
dsts
.
length
;
i
++)
{
dsts
[
i
]
=
ByteBuffer
.
allocateDirect
(
100
);
dsts
[
i
]
=
ByteBuffer
.
allocateDirect
(
100
);
}
}
// scattering read that completes ascynhronously
// scattering read that completes ascynhronously
final
CountDownLatch
l1
=
new
CountDownLatch
(
1
);
final
CountDownLatch
l1
=
new
CountDownLatch
(
1
);
ch
.
read
(
dsts
,
0
,
dsts
.
length
,
0L
,
TimeUnit
.
SECONDS
,
(
Void
)
null
,
ch
.
read
(
dsts
,
0
,
dsts
.
length
,
0L
,
TimeUnit
.
SECONDS
,
(
Void
)
null
,
new
CompletionHandler
<
Long
,
Void
>()
{
new
CompletionHandler
<
Long
,
Void
>()
{
public
void
completed
(
Long
result
,
Void
att
)
{
public
void
completed
(
Long
result
,
Void
att
)
{
long
n
=
result
;
long
n
=
result
;
if
(
n
<=
0
)
if
(
n
<=
0
)
throw
new
RuntimeException
(
"No bytes read"
);
throw
new
RuntimeException
(
"No bytes read"
);
l1
.
countDown
();
l1
.
countDown
();
}
}
public
void
failed
(
Throwable
exc
,
Void
att
)
{
public
void
failed
(
Throwable
exc
,
Void
att
)
{
}
}
});
});
// write some bytes
// write some bytes
sc
.
write
(
genBuffer
());
sc
.
write
(
genBuffer
());
// read should now complete
// read should now complete
l1
.
await
();
l1
.
await
();
// write more bytes
// write more bytes
sc
.
write
(
genBuffer
());
sc
.
write
(
genBuffer
());
// read should complete immediately
// read should complete immediately
for
(
int
i
=
0
;
i
<
dsts
.
length
;
i
++)
{
for
(
int
i
=
0
;
i
<
dsts
.
length
;
i
++)
{
dsts
[
i
].
rewind
();
dsts
[
i
].
rewind
();
}
}
final
CountDownLatch
l2
=
new
CountDownLatch
(
1
);
final
CountDownLatch
l2
=
new
CountDownLatch
(
1
);
ch
.
read
(
dsts
,
0
,
dsts
.
length
,
0L
,
TimeUnit
.
SECONDS
,
(
Void
)
null
,
ch
.
read
(
dsts
,
0
,
dsts
.
length
,
0L
,
TimeUnit
.
SECONDS
,
(
Void
)
null
,
new
CompletionHandler
<
Long
,
Void
>()
{
new
CompletionHandler
<
Long
,
Void
>()
{
public
void
completed
(
Long
result
,
Void
att
)
{
public
void
completed
(
Long
result
,
Void
att
)
{
long
n
=
result
;
long
n
=
result
;
if
(
n
<=
0
)
if
(
n
<=
0
)
throw
new
RuntimeException
(
"No bytes read"
);
throw
new
RuntimeException
(
"No bytes read"
);
l2
.
countDown
();
l2
.
countDown
();
}
}
public
void
failed
(
Throwable
exc
,
Void
att
)
{
public
void
failed
(
Throwable
exc
,
Void
att
)
{
}
}
});
});
l2
.
await
();
l2
.
await
();
ch
.
close
();
ch
.
close
();
sc
.
close
();
sc
.
close
();
server
.
close
();
}
}
}
static
void
testWrite1
()
throws
Exception
{
static
void
testWrite1
()
throws
Exception
{
System
.
out
.
println
(
"-- write (1) --"
);
System
.
out
.
println
(
"-- write (1) --"
);
Server
server
=
new
Server
();
try
(
Server
server
=
new
Server
())
{
final
AsynchronousSocketChannel
ch
=
AsynchronousSocketChannel
.
open
();
final
AsynchronousSocketChannel
ch
=
AsynchronousSocketChannel
.
open
();
ch
.
connect
(
server
.
address
()).
get
();
ch
.
connect
(
server
.
address
()).
get
();
SocketChannel
sc
=
server
.
accept
();
SocketChannel
sc
=
server
.
accept
();
// write with 0 bytes remaining should complete immediately
// write with 0 bytes remaining should complete immediately
ByteBuffer
buf
=
ByteBuffer
.
allocate
(
1
);
ByteBuffer
buf
=
ByteBuffer
.
allocate
(
1
);
buf
.
put
((
byte
)
0
);
buf
.
put
((
byte
)
0
);
int
n
=
ch
.
write
(
buf
).
get
();
int
n
=
ch
.
write
(
buf
).
get
();
if
(
n
!=
0
)
if
(
n
!=
0
)
throw
new
RuntimeException
(
"0 expected"
);
throw
new
RuntimeException
(
"0 expected"
);
// write all bytes and close connection when done
// write all bytes and close connection when done
final
ByteBuffer
src
=
genBuffer
();
final
ByteBuffer
src
=
genBuffer
();
ch
.
write
(
src
,
(
Void
)
null
,
new
CompletionHandler
<
Integer
,
Void
>()
{
ch
.
write
(
src
,
(
Void
)
null
,
new
CompletionHandler
<
Integer
,
Void
>()
{
public
void
completed
(
Integer
result
,
Void
att
)
{
public
void
completed
(
Integer
result
,
Void
att
)
{
if
(
src
.
hasRemaining
())
{
if
(
src
.
hasRemaining
())
{
ch
.
write
(
src
,
(
Void
)
null
,
this
);
ch
.
write
(
src
,
(
Void
)
null
,
this
);
}
else
{
}
else
{
try
{
try
{
ch
.
close
();
ch
.
close
();
}
catch
(
IOException
ignore
)
{
}
}
catch
(
IOException
ignore
)
{
}
}
}
}
public
void
failed
(
Throwable
exc
,
Void
att
)
{
}
});
// read to EOF or buffer full
ByteBuffer
dst
=
ByteBuffer
.
allocateDirect
(
src
.
capacity
()
+
100
);
do
{
n
=
sc
.
read
(
dst
);
}
while
(
n
>
0
);
sc
.
close
();
// check buffers
src
.
flip
();
dst
.
flip
();
if
(!
src
.
equals
(
dst
))
{
throw
new
RuntimeException
(
"Contents differ"
);
}
}
public
void
failed
(
Throwable
exc
,
Void
att
)
{
}
});
// read to EOF or buffer full
ByteBuffer
dst
=
ByteBuffer
.
allocateDirect
(
src
.
capacity
()
+
100
);
do
{
n
=
sc
.
read
(
dst
);
}
while
(
n
>
0
);
sc
.
close
();
// check buffers
src
.
flip
();
dst
.
flip
();
if
(!
src
.
equals
(
dst
))
{
throw
new
RuntimeException
(
"Contents differ"
);
}
// check write fails with ClosedChannelException
// check write fails with ClosedChannelException
try
{
try
{
ch
.
read
(
dst
).
get
();
ch
.
read
(
dst
).
get
();
throw
new
RuntimeException
(
"ExecutionException expected"
);
throw
new
RuntimeException
(
"ExecutionException expected"
);
}
catch
(
ExecutionException
x
)
{
}
catch
(
ExecutionException
x
)
{
if
(!(
x
.
getCause
()
instanceof
ClosedChannelException
))
if
(!(
x
.
getCause
()
instanceof
ClosedChannelException
))
throw
new
RuntimeException
(
"Cause of ClosedChannelException expected"
);
throw
new
RuntimeException
(
"Cause of ClosedChannelException expected"
);
}
}
}
server
.
close
();
}
}
// exercise gathering write
// exercise gathering write
static
void
testWrite2
()
throws
Exception
{
static
void
testWrite2
()
throws
Exception
{
System
.
out
.
println
(
"-- write (2) --"
);
System
.
out
.
println
(
"-- write (2) --"
);
Server
server
=
new
Server
();
try
(
Server
server
=
new
Server
())
{
final
AsynchronousSocketChannel
ch
=
AsynchronousSocketChannel
.
open
();
final
AsynchronousSocketChannel
ch
=
AsynchronousSocketChannel
.
open
();
ch
.
connect
(
server
.
address
()).
get
();
ch
.
connect
(
server
.
address
()).
get
();
SocketChannel
sc
=
server
.
accept
();
SocketChannel
sc
=
server
.
accept
();
// number of bytes written
// number of bytes written
final
AtomicLong
bytesWritten
=
new
AtomicLong
(
0
);
final
AtomicLong
bytesWritten
=
new
AtomicLong
(
0
);
// write buffers (should complete immediately)
// write buffers (should complete immediately)
ByteBuffer
[]
srcs
=
genBuffers
(
1
);
ByteBuffer
[]
srcs
=
genBuffers
(
1
);
final
CountDownLatch
l1
=
new
CountDownLatch
(
1
);
final
CountDownLatch
l1
=
new
CountDownLatch
(
1
);
ch
.
write
(
srcs
,
0
,
srcs
.
length
,
0L
,
TimeUnit
.
SECONDS
,
(
Void
)
null
,
ch
.
write
(
srcs
,
0
,
srcs
.
length
,
0L
,
TimeUnit
.
SECONDS
,
(
Void
)
null
,
new
CompletionHandler
<
Long
,
Void
>()
{
new
CompletionHandler
<
Long
,
Void
>()
{
public
void
completed
(
Long
result
,
Void
att
)
{
public
void
completed
(
Long
result
,
Void
att
)
{
long
n
=
result
;
long
n
=
result
;
if
(
n
<=
0
)
if
(
n
<=
0
)
throw
new
RuntimeException
(
"No bytes read"
);
throw
new
RuntimeException
(
"No bytes read"
);
bytesWritten
.
addAndGet
(
n
);
bytesWritten
.
addAndGet
(
n
);
l1
.
countDown
();
l1
.
countDown
();
}
public
void
failed
(
Throwable
exc
,
Void
att
)
{
}
});
l1
.
await
();
// set to true to signal that no more buffers should be written
final
AtomicBoolean
continueWriting
=
new
AtomicBoolean
(
true
);
// write until socket buffer is full so as to create the conditions
// for when a write does not complete immediately
srcs
=
genBuffers
(
1
);
ch
.
write
(
srcs
,
0
,
srcs
.
length
,
0L
,
TimeUnit
.
SECONDS
,
(
Void
)
null
,
new
CompletionHandler
<
Long
,
Void
>()
{
public
void
completed
(
Long
result
,
Void
att
)
{
long
n
=
result
;
if
(
n
<=
0
)
throw
new
RuntimeException
(
"No bytes written"
);
bytesWritten
.
addAndGet
(
n
);
if
(
continueWriting
.
get
())
{
ByteBuffer
[]
srcs
=
genBuffers
(
8
);
ch
.
write
(
srcs
,
0
,
srcs
.
length
,
0L
,
TimeUnit
.
SECONDS
,
(
Void
)
null
,
this
);
}
}
}
public
void
failed
(
Throwable
exc
,
Void
att
)
{
public
void
failed
(
Throwable
exc
,
Void
att
)
{
}
}
});
});
l1
.
await
();
// set to true to signal that no more buffers should be written
final
AtomicBoolean
continueWriting
=
new
AtomicBoolean
(
true
);
// write until socket buffer is full so as to create the conditions
// for when a write does not complete immediately
srcs
=
genBuffers
(
1
);
ch
.
write
(
srcs
,
0
,
srcs
.
length
,
0L
,
TimeUnit
.
SECONDS
,
(
Void
)
null
,
new
CompletionHandler
<
Long
,
Void
>()
{
public
void
completed
(
Long
result
,
Void
att
)
{
long
n
=
result
;
if
(
n
<=
0
)
throw
new
RuntimeException
(
"No bytes written"
);
bytesWritten
.
addAndGet
(
n
);
if
(
continueWriting
.
get
())
{
ByteBuffer
[]
srcs
=
genBuffers
(
8
);
ch
.
write
(
srcs
,
0
,
srcs
.
length
,
0L
,
TimeUnit
.
SECONDS
,
(
Void
)
null
,
this
);
}
}
public
void
failed
(
Throwable
exc
,
Void
att
)
{
}
});
// give time for socket buffer to fill up.
// give time for socket buffer to fill up.
Thread
.
sleep
(
5
*
1000
);
Thread
.
sleep
(
5
*
1000
);
// signal handler to stop further writing
// signal handler to stop further writing
continueWriting
.
set
(
false
);
continueWriting
.
set
(
false
);
// read until done
// read until done
ByteBuffer
buf
=
ByteBuffer
.
allocateDirect
(
4096
);
ByteBuffer
buf
=
ByteBuffer
.
allocateDirect
(
4096
);
long
total
=
0L
;
long
total
=
0L
;
do
{
do
{
int
n
=
sc
.
read
(
buf
);
int
n
=
sc
.
read
(
buf
);
if
(
n
<=
0
)
if
(
n
<=
0
)
throw
new
RuntimeException
(
"No bytes read"
);
throw
new
RuntimeException
(
"No bytes read"
);
buf
.
rewind
();
buf
.
rewind
();
total
+=
n
;
total
+=
n
;
}
while
(
total
<
bytesWritten
.
get
());
}
while
(
total
<
bytesWritten
.
get
());
ch
.
close
();
ch
.
close
();
sc
.
close
();
sc
.
close
();
server
.
close
();
}
}
}
static
void
testShutdown
()
throws
Exception
{
static
void
testShutdown
()
throws
Exception
{
System
.
out
.
println
(
"-- shutdown--"
);
System
.
out
.
println
(
"-- shutdown--"
);
Server
server
=
new
Server
();
try
(
Server
server
=
new
Server
();
AsynchronousSocketChannel
ch
=
AsynchronousSocketChannel
.
open
();
AsynchronousSocketChannel
ch
=
AsynchronousSocketChannel
.
open
())
ch
.
connect
(
server
.
address
()).
get
();
{
SocketChannel
sc
=
server
.
accept
();
ch
.
connect
(
server
.
address
()).
get
();
try
(
SocketChannel
peer
=
server
.
accept
())
{
ByteBuffer
buf
=
ByteBuffer
.
allocateDirect
(
1000
);
ByteBuffer
buf
=
ByteBuffer
.
allocateDirect
(
1000
);
int
n
;
int
n
;
// check read
// check read
ch
.
shutdownInput
();
ch
.
shutdownInput
();
n
=
ch
.
read
(
buf
).
get
();
n
=
ch
.
read
(
buf
).
get
();
if
(
n
!=
-
1
)
if
(
n
!=
-
1
)
throw
new
RuntimeException
(
"-1 expected"
);
throw
new
RuntimeException
(
"-1 expected"
);
// check full with full buffer
// check full with full buffer
buf
.
put
(
new
byte
[
100
]);
buf
.
put
(
new
byte
[
100
]);
n
=
ch
.
read
(
buf
).
get
();
n
=
ch
.
read
(
buf
).
get
();
if
(
n
!=
-
1
)
if
(
n
!=
-
1
)
throw
new
RuntimeException
(
"-1 expected"
);
throw
new
RuntimeException
(
"-1 expected"
);
// check write
// check write
ch
.
shutdownOutput
();
ch
.
shutdownOutput
();
try
{
try
{
ch
.
write
(
buf
).
get
();
ch
.
write
(
buf
).
get
();
throw
new
RuntimeException
(
"ClosedChannelException expected"
);
throw
new
RuntimeException
(
"ClosedChannelException expected"
);
}
catch
(
ExecutionException
x
)
{
}
catch
(
ExecutionException
x
)
{
if
(!(
x
.
getCause
()
instanceof
ClosedChannelException
))
if
(!(
x
.
getCause
()
instanceof
ClosedChannelException
))
throw
new
RuntimeException
(
"ClosedChannelException expected"
);
throw
new
RuntimeException
(
"ClosedChannelException expected"
);
}
}
}
}
sc
.
close
();
ch
.
close
();
server
.
close
();
}
}
static
void
testTimeout
()
throws
Exception
{
static
void
testTimeout
()
throws
Exception
{
...
@@ -720,88 +740,88 @@ public class Basic {
...
@@ -720,88 +740,88 @@ public class Basic {
}
}
static
void
testTimeout
(
final
long
timeout
,
final
TimeUnit
unit
)
throws
Exception
{
static
void
testTimeout
(
final
long
timeout
,
final
TimeUnit
unit
)
throws
Exception
{
Server
server
=
new
Server
();
try
(
Server
server
=
new
Server
())
{
AsynchronousSocketChannel
ch
=
AsynchronousSocketChannel
.
open
();
AsynchronousSocketChannel
ch
=
AsynchronousSocketChannel
.
open
();
ch
.
connect
(
server
.
address
()).
get
();
ch
.
connect
(
server
.
address
()).
get
();
ByteBuffer
dst
=
ByteBuffer
.
allocate
(
512
);
final
AtomicReference
<
Throwable
>
readException
=
new
AtomicReference
<
Throwable
>(
);
ByteBuffer
dst
=
ByteBuffer
.
allocate
(
512
);
// this read should timeout if value is > 0
final
AtomicReference
<
Throwable
>
readException
=
new
AtomicReference
<
Throwable
>();
ch
.
read
(
dst
,
timeout
,
unit
,
null
,
new
CompletionHandler
<
Integer
,
Void
>()
{
public
void
completed
(
Integer
result
,
Void
att
)
{
readException
.
set
(
new
RuntimeException
(
"Should not complete"
));
}
public
void
failed
(
Throwable
exc
,
Void
att
)
{
readException
.
set
(
exc
);
}
});
if
(
timeout
>
0L
)
{
// wait for exception
while
(
readException
.
get
()
==
null
)
{
Thread
.
sleep
(
100
);
}
if
(!(
readException
.
get
()
instanceof
InterruptedByTimeoutException
))
throw
new
RuntimeException
(
"InterruptedByTimeoutException expected"
);
// after a timeout then further reading should throw unspecified runtime exception
// this read should timeout if value is > 0
boolean
exceptionThrown
=
false
;
ch
.
read
(
dst
,
timeout
,
unit
,
null
,
new
CompletionHandler
<
Integer
,
Void
>()
{
try
{
public
void
completed
(
Integer
result
,
Void
att
)
{
ch
.
read
(
dst
);
readException
.
set
(
new
RuntimeException
(
"Should not complete"
));
}
catch
(
RuntimeException
x
)
{
}
exceptionThrown
=
true
;
public
void
failed
(
Throwable
exc
,
Void
att
)
{
readException
.
set
(
exc
);
}
});
if
(
timeout
>
0L
)
{
// wait for exception
while
(
readException
.
get
()
==
null
)
{
Thread
.
sleep
(
100
);
}
if
(!(
readException
.
get
()
instanceof
InterruptedByTimeoutException
))
throw
new
RuntimeException
(
"InterruptedByTimeoutException expected"
);
// after a timeout then further reading should throw unspecified runtime exception
boolean
exceptionThrown
=
false
;
try
{
ch
.
read
(
dst
);
}
catch
(
RuntimeException
x
)
{
exceptionThrown
=
true
;
}
if
(!
exceptionThrown
)
throw
new
RuntimeException
(
"RuntimeException expected after timeout."
);
}
else
{
Thread
.
sleep
(
1000
);
Throwable
exc
=
readException
.
get
();
if
(
exc
!=
null
)
throw
new
RuntimeException
(
exc
);
}
}
if
(!
exceptionThrown
)
throw
new
RuntimeException
(
"RuntimeException expected after timeout."
);
}
else
{
Thread
.
sleep
(
1000
);
Throwable
exc
=
readException
.
get
();
if
(
exc
!=
null
)
throw
new
RuntimeException
(
exc
);
}
final
AtomicReference
<
Throwable
>
writeException
=
new
AtomicReference
<
Throwable
>();
final
AtomicReference
<
Throwable
>
writeException
=
new
AtomicReference
<
Throwable
>();
// write bytes to fill socket buffer
// write bytes to fill socket buffer
ch
.
write
(
genBuffer
(),
timeout
,
unit
,
ch
,
ch
.
write
(
genBuffer
(),
timeout
,
unit
,
ch
,
new
CompletionHandler
<
Integer
,
AsynchronousSocketChannel
>()
new
CompletionHandler
<
Integer
,
AsynchronousSocketChannel
>()
{
{
public
void
completed
(
Integer
result
,
AsynchronousSocketChannel
ch
)
{
public
void
completed
(
Integer
result
,
AsynchronousSocketChannel
ch
)
{
ch
.
write
(
genBuffer
(),
timeout
,
unit
,
ch
,
this
);
ch
.
write
(
genBuffer
(),
timeout
,
unit
,
ch
,
this
);
}
}
public
void
failed
(
Throwable
exc
,
AsynchronousSocketChannel
ch
)
{
public
void
failed
(
Throwable
exc
,
AsynchronousSocketChannel
ch
)
{
writeException
.
set
(
exc
);
writeException
.
set
(
exc
);
}
}
});
});
if
(
timeout
>
0
)
{
if
(
timeout
>
0
)
{
// wait for exception
// wait for exception
while
(
writeException
.
get
()
==
null
)
{
while
(
writeException
.
get
()
==
null
)
{
Thread
.
sleep
(
100
);
Thread
.
sleep
(
100
);
}
if
(!(
writeException
.
get
()
instanceof
InterruptedByTimeoutException
))
throw
new
RuntimeException
(
"InterruptedByTimeoutException expected"
);
// after a timeout then further writing should throw unspecified runtime exception
boolean
exceptionThrown
=
false
;
try
{
ch
.
write
(
genBuffer
());
}
catch
(
RuntimeException
x
)
{
exceptionThrown
=
true
;
}
if
(!
exceptionThrown
)
throw
new
RuntimeException
(
"RuntimeException expected after timeout."
);
}
else
{
Thread
.
sleep
(
1000
);
Throwable
exc
=
writeException
.
get
();
if
(
exc
!=
null
)
throw
new
RuntimeException
(
exc
);
}
}
if
(!(
writeException
.
get
()
instanceof
InterruptedByTimeoutException
))
throw
new
RuntimeException
(
"InterruptedByTimeoutException expected"
);
// after a timeout then further writing should throw unspecified runtime exception
// clean-up
boolean
exceptionThrown
=
false
;
server
.
accept
().
close
();
try
{
ch
.
close
();
ch
.
write
(
genBuffer
());
}
catch
(
RuntimeException
x
)
{
exceptionThrown
=
true
;
}
if
(!
exceptionThrown
)
throw
new
RuntimeException
(
"RuntimeException expected after timeout."
);
}
else
{
Thread
.
sleep
(
1000
);
Throwable
exc
=
writeException
.
get
();
if
(
exc
!=
null
)
throw
new
RuntimeException
(
exc
);
}
}
// clean-up
server
.
accept
().
close
();
ch
.
close
();
server
.
close
();
}
}
// returns ByteBuffer with random bytes
// returns ByteBuffer with random bytes
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录