提交 34f5fea3 编写于 作者: V Vlad Ilyushchenko

NET: OSX dispatcher implementation, not fully tested yet

上级 73b30584
......@@ -26,7 +26,6 @@ if (APPLE)
set(OUTPUT ${CMAKE_CURRENT_SOURCE_DIR}/src/main/resources/binaries/osx)
set(
SOURCE_FILES ${SOURCE_FILES}
src/main/c/osx/kqueue.h
src/main/c/osx/kqueue.c
src/main/c/share/net.c
src/main/c/osx/affinity.c
......
......@@ -25,7 +25,7 @@
#include <unistd.h>
#include "../share/net.h"
JNIEXPORT jint JNICALL Java_com_questdb_std_Net_abortAccept
JNIEXPORT jint JNICALL Java_com_questdb_network_Net_abortAccept
(JNIEnv *e, jclass cl, jlong fd) {
return close((int) fd);
}
......@@ -24,61 +24,60 @@
#include <sys/event.h>
#include <sys/time.h>
#include <stddef.h>
#include <sys/errno.h>
#include "kqueue.h"
#include "jni.h"
JNIEXPORT jshort JNICALL Java_com_questdb_net_Kqueue_getEvfiltRead
JNIEXPORT jshort JNICALL Java_com_questdb_network_KqueueAccessor_getEvfiltRead
(JNIEnv *e, jclass cl) {
return EVFILT_READ;
}
JNIEXPORT jshort JNICALL Java_com_questdb_net_Kqueue_getEvfiltWrite
JNIEXPORT jshort JNICALL Java_com_questdb_network_KqueueAccessor_getEvfiltWrite
(JNIEnv *e, jclass cl) {
return EVFILT_WRITE;
}
JNIEXPORT jshort JNICALL Java_com_questdb_net_Kqueue_getSizeofKevent
JNIEXPORT jshort JNICALL Java_com_questdb_network_KqueueAccessor_getSizeofKevent
(JNIEnv *e, jclass cl) {
return (short) sizeof(struct kevent);
}
JNIEXPORT jshort JNICALL Java_com_questdb_net_Kqueue_getFdOffset
JNIEXPORT jshort JNICALL Java_com_questdb_network_KqueueAccessor_getFdOffset
(JNIEnv *e, jclass cl) {
return (short) offsetof(struct kevent, ident);
}
JNIEXPORT jshort JNICALL Java_com_questdb_net_Kqueue_getFilterOffset
JNIEXPORT jshort JNICALL Java_com_questdb_network_KqueueAccessor_getFilterOffset
(JNIEnv *e, jclass cl) {
return (short) offsetof(struct kevent, filter);
}
JNIEXPORT jshort JNICALL Java_com_questdb_net_Kqueue_getDataOffset
JNIEXPORT jshort JNICALL Java_com_questdb_network_KqueueAccessor_getDataOffset
(JNIEnv *e, jclass cl) {
return (short) offsetof(struct kevent, udata);
}
JNIEXPORT jshort JNICALL Java_com_questdb_net_Kqueue_getFlagsOffset
JNIEXPORT jshort JNICALL Java_com_questdb_network_KqueueAccessor_getFlagsOffset
(JNIEnv *e, jclass cl) {
return (short) offsetof(struct kevent, flags);
}
JNIEXPORT jshort JNICALL Java_com_questdb_net_Kqueue_getEvAdd
JNIEXPORT jshort JNICALL Java_com_questdb_network_KqueueAccessor_getEvAdd
(JNIEnv *e, jclass cl) {
return EV_ADD;
}
JNIEXPORT jshort JNICALL Java_com_questdb_net_Kqueue_getEvOneshot
JNIEXPORT jshort JNICALL Java_com_questdb_network_KqueueAccessor_getEvOneshot
(JNIEnv *e, jclass cl) {
return EV_ONESHOT;
}
JNIEXPORT jint JNICALL Java_com_questdb_net_Kqueue_kqueue
JNIEXPORT jint JNICALL Java_com_questdb_network_KqueueAccessor_kqueue
(JNIEnv *e, jclass cl) {
return kqueue();
}
JNIEXPORT jint JNICALL Java_com_questdb_net_Kqueue_kevent
JNIEXPORT jint JNICALL Java_com_questdb_network_KqueueAccessor_kevent
(JNIEnv *e, jclass cl, jint kq, jlong changelist, jint nChanges, jlong eventlist, jint nEvents) {
struct timespec dontBlock = {0, 0};
return (jint) kevent(kq, (const struct kevent *) changelist, nChanges, (struct kevent *) eventlist, nEvents,
......
/* DO NOT EDIT THIS FILE - it is machine generated */
#include <jni.h>
/* Header for class com_questdb_net_Kqueue */
#ifndef _Included_com_questdb_net_Kqueue
#define _Included_com_questdb_net_Kqueue
#ifdef __cplusplus
extern "C" {
#endif
#undef com_questdb_net_Kqueue_NUM_KEVENTS
#define com_questdb_net_Kqueue_NUM_KEVENTS 1024L
/*
* Class: com_questdb_net_Kqueue
* Method: kevent
* Signature: (IJIJI)I
*/
JNIEXPORT jint JNICALL Java_com_questdb_net_Kqueue_kevent
(JNIEnv *, jclass, jint, jlong, jint, jlong, jint);
/*
* Class: com_questdb_net_Kqueue
* Method: kqueue
* Signature: ()I
*/
JNIEXPORT jint JNICALL Java_com_questdb_net_Kqueue_kqueue
(JNIEnv *, jclass);
/*
* Class: com_questdb_net_Kqueue
* Method: getEvfiltRead
* Signature: ()S
*/
JNIEXPORT jshort JNICALL Java_com_questdb_net_Kqueue_getEvfiltRead
(JNIEnv *, jclass);
/*
* Class: com_questdb_net_Kqueue
* Method: getEvfiltWrite
* Signature: ()S
*/
JNIEXPORT jshort JNICALL Java_com_questdb_net_Kqueue_getEvfiltWrite
(JNIEnv *, jclass);
/*
* Class: com_questdb_net_Kqueue
* Method: getSizeofKevent
* Signature: ()S
*/
JNIEXPORT jshort JNICALL Java_com_questdb_net_Kqueue_getSizeofKevent
(JNIEnv *, jclass);
/*
* Class: com_questdb_net_Kqueue
* Method: getFdOffset
* Signature: ()S
*/
JNIEXPORT jshort JNICALL Java_com_questdb_net_Kqueue_getFdOffset
(JNIEnv *, jclass);
/*
* Class: com_questdb_net_Kqueue
* Method: getFilterOffset
* Signature: ()S
*/
JNIEXPORT jshort JNICALL Java_com_questdb_net_Kqueue_getFilterOffset
(JNIEnv *, jclass);
/*
* Class: com_questdb_net_Kqueue
* Method: getEvAdd
* Signature: ()S
*/
JNIEXPORT jshort JNICALL Java_com_questdb_net_Kqueue_getEvAdd
(JNIEnv *, jclass);
/*
* Class: com_questdb_net_Kqueue
* Method: getEvOneshot
* Signature: ()S
*/
JNIEXPORT jshort JNICALL Java_com_questdb_net_Kqueue_getEvOneshot
(JNIEnv *, jclass);
/*
* Class: com_questdb_net_Kqueue
* Method: getFlagsOffset
* Signature: ()S
*/
JNIEXPORT jshort JNICALL Java_com_questdb_net_Kqueue_getFlagsOffset
(JNIEnv *, jclass);
/*
* Class: com_questdb_net_Kqueue
* Method: getDataOffset
* Signature: ()S
*/
JNIEXPORT jshort JNICALL Java_com_questdb_net_Kqueue_getDataOffset
(JNIEnv *, jclass);
#ifdef __cplusplus
}
#endif
#endif
......@@ -149,7 +149,7 @@ JNIEXPORT jint JNICALL Java_com_questdb_network_Net_recv
JNIEXPORT jboolean JNICALL Java_com_questdb_network_Net_isDead
(JNIEnv *e, jclass cl, jlong fd) {
int c;
return (jboolean) (recv((int) fd, &c, 1, 0) == 0);
return (jboolean) (recv((int) fd, &c, 1, 0) < 1);
}
JNIEXPORT jint JNICALL Java_com_questdb_network_Net_configureNonBlocking
......@@ -160,7 +160,6 @@ JNIEXPORT jint JNICALL Java_com_questdb_network_Net_configureNonBlocking
return flags;
}
if ((flags = fcntl((int) fd, F_SETFL, flags | O_NONBLOCK)) < 0) {
return flags;
}
......
......@@ -26,6 +26,8 @@ package com.questdb.net;
import com.questdb.log.Log;
import com.questdb.log.LogFactory;
import com.questdb.mp.*;
import com.questdb.network.Kqueue;
import com.questdb.network.KqueueAccessor;
import com.questdb.network.Net;
import com.questdb.network.NetworkError;
import com.questdb.std.LongMatrix;
......@@ -165,7 +167,7 @@ public class KQueueDispatcher<C extends Context> extends SynchronizedJob impleme
private void enqueuePending(int watermark) {
int index = 0;
for (int i = watermark, sz = pending.size(), offset = 0; i < sz; i++, offset += Kqueue.SIZEOF_KEVENT) {
for (int i = watermark, sz = pending.size(), offset = 0; i < sz; i++, offset += KqueueAccessor.SIZEOF_KEVENT) {
kqueue.setOffset(offset);
kqueue.readFD((int) pending.get(i, 1), pending.get(i, 0));
LOG.debug().$("kqueued ").$(pending.get(i, 1)).$(" as ").$(index - 1).$();
......@@ -216,7 +218,7 @@ public class KQueueDispatcher<C extends Context> extends SynchronizedJob impleme
int fd = (int) context.getFd();
LOG.debug().$("Registering ").$(fd).$(" status ").$(channelStatus).$();
kqueue.setOffset(offset);
offset += Kqueue.SIZEOF_KEVENT;
offset += KqueueAccessor.SIZEOF_KEVENT;
count++;
switch (channelStatus) {
case ChannelStatus.READ:
......@@ -265,7 +267,7 @@ public class KQueueDispatcher<C extends Context> extends SynchronizedJob impleme
// check all activated FDs
for (int i = 0; i < n; i++) {
kqueue.setOffset(offset);
offset += Kqueue.SIZEOF_KEVENT;
offset += KqueueAccessor.SIZEOF_KEVENT;
int fd = kqueue.getFd();
// this is server socket, accept if there aren't too many already
if (fd == socketFd) {
......@@ -284,13 +286,13 @@ public class KQueueDispatcher<C extends Context> extends SynchronizedJob impleme
continue;
}
if (kqueue.getFlags() == Kqueue.EV_EOF) {
if (kqueue.getFlags() == KqueueAccessor.EV_EOF) {
disconnect(pending.get(row), DisconnectReason.PEER);
} else {
long cursor = ioSequence.nextBully();
Event<C> evt = ioQueue.get(cursor);
evt.context = pending.get(row);
evt.channelStatus = kqueue.getFilter() == Kqueue.EVFILT_READ ? ChannelStatus.READ : ChannelStatus.WRITE;
evt.channelStatus = kqueue.getFilter() == KqueueAccessor.EVFILT_READ ? ChannelStatus.READ : ChannelStatus.WRITE;
ioSequence.done(cursor);
LOG.debug().$("Queuing ").$(kqueue.getFilter()).$(" on ").$(fd).$();
}
......
......@@ -88,12 +88,14 @@ public class IODispatcherLinux<C extends IOContext> extends SynchronizedJob impl
public void close() {
this.epoll.close();
nf.close(serverFd, LOG);
int n = pending.size();
for (int i = 0; i < n; i++) {
disconnect(pending.get(i), DisconnectReason.SILLY);
}
drainQueueAndDisconnect();
long cursor;
do {
cursor = ioEventSubSeq.next();
......
......@@ -28,9 +28,7 @@ import com.questdb.log.LogFactory;
import com.questdb.mp.*;
import com.questdb.net.ChannelStatus;
import com.questdb.net.DisconnectReason;
import com.questdb.net.Kqueue;
import com.questdb.std.LongMatrix;
import com.questdb.std.Misc;
import com.questdb.std.Os;
import com.questdb.std.time.MillisecondClock;
......@@ -55,6 +53,7 @@ public class IODispatcherOsx<C extends IOContext> extends SynchronizedJob implem
private final int capacity;
private final IOContextFactory<C> ioContextFactory;
private final NetworkFacade nf;
private final int initialBias;
private int connectionCount = 0;
public IODispatcherOsx(
......@@ -76,13 +75,18 @@ public class IODispatcherOsx<C extends IOContext> extends SynchronizedJob implem
this.timeout = configuration.getIdleConnectionTimeout();
this.capacity = configuration.getEventCapacity();
this.ioContextFactory = ioContextFactory;
this.initialBias = configuration.getInitialBias();
// bind socket
this.kqueue = new Kqueue(capacity);
this.serverFd = nf.socketTcp(false);
if (nf.bindTcp(this.serverFd, configuration.getBindIPv4Address(), configuration.getBindPort())) {
nf.listen(this.serverFd, configuration.getListenBacklog());
this.kqueue.listen(serverFd);
if (this.kqueue.listen(serverFd) != 0) {
throw NetworkError.instance(nf.errno(), "could not kqueue.listen()");
}
LOG.info()
.$("listening on ")
.$(configuration.getBindIPv4Address()).$(':').$(configuration.getBindPort())
......@@ -95,23 +99,24 @@ public class IODispatcherOsx<C extends IOContext> extends SynchronizedJob implem
@Override
public void close() {
this.kqueue.close();
if (nf.close(serverFd) != 0) {
LOG.error().$("failed to close socket [fd=").$(serverFd).$(", errno=").$(Os.errno()).$(']').$();
}
nf.close(serverFd, LOG);
int n = pending.size();
for (int i = 0; i < n; i++) {
Misc.free(pending.get(i));
disconnect(pending.get(i), DisconnectReason.SILLY);
}
long cursor = interestSubSeq.next();
if (cursor > -1) {
long available = interestSubSeq.available();
while (cursor < available) {
final IOEvent<C> evt = interestQueue.get(cursor);
disconnect(evt.context, DisconnectReason.SILLY);
cursor++;
drainQueueAndDisconnect();
long cursor;
do {
cursor = ioEventSubSeq.next();
if (cursor > -1) {
disconnect(ioEventQueue.get(cursor).context, DisconnectReason.SILLY);
ioEventSubSeq.done(cursor);
}
}
} while (cursor != -1);
LOG.info().$("closed").$();
}
@Override
......@@ -153,14 +158,17 @@ public class IODispatcherOsx<C extends IOContext> extends SynchronizedJob implem
}
if (nf.configureNonBlocking(fd) < 0) {
LOG.error().$("could not configure non-blocking [fd=").$(fd).$(", errno=").$(Os.errno()).$(']').$();
closeFd(fd);
LOG.error().$("could not configure non-blocking [fd=").$(fd).$(", errno=").$(nf.errno()).$(']').$();
nf.close(fd, LOG);
return;
}
if (connectionCount > activeConnectionLimit) {
LOG.info().$("connection limit exceeded [fd=").$(fd).$(']').$();
closeFd(fd);
if (connectionCount == activeConnectionLimit) {
LOG.info().$("connection limit exceeded [fd=").$(fd)
.$(", connectionCount=").$(connectionCount)
.$(", activeConnectionLimit=").$(activeConnectionLimit)
.$(']').$();
nf.close(fd, LOG);
return;
}
......@@ -170,21 +178,14 @@ public class IODispatcherOsx<C extends IOContext> extends SynchronizedJob implem
}
}
private void addPending(long _fd, long timestamp) {
private void addPending(long fd, long timestamp) {
// append to pending
// all rows below watermark will be registered with kqueue
int r = pending.addRow();
LOG.debug().$(" Matrix row ").$(r).$(" for ").$(_fd).$();
pending.set(r, 0, timestamp);
pending.set(r, 1, _fd);
pending.set(r, ioContextFactory.newInstance(_fd));
}
private void closeFd(long fd) {
if (nf.close(fd) != 0) {
LOG.error().$("could not close [fd=").$(fd).$(", errno=").$(nf.errno()).$(']').$();
}
LOG.debug().$("pending [row=").$(r).$(", fd=").$(fd).$(']').$();
pending.set(r, M_TIMESTAMP, timestamp);
pending.set(r, M_FD, fd);
pending.set(r, ioContextFactory.newInstance(fd));
}
private void disconnect(C context, int disconnectReason) {
......@@ -194,49 +195,44 @@ public class IODispatcherOsx<C extends IOContext> extends SynchronizedJob implem
.$(", fd=").$(fd)
.$(", reason=").$(DisconnectReason.nameOf(disconnectReason))
.$(']').$();
closeFd(fd);
nf.close(fd, LOG);
context.close();
connectionCount--;
}
private void drainQueueAndDisconnect() {
long cursor;
do {
cursor = interestSubSeq.next();
if (cursor > -1) {
final long available = interestSubSeq.available();
while (cursor < available) {
disconnect(interestQueue.get(cursor++).context, DisconnectReason.SILLY);
}
interestSubSeq.done(available - 1);
}
} while (cursor != -1);
}
private void enqueuePending(int watermark) {
int index = 0;
for (int i = watermark, sz = pending.size(), offset = 0; i < sz; i++, offset += Kqueue.SIZEOF_KEVENT) {
for (int i = watermark, sz = pending.size(), offset = 0; i < sz; i++, offset += KqueueAccessor.SIZEOF_KEVENT) {
kqueue.setOffset(offset);
kqueue.readFD((int) pending.get(i, M_FD), pending.get(i, M_TIMESTAMP));
LOG.debug().$("kqueued ").$(pending.get(i, M_FD)).$(" as ").$(index - 1).$();
if (initialBias == IODispatcherConfiguration.BIAS_READ) {
kqueue.readFD((int) pending.get(i, M_FD), pending.get(i, M_TIMESTAMP));
} else {
kqueue.writeFD((int) pending.get(i, M_FD), pending.get(i, M_TIMESTAMP));
}
if (++index > capacity - 1) {
kqueue.register(index);
registerWithKQueue(index);
index = 0;
}
}
if (index > 0) {
kqueue.register(index);
LOG.debug().$("Registered ").$(index).$();
}
}
private int findPending(int fd, long ts) {
int r = pending.binarySearch(ts);
if (r < 0) {
return r;
}
if (pending.get(r, M_FD) == fd) {
return r;
} else {
return scanRow(r + 1, fd, ts);
registerWithKQueue(index);
}
}
private void processIdleConnections(long deadline) {
int count = 0;
for (int i = 0, n = pending.size(); i < n && pending.get(i, M_TIMESTAMP) < deadline; i++, count++) {
disconnect(pending.get(i), DisconnectReason.IDLE);
}
pending.zapTop(count);
}
private boolean processRegistrations(long timestamp) {
long cursor;
boolean useful = false;
......@@ -252,13 +248,15 @@ public class IODispatcherOsx<C extends IOContext> extends SynchronizedJob implem
int fd = (int) context.getFd();
LOG.debug().$("Registering ").$(fd).$(" status ").$(operation).$();
kqueue.setOffset(offset);
offset += Kqueue.SIZEOF_KEVENT;
count++;
switch (operation) {
case IOOperation.READ:
offset += KqueueAccessor.SIZEOF_KEVENT;
count++;
kqueue.readFD(fd, timestamp);
break;
case IOOperation.WRITE:
offset += KqueueAccessor.SIZEOF_KEVENT;
count++;
kqueue.writeFD(fd, timestamp);
break;
case IOOperation.DISCONNECT:
......@@ -278,25 +276,52 @@ public class IODispatcherOsx<C extends IOContext> extends SynchronizedJob implem
if (count > capacity - 1) {
kqueue.register(count);
registerWithKQueue(count);
count = 0;
}
}
if (count > 0) {
kqueue.register(count);
registerWithKQueue(count);
}
return useful;
}
private int findPending(int fd, long ts) {
int r = pending.binarySearch(ts);
if (r < 0) {
return r;
}
if (pending.get(r, M_FD) == fd) {
return r;
} else {
return scanRow(r + 1, fd, ts);
}
}
private void processIdleConnections(long deadline) {
int count = 0;
for (int i = 0, n = pending.size(); i < n && pending.get(i, M_TIMESTAMP) < deadline; i++, count++) {
disconnect(pending.get(i), DisconnectReason.IDLE);
}
pending.zapTop(count);
}
private void publishOperation(int operation, C context) {
long cursor = ioEventPubSeq.nextBully();
IOEvent<C> evt = ioEventQueue.get(cursor);
evt.context = context;
evt.operation = operation;
ioEventPubSeq.done(cursor);
LOG.debug().$("fired [fd=").$(context.getFd()).$(", op=").$(evt.operation).$(']').$();
LOG.debug().$("fired [fd=").$(context.getFd()).$(", op=").$(evt.operation).$(", pos=").$(cursor).$(']').$();
}
private void registerWithKQueue(int eventCount) {
if (kqueue.register(eventCount) != 0) {
throw NetworkError.instance(Os.errno()).put("could not register [eventCount=").put(eventCount).put(']');
}
}
@Override
......@@ -309,7 +334,7 @@ public class IODispatcherOsx<C extends IOContext> extends SynchronizedJob implem
// check all activated FDs
for (int i = 0; i < n; i++) {
kqueue.setOffset(offset);
offset += Kqueue.SIZEOF_KEVENT;
offset += KqueueAccessor.SIZEOF_KEVENT;
int fd = kqueue.getFd();
// this is server socket, accept if there aren't too many already
if (fd == serverFd) {
......@@ -324,12 +349,8 @@ public class IODispatcherOsx<C extends IOContext> extends SynchronizedJob implem
continue;
}
if (kqueue.getFlags() == Kqueue.EV_EOF) {
disconnect(pending.get(row), DisconnectReason.PEER);
} else {
publishOperation(kqueue.getFilter() == Kqueue.EVFILT_READ ? ChannelStatus.READ : ChannelStatus.WRITE, pending.get(row));
LOG.debug().$("Queuing ").$(kqueue.getFilter()).$(" on ").$(fd).$();
}
publishOperation(kqueue.getFilter() == KqueueAccessor.EVFILT_READ ? ChannelStatus.READ : ChannelStatus.WRITE, pending.get(row));
LOG.debug().$("Queuing ").$(kqueue.getFilter()).$(" on ").$(fd).$();
pending.deleteRow(row);
watermark--;
}
......
......@@ -21,84 +21,79 @@
*
******************************************************************************/
package com.questdb.net;
package com.questdb.network;
import com.questdb.log.Log;
import com.questdb.log.LogFactory;
import com.questdb.network.Net;
import com.questdb.std.Files;
import com.questdb.std.Unsafe;
import java.io.Closeable;
public final class Kqueue implements Closeable {
public static final short EVFILT_READ;
public static final short SIZEOF_KEVENT;
public static final int EV_EOF = -32751;
private static final short EVFILT_WRITE;
private static final Log LOG = LogFactory.getLog(Kqueue.class);
private static final short FD_OFFSET;
private static final short FILTER_OFFSET;
private static final short FLAGS_OFFSET;
private static final short DATA_OFFSET;
private static final short EV_ADD;
private static final short EV_ONESHOT;
private final long eventList;
private final int kq;
private final int capacity;
private final KqueueFacade kqf;
private long _rPtr;
public Kqueue(int capacity) {
this.capacity = capacity;
this.eventList = this._rPtr = Unsafe.malloc(SIZEOF_KEVENT * (long) capacity);
this.kq = kqueue();
this(KqueueFacadeImpl.INSTANCE, capacity);
}
public static native int kevent(int kq, long changeList, int nChanges, long eventList, int nEvents);
public Kqueue(KqueueFacade kqf, int capacity) {
this.kqf = kqf;
this.capacity = capacity;
this.eventList = this._rPtr = Unsafe.malloc(KqueueAccessor.SIZEOF_KEVENT * (long) capacity);
this.kq = kqf.kqueue();
if (this.kq != -1) {
Files.bumpFileCount();
}
}
@Override
public void close() {
if (Net.close(this.kq) < 0) {
LOG.error().$("Cannot close kqueue ").$(this.kq).$();
}
Unsafe.free(this.eventList, SIZEOF_KEVENT * (long) capacity);
kqf.getNetworkFacade().close(kq, LOG);
Unsafe.free(this.eventList, KqueueAccessor.SIZEOF_KEVENT * (long) capacity);
}
public long getData() {
return Unsafe.getUnsafe().getLong(_rPtr + DATA_OFFSET);
return Unsafe.getUnsafe().getLong(_rPtr + KqueueAccessor.DATA_OFFSET);
}
public int getFd() {
return (int) Unsafe.getUnsafe().getLong(_rPtr + FD_OFFSET);
return (int) Unsafe.getUnsafe().getLong(_rPtr + KqueueAccessor.FD_OFFSET);
}
public int getFilter() {
return Unsafe.getUnsafe().getShort(_rPtr + FILTER_OFFSET);
return Unsafe.getUnsafe().getShort(_rPtr + KqueueAccessor.FILTER_OFFSET);
}
public int getFlags() {
return Unsafe.getUnsafe().getShort(_rPtr + FLAGS_OFFSET);
return Unsafe.getUnsafe().getShort(_rPtr + KqueueAccessor.FLAGS_OFFSET);
}
public void listen(long sfd) {
public int listen(long sfd) {
_rPtr = eventList;
commonFd(sfd, 0);
Unsafe.getUnsafe().putShort(_rPtr + FILTER_OFFSET, EVFILT_READ);
Unsafe.getUnsafe().putShort(_rPtr + FLAGS_OFFSET, EV_ADD);
register(1);
Unsafe.getUnsafe().putShort(_rPtr + KqueueAccessor.FILTER_OFFSET, KqueueAccessor.EVFILT_READ);
Unsafe.getUnsafe().putShort(_rPtr + KqueueAccessor.FLAGS_OFFSET, KqueueAccessor.EV_ADD);
return register(1);
}
public int poll() {
return kevent(kq, 0, 0, eventList, capacity);
return kqf.kevent(kq, 0, 0, eventList, capacity);
}
public void readFD(int fd, long data) {
commonFd(fd, data);
Unsafe.getUnsafe().putShort(_rPtr + FILTER_OFFSET, EVFILT_READ);
Unsafe.getUnsafe().putShort(_rPtr + FLAGS_OFFSET, (short) (EV_ADD | EV_ONESHOT));
Unsafe.getUnsafe().putShort(_rPtr + KqueueAccessor.FILTER_OFFSET, KqueueAccessor.EVFILT_READ);
Unsafe.getUnsafe().putShort(_rPtr + KqueueAccessor.FLAGS_OFFSET, (short) (KqueueAccessor.EV_ADD | KqueueAccessor.EV_ONESHOT));
}
public void register(int n) {
kevent(kq, eventList, n, 0, 0);
public int register(int n) {
return kqf.kevent(kq, eventList, n, 0, 0);
}
public void setOffset(int offset) {
......@@ -107,45 +102,12 @@ public final class Kqueue implements Closeable {
public void writeFD(int fd, long data) {
commonFd(fd, data);
Unsafe.getUnsafe().putShort(_rPtr + FILTER_OFFSET, EVFILT_WRITE);
Unsafe.getUnsafe().putShort(_rPtr + FLAGS_OFFSET, (short) (EV_ADD | EV_ONESHOT));
Unsafe.getUnsafe().putShort(_rPtr + KqueueAccessor.FILTER_OFFSET, KqueueAccessor.EVFILT_WRITE);
Unsafe.getUnsafe().putShort(_rPtr + KqueueAccessor.FLAGS_OFFSET, (short) (KqueueAccessor.EV_ADD | KqueueAccessor.EV_ONESHOT));
}
private static native int kqueue();
private static native short getEvfiltRead();
private static native short getEvfiltWrite();
private static native short getSizeofKevent();
private static native short getFdOffset();
private static native short getFilterOffset();
private static native short getEvAdd();
private static native short getEvOneshot();
private static native short getFlagsOffset();
private static native short getDataOffset();
private void commonFd(long fd, long data) {
Unsafe.getUnsafe().putLong(_rPtr + FD_OFFSET, fd);
Unsafe.getUnsafe().putLong(_rPtr + DATA_OFFSET, data);
}
static {
EVFILT_READ = getEvfiltRead();
EVFILT_WRITE = getEvfiltWrite();
SIZEOF_KEVENT = getSizeofKevent();
FD_OFFSET = getFdOffset();
FILTER_OFFSET = getFilterOffset();
FLAGS_OFFSET = getFlagsOffset();
DATA_OFFSET = getDataOffset();
EV_ADD = getEvAdd();
EV_ONESHOT = getEvOneshot();
Unsafe.getUnsafe().putLong(_rPtr + KqueueAccessor.FD_OFFSET, fd);
Unsafe.getUnsafe().putLong(_rPtr + KqueueAccessor.DATA_OFFSET, data);
}
}
/*******************************************************************************
* ___ _ ____ ____
* / _ \ _ _ ___ ___| |_| _ \| __ )
* | | | | | | |/ _ \/ __| __| | | | _ \
* | |_| | |_| | __/\__ \ |_| |_| | |_) |
* \__\_\\__,_|\___||___/\__|____/|____/
*
* Copyright (C) 2014-2019 Appsicle
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
******************************************************************************/
package com.questdb.network;
public class KqueueAccessor {
public static final short EVFILT_READ;
public static final short SIZEOF_KEVENT;
public static final int EV_EOF = -32751;
static final short EV_ONESHOT;
static final short EVFILT_WRITE;
static final short FD_OFFSET;
static final short FILTER_OFFSET;
static final short FLAGS_OFFSET;
static final short DATA_OFFSET;
static final short EV_ADD;
static native int kevent(int kq, long changeList, int nChanges, long eventList, int nEvents);
static native int kqueue();
static native short getEvfiltRead();
static native short getEvfiltWrite();
static native short getSizeofKevent();
static native short getFdOffset();
static native short getFilterOffset();
static native short getEvAdd();
static native short getEvOneshot();
static native short getFlagsOffset();
static native short getDataOffset();
static {
EVFILT_READ = getEvfiltRead();
EVFILT_WRITE = getEvfiltWrite();
SIZEOF_KEVENT = getSizeofKevent();
FD_OFFSET = getFdOffset();
FILTER_OFFSET = getFilterOffset();
FLAGS_OFFSET = getFlagsOffset();
DATA_OFFSET = getDataOffset();
EV_ADD = getEvAdd();
EV_ONESHOT = getEvOneshot();
}
}
/*******************************************************************************
* ___ _ ____ ____
* / _ \ _ _ ___ ___| |_| _ \| __ )
* | | | | | | |/ _ \/ __| __| | | | _ \
* | |_| | |_| | __/\__ \ |_| |_| | |_) |
* \__\_\\__,_|\___||___/\__|____/|____/
*
* Copyright (C) 2014-2019 Appsicle
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
******************************************************************************/
package com.questdb.network;
public interface KqueueFacade {
NetworkFacade getNetworkFacade();
int kevent(int kq, long changeList, int nChanges, long eventList, int nEvents);
int kqueue();
}
/*******************************************************************************
* ___ _ ____ ____
* / _ \ _ _ ___ ___| |_| _ \| __ )
* | | | | | | |/ _ \/ __| __| | | | _ \
* | |_| | |_| | __/\__ \ |_| |_| | |_) |
* \__\_\\__,_|\___||___/\__|____/|____/
*
* Copyright (C) 2014-2019 Appsicle
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
******************************************************************************/
package com.questdb.network;
public class KqueueFacadeImpl implements KqueueFacade {
public static final KqueueFacade INSTANCE = new KqueueFacadeImpl();
@Override
public NetworkFacade getNetworkFacade() {
return NetworkFacadeImpl.INSTANCE;
}
@Override
public int kevent(int kq, long changeList, int nChanges, long eventList, int nEvents) {
return KqueueAccessor.kevent(kq, changeList, nChanges, eventList, nEvents);
}
@Override
public int kqueue() {
return KqueueAccessor.kqueue();
}
}
......@@ -72,6 +72,11 @@ public class NetworkError extends Error implements Sinkable {
return this;
}
public NetworkError put(int value) {
message.put(value);
return this;
}
public NetworkError put(Throwable err) {
message.put(err);
return this;
......@@ -84,6 +89,6 @@ public class NetworkError extends Error implements Sinkable {
@Override
public void toSink(CharSink sink) {
sink.put(message);
sink.put("[errno=").put(errno).put("] ").put(message);
}
}
......@@ -25,6 +25,9 @@ package com.questdb.cutlass.http;
import com.questdb.log.Log;
import com.questdb.log.LogFactory;
import com.questdb.mp.MPSequence;
import com.questdb.mp.RingQueue;
import com.questdb.mp.SCSequence;
import com.questdb.mp.SOCountDownLatch;
import com.questdb.network.*;
import com.questdb.std.ObjList;
......@@ -32,8 +35,10 @@ import com.questdb.std.Unsafe;
import com.questdb.std.str.StringSink;
import com.questdb.test.tools.TestUtils;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
......@@ -45,7 +50,6 @@ public class IODispatcherTest {
LOG.info().$("started testBiasWrite").$();
TestUtils.assertMemoryLeak(() -> {
SOCountDownLatch connectLatch = new SOCountDownLatch(1);
......@@ -267,7 +271,6 @@ public class IODispatcherTest {
}
}
// Thread.sleep(3000);
Assert.assertFalse(configuration.getActiveConnectionLimit() < dispatcher.getConnectionCount());
serverRunning.set(false);
serverHaltLatch.await();
......@@ -521,8 +524,6 @@ public class IODispatcherTest {
// do not close client side before server does theirs
Assert.assertTrue(Net.isDead(fd));
Assert.assertEquals(0, Net.close(fd));
LOG.info().$("closed [fd=").$(fd).$(']').$();
TestUtils.assertEquals("", sink);
} finally {
......@@ -530,6 +531,7 @@ public class IODispatcherTest {
}
} finally {
Net.close(fd);
LOG.info().$("closed [fd=").$(fd).$(']').$();
}
Assert.assertEquals(1, closeCount.get());
......@@ -537,6 +539,164 @@ public class IODispatcherTest {
});
}
@Test
@Ignore
// this test is ignore for the time being because it is unstable on OSX and I
// have not figured out the reason yet. I would like to see if this test
// runs any different on Linux, just to narrow the problem down to either
// dispatcher or Http parser.
public void testTwoThreadsSendTwoThreadsRead() throws Exception {
LOG.info().$("started testSendHttpGet").$();
final String request = "GET /status?x=1&a=%26b&c&d=x HTTP/1.1\r\n" +
"Host: localhost:9000\r\n" +
"Connection: keep-alive\r\n" +
"Cache-Control: max-age=0\r\n" +
"Accept: text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8\r\n" +
"User-Agent: Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/31.0.1650.48 Safari/537.36\r\n" +
"Accept-Encoding: gzip,deflate,sdch\r\n" +
"Accept-Language: en-US,en;q=0.8\r\n" +
"Cookie: textwrapon=false; textautoformat=false; wysiwyg=textarea\r\n" +
"\r\n";
// the difference between request and expected is url encoding (and ':' padding, which can easily be fixed)
final String expected = "GET /status?x=1&a=&b&c&d=x HTTP/1.1\r\n" +
"Host:localhost:9000\r\n" +
"Connection:keep-alive\r\n" +
"Cache-Control:max-age=0\r\n" +
"Accept:text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8\r\n" +
"User-Agent:Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/31.0.1650.48 Safari/537.36\r\n" +
"Accept-Encoding:gzip,deflate,sdch\r\n" +
"Accept-Language:en-US,en;q=0.8\r\n" +
"Cookie:textwrapon=false; textautoformat=false; wysiwyg=textarea\r\n" +
"\r\n";
int N = 1000;
TestUtils.assertMemoryLeak(() -> {
HttpServerConfiguration httpServerConfiguration = new DefaultHttpServerConfiguration();
final NetworkFacade nf = NetworkFacadeImpl.INSTANCE;
final AtomicInteger requestsReceived = new AtomicInteger();
try (IODispatcher<HttpConnectionContext> dispatcher = IODispatchers.create(
new DefaultIODispatcherConfiguration(),
fd -> new HttpConnectionContext(httpServerConfiguration, fd)
)) {
// server will publish status of each request to this queue
final RingQueue<Status> queue = new RingQueue<>(Status::new, 1024);
final MPSequence pubSeq = new MPSequence(queue.getCapacity());
SCSequence subSeq = new SCSequence();
pubSeq.then(subSeq).then(pubSeq);
AtomicBoolean serverRunning = new AtomicBoolean(true);
int serverThreadCount = 1;
CountDownLatch serverHaltLatch = new CountDownLatch(serverThreadCount);
for (int j = 0; j < serverThreadCount; j++) {
new Thread(() -> {
final StringSink sink = new StringSink();
final long responseBuf = Unsafe.malloc(32);
Unsafe.getUnsafe().putByte(responseBuf, (byte) 'A');
HttpRequestProcessorSelector selector = url -> (context, dispatcher1) -> {
HttpHeaders headers = context.getHeaders();
sink.clear();
sink.put(headers.getMethodLine());
sink.put("\r\n");
ObjList<CharSequence> headerNames = headers.getHeaderNames();
for (int i = 0, n = headerNames.size(); i < n; i++) {
sink.put(headerNames.getQuick(i)).put(':');
sink.put(headers.getHeader(headerNames.getQuick(i)));
sink.put("\r\n");
}
sink.put("\r\n");
boolean result;
try {
TestUtils.assertEquals(expected, sink);
result = true;
} catch (Exception e) {
result = false;
}
while (true) {
long cursor = pubSeq.next();
if (cursor < 0) {
continue;
}
queue.get(cursor).valid = result;
pubSeq.done(cursor);
break;
}
requestsReceived.incrementAndGet();
nf.send(context.getFd(), responseBuf, 1);
dispatcher1.registerChannel(context, IOOperation.READ);
};
while (serverRunning.get()) {
dispatcher.run();
dispatcher.processIOQueue(
(operation, context, disp) -> context.handleClientOperation(operation, nf, disp, selector)
);
}
Unsafe.free(responseBuf, 32);
serverHaltLatch.countDown();
}).start();
}
new Thread(() -> {
for (int i = 0; i < N; i++) {
long fd = Net.socketTcp(true);
try {
long sockAddr = Net.sockaddr("127.0.0.1", 9001);
try {
Assert.assertTrue(fd > -1);
Assert.assertEquals(0, Net.connect(fd, sockAddr));
int len = request.length();
long buffer = TestUtils.toMemory(request);
try {
Assert.assertEquals(len, Net.send(fd, buffer, len));
Assert.assertEquals("fd=" + fd + ", i=" + i, 1, Net.recv(fd, buffer, 1));
Assert.assertEquals('A', Unsafe.getUnsafe().getByte(buffer));
} finally {
Unsafe.free(buffer, len);
}
} finally {
Net.freeSockAddr(sockAddr);
}
} finally {
Net.close(fd);
}
}
}).start();
int receiveCount = 0;
while (receiveCount < N) {
long cursor = subSeq.next();
if (cursor < 0) {
continue;
}
boolean valid = queue.get(cursor).valid;
subSeq.done(cursor);
Assert.assertTrue(valid);
receiveCount++;
}
serverRunning.set(false);
serverHaltLatch.await();
}
Assert.assertEquals(N, requestsReceived.get());
});
}
private static class HelloContext implements IOContext {
private final long fd;
private final long buffer = Unsafe.malloc(1024);
......@@ -558,4 +718,8 @@ public class IODispatcherTest {
return fd;
}
}
class Status {
boolean valid;
}
}
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册