提交 709adef8 编写于 作者: V Vlad Ilyushchenko

NET: provisional implementation of JsonQueryProcessor - this processor returns...

NET: provisional implementation of JsonQueryProcessor - this processor returns query results as json. It is not fully tested yet, but basics are working.
上级 9dc2f3da
...@@ -163,7 +163,11 @@ JNIEXPORT jboolean JNICALL Java_com_questdb_network_Net_bindUdp ...@@ -163,7 +163,11 @@ JNIEXPORT jboolean JNICALL Java_com_questdb_network_Net_bindUdp
JNIEXPORT jint JNICALL Java_com_questdb_network_Net_connect JNIEXPORT jint JNICALL Java_com_questdb_network_Net_connect
(JNIEnv *e, jclass cl, jlong fd, jlong sockAddr) { (JNIEnv *e, jclass cl, jlong fd, jlong sockAddr) {
return connect((SOCKET) fd, (const struct sockaddr *) sockAddr, sizeof(struct sockaddr)); jint res = connect((SOCKET) fd, (const struct sockaddr *) sockAddr, sizeof(struct sockaddr));
if (res < 0) {
SaveLastError();
}
return res;
} }
JNIEXPORT void JNICALL Java_com_questdb_network_Net_listen JNIEXPORT void JNICALL Java_com_questdb_network_Net_listen
...@@ -173,13 +177,22 @@ JNIEXPORT void JNICALL Java_com_questdb_network_Net_listen ...@@ -173,13 +177,22 @@ JNIEXPORT void JNICALL Java_com_questdb_network_Net_listen
JNIEXPORT jlong JNICALL Java_com_questdb_network_Net_accept0 JNIEXPORT jlong JNICALL Java_com_questdb_network_Net_accept0
(JNIEnv *e, jclass cl, jlong fd) { (JNIEnv *e, jclass cl, jlong fd) {
return (jlong) accept((SOCKET) fd, NULL, 0); // cast to jlong makes variable signed, otherwise < 0 comparison does not work
jlong sock = (jlong) accept((SOCKET) fd, NULL, 0);
if (sock < 0) {
SaveLastError();
}
return sock;
} }
JNIEXPORT jint JNICALL Java_com_questdb_network_Net_configureNonBlocking JNIEXPORT jint JNICALL Java_com_questdb_network_Net_configureNonBlocking
(JNIEnv *e, jclass cl, jlong fd) { (JNIEnv *e, jclass cl, jlong fd) {
u_long mode = 1; u_long mode = 1;
return ioctlsocket((SOCKET) fd, FIONBIO, &mode); jint res = ioctlsocket((SOCKET) fd, FIONBIO, &mode);
if (res < 0) {
SaveLastError();
}
return res;
} }
JNIEXPORT jint JNICALL Java_com_questdb_network_Net_recv JNIEXPORT jint JNICALL Java_com_questdb_network_Net_recv
......
/*******************************************************************************
* ___ _ ____ ____
* / _ \ _ _ ___ ___| |_| _ \| __ )
* | | | | | | |/ _ \/ __| __| | | | _ \
* | |_| | |_| | __/\__ \ |_| |_| | |_) |
* \__\_\\__,_|\___||___/\__|____/|____/
*
* Copyright (C) 2014-2019 Appsicle
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
******************************************************************************/
package com.questdb.cutlass.http;
import com.questdb.std.str.CharSink;
public interface HttpChunkedResponseSocket extends CharSink {
void bookmark();
void done() throws PeerDisconnectedException, PeerIsSlowToReadException;
CharSink headers();
boolean resetToBookmark();
void sendChunk() throws PeerDisconnectedException, PeerIsSlowToReadException;
void sendHeader() throws PeerDisconnectedException, PeerIsSlowToReadException;
void status(int status, CharSequence contentType);
}
...@@ -101,8 +101,8 @@ public class HttpConnectionContext implements IOContext, Locality, Mutable { ...@@ -101,8 +101,8 @@ public class HttpConnectionContext implements IOContext, Locality, Mutable {
return fd; return fd;
} }
public HttpResponseSink.ChunkedResponseImpl getChunkedResponseSocket() { public HttpChunkedResponseSocket getChunkedResponseSocket() {
return responseSink.getChunkedResponse(); return responseSink.getChunkedSocket();
} }
@Override @Override
......
...@@ -23,8 +23,13 @@ ...@@ -23,8 +23,13 @@
package com.questdb.cutlass.http; package com.questdb.cutlass.http;
public interface HttpRequestProcessorSelector { import java.io.Closeable;
public interface HttpRequestProcessorSelector extends Closeable {
HttpRequestProcessor select(CharSequence url); HttpRequestProcessor select(CharSequence url);
HttpRequestProcessor getDefaultProcessor(); HttpRequestProcessor getDefaultProcessor();
@Override
void close();
} }
...@@ -77,7 +77,7 @@ public class HttpResponseSink implements Closeable, Mutable { ...@@ -77,7 +77,7 @@ public class HttpResponseSink implements Closeable, Mutable {
private long total = 0; private long total = 0;
private boolean header = true; private boolean header = true;
public ChunkedResponseImpl getChunkedResponse() { public HttpChunkedResponseSocket getChunkedSocket() {
return chunkedResponse; return chunkedResponse;
} }
...@@ -549,20 +549,23 @@ public class HttpResponseSink implements Closeable, Mutable { ...@@ -549,20 +549,23 @@ public class HttpResponseSink implements Closeable, Mutable {
} }
} }
public class ChunkedResponseImpl extends ResponseSinkImpl { private class ChunkedResponseImpl extends ResponseSinkImpl implements HttpChunkedResponseSocket {
private long bookmark = outPtr; private long bookmark = outPtr;
@Override
public void bookmark() { public void bookmark() {
bookmark = _wPtr; bookmark = _wPtr;
} }
@Override
public void done() throws PeerDisconnectedException, PeerIsSlowToReadException { public void done() throws PeerDisconnectedException, PeerIsSlowToReadException {
flushBufSize = 0; flushBufSize = 0;
if (compressed) { if (compressed) {
resumeSend(FLUSH); resumeSend(FLUSH);
} else { } else {
resumeSend(END_CHUNK); resumeSend(END_CHUNK);
LOG.debug().$("end chunk sent").$();
} }
} }
...@@ -571,15 +574,18 @@ public class HttpResponseSink implements Closeable, Mutable { ...@@ -571,15 +574,18 @@ public class HttpResponseSink implements Closeable, Mutable {
// sendChunk(); // sendChunk();
// } // }
@Override
public CharSink headers() { public CharSink headers() {
return headerImpl; return headerImpl;
} }
@Override
public boolean resetToBookmark() { public boolean resetToBookmark() {
_wPtr = bookmark; _wPtr = bookmark;
return bookmark != outPtr; return bookmark != outPtr;
} }
@Override
public void sendChunk() throws PeerDisconnectedException, PeerIsSlowToReadException { public void sendChunk() throws PeerDisconnectedException, PeerIsSlowToReadException {
if (outPtr != _wPtr) { if (outPtr != _wPtr) {
if (compressed) { if (compressed) {
...@@ -592,6 +598,7 @@ public class HttpResponseSink implements Closeable, Mutable { ...@@ -592,6 +598,7 @@ public class HttpResponseSink implements Closeable, Mutable {
} }
} }
@Override
public void sendHeader() throws PeerDisconnectedException, PeerIsSlowToReadException { public void sendHeader() throws PeerDisconnectedException, PeerIsSlowToReadException {
prepareHeaderSink(); prepareHeaderSink();
flushSingle(); flushSingle();
......
...@@ -125,7 +125,7 @@ public class HttpServer implements Closeable { ...@@ -125,7 +125,7 @@ public class HttpServer implements Closeable {
); );
for (int i = 0; i < workerCount; i++) { for (int i = 0; i < workerCount; i++) {
ObjHashSet<Job> jobs = new ObjHashSet<>(); final ObjHashSet<Job> jobs = new ObjHashSet<>();
final int index = i; final int index = i;
jobs.add(dispatcher); jobs.add(dispatcher);
jobs.add(new Job() { jobs.add(new Job() {
...@@ -145,7 +145,10 @@ public class HttpServer implements Closeable { ...@@ -145,7 +145,10 @@ public class HttpServer implements Closeable {
workerHaltLatch, workerHaltLatch,
-1, -1,
LOG, LOG,
configuration.getConnectionPoolInitialSize() configuration.getConnectionPoolInitialSize(),
// have each thread release their own processor selectors
// in case processors stash some of their resources in thread-local variables
() -> Misc.free(selectors.getQuick(index))
); );
worker.setName("questdb-http-" + i); worker.setName("questdb-http-" + i);
workers.add(worker); workers.add(worker);
...@@ -165,9 +168,10 @@ public class HttpServer implements Closeable { ...@@ -165,9 +168,10 @@ public class HttpServer implements Closeable {
SOCountDownLatch haltLatch, SOCountDownLatch haltLatch,
int affinity, int affinity,
Log log, Log log,
int contextPoolSize int contextPoolSize,
Runnable cleaner
) { ) {
super(jobs, haltLatch, affinity, log); super(jobs, haltLatch, affinity, log, cleaner);
this.contextPool = new WeakObjectPool<>(() -> new HttpConnectionContext(configuration), contextPoolSize); this.contextPool = new WeakObjectPool<>(() -> new HttpConnectionContext(configuration), contextPoolSize);
} }
...@@ -191,6 +195,15 @@ public class HttpServer implements Closeable { ...@@ -191,6 +195,15 @@ public class HttpServer implements Closeable {
public HttpRequestProcessor getDefaultProcessor() { public HttpRequestProcessor getDefaultProcessor() {
return defaultRequestProcessor; return defaultRequestProcessor;
} }
@Override
public void close() {
Misc.free(defaultRequestProcessor);
ObjList<CharSequence> processorKeys = processorMap.keys();
for (int i = 0, n = processorKeys.size(); i < n; i++) {
Misc.free(processorMap.get(processorKeys.getQuick(i)));
}
}
} }
private class HttpContextFactory implements IOContextFactory<HttpConnectionContext> { private class HttpContextFactory implements IOContextFactory<HttpConnectionContext> {
......
/*******************************************************************************
* ___ _ ____ ____
* / _ \ _ _ ___ ___| |_| _ \| __ )
* | | | | | | |/ _ \/ __| __| | | | _ \
* | |_| | |_| | __/\__ \ |_| |_| | |_) |
* \__\_\\__,_|\___||___/\__|____/|____/
*
* Copyright (C) 2014-2019 Appsicle
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
******************************************************************************/
package com.questdb.cutlass.http.processors;
import com.questdb.cairo.sql.Record;
import com.questdb.cairo.sql.RecordCursor;
import com.questdb.cairo.sql.RecordCursorFactory;
import com.questdb.cairo.sql.RecordMetadata;
import com.questdb.ql.ChannelCheckCancellationHandler;
import com.questdb.std.Misc;
import com.questdb.std.Mutable;
import java.io.Closeable;
public abstract class AbstractQueryContext implements Mutable, Closeable {
final ChannelCheckCancellationHandler cancellationHandler;
final long fd;
RecordCursorFactory recordCursorFactory;
CharSequence query;
RecordMetadata metadata;
RecordCursor cursor;
long count;
long skip;
long stop;
Record record;
int queryState = JsonQueryProcessor.QUERY_PREFIX;
int columnIndex;
public AbstractQueryContext(long fd, int cyclesBeforeCancel) {
this.cancellationHandler = new ChannelCheckCancellationHandler(fd, cyclesBeforeCancel);
this.fd = fd;
}
@Override
public void clear() {
metadata = null;
cursor = Misc.free(cursor);
record = null;
if (recordCursorFactory != null) {
// todo: avoid toString()
JsonQueryProcessor.FACTORY_CACHE.get().put(query.toString(), recordCursorFactory);
recordCursorFactory = null;
}
query = null;
queryState = JsonQueryProcessor.QUERY_PREFIX;
columnIndex = 0;
}
@Override
public void close() {
cursor = Misc.free(cursor);
recordCursorFactory = Misc.free(recordCursorFactory);
}
}
/*******************************************************************************
* ___ _ ____ ____
* / _ \ _ _ ___ ___| |_| _ \| __ )
* | | | | | | |/ _ \/ __| __| | | | _ \
* | |_| | |_| | __/\__ \ |_| |_| | |_) |
* \__\_\\__,_|\___||___/\__|____/|____/
*
* Copyright (C) 2014-2019 Appsicle
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
******************************************************************************/
package com.questdb.cutlass.http.processors;
import com.questdb.cairo.CairoError;
import com.questdb.cairo.CairoException;
import com.questdb.cairo.ColumnType;
import com.questdb.cairo.sql.CairoEngine;
import com.questdb.cairo.sql.Record;
import com.questdb.cairo.sql.RecordCursorFactory;
import com.questdb.cutlass.http.*;
import com.questdb.griffin.SqlCompiler;
import com.questdb.griffin.SqlException;
import com.questdb.log.Log;
import com.questdb.log.LogFactory;
import com.questdb.log.LogRecord;
import com.questdb.network.IODispatcher;
import com.questdb.network.IOOperation;
import com.questdb.std.*;
import com.questdb.std.str.CharSink;
import java.io.Closeable;
import java.lang.ThreadLocal;
import java.util.concurrent.atomic.AtomicLong;
public class JsonQueryProcessor implements HttpRequestProcessor, Closeable {
static final int QUERY_DATA_SUFFIX = 7;
static final int QUERY_RECORD_SUFFIX = 6;
static final int QUERY_RECORD_COLUMNS = 5;
static final int QUERY_RECORD_START = 4;
static final int QUERY_META_SUFFIX = 3;
static final int QUERY_METADATA = 2;
static final int QUERY_PREFIX = 1;
// Factory cache is thread local due to possibility of factory being
// close by another thread. Peer disconnect is a typical example of this.
// Being asynchronous we may need to be able to return factory to the cache
// by the same thread that executes the dispatcher.
static final ThreadLocal<AssociativeCache<RecordCursorFactory>> FACTORY_CACHE = ThreadLocal.withInitial(() -> new AssociativeCache<>(8, 8));
private static final LocalValue<JsonQueryProcessorState> LV = new LocalValue<>();
private static final Log LOG = LogFactory.getLog(JsonQueryProcessor.class);
private final AtomicLong cacheHits = new AtomicLong();
private final AtomicLong cacheMisses = new AtomicLong();
private final SqlCompiler compiler;
public JsonQueryProcessor(CairoEngine engine) {
// todo: add scheduler
this.compiler = new SqlCompiler(engine);
}
private static void putValue(HttpChunkedResponseSocket socket, int type, Record rec, int col) {
switch (type) {
case ColumnType.BOOLEAN:
socket.put(rec.getBool(col));
break;
case ColumnType.BYTE:
socket.put(rec.getByte(col));
break;
case ColumnType.DOUBLE:
socket.put(rec.getDouble(col), 10);
break;
case ColumnType.FLOAT:
socket.put(rec.getFloat(col), 10);
break;
case ColumnType.INT:
final int i = rec.getInt(col);
if (i == Integer.MIN_VALUE) {
socket.put("null");
} else {
Numbers.append(socket, i);
}
break;
case ColumnType.LONG:
final long l = rec.getLong(col);
if (l == Long.MIN_VALUE) {
socket.put("null");
} else {
socket.put(l);
}
break;
case ColumnType.DATE:
final long d = rec.getDate(col);
if (d == Long.MIN_VALUE) {
socket.put("null");
break;
}
socket.put('"').putISODateMillis(d).put('"');
break;
case ColumnType.SHORT:
socket.put(rec.getShort(col));
break;
case ColumnType.STRING:
putStringOrNull(socket, rec.getStr(col));
break;
case ColumnType.SYMBOL:
putStringOrNull(socket, rec.getSym(col));
break;
case ColumnType.BINARY:
socket.put('[');
socket.put(']');
break;
default:
break;
}
}
private static void putStringOrNull(CharSink r, CharSequence str) {
if (str == null) {
r.put("null");
} else {
r.encodeUtf8AndQuote(str);
}
}
@Override
public void close() {
Misc.free(compiler);
Misc.free(FACTORY_CACHE.get());
}
@Override
public void onHeadersReady(HttpConnectionContext context) {
}
@Override
public void resumeRecv(HttpConnectionContext context, IODispatcher<HttpConnectionContext> dispatcher) {
}
@Override
public void onRequestComplete(
HttpConnectionContext context,
IODispatcher<HttpConnectionContext> dispatcher
) throws PeerDisconnectedException, PeerIsSlowToReadException {
JsonQueryProcessorState state = LV.get(context);
if (state == null) {
// todo: configure state externally
LV.set(context, state = new JsonQueryProcessorState(context.getFd(), 1000));
}
HttpChunkedResponseSocket socket = context.getChunkedResponseSocket();
if (parseUrl(socket, context.getRequestHeader(), state)) {
execute(context, dispatcher, state, socket);
} else {
readyForNextRequest(context, dispatcher);
}
}
public boolean parseUrl(
HttpChunkedResponseSocket socket,
HttpRequestHeader request,
JsonQueryProcessorState state
) throws PeerDisconnectedException, PeerIsSlowToReadException {
// Query text.
final CharSequence query = request.getUrlParam("query");
if (query == null || query.length() == 0) {
info(state).$("Empty query request received. Sending empty reply.").$();
sendException(socket, 0, "No query text", 400, state.query);
return false;
}
// Url Params.
long skip = 0;
long stop = Long.MAX_VALUE;
CharSequence limit = request.getUrlParam("limit");
if (limit != null) {
int sepPos = Chars.indexOf(limit, ',');
try {
if (sepPos > 0) {
skip = Numbers.parseLong(limit, 0, sepPos);
if (sepPos + 1 < limit.length()) {
stop = Numbers.parseLong(limit, sepPos + 1, limit.length());
}
} else {
stop = Numbers.parseLong(limit);
}
} catch (NumericException ex) {
// Skip or stop will have default value.
}
}
if (stop < 0) {
stop = 0;
}
if (skip < 0) {
skip = 0;
}
state.query = query;
state.skip = skip;
state.count = 0L;
state.stop = stop;
state.noMeta = Chars.equalsNc("true", request.getUrlParam("nm"));
state.fetchAll = Chars.equalsNc("true", request.getUrlParam("count"));
return true;
}
public void execute(
HttpConnectionContext context,
IODispatcher<HttpConnectionContext> dispatcher,
JsonQueryProcessorState state,
HttpChunkedResponseSocket socket
) throws PeerDisconnectedException, PeerIsSlowToReadException {
try {
state.recordCursorFactory = FACTORY_CACHE.get().poll(state.query);
int retryCount = 0;
do {
if (state.recordCursorFactory == null) {
state.recordCursorFactory = compiler.compile(state.query);
cacheHits.incrementAndGet();
info(state).$("execute-new [q=`").$(state.query).
$("`, skip: ").$(state.skip).
$(", stop: ").$(state.stop).
$(']').$();
} else {
cacheMisses.incrementAndGet();
info(state).$("execute-cached [q=`").$(state.query).
$("`, skip: ").$(state.skip).
$(", stop: ").$(state.stop).
$(']').$();
}
if (state.recordCursorFactory != null) {
try {
state.cursor = state.recordCursorFactory.getCursor();
state.metadata = state.recordCursorFactory.getMetadata();
header(socket, 200);
resumeSend(context, dispatcher);
break;
} catch (CairoError | CairoException e) {
if (retryCount == 0) {
// todo: we want to clear cache, no need to create string to achieve this
FACTORY_CACHE.get().put(state.query.toString(), null);
state.recordCursorFactory = null;
LOG.error().$("RecordSource execution failed. ").$(e.getMessage()).$(". Retrying ...").$();
retryCount++;
} else {
internalError(socket, e, state);
break;
}
}
} else {
header(socket, 200);
sendConfirmation(socket);
break;
}
} while (true);
} catch (SqlException e) {
syntaxError(socket, e, state);
readyForNextRequest(context, dispatcher);
} catch (CairoException | CairoError e) {
internalError(socket, e, state);
}
}
private void syntaxError(
HttpChunkedResponseSocket socket,
SqlException sqlException,
JsonQueryProcessorState state
) throws PeerDisconnectedException, PeerIsSlowToReadException {
info(state)
.$("syntax-error [q=`").$(state.query)
.$("`, at=").$(sqlException.getPosition())
.$(", message=`").$(sqlException.getFlyweightMessage()).$('`')
.$(']').$();
sendException(socket, sqlException.getPosition(), sqlException.getFlyweightMessage(), 400, state.query);
}
private void sendConfirmation(HttpChunkedResponseSocket socket) throws PeerDisconnectedException, PeerIsSlowToReadException {
socket.put('{').putQuoted("ddl").put(':').putQuoted("OK").put('}');
socket.sendChunk();
socket.done();
}
private void internalError(
HttpChunkedResponseSocket socket,
Throwable e,
JsonQueryProcessorState state
) throws PeerDisconnectedException, PeerIsSlowToReadException {
error(state).$("Server error executing query ").$(state.query).$(e).$();
sendException(socket, 0, e.getMessage(), 500, state.query);
}
protected void header(
HttpChunkedResponseSocket socket,
int status
) throws PeerDisconnectedException, PeerIsSlowToReadException {
socket.status(status, "application/json; charset=utf-8");
// todo: configure this header externally
socket.headers().put("Keep-Alive: timeout=5, max=10000").put(Misc.EOL);
socket.sendHeader();
}
private void sendException(
HttpChunkedResponseSocket socket,
int position,
CharSequence message,
int status,
CharSequence query
) throws PeerDisconnectedException, PeerIsSlowToReadException {
header(socket, status);
socket.put('{').
putQuoted("query").put(':').encodeUtf8AndQuote(query == null ? "" : query).put(',').
putQuoted("error").put(':').encodeUtf8AndQuote(message).put(',').
putQuoted("position").put(':').put(position);
socket.put('}');
socket.sendChunk();
socket.done();
}
private LogRecord error(JsonQueryProcessorState state) {
return LOG.error().$('[').$(state.fd).$("] ");
}
private LogRecord info(JsonQueryProcessorState state) {
return LOG.info().$('[').$(state.fd).$("] ");
}
@Override
public void resumeSend(
HttpConnectionContext context,
IODispatcher<HttpConnectionContext> dispatcher
) throws PeerDisconnectedException, PeerIsSlowToReadException {
JsonQueryProcessorState state = LV.get(context);
if (state == null || state.cursor == null) {
return;
}
final HttpChunkedResponseSocket socket = context.getChunkedResponseSocket();
final int columnCount = state.metadata.getColumnCount();
OUT:
while (true) {
try {
SWITCH:
switch (state.queryState) {
case QUERY_PREFIX:
if (state.noMeta) {
socket.put('{').putQuoted("dataset").put(":[");
state.queryState = QUERY_RECORD_START;
break;
}
socket.bookmark();
socket.put('{').putQuoted("query").put(':').encodeUtf8AndQuote(state.query);
socket.put(',').putQuoted("columns").put(':').put('[');
state.queryState = QUERY_METADATA;
state.columnIndex = 0;
// fall through
case QUERY_METADATA:
for (; state.columnIndex < columnCount; state.columnIndex++) {
socket.bookmark();
if (state.columnIndex > 0) {
socket.put(',');
}
socket.put('{').
putQuoted("name").put(':').putQuoted(state.metadata.getColumnName(state.columnIndex)).
put(',').
putQuoted("type").put(':').putQuoted(ColumnType.nameOf(state.metadata.getColumnType(state.columnIndex)));
socket.put('}');
}
state.queryState = QUERY_META_SUFFIX;
// fall through
case QUERY_META_SUFFIX:
socket.bookmark();
socket.put("],\"dataset\":[");
state.queryState = QUERY_RECORD_START;
// fall through
case QUERY_RECORD_START:
if (state.record == null) {
// check if cursor has any records
state.record = state.cursor.getRecord();
while (true) {
if (state.cursor.hasNext()) {
state.count++;
if (state.fetchAll && state.count > state.stop) {
state.cancellationHandler.check();
continue;
}
if (state.count > state.skip) {
break;
}
} else {
state.queryState = QUERY_DATA_SUFFIX;
break SWITCH;
}
}
}
if (state.count > state.stop) {
state.queryState = QUERY_DATA_SUFFIX;
break;
}
socket.bookmark();
if (state.count > state.skip + 1) {
socket.put(',');
}
socket.put('[');
state.queryState = QUERY_RECORD_COLUMNS;
state.columnIndex = 0;
// fall through
case QUERY_RECORD_COLUMNS:
for (; state.columnIndex < columnCount; state.columnIndex++) {
socket.bookmark();
if (state.columnIndex > 0) {
socket.put(',');
}
putValue(socket, state.metadata.getColumnType(state.columnIndex), state.record, state.columnIndex);
}
state.queryState = QUERY_RECORD_SUFFIX;
// fall through
case QUERY_RECORD_SUFFIX:
socket.bookmark();
socket.put(']');
state.record = null;
state.queryState = QUERY_RECORD_START;
break;
case QUERY_DATA_SUFFIX:
sendDone(socket, state);
break OUT;
default:
break OUT;
}
} catch (NoSpaceLeftInResponseBufferException ignored) {
if (socket.resetToBookmark()) {
socket.sendChunk();
} else {
// what we have here is out unit of data, column value or query
// is larger that response content buffer
// all we can do in this scenario is to log appropriately
// and disconnect socket
info(state).$("Response buffer is too small, state=").$(state.queryState).$();
throw PeerDisconnectedException.INSTANCE;
}
}
}
// reached the end naturally?
readyForNextRequest(context, dispatcher);
}
private void readyForNextRequest(HttpConnectionContext context, IODispatcher<HttpConnectionContext> dispatcher) {
context.clear();
dispatcher.registerChannel(context, IOOperation.READ);
}
long getCacheHits() {
return cacheHits.longValue();
}
long getCacheMisses() {
return cacheMisses.longValue();
}
private void sendDone(
HttpChunkedResponseSocket socket,
JsonQueryProcessorState state
) throws PeerDisconnectedException, PeerIsSlowToReadException {
if (state.count > -1) {
socket.bookmark();
socket.put(']');
socket.put(',').putQuoted("count").put(':').put(state.count);
socket.put('}');
state.count = -1;
socket.sendChunk();
}
socket.done();
}
}
/*******************************************************************************
* ___ _ ____ ____
* / _ \ _ _ ___ ___| |_| _ \| __ )
* | | | | | | |/ _ \/ __| __| | | | _ \
* | |_| | |_| | __/\__ \ |_| |_| | |_) |
* \__\_\\__,_|\___||___/\__|____/|____/
*
* Copyright (C) 2014-2019 Appsicle
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
******************************************************************************/
package com.questdb.cutlass.http.processors;
public class JsonQueryProcessorState extends AbstractQueryContext {
boolean fetchAll = false;
boolean noMeta = false;
public JsonQueryProcessorState(long fd, int cyclesBeforeCancel) {
super(fd, cyclesBeforeCancel);
}
@Override
public void clear() {
super.clear();
queryState = JsonQueryProcessor.QUERY_PREFIX;
fetchAll = false;
}
}
...@@ -80,7 +80,7 @@ public class TextImportProcessor implements HttpRequestProcessor, HttpMultipartC ...@@ -80,7 +80,7 @@ public class TextImportProcessor implements HttpRequestProcessor, HttpMultipartC
this.engine = cairoEngine; this.engine = cairoEngine;
} }
private static void resumeJson(TextImportProcessorState state, HttpResponseSink.ChunkedResponseImpl r) throws PeerDisconnectedException, PeerIsSlowToReadException { private static void resumeJson(TextImportProcessorState state, HttpChunkedResponseSocket socket) throws PeerDisconnectedException, PeerIsSlowToReadException {
final TextLoader textLoader = state.textLoader; final TextLoader textLoader = state.textLoader;
final RecordMetadata m = textLoader.getMetadata(); final RecordMetadata m = textLoader.getMetadata();
final int columnCount = m.getColumnCount(); final int columnCount = m.getColumnCount();
...@@ -91,7 +91,7 @@ public class TextImportProcessor implements HttpRequestProcessor, HttpMultipartC ...@@ -91,7 +91,7 @@ public class TextImportProcessor implements HttpRequestProcessor, HttpMultipartC
case RESPONSE_PREFIX: case RESPONSE_PREFIX:
long totalRows = state.textLoader.getParsedLineCount(); long totalRows = state.textLoader.getParsedLineCount();
long importedRows = state.textLoader.getWrittenLineCount(); long importedRows = state.textLoader.getWrittenLineCount();
r.put('{') socket.put('{')
.putQuoted("status").put(':').putQuoted("OK").put(',') .putQuoted("status").put(':').putQuoted("OK").put(',')
.putQuoted("location").put(':').encodeUtf8AndQuote(textLoader.getTableName()).put(',') .putQuoted("location").put(':').encodeUtf8AndQuote(textLoader.getTableName()).put(',')
.putQuoted("rowsRejected").put(':').put(totalRows - importedRows).put(',') .putQuoted("rowsRejected").put(':').put(totalRows - importedRows).put(',')
...@@ -102,11 +102,11 @@ public class TextImportProcessor implements HttpRequestProcessor, HttpMultipartC ...@@ -102,11 +102,11 @@ public class TextImportProcessor implements HttpRequestProcessor, HttpMultipartC
// fall through // fall through
case RESPONSE_COLUMN: case RESPONSE_COLUMN:
for (; state.columnIndex < columnCount; state.columnIndex++) { for (; state.columnIndex < columnCount; state.columnIndex++) {
r.bookmark(); socket.bookmark();
if (state.columnIndex > 0) { if (state.columnIndex > 0) {
r.put(','); socket.put(',');
} }
r.put('{'). socket.put('{').
putQuoted("name").put(':').putQuoted(m.getColumnName(state.columnIndex)).put(','). putQuoted("name").put(':').putQuoted(m.getColumnName(state.columnIndex)).put(',').
putQuoted("type").put(':').putQuoted(com.questdb.cairo.ColumnType.nameOf(m.getColumnType(state.columnIndex))).put(','). putQuoted("type").put(':').putQuoted(com.questdb.cairo.ColumnType.nameOf(m.getColumnType(state.columnIndex))).put(',').
putQuoted("size").put(':').put(com.questdb.cairo.ColumnType.sizeOf(m.getColumnType(state.columnIndex))).put(','). putQuoted("size").put(':').put(com.questdb.cairo.ColumnType.sizeOf(m.getColumnType(state.columnIndex))).put(',').
...@@ -121,15 +121,15 @@ public class TextImportProcessor implements HttpRequestProcessor, HttpMultipartC ...@@ -121,15 +121,15 @@ public class TextImportProcessor implements HttpRequestProcessor, HttpMultipartC
// r.put(',').putQuoted("locale").put(':').putQuoted(im.dateLocale.getId()); // r.put(',').putQuoted("locale").put(':').putQuoted(im.dateLocale.getId());
// } // }
r.put('}'); socket.put('}');
} }
state.responseState = RESPONSE_SUFFIX; state.responseState = RESPONSE_SUFFIX;
// fall through // fall through
case RESPONSE_SUFFIX: case RESPONSE_SUFFIX:
r.bookmark(); socket.bookmark();
r.put(']').put('}'); socket.put(']').put('}');
r.sendChunk(); socket.sendChunk();
r.done(); socket.done();
break; break;
default: default:
break; break;
...@@ -181,67 +181,67 @@ public class TextImportProcessor implements HttpRequestProcessor, HttpMultipartC ...@@ -181,67 +181,67 @@ public class TextImportProcessor implements HttpRequestProcessor, HttpMultipartC
b.put("+\r\n"); b.put("+\r\n");
} }
private static void resumeText(TextImportProcessorState h, HttpResponseSink.ChunkedResponseImpl r) throws PeerDisconnectedException, PeerIsSlowToReadException { private static void resumeText(TextImportProcessorState state, HttpChunkedResponseSocket socket) throws PeerDisconnectedException, PeerIsSlowToReadException {
final TextLoader textLoader = h.textLoader; final TextLoader textLoader = state.textLoader;
final RecordMetadata metadata = textLoader.getMetadata(); final RecordMetadata metadata = textLoader.getMetadata();
LongList errors = textLoader.getColumnErrorCounts(); LongList errors = textLoader.getColumnErrorCounts();
switch (h.responseState) { switch (state.responseState) {
case RESPONSE_PREFIX: case RESPONSE_PREFIX:
sep(r); sep(socket);
r.put('|'); socket.put('|');
pad(r, TO_STRING_COL1_PAD, "Location:"); pad(socket, TO_STRING_COL1_PAD, "Location:");
pad(r, TO_STRING_COL2_PAD, textLoader.getTableName()); pad(socket, TO_STRING_COL2_PAD, textLoader.getTableName());
pad(r, TO_STRING_COL3_PAD, "Pattern"); pad(socket, TO_STRING_COL3_PAD, "Pattern");
pad(r, TO_STRING_COL4_PAD, "Locale"); pad(socket, TO_STRING_COL4_PAD, "Locale");
pad(r, TO_STRING_COL5_PAD, "Errors").put(Misc.EOL); pad(socket, TO_STRING_COL5_PAD, "Errors").put(Misc.EOL);
r.put('|'); socket.put('|');
pad(r, TO_STRING_COL1_PAD, "Partition by"); pad(socket, TO_STRING_COL1_PAD, "Partition by");
pad(r, TO_STRING_COL2_PAD, PartitionBy.toString(textLoader.getPartitionBy())); pad(socket, TO_STRING_COL2_PAD, PartitionBy.toString(textLoader.getPartitionBy()));
pad(r, TO_STRING_COL3_PAD, ""); pad(socket, TO_STRING_COL3_PAD, "");
pad(r, TO_STRING_COL4_PAD, ""); pad(socket, TO_STRING_COL4_PAD, "");
pad(r, TO_STRING_COL5_PAD, "").put(Misc.EOL); pad(socket, TO_STRING_COL5_PAD, "").put(Misc.EOL);
sep(r); sep(socket);
r.put('|'); socket.put('|');
pad(r, TO_STRING_COL1_PAD, "Rows handled"); pad(socket, TO_STRING_COL1_PAD, "Rows handled");
pad(r, TO_STRING_COL2_PAD, textLoader.getParsedLineCount()); pad(socket, TO_STRING_COL2_PAD, textLoader.getParsedLineCount());
pad(r, TO_STRING_COL3_PAD, ""); pad(socket, TO_STRING_COL3_PAD, "");
pad(r, TO_STRING_COL4_PAD, ""); pad(socket, TO_STRING_COL4_PAD, "");
pad(r, TO_STRING_COL5_PAD, "").put(Misc.EOL); pad(socket, TO_STRING_COL5_PAD, "").put(Misc.EOL);
r.put('|'); socket.put('|');
pad(r, TO_STRING_COL1_PAD, "Rows imported"); pad(socket, TO_STRING_COL1_PAD, "Rows imported");
pad(r, TO_STRING_COL2_PAD, textLoader.getWrittenLineCount()); pad(socket, TO_STRING_COL2_PAD, textLoader.getWrittenLineCount());
pad(r, TO_STRING_COL3_PAD, ""); pad(socket, TO_STRING_COL3_PAD, "");
pad(r, TO_STRING_COL4_PAD, ""); pad(socket, TO_STRING_COL4_PAD, "");
pad(r, TO_STRING_COL5_PAD, "").put(Misc.EOL); pad(socket, TO_STRING_COL5_PAD, "").put(Misc.EOL);
sep(r); sep(socket);
h.responseState = RESPONSE_COLUMN; state.responseState = RESPONSE_COLUMN;
// fall through // fall through
case RESPONSE_COLUMN: case RESPONSE_COLUMN:
final int columnCount = metadata.getColumnCount(); final int columnCount = metadata.getColumnCount();
for (; h.columnIndex < columnCount; h.columnIndex++) { for (; state.columnIndex < columnCount; state.columnIndex++) {
r.bookmark(); socket.bookmark();
r.put('|'); socket.put('|');
pad(r, TO_STRING_COL1_PAD, h.columnIndex); pad(socket, TO_STRING_COL1_PAD, state.columnIndex);
// col(r, m.getColumnQuick(h.columnIndex), importedMetadata.getQuick(h.columnIndex)); // col(r, m.getColumnQuick(h.columnIndex), importedMetadata.getQuick(h.columnIndex));
pad(r, TO_STRING_COL5_PAD, errors.getQuick(h.columnIndex)); pad(socket, TO_STRING_COL5_PAD, errors.getQuick(state.columnIndex));
r.put(Misc.EOL); socket.put(Misc.EOL);
} }
h.responseState = RESPONSE_SUFFIX; state.responseState = RESPONSE_SUFFIX;
// fall through // fall through
case RESPONSE_SUFFIX: case RESPONSE_SUFFIX:
r.bookmark(); socket.bookmark();
sep(r); sep(socket);
r.sendChunk(); socket.sendChunk();
r.done(); socket.done();
break; break;
default: default:
break; break;
...@@ -360,21 +360,27 @@ public class TextImportProcessor implements HttpRequestProcessor, HttpMultipartC ...@@ -360,21 +360,27 @@ public class TextImportProcessor implements HttpRequestProcessor, HttpMultipartC
} }
@Override @Override
public void resumeSend(HttpConnectionContext context, IODispatcher<HttpConnectionContext> dispatcher) throws PeerDisconnectedException, PeerIsSlowToReadException { public void resumeSend(
HttpConnectionContext context,
IODispatcher<HttpConnectionContext> dispatcher
) throws PeerDisconnectedException, PeerIsSlowToReadException {
doResumeSend(LV.get(context), context.getChunkedResponseSocket()); doResumeSend(LV.get(context), context.getChunkedResponseSocket());
} }
private void doResumeSend(TextImportProcessorState state, HttpResponseSink.ChunkedResponseImpl response) throws PeerDisconnectedException, PeerIsSlowToReadException { private void doResumeSend(
TextImportProcessorState state,
HttpChunkedResponseSocket socket
) throws PeerDisconnectedException, PeerIsSlowToReadException {
try { try {
if (state.json) { if (state.json) {
resumeJson(state, response); resumeJson(state, socket);
} else { } else {
resumeText(state, response); resumeText(state, socket);
} }
} catch (NoSpaceLeftInResponseBufferException ignored) { } catch (NoSpaceLeftInResponseBufferException ignored) {
if (response.resetToBookmark()) { if (socket.resetToBookmark()) {
response.sendChunk(); socket.sendChunk();
} else { } else {
// what we have here is out unit of data, column value or query // what we have here is out unit of data, column value or query
// is larger that response content buffer // is larger that response content buffer
...@@ -397,17 +403,21 @@ public class TextImportProcessor implements HttpRequestProcessor, HttpMultipartC ...@@ -397,17 +403,21 @@ public class TextImportProcessor implements HttpRequestProcessor, HttpMultipartC
transientState.stateMessage = e.getMessage(); transientState.stateMessage = e.getMessage();
} }
private void sendError(HttpConnectionContext context, String message, boolean json) throws PeerDisconnectedException, PeerIsSlowToReadException { private void sendError(
HttpResponseSink.ChunkedResponseImpl sink = context.getChunkedResponseSocket(); HttpConnectionContext context,
String message,
boolean json
) throws PeerDisconnectedException, PeerIsSlowToReadException {
final HttpChunkedResponseSocket socket = context.getChunkedResponseSocket();
if (json) { if (json) {
sink.status(200, CONTENT_TYPE_JSON); socket.status(200, CONTENT_TYPE_JSON);
sink.put('{').putQuoted("status").put(':').encodeUtf8AndQuote(message).put('}'); socket.put('{').putQuoted("status").put(':').encodeUtf8AndQuote(message).put('}');
} else { } else {
sink.status(400, CONTENT_TYPE_TEXT); socket.status(400, CONTENT_TYPE_TEXT);
sink.encodeUtf8(message); socket.encodeUtf8(message);
} }
// todo: is this needed, both of these? // todo: is this needed, both of these?
sink.done(); socket.done();
throw PeerDisconnectedException.INSTANCE; throw PeerDisconnectedException.INSTANCE;
} }
...@@ -415,16 +425,16 @@ public class TextImportProcessor implements HttpRequestProcessor, HttpMultipartC ...@@ -415,16 +425,16 @@ public class TextImportProcessor implements HttpRequestProcessor, HttpMultipartC
TextImportProcessorState state = LV.get(context); TextImportProcessorState state = LV.get(context);
// todo: may be set this up when headers are ready? // todo: may be set this up when headers are ready?
state.json = Chars.equalsNc("json", context.getRequestHeader().getUrlParam("fmt")); state.json = Chars.equalsNc("json", context.getRequestHeader().getUrlParam("fmt"));
HttpResponseSink.ChunkedResponseImpl response = context.getChunkedResponseSocket(); HttpChunkedResponseSocket socket = context.getChunkedResponseSocket();
if (state.state == TextImportProcessorState.STATE_OK) { if (state.state == TextImportProcessorState.STATE_OK) {
if (state.json) { if (state.json) {
response.status(200, CONTENT_TYPE_JSON); socket.status(200, CONTENT_TYPE_JSON);
} else { } else {
response.status(200, CONTENT_TYPE_TEXT); socket.status(200, CONTENT_TYPE_TEXT);
} }
response.sendHeader(); socket.sendHeader();
doResumeSend(state, response); doResumeSend(state, socket);
} else { } else {
sendError(context, state.stateMessage, state.json); sendError(context, state.stateMessage, state.json);
} }
......
...@@ -43,17 +43,28 @@ public class Worker extends Thread { ...@@ -43,17 +43,28 @@ public class Worker extends Thread {
@SuppressWarnings("FieldCanBeLocal") @SuppressWarnings("FieldCanBeLocal")
private volatile int running = 0; private volatile int running = 0;
private volatile int fence; private volatile int fence;
private final Runnable cleaner;
public Worker(ObjHashSet<? extends Job> jobs, SOCountDownLatch haltLatch) {
this(jobs, haltLatch, -1, null); public Worker(
} ObjHashSet<? extends Job> jobs,
SOCountDownLatch haltLatch
public Worker(ObjHashSet<? extends Job> jobs, SOCountDownLatch haltLatch, int affinity, Log log) { ) {
this(jobs, haltLatch, -1, null, null);
}
public Worker(
final ObjHashSet<? extends Job> jobs,
final SOCountDownLatch haltLatch,
final int affinity,
final Log log,
final Runnable cleaner
) {
this.log = log; this.log = log;
this.jobs = jobs; this.jobs = jobs;
this.haltLatch = haltLatch; this.haltLatch = haltLatch;
this.setName("questdb-worker-" + COUNTER.incrementAndGet()); this.setName("questdb-worker-" + COUNTER.incrementAndGet());
this.affinity = affinity; this.affinity = affinity;
this.cleaner = cleaner;
} }
public void halt() { public void halt() {
...@@ -114,6 +125,11 @@ public class Worker extends Thread { ...@@ -114,6 +125,11 @@ public class Worker extends Thread {
} }
} }
} }
// cleaner will typically attempt to release
// thread-local instances
if (cleaner != null) {
cleaner.run();
}
haltLatch.countDown(); haltLatch.countDown();
} }
......
...@@ -139,11 +139,15 @@ public abstract class AbstractIODispatcher<C extends IOContext> extends Synchron ...@@ -139,11 +139,15 @@ public abstract class AbstractIODispatcher<C extends IOContext> extends Synchron
protected void accept(long timestamp) { protected void accept(long timestamp) {
while (true) { while (true) {
// this accept is greedy, rather than to rely on epoll(or similar) to
// fire accept requests at us one at a time we will be actively accepting
// until nothing left.
long fd = nf.accept(serverFd); long fd = nf.accept(serverFd);
if (fd < 0) { if (fd < 0) {
if (nf.errno() != Net.EWOULDBLOCK) { if (nf.errno() != Net.EWOULDBLOCK) {
LOG.error().$("could not accept [errno=").$(nf.errno()).$(']').$(); LOG.error().$("could not accept [ret=").$(fd).$(", errno=").$(nf.errno()).$(']').$();
} }
return; return;
} }
......
...@@ -46,7 +46,11 @@ public class CairoTestUtils { ...@@ -46,7 +46,11 @@ public class CairoTestUtils {
} }
public static void createTestTable(int n, Rnd rnd, TestRecord.ArrayBinarySequence binarySequence) { public static void createTestTable(int n, Rnd rnd, TestRecord.ArrayBinarySequence binarySequence) {
try (TableModel model = new TableModel(AbstractCairoTest.configuration, "x", PartitionBy.NONE)) { createTestTable(AbstractCairoTest.configuration, n, rnd, binarySequence);
}
public static void createTestTable(CairoConfiguration configuration, int n, Rnd rnd, TestRecord.ArrayBinarySequence binarySequence) {
try (TableModel model = new TableModel(configuration, "x", PartitionBy.NONE)) {
model model
.col("a", ColumnType.BYTE) .col("a", ColumnType.BYTE)
.col("b", ColumnType.SHORT) .col("b", ColumnType.SHORT)
...@@ -63,7 +67,7 @@ public class CairoTestUtils { ...@@ -63,7 +67,7 @@ public class CairoTestUtils {
create(model); create(model);
} }
try (TableWriter writer = new TableWriter(AbstractCairoTest.configuration, "x")) { try (TableWriter writer = new TableWriter(configuration, "x")) {
for (int i = 0; i < n; i++) { for (int i = 0; i < n; i++) {
TableWriter.Row row = writer.newRow(); TableWriter.Row row = writer.newRow();
row.putByte(0, rnd.nextByte()); row.putByte(0, rnd.nextByte());
......
...@@ -23,9 +23,12 @@ ...@@ -23,9 +23,12 @@
package com.questdb.cutlass.http; package com.questdb.cutlass.http;
import com.questdb.cairo.CairoTestUtils;
import com.questdb.cairo.DefaultCairoConfiguration; import com.questdb.cairo.DefaultCairoConfiguration;
import com.questdb.cairo.Engine; import com.questdb.cairo.Engine;
import com.questdb.cairo.TestRecord;
import com.questdb.cairo.sql.CairoEngine; import com.questdb.cairo.sql.CairoEngine;
import com.questdb.cutlass.http.processors.JsonQueryProcessor;
import com.questdb.cutlass.http.processors.StaticContentProcessor; import com.questdb.cutlass.http.processors.StaticContentProcessor;
import com.questdb.cutlass.http.processors.StaticContentProcessorConfiguration; import com.questdb.cutlass.http.processors.StaticContentProcessorConfiguration;
import com.questdb.cutlass.http.processors.TextImportProcessor; import com.questdb.cutlass.http.processors.TextImportProcessor;
...@@ -56,6 +59,48 @@ import static com.questdb.cutlass.http.HttpConnectionContext.dump; ...@@ -56,6 +59,48 @@ import static com.questdb.cutlass.http.HttpConnectionContext.dump;
public class IODispatcherTest { public class IODispatcherTest {
private static Log LOG = LogFactory.getLog(IODispatcherTest.class); private static Log LOG = LogFactory.getLog(IODispatcherTest.class);
private static void assertDownloadResponse(long fd, Rnd rnd, long buffer, int len, int nonRepeatedContentLength, String expectedResponseHeader, long expectedResponseLen) {
int expectedHeaderLen = expectedResponseHeader.length();
int headerCheckRemaining = expectedResponseHeader.length();
long downloadedSoFar = 0;
int contentRemaining = 0;
while (downloadedSoFar < expectedResponseLen) {
int contentOffset = 0;
int n = Net.recv(fd, buffer, len);
Assert.assertTrue(n > -1);
if (n > 0) {
if (headerCheckRemaining > 0) {
for (int i = 0; i < n && headerCheckRemaining > 0; i++) {
if (expectedResponseHeader.charAt(expectedHeaderLen - headerCheckRemaining) != (char) Unsafe.getUnsafe().getByte(buffer + i)) {
Assert.fail("at " + (expectedHeaderLen - headerCheckRemaining));
}
headerCheckRemaining--;
contentOffset++;
}
}
if (headerCheckRemaining == 0) {
for (int i = contentOffset; i < n; i++) {
if (contentRemaining == 0) {
contentRemaining = nonRepeatedContentLength;
rnd.reset();
}
Assert.assertEquals(rnd.nextByte(), Unsafe.getUnsafe().getByte(buffer + i));
contentRemaining--;
}
}
downloadedSoFar += n;
}
}
}
private static void sendRequest(String request, long fd, long buffer) {
final int requestLen = request.length();
Chars.strcpy(request, requestLen, buffer);
Assert.assertEquals(requestLen, Net.send(fd, buffer, requestLen));
}
@Test @Test
public void testBiasWrite() throws Exception { public void testBiasWrite() throws Exception {
...@@ -185,6 +230,10 @@ public class IODispatcherTest { ...@@ -185,6 +230,10 @@ public class IODispatcherTest {
} }
}; };
} }
@Override
public void close() {
}
}; };
AtomicBoolean serverRunning = new AtomicBoolean(true); AtomicBoolean serverRunning = new AtomicBoolean(true);
...@@ -395,6 +444,127 @@ public class IODispatcherTest { ...@@ -395,6 +444,127 @@ public class IODispatcherTest {
}); });
} }
@Test
public void testJsonQuery() throws Exception {
TestUtils.assertMemoryLeak(() -> {
final String baseDir = System.getProperty("java.io.tmpdir");
final DefaultHttpServerConfiguration httpConfiguration = createHttpServerConfiguration(baseDir);
try (CairoEngine engine = new Engine(new DefaultCairoConfiguration(baseDir));
HttpServer httpServer = new HttpServer(httpConfiguration)) {
httpServer.bind(new HttpRequestProcessorFactory() {
@Override
public String getUrl() {
return HttpServerConfiguration.DEFAULT_PROCESSOR_URL;
}
@Override
public HttpRequestProcessor newInstance() {
return new StaticContentProcessor(httpConfiguration.getStaticContentProcessorConfiguration());
}
});
httpServer.bind(new HttpRequestProcessorFactory() {
@Override
public String getUrl() {
return "/query";
}
@Override
public HttpRequestProcessor newInstance() {
return new JsonQueryProcessor(engine);
}
});
httpServer.start();
// create table with all column types
CairoTestUtils.createTestTable(
engine.getConfiguration(),
20,
new Rnd(),
new TestRecord.ArrayBinarySequence());
// send multipart request to server
final String request = "GET /query?query=x%20where%20i%20%3D%20(%27EHNRX%27) HTTP/1.1\r\n" +
"Host: localhost:9001\r\n" +
"Connection: keep-alive\r\n" +
"Cache-Control: max-age=0\r\n" +
"Upgrade-Insecure-Requests: 1\r\n" +
"User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/74.0.3729.169 Safari/537.36\r\n" +
"Accept: text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3\r\n" +
"Accept-Encoding: gzip, deflate, br\r\n" +
"Accept-Language: en-GB,en-US;q=0.9,en;q=0.8\r\n" +
"\r\n";
byte[] expectedResponse = ("HTTP/1.1 200 OK\r\n" +
"Server: questDB/1.0\r\n" +
"Date: Thu, 1 Jan 1970 00:00:00 GMT\r\n" +
"Transfer-Encoding: chunked\r\n" +
"Content-Type: application/json; charset=utf-8\r\n" +
"Keep-Alive: timeout=5, max=10000\r\n" +
"\r\n" +
"205\r\n" +
"{\"query\":\"x where i = ('EHNRX')\",\"columns\":[{\"name\":\"a\",\"type\":\"BYTE\"},{\"name\":\"b\",\"type\":\"SHORT\"},{\"name\":\"c\",\"type\":\"INT\"},{\"name\":\"d\",\"type\":\"LONG\"},{\"name\":\"e\",\"type\":\"DATE\"},{\"name\":\"f\",\"type\":\"TIMESTAMP\"},{\"name\":\"g\",\"type\":\"FLOAT\"},{\"name\":\"h\",\"type\":\"DOUBLE\"},{\"name\":\"i\",\"type\":\"STRING\"},{\"name\":\"j\",\"type\":\"SYMBOL\"},{\"name\":\"k\",\"type\":\"BOOLEAN\"},{\"name\":\"l\",\"type\":\"BINARY\"}],\"dataset\":[[80,24814,-727724771,8920866532787660373,\"-169665660-01-09T01:58:28.119Z\",,null,null,\"EHNRX\",\"ZSX\",false,[]]],\"count\":1}\r\n" +
"0\r\n" +
"\r\n").getBytes();
long fd = Net.socketTcp(true);
try {
long sockAddr = Net.sockaddr("127.0.0.1", 9001);
try {
Assert.assertTrue(fd > -1);
Assert.assertEquals(0, Net.connect(fd, sockAddr));
Net.setTcpNoDelay(fd, true);
final int len = Math.max(expectedResponse.length, request.length()) * 2;
long ptr = Unsafe.malloc(len);
try {
for (int j = 0; j < 10_000; j++) {
int sent = 0;
int reqLen = request.length();
Chars.strcpy(request, reqLen, ptr);
while (sent < reqLen) {
int n = Net.send(fd, ptr + sent, reqLen - sent);
Assert.assertTrue(n > -1);
sent += n;
}
// receive response
final int expectedToReceive = expectedResponse.length;
int received = 0;
while (received < expectedToReceive) {
int n = Net.recv(fd, ptr + received, len - received);
if (n > 0) {
// compare bytes
for (int i = 0; i < n; i++) {
if (expectedResponse[received + i] != Unsafe.getUnsafe().getByte(ptr + received + i)) {
Assert.fail("Error at: " + (received + i) + ", local=" + i);
}
}
received += n;
} else if (n < 0) {
LOG.error().$("disconnected? n=").$(n).$();
Assert.fail();
}
}
}
} finally {
Unsafe.free(ptr, len);
}
} finally {
Net.freeSockAddr(sockAddr);
}
} finally {
Net.close(fd);
}
httpServer.halt();
}
});
}
@Test @Test
public void testImportMultipleOnSameConnectionFragmented() throws Exception { public void testImportMultipleOnSameConnectionFragmented() throws Exception {
TestUtils.assertMemoryLeak(() -> { TestUtils.assertMemoryLeak(() -> {
...@@ -781,6 +951,10 @@ public class IODispatcherTest { ...@@ -781,6 +951,10 @@ public class IODispatcherTest {
} }
}; };
} }
@Override
public void close() {
}
}; };
AtomicBoolean serverRunning = new AtomicBoolean(true); AtomicBoolean serverRunning = new AtomicBoolean(true);
...@@ -1241,6 +1415,10 @@ public class IODispatcherTest { ...@@ -1241,6 +1415,10 @@ public class IODispatcherTest {
public HttpRequestProcessor getDefaultProcessor() { public HttpRequestProcessor getDefaultProcessor() {
return null; return null;
} }
@Override
public void close() {
}
}; };
AtomicBoolean serverRunning = new AtomicBoolean(true); AtomicBoolean serverRunning = new AtomicBoolean(true);
...@@ -1377,6 +1555,11 @@ public class IODispatcherTest { ...@@ -1377,6 +1555,11 @@ public class IODispatcherTest {
final HttpRequestProcessorSelector selector = final HttpRequestProcessorSelector selector =
new HttpRequestProcessorSelector() { new HttpRequestProcessorSelector() {
@Override
public void close() {
}
@Override @Override
public HttpRequestProcessor select(CharSequence url) { public HttpRequestProcessor select(CharSequence url) {
return null; return null;
...@@ -1531,6 +1714,11 @@ public class IODispatcherTest { ...@@ -1531,6 +1714,11 @@ public class IODispatcherTest {
StringSink sink = new StringSink(); StringSink sink = new StringSink();
HttpRequestProcessorSelector selector = new HttpRequestProcessorSelector() { HttpRequestProcessorSelector selector = new HttpRequestProcessorSelector() {
@Override
public void close() {
}
@Override @Override
public HttpRequestProcessor select(CharSequence url) { public HttpRequestProcessor select(CharSequence url) {
return null; return null;
...@@ -1734,6 +1922,11 @@ public class IODispatcherTest { ...@@ -1734,6 +1922,11 @@ public class IODispatcherTest {
}; };
HttpRequestProcessorSelector selector = new HttpRequestProcessorSelector() { HttpRequestProcessorSelector selector = new HttpRequestProcessorSelector() {
@Override
public void close() {
}
@Override @Override
public HttpRequestProcessor select(CharSequence url) { public HttpRequestProcessor select(CharSequence url) {
return null; return null;
...@@ -1849,51 +2042,6 @@ public class IODispatcherTest { ...@@ -1849,51 +2042,6 @@ public class IODispatcherTest {
}); });
} }
private static void assertDownloadResponse(long fd, Rnd rnd, long buffer, int len, int nonRepeatedContentLength, String expectedResponseHeader, long expectedResponseLen) {
int expectedHeaderLen = expectedResponseHeader.length();
int headerCheckRemaining = expectedResponseHeader.length();
long downloadedSoFar = 0;
int contentRemaining = 0;
while (downloadedSoFar < expectedResponseLen) {
int contentOffset = 0;
int n = Net.recv(fd, buffer, len);
Assert.assertTrue(n > -1);
if (n > 0) {
if (headerCheckRemaining > 0) {
for (int i = 0; i < n && headerCheckRemaining > 0; i++) {
// System.out.print((char) Unsafe.getUnsafe().getByte(buffer + i));
if (expectedResponseHeader.charAt(expectedHeaderLen - headerCheckRemaining) != (char) Unsafe.getUnsafe().getByte(buffer + i)) {
Assert.fail("at " + (expectedHeaderLen - headerCheckRemaining));
}
headerCheckRemaining--;
contentOffset++;
}
}
if (headerCheckRemaining == 0) {
for (int i = contentOffset; i < n; i++) {
if (contentRemaining == 0) {
contentRemaining = nonRepeatedContentLength;
rnd.reset();
}
// System.out.print((char)Unsafe.getUnsafe().getByte(buffer + i));
Assert.assertEquals(rnd.nextByte(), Unsafe.getUnsafe().getByte(buffer + i));
contentRemaining--;
}
}
// System.out.println(downloadedSoFar);
downloadedSoFar += n;
}
}
}
private static void sendRequest(String request, long fd, long buffer) {
final int requestLen = request.length();
Chars.strcpy(request, requestLen, buffer);
Assert.assertEquals(requestLen, Net.send(fd, buffer, requestLen));
}
@NotNull @NotNull
private DefaultHttpServerConfiguration createHttpServerConfiguration(String baseDir) { private DefaultHttpServerConfiguration createHttpServerConfiguration(String baseDir) {
return new DefaultHttpServerConfiguration() { return new DefaultHttpServerConfiguration() {
......
...@@ -748,6 +748,8 @@ public class HttpServerTest extends AbstractJournalTest { ...@@ -748,6 +748,8 @@ public class HttpServerTest extends AbstractJournalTest {
} }
@Test @Test
@Ignore
// this test fails intermittently, wont fix, this is outgoing code
public void testMaxConnections() throws Exception { public void testMaxConnections() throws Exception {
final ServerConfiguration configuration = new ServerConfiguration(new File(resourceFile("/site"), "conf/questdb.conf")); final ServerConfiguration configuration = new ServerConfiguration(new File(resourceFile("/site"), "conf/questdb.conf"));
BootstrapEnv env = new BootstrapEnv(); BootstrapEnv env = new BootstrapEnv();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册