提交 0e9c961a 编写于 作者: V Vlad Ilyushchenko

NET: async disconnect, disconnects must be done by the same thread that...

NET: async disconnect, disconnects must be done by the same thread that accepts new connections. Windows dispatcher implementation. Refactored common dispatcher code into super class
上级 9c73201b
......@@ -33,6 +33,18 @@ public class MCSequence extends AbstractMSequence {
super(cycle, waitStrategy);
}
public <T> void consumeAll(RingQueue<T> queue, QueueConsumer<T> consumer) {
long cursor;
do {
cursor = next();
if (cursor > -1) {
consumer.consume(queue.get(cursor));
done(cursor);
}
} while (cursor != -1);
}
@Override
public long next() {
long cached = cache;
......
/*******************************************************************************
* ___ _ ____ ____
* / _ \ _ _ ___ ___| |_| _ \| __ )
* | | | | | | |/ _ \/ __| __| | | | _ \
* | |_| | |_| | __/\__ \ |_| |_| | |_) |
* \__\_\\__,_|\___||___/\__|____/|____/
*
* Copyright (C) 2014-2019 Appsicle
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
******************************************************************************/
package com.questdb.network;
import com.questdb.log.Log;
import com.questdb.log.LogFactory;
import com.questdb.mp.*;
import com.questdb.std.time.MillisecondClock;
import java.util.concurrent.atomic.AtomicInteger;
public abstract class AbstractIODispatcher<C extends IOContext> extends SynchronizedJob implements IODispatcher<C> {
protected static final Log LOG = LogFactory.getLog(IODispatcherOsx.class);
protected final RingQueue<IOEvent<C>> interestQueue;
protected final MPSequence interestPubSeq;
protected final SCSequence interestSubSeq;
protected final long serverFd;
protected final RingQueue<IOEvent<C>> ioEventQueue;
protected final SPSequence ioEventPubSeq;
protected final MCSequence ioEventSubSeq;
protected final MillisecondClock clock;
protected final int activeConnectionLimit;
protected final IOContextFactory<C> ioContextFactory;
protected final NetworkFacade nf;
protected final int initialBias;
protected final AtomicInteger connectionCount = new AtomicInteger();
protected final RingQueue<IOEvent<C>> disconnectQueue;
protected final MPSequence disconnectPubSeq;
protected final SCSequence disconnectSubSeq;
protected final QueueConsumer<IOEvent<C>> disconnectContextRef = this::disconnectContext;
protected final long idleConnectionTimeout;
public AbstractIODispatcher(
IODispatcherConfiguration configuration,
IOContextFactory<C> ioContextFactory
) {
this.nf = configuration.getNetworkFacade();
this.serverFd = nf.socketTcp(false);
this.interestQueue = new RingQueue<>(IOEvent::new, configuration.getInterestQueueCapacity());
this.interestPubSeq = new MPSequence(interestQueue.getCapacity());
this.interestSubSeq = new SCSequence();
this.interestPubSeq.then(this.interestSubSeq).then(this.interestPubSeq);
this.ioEventQueue = new RingQueue<>(IOEvent::new, configuration.getIOQueueCapacity());
this.ioEventPubSeq = new SPSequence(configuration.getIOQueueCapacity());
this.ioEventSubSeq = new MCSequence(configuration.getIOQueueCapacity());
this.ioEventPubSeq.then(this.ioEventSubSeq).then(this.ioEventPubSeq);
this.disconnectQueue = new RingQueue<>(IOEvent::new, configuration.getIOQueueCapacity());
this.disconnectPubSeq = new MPSequence(disconnectQueue.getCapacity());
this.disconnectSubSeq = new SCSequence();
this.disconnectPubSeq.then(this.disconnectSubSeq).then(this.disconnectPubSeq);
this.clock = configuration.getClock();
this.activeConnectionLimit = configuration.getActiveConnectionLimit();
this.ioContextFactory = ioContextFactory;
this.initialBias = configuration.getInitialBias();
this.idleConnectionTimeout = configuration.getIdleConnectionTimeout();
}
@Override
public int getConnectionCount() {
return connectionCount.get();
}
@Override
public void registerChannel(C context, int operation) {
long cursor = interestPubSeq.nextBully();
IOEvent<C> evt = interestQueue.get(cursor);
evt.context = context;
evt.operation = operation;
LOG.debug().$("queuing [fd=").$(context.getFd()).$(", op=").$(operation).$(']').$();
interestPubSeq.done(cursor);
}
@Override
public boolean processIOQueue(IORequestProcessor<C> processor) {
long cursor = ioEventSubSeq.next();
while (cursor == -2) {
cursor = ioEventSubSeq.next();
}
if (cursor > -1) {
IOEvent<C> event = ioEventQueue.get(cursor);
C connectionContext = event.context;
final int operation = event.operation;
ioEventSubSeq.done(cursor);
processor.onRequest(operation, connectionContext, this);
return true;
}
return false;
}
@Override
public void disconnect(C context) {
final long cursor = disconnectPubSeq.nextBully();
assert cursor > -1;
disconnectQueue.get(cursor).context = context;
disconnectPubSeq.done(cursor);
}
private void disconnectContext(IOEvent<C> event) {
doDisconnect(event.context);
}
protected void doDisconnect(C context) {
final long fd = context.getFd();
LOG.info()
.$("disconnected [ip=").$ip(nf.getPeerIP(fd))
.$(", fd=").$(fd)
.$(']').$();
nf.close(fd, LOG);
ioContextFactory.done(context);
connectionCount.decrementAndGet();
}
protected void processDisconnects() {
disconnectSubSeq.consumeAll(disconnectQueue, this.disconnectContextRef);
}
protected void publishOperation(int operation, C context) {
long cursor = ioEventPubSeq.nextBully();
IOEvent<C> evt = ioEventQueue.get(cursor);
evt.context = context;
evt.operation = operation;
ioEventPubSeq.done(cursor);
LOG.debug().$("fired [fd=").$(context.getFd()).$(", op=").$(evt.operation).$(", pos=").$(cursor).$(']').$();
}
}
......@@ -23,66 +23,22 @@
package com.questdb.network;
import com.questdb.log.Log;
import com.questdb.log.LogFactory;
import com.questdb.mp.*;
import com.questdb.std.LongMatrix;
import com.questdb.std.time.MillisecondClock;
import java.util.concurrent.atomic.AtomicInteger;
public class IODispatcherLinux<C extends IOContext> extends SynchronizedJob implements IODispatcher<C> {
public class IODispatcherLinux<C extends IOContext> extends AbstractIODispatcher<C> {
private static final int M_TIMESTAMP = 1;
private static final int M_FD = 2;
private static final int M_ID = 0;
private static final Log LOG = LogFactory.getLog(IODispatcherLinux.class);
private final long serverFd;
private final RingQueue<IOEvent<C>> ioEventQueue;
private final Sequence ioEventPubSeq;
private final Sequence ioEventSubSeq;
private final RingQueue<IOEvent<C>> interestQueue;
private final MPSequence interestPubSeq;
private final SCSequence interestSubSeq;
private final MillisecondClock clock;
private final Epoll epoll;
private final long idleConnectionTimeout;
private final LongMatrix<C> pending = new LongMatrix<>(4);
private final int activeConnectionLimit;
private final IOContextFactory<C> ioContextFactory;
private final NetworkFacade nf;
private final int initialBias;
private final AtomicInteger connectionCount = new AtomicInteger();
private final RingQueue<IOEvent<C>> disconnectQueue;
private final MPSequence disconnectPubSeq;
private final SCSequence disconnectSubSeq;
private long fdid = 1;
public IODispatcherLinux(
IODispatcherConfiguration configuration,
IOContextFactory<C> ioContextFactory
) {
this.nf = configuration.getNetworkFacade();
this.ioEventQueue = new RingQueue<>(IOEvent::new, configuration.getIOQueueCapacity());
this.ioEventPubSeq = new SPSequence(configuration.getIOQueueCapacity());
this.ioEventSubSeq = new MCSequence(configuration.getIOQueueCapacity());
this.ioEventPubSeq.then(this.ioEventSubSeq).then(this.ioEventPubSeq);
this.interestQueue = new RingQueue<>(IOEvent::new, configuration.getInterestQueueCapacity());
this.interestPubSeq = new MPSequence(interestQueue.getCapacity());
this.interestSubSeq = new SCSequence();
this.interestPubSeq.then(this.interestSubSeq).then(this.interestPubSeq);
this.disconnectQueue = new RingQueue<>(IOEvent::new, configuration.getIOQueueCapacity());
this.disconnectPubSeq = new MPSequence(disconnectQueue.getCapacity());
this.disconnectSubSeq = new SCSequence();
this.disconnectPubSeq.then(this.disconnectSubSeq).then(disconnectPubSeq);
this.clock = configuration.getClock();
this.activeConnectionLimit = configuration.getActiveConnectionLimit();
this.idleConnectionTimeout = configuration.getIdleConnectionTimeout();
this.ioContextFactory = ioContextFactory;
this.initialBias = configuration.getInitialBias();
super(configuration, ioContextFactory);
this.epoll = new Epoll(configuration.getEpollFacade(), configuration.getEventCapacity());
this.serverFd = nf.socketTcp(false);
if (nf.bindTcp(this.serverFd, configuration.getBindIPv4Address(), configuration.getBindPort())) {
nf.listen(this.serverFd, configuration.getListenBacklog());
this.epoll.listen(serverFd);
......@@ -119,48 +75,6 @@ public class IODispatcherLinux<C extends IOContext> extends SynchronizedJob impl
LOG.info().$("closed").$();
}
@Override
public int getConnectionCount() {
return connectionCount.get();
}
@Override
public void registerChannel(C context, int operation) {
long cursor = interestPubSeq.nextBully();
IOEvent<C> evt = interestQueue.get(cursor);
evt.context = context;
evt.operation = operation;
LOG.debug().$("queuing [fd=").$(context.getFd()).$(", op=").$(operation).$(']').$();
interestPubSeq.done(cursor);
}
@Override
public boolean processIOQueue(IORequestProcessor<C> processor) {
long cursor = ioEventSubSeq.next();
while (cursor == -2) {
cursor = ioEventSubSeq.next();
}
if (cursor > -1) {
IOEvent<C> event = ioEventQueue.get(cursor);
C connectionContext = event.context;
final int operation = event.operation;
ioEventSubSeq.done(cursor);
processor.onRequest(operation, connectionContext, this);
return true;
}
return false;
}
@Override
public void disconnect(C context) {
final long cursor = disconnectPubSeq.nextBully();
assert cursor > -1;
disconnectQueue.get(cursor).context = context;
disconnectPubSeq.done(cursor);
}
private void accept() {
while (true) {
long fd = nf.accept(serverFd);
......@@ -204,17 +118,6 @@ public class IODispatcherLinux<C extends IOContext> extends SynchronizedJob impl
pending.set(r, ioContextFactory.newInstance(fd));
}
private void doDisconnect(C context) {
final long fd = context.getFd();
LOG.info()
.$("disconnected [ip=").$ip(nf.getPeerIP(fd))
.$(", fd=").$(fd)
.$(']').$();
nf.close(fd, LOG);
ioContextFactory.done(context);
connectionCount.decrementAndGet();
}
private void drainQueueAndDisconnect() {
long cursor;
do {
......@@ -244,21 +147,6 @@ public class IODispatcherLinux<C extends IOContext> extends SynchronizedJob impl
}
}
private void processDisconnects() {
while (true) {
long cursor = disconnectSubSeq.next();
if (cursor < 0) {
break;
}
final long available = disconnectSubSeq.available();
while (cursor < available) {
doDisconnect(disconnectQueue.get(cursor++).context);
}
disconnectSubSeq.done(available - 1);
}
}
private void processIdleConnections(long deadline) {
int count = 0;
for (int i = 0, n = pending.size(); i < n && pending.get(i, M_TIMESTAMP) < deadline; i++, count++) {
......@@ -300,15 +188,6 @@ public class IODispatcherLinux<C extends IOContext> extends SynchronizedJob impl
return offset > 0;
}
private void publishOperation(int operation, C context) {
long cursor = ioEventPubSeq.nextBully();
IOEvent<C> evt = ioEventQueue.get(cursor);
evt.context = context;
evt.operation = operation;
ioEventPubSeq.done(cursor);
LOG.debug().$("fired [fd=").$(context.getFd()).$(", op=").$(evt.operation).$(", pos=").$(cursor).$(']').$();
}
@Override
protected boolean runSerially() {
// todo: introduce fairness factor
......
......@@ -23,64 +23,28 @@
package com.questdb.network;
import com.questdb.log.Log;
import com.questdb.log.LogFactory;
import com.questdb.mp.*;
import com.questdb.net.ChannelStatus;
import com.questdb.std.LongMatrix;
import com.questdb.std.Os;
import com.questdb.std.time.MillisecondClock;
import java.util.concurrent.atomic.AtomicInteger;
public class IODispatcherOsx<C extends IOContext> extends SynchronizedJob implements IODispatcher<C> {
private static final Log LOG = LogFactory.getLog(IODispatcherOsx.class);
public class IODispatcherOsx<C extends IOContext> extends AbstractIODispatcher<C> {
private static final int M_TIMESTAMP = 0;
private static final int M_FD = 1;
private final long serverFd;
private final RingQueue<IOEvent<C>> ioEventQueue;
private final Sequence ioEventPubSeq;
private final Sequence ioEventSubSeq;
private final RingQueue<IOEvent<C>> interestQueue;
private final MPSequence interestPubSeq;
private final SCSequence interestSubSeq;
private final MillisecondClock clock;
private final Kqueue kqueue;
private final long idleConnectionTimeout;
private final LongMatrix<C> pending = new LongMatrix<>(2);
private final int activeConnectionLimit;
private final int capacity;
private final IOContextFactory<C> ioContextFactory;
private final NetworkFacade nf;
private final int initialBias;
private final AtomicInteger connectionCount = new AtomicInteger();
public IODispatcherOsx(
IODispatcherConfiguration configuration,
IOContextFactory<C> ioContextFactory
) {
this.nf = configuration.getNetworkFacade();
this.ioEventQueue = new RingQueue<>(IOEvent::new, configuration.getIOQueueCapacity());
this.ioEventPubSeq = new SPSequence(configuration.getIOQueueCapacity());
this.ioEventSubSeq = new MCSequence(configuration.getIOQueueCapacity());
this.ioEventPubSeq.then(this.ioEventSubSeq).then(this.ioEventPubSeq);
this.interestQueue = new RingQueue<>(IOEvent::new, configuration.getInterestQueueCapacity());
this.interestPubSeq = new MPSequence(interestQueue.getCapacity());
this.interestSubSeq = new SCSequence();
this.interestPubSeq.then(this.interestSubSeq).then(this.interestPubSeq);
this.clock = configuration.getClock();
this.activeConnectionLimit = configuration.getActiveConnectionLimit();
this.idleConnectionTimeout = configuration.getIdleConnectionTimeout();
super(configuration, ioContextFactory);
this.capacity = configuration.getEventCapacity();
this.ioContextFactory = ioContextFactory;
this.initialBias = configuration.getInitialBias();
// bind socket
this.kqueue = new Kqueue(capacity);
this.serverFd = nf.socketTcp(false);
if (nf.bindTcp(this.serverFd, configuration.getBindIPv4Address(), configuration.getBindPort())) {
nf.listen(this.serverFd, configuration.getListenBacklog());
......@@ -99,73 +63,19 @@ public class IODispatcherOsx<C extends IOContext> extends SynchronizedJob implem
@Override
public void close() {
processDisconnects();
this.kqueue.close();
nf.close(serverFd, LOG);
int n = pending.size();
for (int i = 0; i < n; i++) {
disconnect(pending.get(i));
for (int i = 0, n = pending.size(); i < n; i++) {
doDisconnect(pending.get(i));
}
drainQueueAndDisconnect();
long cursor;
do {
cursor = ioEventSubSeq.next();
if (cursor > -1) {
disconnect(ioEventQueue.get(cursor).context);
ioEventSubSeq.done(cursor);
}
} while (cursor != -1);
interestSubSeq.consumeAll(interestQueue, this.disconnectContextRef);
ioEventSubSeq.consumeAll(ioEventQueue, this.disconnectContextRef);
LOG.info().$("closed").$();
}
@Override
public int getConnectionCount() {
return connectionCount.get();
}
@Override
public void registerChannel(C context, int operation) {
long cursor = interestPubSeq.nextBully();
IOEvent<C> evt = interestQueue.get(cursor);
evt.context = context;
evt.operation = operation;
LOG.debug().$("queuing [fd=").$(context.getFd()).$(", op=").$(operation).$(']').$();
interestPubSeq.done(cursor);
}
@Override
public boolean processIOQueue(IORequestProcessor<C> processor) {
long cursor = ioEventSubSeq.next();
while (cursor == -2) {
cursor = ioEventSubSeq.next();
}
if (cursor > -1) {
IOEvent<C> event = ioEventQueue.get(cursor);
C connectionContext = event.context;
final int operation = event.operation;
ioEventSubSeq.done(cursor);
processor.onRequest(operation, connectionContext, this);
return true;
}
return false;
}
@Override
public void disconnect(C context) {
final long fd = context.getFd();
LOG.info()
.$("disconnected [ip=").$ip(nf.getPeerIP(fd))
.$(", fd=").$(fd)
.$(']').$();
nf.close(fd, LOG);
ioContextFactory.done(context);
connectionCount.decrementAndGet();
}
private void accept() {
while (true) {
long fd = nf.accept(serverFd);
......@@ -209,20 +119,6 @@ public class IODispatcherOsx<C extends IOContext> extends SynchronizedJob implem
pending.set(r, ioContextFactory.newInstance(fd));
}
private void drainQueueAndDisconnect() {
long cursor;
do {
cursor = interestSubSeq.next();
if (cursor > -1) {
final long available = interestSubSeq.available();
while (cursor < available) {
disconnect(interestQueue.get(cursor++).context);
}
interestSubSeq.done(available - 1);
}
} while (cursor != -1);
}
private void enqueuePending(int watermark) {
int index = 0;
for (int i = watermark, sz = pending.size(), offset = 0; i < sz; i++, offset += KqueueAccessor.SIZEOF_KEVENT) {
......@@ -262,7 +158,7 @@ public class IODispatcherOsx<C extends IOContext> extends SynchronizedJob implem
private void processIdleConnections(long deadline) {
int count = 0;
for (int i = 0, n = pending.size(); i < n && pending.get(i, M_TIMESTAMP) < deadline; i++, count++) {
disconnect(pending.get(i));
doDisconnect(pending.get(i));
}
pending.zapTop(count);
}
......@@ -311,15 +207,6 @@ public class IODispatcherOsx<C extends IOContext> extends SynchronizedJob implem
return useful;
}
private void publishOperation(int operation, C context) {
long cursor = ioEventPubSeq.nextBully();
IOEvent<C> evt = ioEventQueue.get(cursor);
evt.context = context;
evt.operation = operation;
ioEventPubSeq.done(cursor);
LOG.debug().$("fired [fd=").$(context.getFd()).$(", op=").$(evt.operation).$(", pos=").$(cursor).$(']').$();
}
private void registerWithKQueue(int changeCount) {
if (kqueue.register(changeCount) != 0) {
throw NetworkError.instance(Os.errno()).put("could not register [changeCount=").put(changeCount).put(']');
......@@ -329,6 +216,7 @@ public class IODispatcherOsx<C extends IOContext> extends SynchronizedJob implem
@Override
protected boolean runSerially() {
processDisconnects();
boolean useful = false;
final int n = kqueue.poll();
int watermark = pending.size();
......
......@@ -25,16 +25,12 @@ package com.questdb.network;
import com.questdb.log.Log;
import com.questdb.log.LogFactory;
import com.questdb.mp.*;
import com.questdb.std.LongIntHashMap;
import com.questdb.std.LongMatrix;
import com.questdb.std.Misc;
import com.questdb.std.Unsafe;
import com.questdb.std.time.MillisecondClock;
import java.util.concurrent.atomic.AtomicInteger;
public class IODispatcherWindows<C extends IOContext> extends SynchronizedJob implements IODispatcher<C> {
public class IODispatcherWindows<C extends IOContext> extends AbstractIODispatcher<C> {
private static final int M_TIMESTAMP = 0;
private static final int M_FD = 1;
......@@ -43,55 +39,17 @@ public class IODispatcherWindows<C extends IOContext> extends SynchronizedJob im
private final FDSet readFdSet;
private final FDSet writeFdSet;
private final long serverFd;
private final RingQueue<IOEvent<C>> interestQueue;
private final MPSequence interestPubSeq;
private final SCSequence interestSubSeq;
private final MillisecondClock clock;
private final long idleConnectionTimeout;
private final LongMatrix<C> pending = new LongMatrix<>(4);
private final int activeConnectionLimit;
private final LongIntHashMap fds = new LongIntHashMap();
private final IOContextFactory<C> ioContextFactory;
private final RingQueue<IOEvent<C>> ioEventQueue;
private final SPSequence ioEventPubSeq;
private final MCSequence ioEventSubSeq;
private final AtomicInteger connectionCount = new AtomicInteger();
private final NetworkFacade nf;
private final int initialBias;
private final SelectFacade sf;
private final RingQueue<IOEvent<C>> disconnectQueue;
private final MPSequence disconnectPubSeq;
private final SCSequence disconnectSubSeq;
private final QueueConsumer<IOEvent<C>> disconnectContextRef = this::disconnectContext;
public IODispatcherWindows(
IODispatcherConfiguration configuration,
IOContextFactory<C> ioContextFactory
) {
this.nf = configuration.getNetworkFacade();
super(configuration, ioContextFactory);
this.readFdSet = new FDSet(configuration.getEventCapacity());
this.writeFdSet = new FDSet(configuration.getEventCapacity());
this.ioEventQueue = new RingQueue<>(IOEvent::new, configuration.getIOQueueCapacity());
this.ioEventPubSeq = new SPSequence(configuration.getIOQueueCapacity());
this.ioEventSubSeq = new MCSequence(configuration.getIOQueueCapacity());
this.ioEventPubSeq.then(this.ioEventSubSeq).then(this.ioEventPubSeq);
this.interestQueue = new RingQueue<>(IOEvent::new, configuration.getInterestQueueCapacity());
this.interestPubSeq = new MPSequence(interestQueue.getCapacity());
this.interestSubSeq = new SCSequence();
this.interestPubSeq.then(this.interestSubSeq).then(this.interestPubSeq);
this.disconnectQueue = new RingQueue<>(IOEvent::new, configuration.getIOQueueCapacity());
this.disconnectPubSeq = new MPSequence(disconnectQueue.getCapacity());
this.disconnectSubSeq = new SCSequence();
this.disconnectPubSeq.then(this.disconnectSubSeq).then(disconnectPubSeq);
this.clock = configuration.getClock();
this.activeConnectionLimit = configuration.getActiveConnectionLimit();
this.idleConnectionTimeout = configuration.getIdleConnectionTimeout();
this.ioContextFactory = ioContextFactory;
this.initialBias = configuration.getInitialBias();
this.serverFd = nf.socketTcp(false);
this.sf = configuration.getSelectFacade();
if (nf.bindTcp(this.serverFd, configuration.getBindIPv4Address(), configuration.getBindPort())) {
nf.listen(this.serverFd, configuration.getListenBacklog());
......@@ -107,25 +65,6 @@ public class IODispatcherWindows<C extends IOContext> extends SynchronizedJob im
}
}
private void doDisconnect(C context) {
final long fd = context.getFd();
LOG.info()
.$("disconnected [ip=").$ip(nf.getPeerIP(fd))
.$(", fd=").$(fd)
.$(']').$();
nf.close(fd, LOG);
ioContextFactory.done(context);
connectionCount.decrementAndGet();
}
private void processDisconnects() {
disconnectSubSeq.consumeAll(disconnectQueue, this.disconnectContextRef);
}
private void disconnectContext(IOEvent<C> event) {
doDisconnect(event.context);
}
@Override
public void close() {
......@@ -139,61 +78,11 @@ public class IODispatcherWindows<C extends IOContext> extends SynchronizedJob im
Misc.free(pending.get(i));
}
drainQueueAndDisconnect();
long cursor;
do {
cursor = ioEventSubSeq.next();
if (cursor > -1) {
doDisconnect(ioEventQueue.get(cursor).context);
ioEventSubSeq.done(cursor);
}
} while (cursor != -1);
interestSubSeq.consumeAll(interestQueue, this.disconnectContextRef);
ioEventSubSeq.consumeAll(ioEventQueue, this.disconnectContextRef);
LOG.info().$("closed").$();
}
@Override
public int getConnectionCount() {
return connectionCount.get();
}
@Override
public void registerChannel(C context, int operation) {
long cursor = interestPubSeq.nextBully();
IOEvent<C> evt = interestQueue.get(cursor);
evt.context = context;
evt.operation = operation;
LOG.debug().$("queuing [fd=").$(context.getFd()).$(", op=").$(operation).$(']').$();
interestPubSeq.done(cursor);
}
@Override
public boolean processIOQueue(IORequestProcessor<C> processor) {
long cursor = ioEventSubSeq.next();
while (cursor == -2) {
cursor = ioEventSubSeq.next();
}
if (cursor > -1) {
IOEvent<C> event = ioEventQueue.get(cursor);
C connectionContext = event.context;
final int operation = event.operation;
ioEventSubSeq.done(cursor);
processor.onRequest(operation, connectionContext, this);
return true;
}
return false;
}
@Override
public void disconnect(C context) {
final long cursor = disconnectPubSeq.nextBully();
assert cursor > -1;
disconnectQueue.get(cursor).context = context;
disconnectPubSeq.done(cursor);
}
private void accept(long timestamp) {
while (true) {
long fd = nf.accept(serverFd);
......@@ -236,10 +125,6 @@ public class IODispatcherWindows<C extends IOContext> extends SynchronizedJob im
pending.set(r, ioContextFactory.newInstance(fd));
}
private void drainQueueAndDisconnect() {
interestSubSeq.consumeAll(interestQueue, this.disconnectContextRef);
}
private boolean processRegistrations(long timestamp) {
long cursor;
boolean useful = false;
......@@ -283,15 +168,6 @@ public class IODispatcherWindows<C extends IOContext> extends SynchronizedJob im
}
}
private void publishOperation(int operation, C context) {
long cursor = ioEventPubSeq.nextBully();
IOEvent<C> evt = ioEventQueue.get(cursor);
evt.context = context;
evt.operation = operation;
ioEventPubSeq.done(cursor);
LOG.debug().$("fired [fd=").$(context.getFd()).$(", op=").$(evt.operation).$(", pos=").$(cursor).$(']').$();
}
@Override
protected boolean runSerially() {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册