提交 9c73201b 编写于 作者: V Vlad Ilyushchenko

NET: async disconnect, disconnects must be done by the same thread that...

NET: async disconnect, disconnects must be done by the same thread that accepts new connections. Linux dispatcher implementation
上级 3130e761
......@@ -204,7 +204,6 @@ public class HttpConnectionContext implements IOContext, Locality, Mutable {
if (read == 0) {
// client is not sending anything
LOG.info().$("ok, laters").$();
dispatcher.registerChannel(this, IOOperation.READ);
return;
}
......
/*******************************************************************************
* ___ _ ____ ____
* / _ \ _ _ ___ ___| |_| _ \| __ )
* | | | | | | |/ _ \/ __| __| | | | _ \
* | |_| | |_| | __/\__ \ |_| |_| | |_) |
* \__\_\\__,_|\___||___/\__|____/|____/
*
* Copyright (C) 2014-2019 Appsicle
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
******************************************************************************/
package com.questdb.mp;
@FunctionalInterface
public interface QueueConsumer<T> {
void consume(T slot);
}
......@@ -75,4 +75,18 @@ public class SCSequence extends AbstractSSequence {
cache = barrier.availableIndex(next);
return next > cache ? -1 : next;
}
public <T> void consumeAll(RingQueue<T> queue, QueueConsumer<T> consumer) {
long cursor;
do {
cursor = next();
if (cursor > -1) {
final long available = available();
while (cursor < available) {
consumer.consume(queue.get(cursor++));
}
done(available - 1);
}
} while (cursor != -1);
}
}
......@@ -60,6 +60,10 @@ public class IODispatcherWindows<C extends IOContext> extends SynchronizedJob im
private final NetworkFacade nf;
private final int initialBias;
private final SelectFacade sf;
private final RingQueue<IOEvent<C>> disconnectQueue;
private final MPSequence disconnectPubSeq;
private final SCSequence disconnectSubSeq;
private final QueueConsumer<IOEvent<C>> disconnectContextRef = this::disconnectContext;
public IODispatcherWindows(
IODispatcherConfiguration configuration,
......@@ -76,6 +80,12 @@ public class IODispatcherWindows<C extends IOContext> extends SynchronizedJob im
this.interestPubSeq = new MPSequence(interestQueue.getCapacity());
this.interestSubSeq = new SCSequence();
this.interestPubSeq.then(this.interestSubSeq).then(this.interestPubSeq);
this.disconnectQueue = new RingQueue<>(IOEvent::new, configuration.getIOQueueCapacity());
this.disconnectPubSeq = new MPSequence(disconnectQueue.getCapacity());
this.disconnectSubSeq = new SCSequence();
this.disconnectPubSeq.then(this.disconnectSubSeq).then(disconnectPubSeq);
this.clock = configuration.getClock();
this.activeConnectionLimit = configuration.getActiveConnectionLimit();
this.idleConnectionTimeout = configuration.getIdleConnectionTimeout();
......@@ -97,8 +107,30 @@ public class IODispatcherWindows<C extends IOContext> extends SynchronizedJob im
}
}
private void doDisconnect(C context) {
final long fd = context.getFd();
LOG.info()
.$("disconnected [ip=").$ip(nf.getPeerIP(fd))
.$(", fd=").$(fd)
.$(']').$();
nf.close(fd, LOG);
ioContextFactory.done(context);
connectionCount.decrementAndGet();
}
private void processDisconnects() {
disconnectSubSeq.consumeAll(disconnectQueue, this.disconnectContextRef);
}
private void disconnectContext(IOEvent<C> event) {
doDisconnect(event.context);
}
@Override
public void close() {
processDisconnects();
readFdSet.close();
writeFdSet.close();
......@@ -113,7 +145,7 @@ public class IODispatcherWindows<C extends IOContext> extends SynchronizedJob im
do {
cursor = ioEventSubSeq.next();
if (cursor > -1) {
disconnect(ioEventQueue.get(cursor).context);
doDisconnect(ioEventQueue.get(cursor).context);
ioEventSubSeq.done(cursor);
}
} while (cursor != -1);
......@@ -156,14 +188,10 @@ public class IODispatcherWindows<C extends IOContext> extends SynchronizedJob im
@Override
public void disconnect(C context) {
final long fd = context.getFd();
LOG.info()
.$("disconnected [ip=").$ip(nf.getPeerIP(fd))
.$(", fd=").$(fd)
.$(']').$();
nf.close(fd, LOG);
ioContextFactory.done(context);
connectionCount.decrementAndGet();
final long cursor = disconnectPubSeq.nextBully();
assert cursor > -1;
disconnectQueue.get(cursor).context = context;
disconnectPubSeq.done(cursor);
}
private void accept(long timestamp) {
......@@ -209,17 +237,7 @@ public class IODispatcherWindows<C extends IOContext> extends SynchronizedJob im
}
private void drainQueueAndDisconnect() {
long cursor;
do {
cursor = interestSubSeq.next();
if (cursor > -1) {
final long available = interestSubSeq.available();
while (cursor < available) {
disconnect(interestQueue.get(cursor++).context);
}
interestSubSeq.done(available - 1);
}
} while (cursor != -1);
interestSubSeq.consumeAll(interestQueue, this.disconnectContextRef);
}
private boolean processRegistrations(long timestamp) {
......@@ -276,6 +294,9 @@ public class IODispatcherWindows<C extends IOContext> extends SynchronizedJob im
@Override
protected boolean runSerially() {
processDisconnects();
int count = sf.select(readFdSet.address, writeFdSet.address, 0);
if (count < 0) {
LOG.error().$("Error in select(): ").$(nf.errno()).$();
......@@ -310,7 +331,7 @@ public class IODispatcherWindows<C extends IOContext> extends SynchronizedJob im
// check if expired
if (ts < deadline && fd != serverFd) {
disconnect(pending.get(i));
doDisconnect(pending.get(i));
pending.deleteRow(i);
n--;
useful = true;
......
......@@ -794,7 +794,6 @@ public class IODispatcherTest {
);
} while (serverRunning.get());
serverHaltLatch.countDown();
System.out.println("exit");
}).start();
......@@ -814,7 +813,6 @@ public class IODispatcherTest {
Assert.assertFalse(configuration.getActiveConnectionLimit() < dispatcher.getConnectionCount());
serverRunning.set(false);
serverHaltLatch.await();
System.out.println("closing");
}
});
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册