提交 0d4c0e5c 编写于 作者: V Vlad Ilyushchenko

CUTLASS: refactored switch statement out of json query processor to improve performance

上级 265199fb
......@@ -35,13 +35,13 @@ import java.io.Closeable;
public abstract class AbstractQueryContext implements Mutable, Closeable {
static final int QUERY_NEXT_RECORD = 9;
static final int QUERY_RECORD_PREFIX = 9;
static final int QUERY_SETUP_FIRST_RECORD = 8;
static final int QUERY_DATA_SUFFIX = 7;
static final int QUERY_SUFFIX = 7;
static final int QUERY_RECORD_SUFFIX = 6;
static final int QUERY_RECORD_COLUMNS = 5;
static final int QUERY_RECORD = 5;
static final int QUERY_RECORD_START = 4;
static final int QUERY_META_SUFFIX = 3;
static final int QUERY_METADATA_SUFFIX = 3;
static final int QUERY_METADATA = 2;
static final int QUERY_PREFIX = 1;
// Factory cache is thread local due to possibility of factory being
......
......@@ -57,6 +57,7 @@ public class JsonQueryProcessor implements HttpRequestProcessor, Closeable {
private final int doubleScale;
private final SqlExecutionContextImpl sqlExecutionContext = new SqlExecutionContextImpl();
private final ObjList<ValueWriter> valueWriters = new ObjList<>();
private final ObjList<StateResumeAction> resumeActions = new ObjList<>();
public JsonQueryProcessor(JsonQueryProcessorConfiguration configuration, CairoEngine engine) {
// todo: add scheduler
......@@ -79,6 +80,15 @@ public class JsonQueryProcessor implements HttpRequestProcessor, Closeable {
this.valueWriters.extendAndSet(ColumnType.SYMBOL, this::putSymValue);
this.valueWriters.extendAndSet(ColumnType.BINARY, this::putBinValue);
this.valueWriters.extendAndSet(ColumnType.LONG256, this::putLong256Value);
resumeActions.extendAndSet(AbstractQueryContext.QUERY_PREFIX, this::onQueryPrefix);
resumeActions.extendAndSet(AbstractQueryContext.QUERY_METADATA, this::onQueryMetadata);
resumeActions.extendAndSet(AbstractQueryContext.QUERY_METADATA_SUFFIX, this::onQueryMetadataSuffix);
resumeActions.extendAndSet(AbstractQueryContext.QUERY_SETUP_FIRST_RECORD, this::doFirstRecordLoop);
resumeActions.extendAndSet(AbstractQueryContext.QUERY_RECORD_PREFIX, this::onQueryRecordPrefix);
resumeActions.extendAndSet(AbstractQueryContext.QUERY_RECORD, this::onQueryRecord);
resumeActions.extendAndSet(AbstractQueryContext.QUERY_RECORD_SUFFIX, this::onQueryRecordSuffix);
resumeActions.extendAndSet(AbstractQueryContext.QUERY_SUFFIX, this::doQuerySuffix);
}
private static void putStringOrNull(CharSink r, CharSequence str) {
......@@ -194,39 +204,10 @@ public class JsonQueryProcessor implements HttpRequestProcessor, Closeable {
final HttpChunkedResponseSocket socket = context.getChunkedResponseSocket();
final int columnCount = state.metadata.getColumnCount();
OUT:
while (true) {
try {
switch (state.queryState) {
case AbstractQueryContext.QUERY_PREFIX:
if (onQueryPrefix(state, socket)) {
break;
}
// fall through
case AbstractQueryContext.QUERY_METADATA:
onQeeryMetadata(state, socket, columnCount);
// fall through
case AbstractQueryContext.QUERY_META_SUFFIX:
onMetadataSuffix(state, socket);
// fall through
case AbstractQueryContext.QUERY_SETUP_FIRST_RECORD:
onSetupFirstRecord(state, socket);
break;
case AbstractQueryContext.QUERY_NEXT_RECORD:
onNextRecord(state, socket);
break;
case AbstractQueryContext.QUERY_RECORD_COLUMNS:
onRecordColumn(state, socket, columnCount);
// fall through
case AbstractQueryContext.QUERY_RECORD_SUFFIX:
onRecordSuffix(state, socket);
break;
case AbstractQueryContext.QUERY_DATA_SUFFIX:
onDataSuffix(socket, state);
break OUT;
default:
break OUT;
}
resumeActions.getQuick(state.queryState).onResume(state, socket, columnCount);
break;
} catch (NoSpaceLeftInResponseBufferException ignored) {
if (socket.resetToBookmark()) {
socket.sendChunk();
......@@ -244,51 +225,105 @@ public class JsonQueryProcessor implements HttpRequestProcessor, Closeable {
readyForNextRequest(context, dispatcher);
}
private void onSetupFirstRecord(JsonQueryProcessorState state, HttpChunkedResponseSocket socket) {
private void onQueryRecordSuffix(JsonQueryProcessorState state, HttpChunkedResponseSocket socket, int columnCount) throws PeerDisconnectedException, PeerIsSlowToReadException {
doQueryRecordSuffix(state, socket);
doNextRecordLoop(state, socket, columnCount);
}
private void onQueryRecord(JsonQueryProcessorState state, HttpChunkedResponseSocket socket, int columnCount) throws PeerDisconnectedException, PeerIsSlowToReadException {
doQueryRecord(state, socket, columnCount);
onQueryRecordSuffix(state, socket, columnCount);
}
private void onQueryRecordPrefix(JsonQueryProcessorState state, HttpChunkedResponseSocket socket, int columnCount) throws PeerDisconnectedException, PeerIsSlowToReadException {
doQueryRecordPrefix(state, socket);
onQueryRecord(state, socket, columnCount);
}
private void onQueryMetadataSuffix(JsonQueryProcessorState state, HttpChunkedResponseSocket socket, int columnCount) throws PeerDisconnectedException, PeerIsSlowToReadException {
doQueryMetadataSuffix(state, socket);
doFirstRecordLoop(state, socket, columnCount);
}
private void onQueryMetadata(JsonQueryProcessorState state, HttpChunkedResponseSocket socket, int columnCount) throws PeerDisconnectedException, PeerIsSlowToReadException {
doQueryMetadata(state, socket, columnCount);
onQueryMetadataSuffix(state, socket, columnCount);
}
private void onQueryPrefix(JsonQueryProcessorState state, HttpChunkedResponseSocket socket, int columnCount) throws PeerDisconnectedException, PeerIsSlowToReadException {
if (doQueryPrefix(state, socket)) {
doQueryMetadata(state, socket, columnCount);
doQueryMetadataSuffix(state, socket);
}
doFirstRecordLoop(state, socket, columnCount);
}
private void doNextRecordLoop(JsonQueryProcessorState state, HttpChunkedResponseSocket socket, int columnCount) throws PeerDisconnectedException, PeerIsSlowToReadException {
if (doQueryNextRecord(state)) {
doRecordFetchLoop(state, socket, columnCount);
} else {
doQuerySuffix(state, socket, columnCount);
}
}
private void doRecordFetchLoop(JsonQueryProcessorState state, HttpChunkedResponseSocket socket, int columnCount) throws PeerDisconnectedException, PeerIsSlowToReadException {
do {
doQueryRecordPrefix(state, socket);
doQueryRecord(state, socket, columnCount);
doQueryRecordSuffix(state, socket);
} while (doQueryNextRecord(state));
doQuerySuffix(state, socket, columnCount);
}
private void doFirstRecordLoop(JsonQueryProcessorState state, HttpChunkedResponseSocket socket, int columnCount) throws PeerDisconnectedException, PeerIsSlowToReadException {
if (onQuerySetupFirstRecord(state)) {
doRecordFetchLoop(state, socket, columnCount);
} else {
doQuerySuffix(state, socket, columnCount);
}
}
private boolean onQuerySetupFirstRecord(JsonQueryProcessorState state) {
if (state.skip > 0) {
final RecordCursor cursor = state.cursor;
long target = state.skip;
long target = state.skip + 1;
while (target > 0 && cursor.hasNext()) {
target--;
}
if (target > 0) {
state.queryState = AbstractQueryContext.QUERY_DATA_SUFFIX;
return;
return false;
}
state.count = state.skip;
} else {
if (state.cursor.hasNext()) {
state.count++;
} else {
state.queryState = AbstractQueryContext.QUERY_DATA_SUFFIX;
return;
if (!state.cursor.hasNext()) {
return false;
}
}
state.columnIndex = 0;
state.queryState = AbstractQueryContext.QUERY_RECORD_COLUMNS;
state.record = state.cursor.getRecord();
socket.bookmark();
socket.put('[');
return true;
}
private void onNextRecord(JsonQueryProcessorState state, HttpChunkedResponseSocket socket) {
private boolean doQueryNextRecord(JsonQueryProcessorState state) {
if (state.cursor.hasNext()) {
state.count++;
if (state.count > state.stop) {
onNoMoreData(state);
if (state.count < state.stop) {
return true;
} else {
socket.bookmark();
if (state.count > state.skip) {
socket.put(',');
}
socket.put('[');
state.queryState = AbstractQueryContext.QUERY_RECORD_COLUMNS;
state.columnIndex = 0;
onNoMoreData(state);
}
} else {
state.queryState = AbstractQueryContext.QUERY_DATA_SUFFIX;
}
return false;
}
private void doQueryRecordPrefix(JsonQueryProcessorState state, HttpChunkedResponseSocket socket) {
state.queryState = AbstractQueryContext.QUERY_RECORD_PREFIX;
socket.bookmark();
if (state.count > state.skip) {
socket.put(',');
}
socket.put('[');
state.columnIndex = 0;
}
private void onNoMoreData(JsonQueryProcessorState state) {
......@@ -298,7 +333,7 @@ public class JsonQueryProcessor implements HttpRequestProcessor, Closeable {
final RecordCursor cursor = state.cursor;
final long size = cursor.size();
if (size < 0) {
long count = 0;
long count = 1;
while (cursor.hasNext()) {
count++;
}
......@@ -306,19 +341,18 @@ public class JsonQueryProcessor implements HttpRequestProcessor, Closeable {
} else {
state.count = size;
}
} else {
state.count--;
}
state.queryState = AbstractQueryContext.QUERY_DATA_SUFFIX;
}
private void onRecordSuffix(JsonQueryProcessorState state, HttpChunkedResponseSocket socket) {
private void doQueryRecordSuffix(JsonQueryProcessorState state, HttpChunkedResponseSocket socket) {
state.queryState = AbstractQueryContext.QUERY_RECORD_SUFFIX;
state.count++;
socket.bookmark();
socket.put(']');
state.queryState = AbstractQueryContext.QUERY_NEXT_RECORD;
}
private void onRecordColumn(JsonQueryProcessorState state, HttpChunkedResponseSocket socket, int columnCount) {
private void doQueryRecord(JsonQueryProcessorState state, HttpChunkedResponseSocket socket, int columnCount) {
state.queryState = AbstractQueryContext.QUERY_RECORD;
for (; state.columnIndex < columnCount; state.columnIndex++) {
socket.bookmark();
if (state.columnIndex > 0) {
......@@ -329,21 +363,18 @@ public class JsonQueryProcessor implements HttpRequestProcessor, Closeable {
vw.write(socket, state.record, state.columnIndex);
}
}
state.queryState = AbstractQueryContext.QUERY_RECORD_SUFFIX;
}
private void onMetadataSuffix(JsonQueryProcessorState state, HttpChunkedResponseSocket socket) {
private void doQueryMetadataSuffix(JsonQueryProcessorState state, HttpChunkedResponseSocket socket) {
state.queryState = AbstractQueryContext.QUERY_METADATA_SUFFIX;
socket.bookmark();
socket.put("],\"dataset\":[");
state.queryState = AbstractQueryContext.QUERY_SETUP_FIRST_RECORD;
}
private void onQeeryMetadata(JsonQueryProcessorState state, HttpChunkedResponseSocket socket, int columnCount) {
private void doQueryMetadata(JsonQueryProcessorState state, HttpChunkedResponseSocket socket, int columnCount) {
state.queryState = AbstractQueryContext.QUERY_METADATA;
for (; state.columnIndex < columnCount; state.columnIndex++) {
socket.bookmark();
if (state.columnIndex > 0) {
socket.put(',');
}
......@@ -353,21 +384,19 @@ public class JsonQueryProcessor implements HttpRequestProcessor, Closeable {
putQuoted("type").put(':').putQuoted(ColumnType.nameOf(state.metadata.getColumnType(state.columnIndex)));
socket.put('}');
}
state.queryState = AbstractQueryContext.QUERY_META_SUFFIX;
}
private boolean onQueryPrefix(JsonQueryProcessorState state, HttpChunkedResponseSocket socket) {
private boolean doQueryPrefix(JsonQueryProcessorState state, HttpChunkedResponseSocket socket) {
if (state.noMeta) {
socket.bookmark();
socket.put('{').putQuoted("dataset").put(":[");
state.queryState = AbstractQueryContext.QUERY_SETUP_FIRST_RECORD;
return true;
return false;
}
socket.bookmark();
socket.put('{').putQuoted("query").put(':').encodeUtf8AndQuote(state.query);
socket.put(',').putQuoted("columns").put(':').put('[');
state.queryState = AbstractQueryContext.QUERY_METADATA;
state.columnIndex = 0;
return false;
return true;
}
private LogRecord error(JsonQueryProcessorState state) {
......@@ -426,7 +455,7 @@ public class JsonQueryProcessor implements HttpRequestProcessor, Closeable {
int sepPos = Chars.indexOf(limit, ',');
try {
if (sepPos > 0) {
skip = Numbers.parseLong(limit, 0, sepPos);
skip = Numbers.parseLong(limit, 0, sepPos) - 1;
if (sepPos + 1 < limit.length()) {
stop = Numbers.parseLong(limit, sepPos + 1, limit.length());
}
......@@ -550,10 +579,12 @@ public class JsonQueryProcessor implements HttpRequestProcessor, Closeable {
socket.done();
}
private void onDataSuffix(
private void doQuerySuffix(
JsonQueryProcessorState state,
HttpChunkedResponseSocket socket,
JsonQueryProcessorState state
int columnCount
) throws PeerDisconnectedException, PeerIsSlowToReadException {
state.queryState = AbstractQueryContext.QUERY_SUFFIX;
if (state.count > -1) {
socket.bookmark();
socket.put(']');
......@@ -595,6 +626,11 @@ public class JsonQueryProcessor implements HttpRequestProcessor, Closeable {
sendException(socket, sqlException.getPosition(), sqlException.getFlyweightMessage(), 400, state.query);
}
@FunctionalInterface
private interface StateResumeAction {
void onResume(JsonQueryProcessorState state, HttpChunkedResponseSocket socket, int columnCount) throws PeerDisconnectedException, PeerIsSlowToReadException;
}
@FunctionalInterface
private interface ValueWriter {
void write(HttpChunkedResponseSocket socket, Record rec, int col);
......
......@@ -224,21 +224,21 @@ public class TextQueryProcessor implements HttpRequestProcessor, Closeable {
break;
}
} else {
state.queryState = AbstractQueryContext.QUERY_DATA_SUFFIX;
state.queryState = AbstractQueryContext.QUERY_SUFFIX;
break SWITCH;
}
}
}
if (state.count > state.stop) {
state.queryState = AbstractQueryContext.QUERY_DATA_SUFFIX;
state.queryState = AbstractQueryContext.QUERY_SUFFIX;
break;
}
state.queryState = AbstractQueryContext.QUERY_RECORD_COLUMNS;
state.queryState = AbstractQueryContext.QUERY_RECORD;
state.columnIndex = 0;
// fall through
case AbstractQueryContext.QUERY_RECORD_COLUMNS:
case AbstractQueryContext.QUERY_RECORD:
for (; state.columnIndex < columnCount; state.columnIndex++) {
socket.bookmark();
......@@ -257,7 +257,7 @@ public class TextQueryProcessor implements HttpRequestProcessor, Closeable {
state.record = null;
state.queryState = AbstractQueryContext.QUERY_RECORD_START;
break;
case AbstractQueryContext.QUERY_DATA_SUFFIX:
case AbstractQueryContext.QUERY_SUFFIX:
sendDone(socket, state);
break OUT;
default:
......
......@@ -32,7 +32,7 @@ import io.questdb.std.Misc;
public class CrossJoinRecordCursorFactory extends AbstractRecordCursorFactory {
private final RecordCursorFactory masterFactory;
private final RecordCursorFactory slaveFactory;
private final HashJoinRecordCursor cursor;
private final CrossJoinRecordCursor cursor;
public CrossJoinRecordCursorFactory(
RecordMetadata metadata,
......@@ -44,7 +44,7 @@ public class CrossJoinRecordCursorFactory extends AbstractRecordCursorFactory {
super(metadata);
this.masterFactory = masterFactory;
this.slaveFactory = slaveFactory;
this.cursor = new HashJoinRecordCursor(columnSplit);
this.cursor = new CrossJoinRecordCursor(columnSplit);
}
@Override
......@@ -71,13 +71,13 @@ public class CrossJoinRecordCursorFactory extends AbstractRecordCursorFactory {
return false;
}
private static class HashJoinRecordCursor implements NoRandomAccessRecordCursor {
private static class CrossJoinRecordCursor implements NoRandomAccessRecordCursor {
private final JoinRecord record;
private final int columnSplit;
private RecordCursor masterCursor;
private RecordCursor slaveCursor;
public HashJoinRecordCursor(int columnSplit) {
public CrossJoinRecordCursor(int columnSplit) {
this.record = new JoinRecord(columnSplit);
this.columnSplit = columnSplit;
}
......
......@@ -2847,6 +2847,10 @@ public class IODispatcherTest {
// receive response
final int expectedToReceive = expectedResponse.length;
int received = 0;
if (print) {
System.out.println("expected");
System.out.println(new String(expectedResponse));
}
while (received < expectedToReceive) {
int n = nf.recv(fd, ptr + received, len - received);
if (n > 0) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册