diff --git a/core/src/main/java/io/questdb/cutlass/http/processors/AbstractQueryContext.java b/core/src/main/java/io/questdb/cutlass/http/processors/AbstractQueryContext.java index 7e14de1f3ef35c145609fe5908834be676a0fe94..e8725d02a5e6177e1ff682c1cd1480e3dd0a3da7 100644 --- a/core/src/main/java/io/questdb/cutlass/http/processors/AbstractQueryContext.java +++ b/core/src/main/java/io/questdb/cutlass/http/processors/AbstractQueryContext.java @@ -35,13 +35,13 @@ import java.io.Closeable; public abstract class AbstractQueryContext implements Mutable, Closeable { - static final int QUERY_NEXT_RECORD = 9; + static final int QUERY_RECORD_PREFIX = 9; static final int QUERY_SETUP_FIRST_RECORD = 8; - static final int QUERY_DATA_SUFFIX = 7; + static final int QUERY_SUFFIX = 7; static final int QUERY_RECORD_SUFFIX = 6; - static final int QUERY_RECORD_COLUMNS = 5; + static final int QUERY_RECORD = 5; static final int QUERY_RECORD_START = 4; - static final int QUERY_META_SUFFIX = 3; + static final int QUERY_METADATA_SUFFIX = 3; static final int QUERY_METADATA = 2; static final int QUERY_PREFIX = 1; // Factory cache is thread local due to possibility of factory being diff --git a/core/src/main/java/io/questdb/cutlass/http/processors/JsonQueryProcessor.java b/core/src/main/java/io/questdb/cutlass/http/processors/JsonQueryProcessor.java index 66103b19c0ef502caf740080e70daba953a935f9..5b0fee6f358ebda591dea2c27ae8c30fd39954b0 100644 --- a/core/src/main/java/io/questdb/cutlass/http/processors/JsonQueryProcessor.java +++ b/core/src/main/java/io/questdb/cutlass/http/processors/JsonQueryProcessor.java @@ -57,6 +57,7 @@ public class JsonQueryProcessor implements HttpRequestProcessor, Closeable { private final int doubleScale; private final SqlExecutionContextImpl sqlExecutionContext = new SqlExecutionContextImpl(); private final ObjList valueWriters = new ObjList<>(); + private final ObjList resumeActions = new ObjList<>(); public JsonQueryProcessor(JsonQueryProcessorConfiguration configuration, CairoEngine engine) { // todo: add scheduler @@ -79,6 +80,15 @@ public class JsonQueryProcessor implements HttpRequestProcessor, Closeable { this.valueWriters.extendAndSet(ColumnType.SYMBOL, this::putSymValue); this.valueWriters.extendAndSet(ColumnType.BINARY, this::putBinValue); this.valueWriters.extendAndSet(ColumnType.LONG256, this::putLong256Value); + + resumeActions.extendAndSet(AbstractQueryContext.QUERY_PREFIX, this::onQueryPrefix); + resumeActions.extendAndSet(AbstractQueryContext.QUERY_METADATA, this::onQueryMetadata); + resumeActions.extendAndSet(AbstractQueryContext.QUERY_METADATA_SUFFIX, this::onQueryMetadataSuffix); + resumeActions.extendAndSet(AbstractQueryContext.QUERY_SETUP_FIRST_RECORD, this::doFirstRecordLoop); + resumeActions.extendAndSet(AbstractQueryContext.QUERY_RECORD_PREFIX, this::onQueryRecordPrefix); + resumeActions.extendAndSet(AbstractQueryContext.QUERY_RECORD, this::onQueryRecord); + resumeActions.extendAndSet(AbstractQueryContext.QUERY_RECORD_SUFFIX, this::onQueryRecordSuffix); + resumeActions.extendAndSet(AbstractQueryContext.QUERY_SUFFIX, this::doQuerySuffix); } private static void putStringOrNull(CharSink r, CharSequence str) { @@ -194,39 +204,10 @@ public class JsonQueryProcessor implements HttpRequestProcessor, Closeable { final HttpChunkedResponseSocket socket = context.getChunkedResponseSocket(); final int columnCount = state.metadata.getColumnCount(); - OUT: while (true) { try { - switch (state.queryState) { - case AbstractQueryContext.QUERY_PREFIX: - if (onQueryPrefix(state, socket)) { - break; - } - // fall through - case AbstractQueryContext.QUERY_METADATA: - onQeeryMetadata(state, socket, columnCount); - // fall through - case AbstractQueryContext.QUERY_META_SUFFIX: - onMetadataSuffix(state, socket); - // fall through - case AbstractQueryContext.QUERY_SETUP_FIRST_RECORD: - onSetupFirstRecord(state, socket); - break; - case AbstractQueryContext.QUERY_NEXT_RECORD: - onNextRecord(state, socket); - break; - case AbstractQueryContext.QUERY_RECORD_COLUMNS: - onRecordColumn(state, socket, columnCount); - // fall through - case AbstractQueryContext.QUERY_RECORD_SUFFIX: - onRecordSuffix(state, socket); - break; - case AbstractQueryContext.QUERY_DATA_SUFFIX: - onDataSuffix(socket, state); - break OUT; - default: - break OUT; - } + resumeActions.getQuick(state.queryState).onResume(state, socket, columnCount); + break; } catch (NoSpaceLeftInResponseBufferException ignored) { if (socket.resetToBookmark()) { socket.sendChunk(); @@ -244,51 +225,105 @@ public class JsonQueryProcessor implements HttpRequestProcessor, Closeable { readyForNextRequest(context, dispatcher); } - private void onSetupFirstRecord(JsonQueryProcessorState state, HttpChunkedResponseSocket socket) { + private void onQueryRecordSuffix(JsonQueryProcessorState state, HttpChunkedResponseSocket socket, int columnCount) throws PeerDisconnectedException, PeerIsSlowToReadException { + doQueryRecordSuffix(state, socket); + doNextRecordLoop(state, socket, columnCount); + } + + private void onQueryRecord(JsonQueryProcessorState state, HttpChunkedResponseSocket socket, int columnCount) throws PeerDisconnectedException, PeerIsSlowToReadException { + doQueryRecord(state, socket, columnCount); + onQueryRecordSuffix(state, socket, columnCount); + } + + private void onQueryRecordPrefix(JsonQueryProcessorState state, HttpChunkedResponseSocket socket, int columnCount) throws PeerDisconnectedException, PeerIsSlowToReadException { + doQueryRecordPrefix(state, socket); + onQueryRecord(state, socket, columnCount); + } + + private void onQueryMetadataSuffix(JsonQueryProcessorState state, HttpChunkedResponseSocket socket, int columnCount) throws PeerDisconnectedException, PeerIsSlowToReadException { + doQueryMetadataSuffix(state, socket); + doFirstRecordLoop(state, socket, columnCount); + } + + private void onQueryMetadata(JsonQueryProcessorState state, HttpChunkedResponseSocket socket, int columnCount) throws PeerDisconnectedException, PeerIsSlowToReadException { + doQueryMetadata(state, socket, columnCount); + onQueryMetadataSuffix(state, socket, columnCount); + } + + private void onQueryPrefix(JsonQueryProcessorState state, HttpChunkedResponseSocket socket, int columnCount) throws PeerDisconnectedException, PeerIsSlowToReadException { + if (doQueryPrefix(state, socket)) { + doQueryMetadata(state, socket, columnCount); + doQueryMetadataSuffix(state, socket); + } + doFirstRecordLoop(state, socket, columnCount); + } + + private void doNextRecordLoop(JsonQueryProcessorState state, HttpChunkedResponseSocket socket, int columnCount) throws PeerDisconnectedException, PeerIsSlowToReadException { + if (doQueryNextRecord(state)) { + doRecordFetchLoop(state, socket, columnCount); + } else { + doQuerySuffix(state, socket, columnCount); + } + } + + private void doRecordFetchLoop(JsonQueryProcessorState state, HttpChunkedResponseSocket socket, int columnCount) throws PeerDisconnectedException, PeerIsSlowToReadException { + do { + doQueryRecordPrefix(state, socket); + doQueryRecord(state, socket, columnCount); + doQueryRecordSuffix(state, socket); + } while (doQueryNextRecord(state)); + doQuerySuffix(state, socket, columnCount); + } + + private void doFirstRecordLoop(JsonQueryProcessorState state, HttpChunkedResponseSocket socket, int columnCount) throws PeerDisconnectedException, PeerIsSlowToReadException { + if (onQuerySetupFirstRecord(state)) { + doRecordFetchLoop(state, socket, columnCount); + } else { + doQuerySuffix(state, socket, columnCount); + } + } + + private boolean onQuerySetupFirstRecord(JsonQueryProcessorState state) { if (state.skip > 0) { final RecordCursor cursor = state.cursor; - long target = state.skip; + long target = state.skip + 1; while (target > 0 && cursor.hasNext()) { target--; } if (target > 0) { - state.queryState = AbstractQueryContext.QUERY_DATA_SUFFIX; - return; + return false; } state.count = state.skip; } else { - if (state.cursor.hasNext()) { - state.count++; - } else { - state.queryState = AbstractQueryContext.QUERY_DATA_SUFFIX; - return; + if (!state.cursor.hasNext()) { + return false; } } + state.columnIndex = 0; - state.queryState = AbstractQueryContext.QUERY_RECORD_COLUMNS; state.record = state.cursor.getRecord(); - socket.bookmark(); - socket.put('['); + return true; } - private void onNextRecord(JsonQueryProcessorState state, HttpChunkedResponseSocket socket) { + private boolean doQueryNextRecord(JsonQueryProcessorState state) { if (state.cursor.hasNext()) { - state.count++; - if (state.count > state.stop) { - onNoMoreData(state); + if (state.count < state.stop) { + return true; } else { - socket.bookmark(); - if (state.count > state.skip) { - socket.put(','); - } - socket.put('['); - - state.queryState = AbstractQueryContext.QUERY_RECORD_COLUMNS; - state.columnIndex = 0; + onNoMoreData(state); } - } else { - state.queryState = AbstractQueryContext.QUERY_DATA_SUFFIX; } + return false; + } + + private void doQueryRecordPrefix(JsonQueryProcessorState state, HttpChunkedResponseSocket socket) { + state.queryState = AbstractQueryContext.QUERY_RECORD_PREFIX; + socket.bookmark(); + if (state.count > state.skip) { + socket.put(','); + } + socket.put('['); + state.columnIndex = 0; } private void onNoMoreData(JsonQueryProcessorState state) { @@ -298,7 +333,7 @@ public class JsonQueryProcessor implements HttpRequestProcessor, Closeable { final RecordCursor cursor = state.cursor; final long size = cursor.size(); if (size < 0) { - long count = 0; + long count = 1; while (cursor.hasNext()) { count++; } @@ -306,19 +341,18 @@ public class JsonQueryProcessor implements HttpRequestProcessor, Closeable { } else { state.count = size; } - } else { - state.count--; } - state.queryState = AbstractQueryContext.QUERY_DATA_SUFFIX; } - private void onRecordSuffix(JsonQueryProcessorState state, HttpChunkedResponseSocket socket) { + private void doQueryRecordSuffix(JsonQueryProcessorState state, HttpChunkedResponseSocket socket) { + state.queryState = AbstractQueryContext.QUERY_RECORD_SUFFIX; + state.count++; socket.bookmark(); socket.put(']'); - state.queryState = AbstractQueryContext.QUERY_NEXT_RECORD; } - private void onRecordColumn(JsonQueryProcessorState state, HttpChunkedResponseSocket socket, int columnCount) { + private void doQueryRecord(JsonQueryProcessorState state, HttpChunkedResponseSocket socket, int columnCount) { + state.queryState = AbstractQueryContext.QUERY_RECORD; for (; state.columnIndex < columnCount; state.columnIndex++) { socket.bookmark(); if (state.columnIndex > 0) { @@ -329,21 +363,18 @@ public class JsonQueryProcessor implements HttpRequestProcessor, Closeable { vw.write(socket, state.record, state.columnIndex); } } - - state.queryState = AbstractQueryContext.QUERY_RECORD_SUFFIX; } - private void onMetadataSuffix(JsonQueryProcessorState state, HttpChunkedResponseSocket socket) { + private void doQueryMetadataSuffix(JsonQueryProcessorState state, HttpChunkedResponseSocket socket) { + state.queryState = AbstractQueryContext.QUERY_METADATA_SUFFIX; socket.bookmark(); socket.put("],\"dataset\":["); - state.queryState = AbstractQueryContext.QUERY_SETUP_FIRST_RECORD; } - private void onQeeryMetadata(JsonQueryProcessorState state, HttpChunkedResponseSocket socket, int columnCount) { + private void doQueryMetadata(JsonQueryProcessorState state, HttpChunkedResponseSocket socket, int columnCount) { + state.queryState = AbstractQueryContext.QUERY_METADATA; for (; state.columnIndex < columnCount; state.columnIndex++) { - socket.bookmark(); - if (state.columnIndex > 0) { socket.put(','); } @@ -353,21 +384,19 @@ public class JsonQueryProcessor implements HttpRequestProcessor, Closeable { putQuoted("type").put(':').putQuoted(ColumnType.nameOf(state.metadata.getColumnType(state.columnIndex))); socket.put('}'); } - state.queryState = AbstractQueryContext.QUERY_META_SUFFIX; } - private boolean onQueryPrefix(JsonQueryProcessorState state, HttpChunkedResponseSocket socket) { + private boolean doQueryPrefix(JsonQueryProcessorState state, HttpChunkedResponseSocket socket) { if (state.noMeta) { + socket.bookmark(); socket.put('{').putQuoted("dataset").put(":["); - state.queryState = AbstractQueryContext.QUERY_SETUP_FIRST_RECORD; - return true; + return false; } socket.bookmark(); socket.put('{').putQuoted("query").put(':').encodeUtf8AndQuote(state.query); socket.put(',').putQuoted("columns").put(':').put('['); - state.queryState = AbstractQueryContext.QUERY_METADATA; state.columnIndex = 0; - return false; + return true; } private LogRecord error(JsonQueryProcessorState state) { @@ -426,7 +455,7 @@ public class JsonQueryProcessor implements HttpRequestProcessor, Closeable { int sepPos = Chars.indexOf(limit, ','); try { if (sepPos > 0) { - skip = Numbers.parseLong(limit, 0, sepPos); + skip = Numbers.parseLong(limit, 0, sepPos) - 1; if (sepPos + 1 < limit.length()) { stop = Numbers.parseLong(limit, sepPos + 1, limit.length()); } @@ -550,10 +579,12 @@ public class JsonQueryProcessor implements HttpRequestProcessor, Closeable { socket.done(); } - private void onDataSuffix( + private void doQuerySuffix( + JsonQueryProcessorState state, HttpChunkedResponseSocket socket, - JsonQueryProcessorState state + int columnCount ) throws PeerDisconnectedException, PeerIsSlowToReadException { + state.queryState = AbstractQueryContext.QUERY_SUFFIX; if (state.count > -1) { socket.bookmark(); socket.put(']'); @@ -595,6 +626,11 @@ public class JsonQueryProcessor implements HttpRequestProcessor, Closeable { sendException(socket, sqlException.getPosition(), sqlException.getFlyweightMessage(), 400, state.query); } + @FunctionalInterface + private interface StateResumeAction { + void onResume(JsonQueryProcessorState state, HttpChunkedResponseSocket socket, int columnCount) throws PeerDisconnectedException, PeerIsSlowToReadException; + } + @FunctionalInterface private interface ValueWriter { void write(HttpChunkedResponseSocket socket, Record rec, int col); diff --git a/core/src/main/java/io/questdb/cutlass/http/processors/TextQueryProcessor.java b/core/src/main/java/io/questdb/cutlass/http/processors/TextQueryProcessor.java index 4c8279e7281f9aa6e4b12b0a1f7cca4c0ceac316..fe720b5b02d73c47280bc7c149a0d18933d8da3b 100644 --- a/core/src/main/java/io/questdb/cutlass/http/processors/TextQueryProcessor.java +++ b/core/src/main/java/io/questdb/cutlass/http/processors/TextQueryProcessor.java @@ -224,21 +224,21 @@ public class TextQueryProcessor implements HttpRequestProcessor, Closeable { break; } } else { - state.queryState = AbstractQueryContext.QUERY_DATA_SUFFIX; + state.queryState = AbstractQueryContext.QUERY_SUFFIX; break SWITCH; } } } if (state.count > state.stop) { - state.queryState = AbstractQueryContext.QUERY_DATA_SUFFIX; + state.queryState = AbstractQueryContext.QUERY_SUFFIX; break; } - state.queryState = AbstractQueryContext.QUERY_RECORD_COLUMNS; + state.queryState = AbstractQueryContext.QUERY_RECORD; state.columnIndex = 0; // fall through - case AbstractQueryContext.QUERY_RECORD_COLUMNS: + case AbstractQueryContext.QUERY_RECORD: for (; state.columnIndex < columnCount; state.columnIndex++) { socket.bookmark(); @@ -257,7 +257,7 @@ public class TextQueryProcessor implements HttpRequestProcessor, Closeable { state.record = null; state.queryState = AbstractQueryContext.QUERY_RECORD_START; break; - case AbstractQueryContext.QUERY_DATA_SUFFIX: + case AbstractQueryContext.QUERY_SUFFIX: sendDone(socket, state); break OUT; default: diff --git a/core/src/main/java/io/questdb/griffin/engine/join/CrossJoinRecordCursorFactory.java b/core/src/main/java/io/questdb/griffin/engine/join/CrossJoinRecordCursorFactory.java index 25c22238764fc3b7e308ebaa5bb4fb5ed6b0ac10..042a70adae0f1672cac1d32d8ed31666795e589b 100644 --- a/core/src/main/java/io/questdb/griffin/engine/join/CrossJoinRecordCursorFactory.java +++ b/core/src/main/java/io/questdb/griffin/engine/join/CrossJoinRecordCursorFactory.java @@ -32,7 +32,7 @@ import io.questdb.std.Misc; public class CrossJoinRecordCursorFactory extends AbstractRecordCursorFactory { private final RecordCursorFactory masterFactory; private final RecordCursorFactory slaveFactory; - private final HashJoinRecordCursor cursor; + private final CrossJoinRecordCursor cursor; public CrossJoinRecordCursorFactory( RecordMetadata metadata, @@ -44,7 +44,7 @@ public class CrossJoinRecordCursorFactory extends AbstractRecordCursorFactory { super(metadata); this.masterFactory = masterFactory; this.slaveFactory = slaveFactory; - this.cursor = new HashJoinRecordCursor(columnSplit); + this.cursor = new CrossJoinRecordCursor(columnSplit); } @Override @@ -71,13 +71,13 @@ public class CrossJoinRecordCursorFactory extends AbstractRecordCursorFactory { return false; } - private static class HashJoinRecordCursor implements NoRandomAccessRecordCursor { + private static class CrossJoinRecordCursor implements NoRandomAccessRecordCursor { private final JoinRecord record; private final int columnSplit; private RecordCursor masterCursor; private RecordCursor slaveCursor; - public HashJoinRecordCursor(int columnSplit) { + public CrossJoinRecordCursor(int columnSplit) { this.record = new JoinRecord(columnSplit); this.columnSplit = columnSplit; } diff --git a/core/src/test/java/io/questdb/cutlass/http/IODispatcherTest.java b/core/src/test/java/io/questdb/cutlass/http/IODispatcherTest.java index 4a795c3a501897eb5dc9814f37f1456f81009849..0bf67b4a07b3bfce55bac41c793e6233bfc058b8 100644 --- a/core/src/test/java/io/questdb/cutlass/http/IODispatcherTest.java +++ b/core/src/test/java/io/questdb/cutlass/http/IODispatcherTest.java @@ -2847,6 +2847,10 @@ public class IODispatcherTest { // receive response final int expectedToReceive = expectedResponse.length; int received = 0; + if (print) { + System.out.println("expected"); + System.out.println(new String(expectedResponse)); + } while (received < expectedToReceive) { int n = nf.recv(fd, ptr + received, len - received); if (n > 0) {