提交 a0143c83 编写于 作者: A alanb

8003895: java/nio/channels/AsynchronousChannelGroup/Unbounded.java failing again [win64]

Reviewed-by: chegar
上级 5d320939
...@@ -230,9 +230,6 @@ java/nio/channels/DatagramChannel/ChangingAddress.java macosx-all ...@@ -230,9 +230,6 @@ java/nio/channels/DatagramChannel/ChangingAddress.java macosx-all
# 7132677 # 7132677
java/nio/channels/Selector/OutOfBand.java macosx-all java/nio/channels/Selector/OutOfBand.java macosx-all
# 8003895
java/nio/channels/AsynchronousChannelGroup/Unbounded.java windows-amd64
############################################################################ ############################################################################
# jdk_rmi # jdk_rmi
......
...@@ -43,47 +43,24 @@ public class Unbounded { ...@@ -43,47 +43,24 @@ public class Unbounded {
static volatile boolean finished; static volatile boolean finished;
public static void main(String[] args) throws Exception { public static void main(String[] args) throws Exception {
// all accepted connections are added to a queue
final ArrayBlockingQueue<AsynchronousSocketChannel> queue =
new ArrayBlockingQueue<AsynchronousSocketChannel>(CONCURRENCY_COUNT);
// create listener to accept connections // create listener to accept connections
final AsynchronousServerSocketChannel listener = AsynchronousServerSocketChannel listener =
AsynchronousServerSocketChannel.open() AsynchronousServerSocketChannel.open()
.bind(new InetSocketAddress(0)); .bind(new InetSocketAddress(0));
listener.accept((Void)null, new CompletionHandler<AsynchronousSocketChannel,Void>() {
public void completed(AsynchronousSocketChannel ch, Void att) {
queue.add(ch);
listener.accept((Void)null, this);
}
public void failed(Throwable exc, Void att) {
if (!finished) {
failed = true;
System.err.println("accept failed: " + exc);
}
}
});
System.out.println("Listener created.");
// establish lots of connections // establish connections
AsynchronousSocketChannel[] clients = new AsynchronousSocketChannel[CONCURRENCY_COUNT];
AsynchronousSocketChannel[] peers = new AsynchronousSocketChannel[CONCURRENCY_COUNT];
int port = ((InetSocketAddress)(listener.getLocalAddress())).getPort(); int port = ((InetSocketAddress)(listener.getLocalAddress())).getPort();
SocketAddress sa = new InetSocketAddress(InetAddress.getLocalHost(), port); SocketAddress sa = new InetSocketAddress(InetAddress.getLocalHost(), port);
AsynchronousSocketChannel[] channels =
new AsynchronousSocketChannel[CONCURRENCY_COUNT];
for (int i=0; i<CONCURRENCY_COUNT; i++) { for (int i=0; i<CONCURRENCY_COUNT; i++) {
int attempts = 0; clients[i] = AsynchronousSocketChannel.open();
for (;;) { Future<Void> result = clients[i].connect(sa);
try { peers[i] = listener.accept().get();
channels[i] = AsynchronousSocketChannel.open(); result.get();
channels[i].connect(sa).get();
break;
} catch (IOException x) {
// probably resource issue so back off and retry
if (++attempts >= 3)
throw x;
Thread.sleep(50);
}
}
} }
System.out.println("All connection established."); System.out.println("All connection established.");
...@@ -91,9 +68,9 @@ public class Unbounded { ...@@ -91,9 +68,9 @@ public class Unbounded {
final CyclicBarrier barrier = new CyclicBarrier(CONCURRENCY_COUNT+1); final CyclicBarrier barrier = new CyclicBarrier(CONCURRENCY_COUNT+1);
// initiate a read operation on each channel. // initiate a read operation on each channel.
for (int i=0; i<CONCURRENCY_COUNT; i++) { for (AsynchronousSocketChannel client: clients) {
ByteBuffer buf = ByteBuffer.allocateDirect(100); ByteBuffer buf = ByteBuffer.allocateDirect(100);
channels[i].read( buf, channels[i], client.read(buf, client,
new CompletionHandler<Integer,AsynchronousSocketChannel>() { new CompletionHandler<Integer,AsynchronousSocketChannel>() {
public void completed(Integer bytesRead, AsynchronousSocketChannel ch) { public void completed(Integer bytesRead, AsynchronousSocketChannel ch) {
try { try {
...@@ -113,13 +90,10 @@ public class Unbounded { ...@@ -113,13 +90,10 @@ public class Unbounded {
System.out.println("All read operations outstanding."); System.out.println("All read operations outstanding.");
// write data to each of the accepted connections // write data to each of the accepted connections
int remaining = CONCURRENCY_COUNT; for (AsynchronousSocketChannel peer: peers) {
while (remaining > 0) { peer.write(ByteBuffer.wrap("welcome".getBytes())).get();
AsynchronousSocketChannel ch = queue.take(); peer.shutdownOutput();
ch.write(ByteBuffer.wrap("welcome".getBytes())).get(); peer.close();
ch.shutdownOutput();
ch.close();
remaining--;
} }
// wait for all threads to reach the barrier // wait for all threads to reach the barrier
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册