提交 3130e761 编写于 作者: V Vlad Ilyushchenko

NET: async disconnect, disconnects must be done by the same thread that...

NET: async disconnect, disconnects must be done by the same thread that accepts new connections. Linux dispatcher implementation
上级 b7af717d
......@@ -25,7 +25,10 @@ package com.questdb.cutlass.http;
import com.questdb.log.Log;
import com.questdb.log.LogFactory;
import com.questdb.network.*;
import com.questdb.network.IOContext;
import com.questdb.network.IODispatcher;
import com.questdb.network.IOOperation;
import com.questdb.network.NetworkFacade;
import com.questdb.std.*;
import com.questdb.std.str.DirectByteCharSequence;
......@@ -126,7 +129,7 @@ public class HttpConnectionContext implements IOContext, Locality, Mutable {
handleClientRecv(dispatcher, selector);
} catch (PeerDisconnectedException ignore) {
LOG.debug().$("peer disconnected").$();
dispatcher.disconnect(this, DisconnectReason.PEER);
dispatcher.disconnect(this);
} catch (PeerIsSlowToReadException ignore) {
LOG.debug().$("peer is slow writer").$();
dispatcher.registerChannel(this, IOOperation.READ);
......@@ -142,14 +145,14 @@ public class HttpConnectionContext implements IOContext, Locality, Mutable {
LOG.debug().$("peer is slow reader").$();
dispatcher.registerChannel(this, IOOperation.WRITE);
} catch (PeerDisconnectedException ignore) {
dispatcher.disconnect(this, DisconnectReason.PEER);
dispatcher.disconnect(this);
}
} else {
assert false;
}
break;
default:
dispatcher.disconnect(this, DisconnectReason.SILLY);
dispatcher.disconnect(this);
break;
}
}
......@@ -169,7 +172,7 @@ public class HttpConnectionContext implements IOContext, Locality, Mutable {
try {
processor.onRequestComplete(this, dispatcher);
} catch (PeerDisconnectedException ignore) {
dispatcher.disconnect(this, DisconnectReason.PEER);
dispatcher.disconnect(this);
} catch (PeerIsSlowToReadException e) {
dispatcher.registerChannel(this, IOOperation.WRITE);
}
......@@ -195,7 +198,7 @@ public class HttpConnectionContext implements IOContext, Locality, Mutable {
if (read < 0) {
LOG.debug().$("done [fd=").$(fd).$(']').$();
// peer disconnect
dispatcher.disconnect(this, DisconnectReason.PEER);
dispatcher.disconnect(this);
return;
}
......@@ -265,7 +268,7 @@ public class HttpConnectionContext implements IOContext, Locality, Mutable {
while (true) {
final int n = nf.recv(fd, buf, bufRemaining);
if (n < 0) {
dispatcher.disconnect(this, DisconnectReason.PEER);
dispatcher.disconnect(this);
break;
}
......@@ -326,7 +329,7 @@ public class HttpConnectionContext implements IOContext, Locality, Mutable {
read = nf.recv(fd, recvBuffer, 1);
if (read != 0) {
LOG.debug().$("disconnect after request [fd=").$(fd).$(']').$();
dispatcher.disconnect(this, DisconnectReason.PEER);
dispatcher.disconnect(this);
} else {
processor.onHeadersReady(this);
LOG.debug().$("good [fd=").$(fd).$(']').$();
......@@ -334,7 +337,7 @@ public class HttpConnectionContext implements IOContext, Locality, Mutable {
processor.onRequestComplete(this, dispatcher);
resumeProcessor = null;
} catch (PeerDisconnectedException ignore) {
dispatcher.disconnect(this, DisconnectReason.PEER);
dispatcher.disconnect(this);
} catch (PeerIsSlowToReadException ignore) {
LOG.debug().$("peer is slow reader [two]").$();
dispatcher.registerChannel(this, IOOperation.WRITE);
......
......@@ -32,7 +32,6 @@ import com.questdb.cutlass.text.Atomicity;
import com.questdb.cutlass.text.TextLoader;
import com.questdb.log.Log;
import com.questdb.log.LogFactory;
import com.questdb.network.DisconnectReason;
import com.questdb.network.IODispatcher;
import com.questdb.network.IOOperation;
import com.questdb.std.*;
......@@ -292,7 +291,7 @@ public class TextImportProcessor implements HttpRequestProcessor, HttpMultipartC
if (name == null) {
transientContext.simpleResponse().sendStatus(400, "no name given");
// we have to disconnect to interrupt potentially large upload
transientDispatcher.disconnect(transientContext, DisconnectReason.SILLY);
transientDispatcher.disconnect(transientContext);
return;
}
......
......@@ -34,5 +34,5 @@ public interface IODispatcher<C extends IOContext> extends Closeable, Job {
boolean processIOQueue(IORequestProcessor<C> processor);
void disconnect(C context, int disconnectReason);
void disconnect(C context);
}
......@@ -52,6 +52,9 @@ public class IODispatcherLinux<C extends IOContext> extends SynchronizedJob impl
private final NetworkFacade nf;
private final int initialBias;
private final AtomicInteger connectionCount = new AtomicInteger();
private final RingQueue<IOEvent<C>> disconnectQueue;
private final MPSequence disconnectPubSeq;
private final SCSequence disconnectSubSeq;
private long fdid = 1;
public IODispatcherLinux(
......@@ -67,6 +70,12 @@ public class IODispatcherLinux<C extends IOContext> extends SynchronizedJob impl
this.interestPubSeq = new MPSequence(interestQueue.getCapacity());
this.interestSubSeq = new SCSequence();
this.interestPubSeq.then(this.interestSubSeq).then(this.interestPubSeq);
this.disconnectQueue = new RingQueue<>(IOEvent::new, configuration.getIOQueueCapacity());
this.disconnectPubSeq = new MPSequence(disconnectQueue.getCapacity());
this.disconnectSubSeq = new SCSequence();
this.disconnectPubSeq.then(this.disconnectSubSeq).then(disconnectPubSeq);
this.clock = configuration.getClock();
this.activeConnectionLimit = configuration.getActiveConnectionLimit();
this.idleConnectionTimeout = configuration.getIdleConnectionTimeout();
......@@ -88,12 +97,13 @@ public class IODispatcherLinux<C extends IOContext> extends SynchronizedJob impl
@Override
public void close() {
processDisconnects();
this.epoll.close();
nf.close(serverFd, LOG);
int n = pending.size();
for (int i = 0; i < n; i++) {
disconnect(pending.get(i), DisconnectReason.SILLY);
doDisconnect(pending.get(i));
}
drainQueueAndDisconnect();
......@@ -102,7 +112,7 @@ public class IODispatcherLinux<C extends IOContext> extends SynchronizedJob impl
do {
cursor = ioEventSubSeq.next();
if (cursor > -1) {
disconnect(ioEventQueue.get(cursor).context, DisconnectReason.SILLY);
doDisconnect(ioEventQueue.get(cursor).context);
ioEventSubSeq.done(cursor);
}
} while (cursor != -1);
......@@ -144,16 +154,11 @@ public class IODispatcherLinux<C extends IOContext> extends SynchronizedJob impl
}
@Override
public void disconnect(C context, int disconnectReason) {
final long fd = context.getFd();
LOG.info()
.$("disconnected [ip=").$ip(nf.getPeerIP(fd))
.$(", fd=").$(fd)
.$(", reason=").$(DisconnectReason.nameOf(disconnectReason))
.$(']').$();
nf.close(fd, LOG);
ioContextFactory.done(context);
connectionCount.decrementAndGet();
public void disconnect(C context) {
final long cursor = disconnectPubSeq.nextBully();
assert cursor > -1;
disconnectQueue.get(cursor).context = context;
disconnectPubSeq.done(cursor);
}
private void accept() {
......@@ -199,6 +204,17 @@ public class IODispatcherLinux<C extends IOContext> extends SynchronizedJob impl
pending.set(r, ioContextFactory.newInstance(fd));
}
private void doDisconnect(C context) {
final long fd = context.getFd();
LOG.info()
.$("disconnected [ip=").$ip(nf.getPeerIP(fd))
.$(", fd=").$(fd)
.$(']').$();
nf.close(fd, LOG);
ioContextFactory.done(context);
connectionCount.decrementAndGet();
}
private void drainQueueAndDisconnect() {
long cursor;
do {
......@@ -206,7 +222,7 @@ public class IODispatcherLinux<C extends IOContext> extends SynchronizedJob impl
if (cursor > -1) {
final long available = interestSubSeq.available();
while (cursor < available) {
disconnect(interestQueue.get(cursor++).context, DisconnectReason.SILLY);
doDisconnect(interestQueue.get(cursor++).context);
}
interestSubSeq.done(available - 1);
}
......@@ -228,10 +244,25 @@ public class IODispatcherLinux<C extends IOContext> extends SynchronizedJob impl
}
}
private void processDisconnects() {
while (true) {
long cursor = disconnectSubSeq.next();
if (cursor < 0) {
break;
}
final long available = disconnectSubSeq.available();
while (cursor < available) {
doDisconnect(disconnectQueue.get(cursor++).context);
}
disconnectSubSeq.done(available - 1);
}
}
private void processIdleConnections(long deadline) {
int count = 0;
for (int i = 0, n = pending.size(); i < n && pending.get(i, M_TIMESTAMP) < deadline; i++, count++) {
disconnect(pending.get(i), DisconnectReason.IDLE);
doDisconnect(pending.get(i));
}
pending.zapTop(count);
}
......@@ -280,7 +311,15 @@ public class IODispatcherLinux<C extends IOContext> extends SynchronizedJob impl
@Override
protected boolean runSerially() {
// todo: introduce fairness factor
// current worker impl will still proceed to execute another job even if this one was useful
// we should see if we can stay inside of this method until we have a completely idle iteration
// at the same time we should hog this thread in case we are always 'useful', we can probably
// introduce a loop count after which we always exit
boolean useful = false;
processDisconnects();
final int n = epoll.poll();
int watermark = pending.size();
int offset = 0;
......
......@@ -27,7 +27,6 @@ import com.questdb.log.Log;
import com.questdb.log.LogFactory;
import com.questdb.mp.*;
import com.questdb.net.ChannelStatus;
import com.questdb.net.DisconnectReason;
import com.questdb.std.LongMatrix;
import com.questdb.std.Os;
import com.questdb.std.time.MillisecondClock;
......@@ -105,7 +104,7 @@ public class IODispatcherOsx<C extends IOContext> extends SynchronizedJob implem
int n = pending.size();
for (int i = 0; i < n; i++) {
disconnect(pending.get(i), DisconnectReason.SILLY);
disconnect(pending.get(i));
}
drainQueueAndDisconnect();
......@@ -114,7 +113,7 @@ public class IODispatcherOsx<C extends IOContext> extends SynchronizedJob implem
do {
cursor = ioEventSubSeq.next();
if (cursor > -1) {
disconnect(ioEventQueue.get(cursor).context, DisconnectReason.SILLY);
disconnect(ioEventQueue.get(cursor).context);
ioEventSubSeq.done(cursor);
}
} while (cursor != -1);
......@@ -156,12 +155,11 @@ public class IODispatcherOsx<C extends IOContext> extends SynchronizedJob implem
}
@Override
public void disconnect(C context, int disconnectReason) {
public void disconnect(C context) {
final long fd = context.getFd();
LOG.info()
.$("disconnected [ip=").$ip(nf.getPeerIP(fd))
.$(", fd=").$(fd)
.$(", reason=").$(DisconnectReason.nameOf(disconnectReason))
.$(']').$();
nf.close(fd, LOG);
ioContextFactory.done(context);
......@@ -218,7 +216,7 @@ public class IODispatcherOsx<C extends IOContext> extends SynchronizedJob implem
if (cursor > -1) {
final long available = interestSubSeq.available();
while (cursor < available) {
disconnect(interestQueue.get(cursor++).context, DisconnectReason.SILLY);
disconnect(interestQueue.get(cursor++).context);
}
interestSubSeq.done(available - 1);
}
......@@ -264,7 +262,7 @@ public class IODispatcherOsx<C extends IOContext> extends SynchronizedJob implem
private void processIdleConnections(long deadline) {
int count = 0;
for (int i = 0, n = pending.size(); i < n && pending.get(i, M_TIMESTAMP) < deadline; i++, count++) {
disconnect(pending.get(i), DisconnectReason.IDLE);
disconnect(pending.get(i));
}
pending.zapTop(count);
}
......
......@@ -113,7 +113,7 @@ public class IODispatcherWindows<C extends IOContext> extends SynchronizedJob im
do {
cursor = ioEventSubSeq.next();
if (cursor > -1) {
disconnect(ioEventQueue.get(cursor).context, DisconnectReason.SILLY);
disconnect(ioEventQueue.get(cursor).context);
ioEventSubSeq.done(cursor);
}
} while (cursor != -1);
......@@ -155,12 +155,11 @@ public class IODispatcherWindows<C extends IOContext> extends SynchronizedJob im
}
@Override
public void disconnect(C context, int disconnectReason) {
public void disconnect(C context) {
final long fd = context.getFd();
LOG.info()
.$("disconnected [ip=").$ip(nf.getPeerIP(fd))
.$(", fd=").$(fd)
.$(", reason=").$(DisconnectReason.nameOf(disconnectReason))
.$(']').$();
nf.close(fd, LOG);
ioContextFactory.done(context);
......@@ -216,7 +215,7 @@ public class IODispatcherWindows<C extends IOContext> extends SynchronizedJob im
if (cursor > -1) {
final long available = interestSubSeq.available();
while (cursor < available) {
disconnect(interestQueue.get(cursor++).context, DisconnectReason.SILLY);
disconnect(interestQueue.get(cursor++).context);
}
interestSubSeq.done(available - 1);
}
......@@ -311,7 +310,7 @@ public class IODispatcherWindows<C extends IOContext> extends SynchronizedJob im
// check if expired
if (ts < deadline && fd != serverFd) {
disconnect(pending.get(i), DisconnectReason.IDLE);
disconnect(pending.get(i));
pending.deleteRow(i);
n--;
useful = true;
......
......@@ -53,7 +53,6 @@ import java.util.concurrent.locks.LockSupport;
import static com.questdb.cutlass.http.HttpConnectionContext.dump;
public class IODispatcherTest {
private static Log LOG = LogFactory.getLog(IODispatcherTest.class);
......@@ -89,7 +88,7 @@ public class IODispatcherTest {
(operation, context, disp) -> {
if (operation == IOOperation.WRITE) {
Assert.assertEquals(1024, Net.send(context.getFd(), context.buffer, 1024));
disp.disconnect(context, DisconnectReason.SILLY);
disp.disconnect(context);
}
}
);
......@@ -795,6 +794,7 @@ public class IODispatcherTest {
);
} while (serverRunning.get());
serverHaltLatch.countDown();
System.out.println("exit");
}).start();
......@@ -814,6 +814,7 @@ public class IODispatcherTest {
Assert.assertFalse(configuration.getActiveConnectionLimit() < dispatcher.getConnectionCount());
serverRunning.set(false);
serverHaltLatch.await();
System.out.println("closing");
}
});
}
......@@ -1730,7 +1731,7 @@ public class IODispatcherTest {
// there is interesting situation here, its possible that header is fully
// read and there are either more bytes or disconnect lingering
dispatcher.disconnect(connectionContext, DisconnectReason.SILLY);
dispatcher.disconnect(connectionContext);
}
};
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册