提交 e7aaf7a1 编写于 作者: V Vlad Ilyushchenko

CUTLASS: bugfix, csv download junk at end of file is removed

GRIFFIN: SQL optimiser bug that could crash server thread.
上级 9020f2a4
......@@ -28,6 +28,7 @@ import com.questdb.cairo.sql.RecordCursor;
import com.questdb.cairo.sql.RecordCursorFactory;
import com.questdb.cairo.sql.RecordMetadata;
import com.questdb.ql.ChannelCheckCancellationHandler;
import com.questdb.std.AssociativeCache;
import com.questdb.std.Misc;
import com.questdb.std.Mutable;
......@@ -35,6 +36,17 @@ import java.io.Closeable;
public abstract class AbstractQueryContext implements Mutable, Closeable {
static final int QUERY_DATA_SUFFIX = 7;
static final int QUERY_RECORD_SUFFIX = 6;
static final int QUERY_RECORD_COLUMNS = 5;
static final int QUERY_RECORD_START = 4;
static final int QUERY_META_SUFFIX = 3;
static final int QUERY_METADATA = 2;
// Factory cache is thread local due to possibility of factory being
// closed by another thread. Peer disconnect is a typical example of this.
// Being asynchronous we may need to be able to return factory to the cache
// by the same thread that executes the dispatcher.
static final ThreadLocal<AssociativeCache<RecordCursorFactory>> FACTORY_CACHE = ThreadLocal.withInitial(() -> new AssociativeCache<>(8, 8));
final ChannelCheckCancellationHandler cancellationHandler;
final long fd;
RecordCursorFactory recordCursorFactory;
......@@ -45,7 +57,8 @@ public abstract class AbstractQueryContext implements Mutable, Closeable {
long skip;
long stop;
Record record;
int queryState = JsonQueryProcessor.QUERY_PREFIX;
static final int QUERY_PREFIX = 1;
int queryState = QUERY_PREFIX;
int columnIndex;
public AbstractQueryContext(long fd, int connectionCheckFrequency) {
......@@ -60,11 +73,11 @@ public abstract class AbstractQueryContext implements Mutable, Closeable {
record = null;
if (recordCursorFactory != null) {
// todo: avoid toString()
JsonQueryProcessor.FACTORY_CACHE.get().put(query.toString(), recordCursorFactory);
FACTORY_CACHE.get().put(query.toString(), recordCursorFactory);
recordCursorFactory = null;
}
query = null;
queryState = JsonQueryProcessor.QUERY_PREFIX;
queryState = QUERY_PREFIX;
columnIndex = 0;
}
......
......@@ -28,7 +28,6 @@ import com.questdb.cairo.CairoError;
import com.questdb.cairo.CairoException;
import com.questdb.cairo.ColumnType;
import com.questdb.cairo.sql.Record;
import com.questdb.cairo.sql.RecordCursorFactory;
import com.questdb.cutlass.http.HttpChunkedResponseSocket;
import com.questdb.cutlass.http.HttpConnectionContext;
import com.questdb.cutlass.http.HttpRequestHeader;
......@@ -43,22 +42,9 @@ import com.questdb.std.*;
import com.questdb.std.str.CharSink;
import java.io.Closeable;
import java.lang.ThreadLocal;
import java.util.concurrent.atomic.AtomicLong;
public class JsonQueryProcessor implements HttpRequestProcessor, Closeable {
static final int QUERY_DATA_SUFFIX = 7;
static final int QUERY_RECORD_SUFFIX = 6;
static final int QUERY_RECORD_COLUMNS = 5;
static final int QUERY_RECORD_START = 4;
static final int QUERY_META_SUFFIX = 3;
static final int QUERY_METADATA = 2;
static final int QUERY_PREFIX = 1;
// Factory cache is thread local due to possibility of factory being
// closed by another thread. Peer disconnect is a typical example of this.
// Being asynchronous we may need to be able to return factory to the cache
// by the same thread that executes the dispatcher.
static final ThreadLocal<AssociativeCache<RecordCursorFactory>> FACTORY_CACHE = ThreadLocal.withInitial(() -> new AssociativeCache<>(8, 8));
private static final LocalValue<JsonQueryProcessorState> LV = new LocalValue<>();
private static final Log LOG = LogFactory.getLog(JsonQueryProcessor.class);
private final AtomicLong cacheHits = new AtomicLong();
......@@ -178,56 +164,6 @@ public class JsonQueryProcessor implements HttpRequestProcessor, Closeable {
}
}
public boolean parseUrl(
HttpChunkedResponseSocket socket,
HttpRequestHeader request,
JsonQueryProcessorState state
) throws PeerDisconnectedException, PeerIsSlowToReadException {
// Query text.
final CharSequence query = request.getUrlParam("query");
if (query == null || query.length() == 0) {
info(state).$("Empty query request received. Sending empty reply.").$();
sendException(socket, 0, "No query text", 400, state.query);
return false;
}
// Url Params.
long skip = 0;
long stop = Long.MAX_VALUE;
CharSequence limit = request.getUrlParam("limit");
if (limit != null) {
int sepPos = Chars.indexOf(limit, ',');
try {
if (sepPos > 0) {
skip = Numbers.parseLong(limit, 0, sepPos);
if (sepPos + 1 < limit.length()) {
stop = Numbers.parseLong(limit, sepPos + 1, limit.length());
}
} else {
stop = Numbers.parseLong(limit);
}
} catch (NumericException ex) {
// Skip or stop will have default value.
}
}
if (stop < 0) {
stop = 0;
}
if (skip < 0) {
skip = 0;
}
state.query = query;
state.skip = skip;
state.count = 0L;
state.stop = stop;
state.noMeta = Chars.equalsNc("true", request.getUrlParam("nm"));
state.fetchAll = Chars.equalsNc("true", request.getUrlParam("count"));
return true;
}
public void execute(
HttpConnectionContext context,
IODispatcher<HttpConnectionContext> dispatcher,
......@@ -235,7 +171,7 @@ public class JsonQueryProcessor implements HttpRequestProcessor, Closeable {
HttpChunkedResponseSocket socket
) throws PeerDisconnectedException, PeerIsSlowToReadException {
try {
state.recordCursorFactory = FACTORY_CACHE.get().poll(state.query);
state.recordCursorFactory = AbstractQueryContext.FACTORY_CACHE.get().poll(state.query);
int retryCount = 0;
do {
if (state.recordCursorFactory == null) {
......@@ -266,7 +202,7 @@ public class JsonQueryProcessor implements HttpRequestProcessor, Closeable {
// we could be having severe hardware issues and continue trying
if (retryCount == 0) {
// todo: we want to clear cache, no need to create string to achieve this
FACTORY_CACHE.get().put(state.query.toString(), null);
AbstractQueryContext.FACTORY_CACHE.get().put(state.query.toString(), null);
state.recordCursorFactory = null;
LOG.error().$("RecordSource execution failed. ").$(e.getMessage()).$(". Retrying ...").$();
retryCount++;
......@@ -291,69 +227,6 @@ public class JsonQueryProcessor implements HttpRequestProcessor, Closeable {
}
}
private void syntaxError(
HttpChunkedResponseSocket socket,
SqlException sqlException,
JsonQueryProcessorState state
) throws PeerDisconnectedException, PeerIsSlowToReadException {
info(state)
.$("syntax-error [q=`").$(state.query)
.$("`, at=").$(sqlException.getPosition())
.$(", message=`").$(sqlException.getFlyweightMessage()).$('`')
.$(']').$();
sendException(socket, sqlException.getPosition(), sqlException.getFlyweightMessage(), 400, state.query);
}
private void sendConfirmation(HttpChunkedResponseSocket socket) throws PeerDisconnectedException, PeerIsSlowToReadException {
socket.put('{').putQuoted("ddl").put(':').putQuoted("OK").put('}');
socket.sendChunk();
socket.done();
}
private void internalError(
HttpChunkedResponseSocket socket,
Throwable e,
JsonQueryProcessorState state
) throws PeerDisconnectedException, PeerIsSlowToReadException {
error(state).$("Server error executing query ").$(state.query).$(e).$();
sendException(socket, 0, e.getMessage(), 500, state.query);
}
protected void header(
HttpChunkedResponseSocket socket,
int status
) throws PeerDisconnectedException, PeerIsSlowToReadException {
socket.status(status, "application/json; charset=utf-8");
socket.headers().setKeepAlive(configuration.getKeepAliveHeader());
socket.sendHeader();
}
private void sendException(
HttpChunkedResponseSocket socket,
int position,
CharSequence message,
int status,
CharSequence query
) throws PeerDisconnectedException, PeerIsSlowToReadException {
header(socket, status);
socket.put('{').
putQuoted("query").put(':').encodeUtf8AndQuote(query == null ? "" : query).put(',').
putQuoted("error").put(':').encodeUtf8AndQuote(message).put(',').
putQuoted("position").put(':').put(position);
socket.put('}');
socket.sendChunk();
socket.done();
}
private LogRecord error(JsonQueryProcessorState state) {
return LOG.error().$('[').$(state.fd).$("] ");
}
private LogRecord info(JsonQueryProcessorState state) {
return LOG.info().$('[').$(state.fd).$("] ");
}
@Override
public void resumeSend(
HttpConnectionContext context,
......@@ -374,19 +247,19 @@ public class JsonQueryProcessor implements HttpRequestProcessor, Closeable {
try {
SWITCH:
switch (state.queryState) {
case QUERY_PREFIX:
case AbstractQueryContext.QUERY_PREFIX:
if (state.noMeta) {
socket.put('{').putQuoted("dataset").put(":[");
state.queryState = QUERY_RECORD_START;
state.queryState = AbstractQueryContext.QUERY_RECORD_START;
break;
}
socket.bookmark();
socket.put('{').putQuoted("query").put(':').encodeUtf8AndQuote(state.query);
socket.put(',').putQuoted("columns").put(':').put('[');
state.queryState = QUERY_METADATA;
state.queryState = AbstractQueryContext.QUERY_METADATA;
state.columnIndex = 0;
// fall through
case QUERY_METADATA:
case AbstractQueryContext.QUERY_METADATA:
for (; state.columnIndex < columnCount; state.columnIndex++) {
socket.bookmark();
......@@ -400,14 +273,14 @@ public class JsonQueryProcessor implements HttpRequestProcessor, Closeable {
putQuoted("type").put(':').putQuoted(ColumnType.nameOf(state.metadata.getColumnType(state.columnIndex)));
socket.put('}');
}
state.queryState = QUERY_META_SUFFIX;
state.queryState = AbstractQueryContext.QUERY_META_SUFFIX;
// fall through
case QUERY_META_SUFFIX:
case AbstractQueryContext.QUERY_META_SUFFIX:
socket.bookmark();
socket.put("],\"dataset\":[");
state.queryState = QUERY_RECORD_START;
state.queryState = AbstractQueryContext.QUERY_RECORD_START;
// fall through
case QUERY_RECORD_START:
case AbstractQueryContext.QUERY_RECORD_START:
if (state.record == null) {
// check if cursor has any records
......@@ -425,14 +298,14 @@ public class JsonQueryProcessor implements HttpRequestProcessor, Closeable {
break;
}
} else {
state.queryState = QUERY_DATA_SUFFIX;
state.queryState = AbstractQueryContext.QUERY_DATA_SUFFIX;
break SWITCH;
}
}
}
if (state.count > state.stop) {
state.queryState = QUERY_DATA_SUFFIX;
state.queryState = AbstractQueryContext.QUERY_DATA_SUFFIX;
break;
}
......@@ -442,10 +315,10 @@ public class JsonQueryProcessor implements HttpRequestProcessor, Closeable {
}
socket.put('[');
state.queryState = QUERY_RECORD_COLUMNS;
state.queryState = AbstractQueryContext.QUERY_RECORD_COLUMNS;
state.columnIndex = 0;
// fall through
case QUERY_RECORD_COLUMNS:
case AbstractQueryContext.QUERY_RECORD_COLUMNS:
for (; state.columnIndex < columnCount; state.columnIndex++) {
socket.bookmark();
......@@ -455,16 +328,16 @@ public class JsonQueryProcessor implements HttpRequestProcessor, Closeable {
putValue(socket, state.metadata.getColumnType(state.columnIndex), state.record, state.columnIndex);
}
state.queryState = QUERY_RECORD_SUFFIX;
state.queryState = AbstractQueryContext.QUERY_RECORD_SUFFIX;
// fall through
case QUERY_RECORD_SUFFIX:
case AbstractQueryContext.QUERY_RECORD_SUFFIX:
socket.bookmark();
socket.put(']');
state.record = null;
state.queryState = QUERY_RECORD_START;
state.queryState = AbstractQueryContext.QUERY_RECORD_START;
break;
case QUERY_DATA_SUFFIX:
case AbstractQueryContext.QUERY_DATA_SUFFIX:
sendDone(socket, state);
break OUT;
default:
......@@ -487,6 +360,119 @@ public class JsonQueryProcessor implements HttpRequestProcessor, Closeable {
readyForNextRequest(context, dispatcher);
}
private void syntaxError(
HttpChunkedResponseSocket socket,
SqlException sqlException,
JsonQueryProcessorState state
) throws PeerDisconnectedException, PeerIsSlowToReadException {
info(state)
.$("syntax-error [q=`").$(state.query)
.$("`, at=").$(sqlException.getPosition())
.$(", message=`").$(sqlException.getFlyweightMessage()).$('`')
.$(']').$();
sendException(socket, sqlException.getPosition(), sqlException.getFlyweightMessage(), 400, state.query);
}
private void sendConfirmation(HttpChunkedResponseSocket socket) throws PeerDisconnectedException, PeerIsSlowToReadException {
socket.put('{').putQuoted("ddl").put(':').putQuoted("OK").put('}');
socket.sendChunk();
socket.done();
}
private void internalError(
HttpChunkedResponseSocket socket,
Throwable e,
JsonQueryProcessorState state
) throws PeerDisconnectedException, PeerIsSlowToReadException {
error(state).$("Server error executing query ").$(state.query).$(e).$();
sendException(socket, 0, e.getMessage(), 500, state.query);
}
protected void header(
HttpChunkedResponseSocket socket,
int status
) throws PeerDisconnectedException, PeerIsSlowToReadException {
socket.status(status, "application/json; charset=utf-8");
socket.headers().setKeepAlive(configuration.getKeepAliveHeader());
socket.sendHeader();
}
private void sendException(
HttpChunkedResponseSocket socket,
int position,
CharSequence message,
int status,
CharSequence query
) throws PeerDisconnectedException, PeerIsSlowToReadException {
header(socket, status);
socket.put('{').
putQuoted("query").put(':').encodeUtf8AndQuote(query == null ? "" : query).put(',').
putQuoted("error").put(':').encodeUtf8AndQuote(message).put(',').
putQuoted("position").put(':').put(position);
socket.put('}');
socket.sendChunk();
socket.done();
}
private LogRecord error(JsonQueryProcessorState state) {
return LOG.error().$('[').$(state.fd).$("] ");
}
private LogRecord info(JsonQueryProcessorState state) {
return LOG.info().$('[').$(state.fd).$("] ");
}
private boolean parseUrl(
HttpChunkedResponseSocket socket,
HttpRequestHeader request,
JsonQueryProcessorState state
) throws PeerDisconnectedException, PeerIsSlowToReadException {
// Query text.
final CharSequence query = request.getUrlParam("query");
if (query == null || query.length() == 0) {
info(state).$("Empty query request received. Sending empty reply.").$();
sendException(socket, 0, "No query text", 400, state.query);
return false;
}
// Url Params.
long skip = 0;
long stop = Long.MAX_VALUE;
CharSequence limit = request.getUrlParam("limit");
if (limit != null) {
int sepPos = Chars.indexOf(limit, ',');
try {
if (sepPos > 0) {
skip = Numbers.parseLong(limit, 0, sepPos);
if (sepPos + 1 < limit.length()) {
stop = Numbers.parseLong(limit, sepPos + 1, limit.length());
}
} else {
stop = Numbers.parseLong(limit);
}
} catch (NumericException ex) {
// Skip or stop will have default value.
}
}
if (stop < 0) {
stop = 0;
}
if (skip < 0) {
skip = 0;
}
state.query = query;
state.skip = skip;
state.count = 0L;
state.stop = stop;
state.noMeta = Chars.equalsNc("true", request.getUrlParam("nm"));
state.fetchAll = Chars.equalsNc("true", request.getUrlParam("count"));
return true;
}
private void readyForNextRequest(HttpConnectionContext context, IODispatcher<HttpConnectionContext> dispatcher) {
LOG.debug().$("all sent [fd=").$(context.getFd()).$(']').$();
context.clear();
......
......@@ -34,7 +34,7 @@ public class JsonQueryProcessorState extends AbstractQueryContext {
@Override
public void clear() {
super.clear();
queryState = JsonQueryProcessor.QUERY_PREFIX;
queryState = AbstractQueryContext.QUERY_PREFIX;
fetchAll = false;
}
}
......@@ -28,7 +28,6 @@ import com.questdb.cairo.CairoError;
import com.questdb.cairo.CairoException;
import com.questdb.cairo.ColumnType;
import com.questdb.cairo.sql.Record;
import com.questdb.cairo.sql.RecordCursorFactory;
import com.questdb.cutlass.http.HttpChunkedResponseSocket;
import com.questdb.cutlass.http.HttpConnectionContext;
import com.questdb.cutlass.http.HttpRequestHeader;
......@@ -43,21 +42,15 @@ import com.questdb.std.*;
import com.questdb.std.str.CharSink;
import java.io.Closeable;
import java.lang.ThreadLocal;
import java.util.concurrent.atomic.AtomicLong;
import static com.questdb.cutlass.http.processors.AbstractQueryContext.*;
public class TextQueryProcessor implements HttpRequestProcessor, Closeable {
static final int QUERY_DATA_SUFFIX = 7;
static final int QUERY_RECORD_SUFFIX = 6;
static final int QUERY_RECORD_COLUMNS = 5;
static final int QUERY_RECORD_START = 4;
static final int QUERY_METADATA = 2;
static final int QUERY_PREFIX = 1;
// Factory cache is thread local due to possibility of factory being
// closed by another thread. Peer disconnect is a typical example of this.
// Being asynchronous we may need to be able to return factory to the cache
// by the same thread that executes the dispatcher.
static final ThreadLocal<AssociativeCache<RecordCursorFactory>> FACTORY_CACHE = ThreadLocal.withInitial(() -> new AssociativeCache<>(8, 8));
private static final LocalValue<JsonQueryProcessorState> LV = new LocalValue<>();
private static final Log LOG = LogFactory.getLog(TextQueryProcessor.class);
private final AtomicLong cacheHits = new AtomicLong();
......@@ -171,7 +164,7 @@ public class TextQueryProcessor implements HttpRequestProcessor, Closeable {
}
}
public boolean parseUrl(
private boolean parseUrl(
HttpChunkedResponseSocket socket,
HttpRequestHeader request,
JsonQueryProcessorState state
......@@ -476,10 +469,6 @@ public class TextQueryProcessor implements HttpRequestProcessor, Closeable {
JsonQueryProcessorState state
) throws PeerDisconnectedException, PeerIsSlowToReadException {
if (state.count > -1) {
socket.bookmark();
socket.put(']');
socket.put(',').putQuoted("count").put(':').put(state.count);
socket.put('}');
state.count = -1;
socket.sendChunk();
}
......
......@@ -362,7 +362,6 @@ public class WireParser implements Closeable {
}
break;
case 'S': // sync?
// todo: wtf is this?
break;
case 'D': // describe?
if (currentFactory != null) {
......
......@@ -567,6 +567,7 @@ class SqlOptimiser {
groupByModel.addColumn(translatedColumn);
analyticModel.addColumn(translatedColumn);
outerModel.addColumn(translatedColumn);
innerModel.addColumn(translatedColumn);
} else {
final CharSequence alias = createColumnAlias(columnName, translatingModel);
addColumnToTranslatingModel(
......
......@@ -42,6 +42,15 @@ public class SqlParserTest extends AbstractGriffinTest {
modelOf("x").col("x", ColumnType.INT));
}
@Test
public void testDuplicateColumnsVirtualAndGroupBySelect() throws SqlException {
assertQuery(
"select-group-by sum(b + a) sum, column, k1, k1 k from (select-virtual a, b, a + b column, k1, k1 k from (select-choose a, b, k k1 from (x timestamp (timestamp)))) sample by 1m",
"select sum(b+a), a+b, k k1, k from x sample by 1m",
modelOf("x").col("a", ColumnType.DOUBLE).col("b", ColumnType.SYMBOL).col("k", ColumnType.TIMESTAMP).timestamp()
);
}
@Test
public void testAliasWithSpaceX() {
assertSyntaxError("from x 'a b' where x > 1", 7, "unexpected");
......@@ -1576,20 +1585,21 @@ public class SqlParserTest extends AbstractGriffinTest {
}
@Test
public void testDuplicateColumnsVirtualAndGroupBySelect() throws SqlException {
public void testDuplicateColumnsVirtualSelect() throws SqlException {
assertQuery(
"select-group-by sum(b + a) sum, column, k1, k1 k from (select-virtual a, b, a + b column, k1 from (select-choose a, b, k k1 from (x timestamp (timestamp)))) sample by 1m",
"select sum(b+a), a+b, k k1, k from x sample by 1m",
"select-virtual b + a column, k1, k1 k from (select-choose a, b, k k1 from (x timestamp (timestamp)))",
"select b+a, k k1, k from x",
modelOf("x").col("a", ColumnType.DOUBLE).col("b", ColumnType.SYMBOL).col("k", ColumnType.TIMESTAMP).timestamp()
);
}
@Test
public void testDuplicateColumnsVirtualSelect() throws SqlException {
assertQuery(
"select-choose column, k1, k1 k from (select-virtual b + a column, k1 from (select-choose a, b, k k1 from (x timestamp (timestamp))))",
"select b+a, k k1, k from x",
modelOf("x").col("a", ColumnType.DOUBLE).col("b", ColumnType.SYMBOL).col("k", ColumnType.TIMESTAMP).timestamp()
public void testFunctionWithoutAlias() throws SqlException {
assertQuery("select-virtual f(x) f, x from (x where x > 1)",
"select f(x), x from x where x > 1",
modelOf("x")
.col("x", ColumnType.INT)
.col("y", ColumnType.INT)
);
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册