提交 902ea32d 编写于 作者: M msheppar

8144144: ORB destroy() leaks filedescriptors after unsuccessful connection

Reviewed-by: chegar, coffeys
上级 a1ef1e2a
...@@ -54,11 +54,17 @@ public class CorbaInboundConnectionCacheImpl ...@@ -54,11 +54,17 @@ public class CorbaInboundConnectionCacheImpl
{ {
protected Collection connectionCache; protected Collection connectionCache;
private Acceptor acceptor;
public CorbaInboundConnectionCacheImpl(ORB orb, Acceptor acceptor) public CorbaInboundConnectionCacheImpl(ORB orb, Acceptor acceptor)
{ {
super(orb, acceptor.getConnectionCacheType(), super(orb, acceptor.getConnectionCacheType(),
((CorbaAcceptor)acceptor).getMonitoringName()); ((CorbaAcceptor)acceptor).getMonitoringName());
this.connectionCache = new ArrayList(); this.connectionCache = new ArrayList();
this.acceptor = acceptor;
if (orb.transportDebugFlag) {
dprint(": " + acceptor );
}
} }
//////////////////////////////////////////////////// ////////////////////////////////////////////////////
...@@ -66,11 +72,25 @@ public class CorbaInboundConnectionCacheImpl ...@@ -66,11 +72,25 @@ public class CorbaInboundConnectionCacheImpl
// pept.transport.InboundConnectionCache // pept.transport.InboundConnectionCache
// //
public void close () {
super.close();
if (orb.transportDebugFlag) {
dprint(".close: " + acceptor );
}
this.acceptor.close();
}
public Connection get(Acceptor acceptor) public Connection get(Acceptor acceptor)
{ {
throw wrapper.methodShouldNotBeCalled(); throw wrapper.methodShouldNotBeCalled();
} }
public Acceptor getAcceptor () {
return acceptor;
}
public void put(Acceptor acceptor, Connection connection) public void put(Acceptor acceptor, Connection connection)
{ {
if (orb.transportDebugFlag) { if (orb.transportDebugFlag) {
......
...@@ -188,8 +188,9 @@ public class CorbaTransportManagerImpl ...@@ -188,8 +188,9 @@ public class CorbaTransportManagerImpl
for (Object cc : outboundConnectionCaches.values()) { for (Object cc : outboundConnectionCaches.values()) {
((ConnectionCache)cc).close() ; ((ConnectionCache)cc).close() ;
} }
for (Object cc : inboundConnectionCaches.values()) { for (Object icc : inboundConnectionCaches.values()) {
((ConnectionCache)cc).close() ; ((ConnectionCache)icc).close() ;
unregisterAcceptor(((InboundConnectionCache)icc).getAcceptor());
} }
getSelector(0).close(); getSelector(0).close();
} finally { } finally {
......
...@@ -26,16 +26,20 @@ ...@@ -26,16 +26,20 @@
package com.sun.corba.se.impl.transport; package com.sun.corba.se.impl.transport;
import java.io.IOException; import java.io.IOException;
import java.net.ServerSocket;
import java.nio.channels.ClosedChannelException; import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectableChannel; import java.nio.channels.SelectableChannel;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SelectionKey; import java.nio.channels.SelectionKey;
import java.nio.channels.Selector; import java.nio.channels.Selector;
import java.nio.channels.ClosedSelectorException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import com.sun.corba.se.pept.broker.Broker; import com.sun.corba.se.pept.broker.Broker;
import com.sun.corba.se.pept.transport.Acceptor; import com.sun.corba.se.pept.transport.Acceptor;
import com.sun.corba.se.pept.transport.Connection; import com.sun.corba.se.pept.transport.Connection;
...@@ -111,8 +115,17 @@ class SelectorImpl ...@@ -111,8 +115,17 @@ class SelectorImpl
interestOpsList.add(keyAndOp); interestOpsList.add(keyAndOp);
} }
// tell Selector Thread there's an update to a SelectorKey's Ops // tell Selector Thread there's an update to a SelectorKey's Ops
try {
if (selector != null) {
// wakeup Selector thread to process close request
selector.wakeup(); selector.wakeup();
} }
} catch (Throwable t) {
if (orb.transportDebugFlag) {
dprint(".registerInterestOps: selector.wakeup: ", t);
}
}
}
else { else {
wrapper.selectionKeyInvalid(eventHandler.toString()); wrapper.selectionKeyInvalid(eventHandler.toString());
if (orb.transportDebugFlag) { if (orb.transportDebugFlag) {
...@@ -186,7 +199,9 @@ class SelectorImpl ...@@ -186,7 +199,9 @@ class SelectorImpl
if (selectionKey != null) { if (selectionKey != null) {
selectionKey.cancel(); selectionKey.cancel();
} }
if (selector != null) {
selector.wakeup(); selector.wakeup();
}
return; return;
} }
...@@ -239,6 +254,8 @@ class SelectorImpl ...@@ -239,6 +254,8 @@ class SelectorImpl
readerThread.close(); readerThread.close();
} }
clearDeferredRegistrations();
// Selector // Selector
try { try {
...@@ -248,7 +265,7 @@ class SelectorImpl ...@@ -248,7 +265,7 @@ class SelectorImpl
} }
} catch (Throwable t) { } catch (Throwable t) {
if (orb.transportDebugFlag) { if (orb.transportDebugFlag) {
dprint(".close: selector.close: " + t); dprint(".close: selector.wakeup: ", t);
} }
} }
} }
...@@ -273,15 +290,16 @@ class SelectorImpl ...@@ -273,15 +290,16 @@ class SelectorImpl
n = selector.select(timeout); n = selector.select(timeout);
} catch (IOException e) { } catch (IOException e) {
if (orb.transportDebugFlag) { if (orb.transportDebugFlag) {
dprint(".run: selector.select: " + e); dprint(".run: selector.select: ", e);
} }
} } catch (ClosedSelectorException csEx) {
if (closed) {
selector.close();
if (orb.transportDebugFlag) { if (orb.transportDebugFlag) {
dprint(".run: closed - .run return"); dprint(".run: selector.select: ", csEx);
} }
return; break;
}
if (closed) {
break;
} }
/* /*
if (timeout == 0 && orb.transportDebugFlag) { if (timeout == 0 && orb.transportDebugFlag) {
...@@ -321,6 +339,18 @@ class SelectorImpl ...@@ -321,6 +339,18 @@ class SelectorImpl
} }
} }
} }
try {
if (selector != null) {
if (orb.transportDebugFlag) {
dprint(".run: selector.close ");
}
selector.close();
}
} catch (Throwable t) {
if (orb.transportDebugFlag) {
dprint(".run: selector.close: ", t);
}
}
} }
///////////////////////////////////////////////////// /////////////////////////////////////////////////////
...@@ -328,6 +358,44 @@ class SelectorImpl ...@@ -328,6 +358,44 @@ class SelectorImpl
// Implementation. // Implementation.
// //
private void clearDeferredRegistrations() {
synchronized (deferredRegistrations) {
int deferredListSize = deferredRegistrations.size();
if (orb.transportDebugFlag) {
dprint(".clearDeferredRegistrations:deferred list size == " + deferredListSize);
}
for (int i = 0; i < deferredListSize; i++) {
EventHandler eventHandler =
(EventHandler)deferredRegistrations.get(i);
if (orb.transportDebugFlag) {
dprint(".clearDeferredRegistrations: " + eventHandler);
}
SelectableChannel channel = eventHandler.getChannel();
SelectionKey selectionKey = null;
try {
if (orb.transportDebugFlag) {
dprint(".clearDeferredRegistrations:close channel == "
+ channel);
dprint(".clearDeferredRegistrations:close channel class == "
+ channel.getClass().getName());
}
channel.close();
selectionKey = eventHandler.getSelectionKey();
if (selectionKey != null) {
selectionKey.cancel();
selectionKey.attach(null);
}
} catch (IOException ioEx) {
if (orb.transportDebugFlag) {
dprint(".clearDeferredRegistrations: ", ioEx);
}
}
}
deferredRegistrations.clear();
}
}
private synchronized boolean isClosed () private synchronized boolean isClosed ()
{ {
return closed; return closed;
...@@ -344,7 +412,7 @@ class SelectorImpl ...@@ -344,7 +412,7 @@ class SelectorImpl
selector = Selector.open(); selector = Selector.open();
} catch (IOException e) { } catch (IOException e) {
if (orb.transportDebugFlag) { if (orb.transportDebugFlag) {
dprint(".startSelector: Selector.open: IOException: " + e); dprint(".startSelector: Selector.open: IOException: ", e);
} }
// REVISIT - better handling/reporting // REVISIT - better handling/reporting
RuntimeException rte = RuntimeException rte =
...@@ -379,7 +447,7 @@ class SelectorImpl ...@@ -379,7 +447,7 @@ class SelectorImpl
(Object)eventHandler); (Object)eventHandler);
} catch (ClosedChannelException e) { } catch (ClosedChannelException e) {
if (orb.transportDebugFlag) { if (orb.transportDebugFlag) {
dprint(".handleDeferredRegistrations: " + e); dprint(".handleDeferredRegistrations: ", e);
} }
} }
eventHandler.setSelectionKey(selectionKey); eventHandler.setSelectionKey(selectionKey);
......
...@@ -264,8 +264,13 @@ public class SocketOrChannelAcceptorImpl ...@@ -264,8 +264,13 @@ public class SocketOrChannelAcceptorImpl
if (connection.shouldRegisterServerReadEvent()) { if (connection.shouldRegisterServerReadEvent()) {
Selector selector = orb.getTransportManager().getSelector(0); Selector selector = orb.getTransportManager().getSelector(0);
if (selector != null) {
if (orb.transportDebugFlag) {
dprint(".accept: registerForEvent: " + connection);
}
selector.registerForEvent(connection.getEventHandler()); selector.registerForEvent(connection.getEventHandler());
} }
}
getConnectionCache().reclaim(); getConnectionCache().reclaim();
...@@ -273,14 +278,17 @@ public class SocketOrChannelAcceptorImpl ...@@ -273,14 +278,17 @@ public class SocketOrChannelAcceptorImpl
if (orb.transportDebugFlag) { if (orb.transportDebugFlag) {
dprint(".accept:", e); dprint(".accept:", e);
} }
orb.getTransportManager().getSelector(0).unregisterForEvent(this); Selector selector = orb.getTransportManager().getSelector(0);
if (selector != null) {
selector.unregisterForEvent(this);
// REVISIT - need to close - recreate - then register new one. // REVISIT - need to close - recreate - then register new one.
orb.getTransportManager().getSelector(0).registerForEvent(this); selector.registerForEvent(this);
// NOTE: if register cycling we do not want to shut down ORB // NOTE: if register cycling we do not want to shut down ORB
// since local beans will still work. Instead one will see // since local beans will still work. Instead one will see
// a growing log file to alert admin of problem. // a growing log file to alert admin of problem.
} }
} }
}
public void close () public void close ()
{ {
...@@ -289,7 +297,9 @@ public class SocketOrChannelAcceptorImpl ...@@ -289,7 +297,9 @@ public class SocketOrChannelAcceptorImpl
dprint(".close->:"); dprint(".close->:");
} }
Selector selector = orb.getTransportManager().getSelector(0); Selector selector = orb.getTransportManager().getSelector(0);
if (selector != null) {
selector.unregisterForEvent(this); selector.unregisterForEvent(this);
}
if (serverSocketChannel != null) { if (serverSocketChannel != null) {
serverSocketChannel.close(); serverSocketChannel.close();
} }
...@@ -480,7 +490,9 @@ public class SocketOrChannelAcceptorImpl ...@@ -480,7 +490,9 @@ public class SocketOrChannelAcceptorImpl
// of calling SelectionKey.interestOps(<interest op>). // of calling SelectionKey.interestOps(<interest op>).
Selector selector = orb.getTransportManager().getSelector(0); Selector selector = orb.getTransportManager().getSelector(0);
if (selector != null) {
selector.registerInterestOps(this); selector.registerInterestOps(this);
}
if (orb.transportDebugFlag) { if (orb.transportDebugFlag) {
dprint(".doWork<-:" + this); dprint(".doWork<-:" + this);
......
...@@ -367,7 +367,10 @@ public class SocketOrChannelConnectionImpl ...@@ -367,7 +367,10 @@ public class SocketOrChannelConnectionImpl
} }
} }
// REVISIT - make sure reader thread is killed. // REVISIT - make sure reader thread is killed.
orb.getTransportManager().getSelector(0).unregisterForEvent(this); Selector selector = orb.getTransportManager().getSelector(0);
if (selector != null) {
selector.unregisterForEvent(this);
}
// Notify anyone waiting. // Notify anyone waiting.
purgeCalls(wrapper.connectionAbort(ex), true, false); purgeCalls(wrapper.connectionAbort(ex), true, false);
// REVISIT // REVISIT
...@@ -801,7 +804,9 @@ public class SocketOrChannelConnectionImpl ...@@ -801,7 +804,9 @@ public class SocketOrChannelConnectionImpl
} }
try { try {
Selector selector = orb.getTransportManager().getSelector(0); Selector selector = orb.getTransportManager().getSelector(0);
if (selector != null) {
selector.unregisterForEvent(this); selector.unregisterForEvent(this);
}
if (socketChannel != null) { if (socketChannel != null) {
socketChannel.close(); socketChannel.close();
} }
...@@ -824,7 +829,9 @@ public class SocketOrChannelConnectionImpl ...@@ -824,7 +829,9 @@ public class SocketOrChannelConnectionImpl
dprint(".closeConnectionResources->: " + this); dprint(".closeConnectionResources->: " + this);
} }
Selector selector = orb.getTransportManager().getSelector(0); Selector selector = orb.getTransportManager().getSelector(0);
if (selector != null) {
selector.unregisterForEvent(this); selector.unregisterForEvent(this);
}
try { try {
if (socketChannel != null) if (socketChannel != null)
socketChannel.close() ; socketChannel.close() ;
......
...@@ -36,6 +36,8 @@ public interface InboundConnectionCache ...@@ -36,6 +36,8 @@ public interface InboundConnectionCache
public void put(Acceptor acceptor, Connection connection); public void put(Acceptor acceptor, Connection connection);
public void remove(Connection connection); public void remove(Connection connection);
public Acceptor getAcceptor();
} }
// End of file. // End of file.
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册