未验证 提交 31fabfe3 编写于 作者: V Vlad Ilyushchenko 提交者: GitHub

chore(http): change http status of SQL errors from 400 to 200 (#684)

上级 5fabe973
......@@ -406,7 +406,7 @@ public class HttpConnectionContext implements IOContext, Locality, Mutable {
private boolean rejectRequest(CharSequence userMessage) throws PeerDisconnectedException, PeerIsSlowToReadException {
clear();
LOG.error().$(userMessage).$();
simpleResponse().sendStatus(400, userMessage);
simpleResponse().sendStatus(404, userMessage);
dispatcher.registerChannel(this, IOOperation.READ);
return false;
}
......
......@@ -477,7 +477,7 @@ public class HttpResponseSink implements Closeable, Mutable {
public class SimpleResponseImpl {
public void sendStatus(int code, CharSequence message) throws PeerDisconnectedException, PeerIsSlowToReadException {
final String std = headerImpl.status(httpVersion, code, "text/html; charset=utf-8", -1L);
final String std = headerImpl.status(httpVersion, code, "text/plain; charset=utf-8", -1L);
sink.put(message == null ? std : message).put(Misc.EOL);
prepareHeaderSink();
resumeSend(CHUNK_HEAD);
......
......@@ -24,10 +24,6 @@
package io.questdb.cutlass.http.processors;
import java.io.Closeable;
import org.jetbrains.annotations.Nullable;
import io.questdb.MessageBus;
import io.questdb.Telemetry;
import io.questdb.cairo.CairoEngine;
......@@ -37,40 +33,30 @@ import io.questdb.cairo.sql.InsertMethod;
import io.questdb.cairo.sql.InsertStatement;
import io.questdb.cairo.sql.ReaderOutOfDateException;
import io.questdb.cairo.sql.RecordCursorFactory;
import io.questdb.cutlass.http.HttpChunkedResponseSocket;
import io.questdb.cutlass.http.HttpConnectionContext;
import io.questdb.cutlass.http.HttpRequestHeader;
import io.questdb.cutlass.http.HttpRequestProcessor;
import io.questdb.cutlass.http.LocalValue;
import io.questdb.cutlass.http.*;
import io.questdb.cutlass.text.Utf8Exception;
import io.questdb.griffin.CompiledQuery;
import io.questdb.griffin.FunctionFactoryCache;
import io.questdb.griffin.SqlCompiler;
import io.questdb.griffin.SqlException;
import io.questdb.griffin.SqlExecutionContextImpl;
import io.questdb.griffin.*;
import io.questdb.log.Log;
import io.questdb.log.LogFactory;
import io.questdb.network.NoSpaceLeftInResponseBufferException;
import io.questdb.network.PeerDisconnectedException;
import io.questdb.network.PeerIsSlowToReadException;
import io.questdb.network.ServerDisconnectException;
import io.questdb.std.Chars;
import io.questdb.std.Misc;
import io.questdb.std.NanosecondClock;
import io.questdb.std.Numbers;
import io.questdb.std.NumericException;
import io.questdb.std.ObjList;
import io.questdb.std.*;
import io.questdb.std.str.DirectByteCharSequence;
import io.questdb.std.str.Path;
import org.jetbrains.annotations.Nullable;
import java.io.Closeable;
public class JsonQueryProcessor implements HttpRequestProcessor, Closeable {
private static final LocalValue<JsonQueryProcessorState> LV = new LocalValue<>();
private static final Log LOG = LogFactory.getLog(JsonQueryProcessor.class);
protected final ObjList<QueryExecutor> queryExecutors = new ObjList<>();
private final SqlCompiler compiler;
private final JsonQueryProcessorConfiguration configuration;
private final SqlExecutionContextImpl sqlExecutionContext;
private final Path path = new Path();
protected final ObjList<QueryExecutor> queryExecutors = new ObjList<>();
private final NanosecondClock nanosecondClock;
public JsonQueryProcessor(
......@@ -79,7 +65,7 @@ public class JsonQueryProcessor implements HttpRequestProcessor, Closeable {
@Nullable MessageBus messageBus,
int workerCount
) {
this(configuration, engine, messageBus, workerCount, null);
this(configuration, engine, messageBus, workerCount, (FunctionFactoryCache) null);
}
public JsonQueryProcessor(
......@@ -89,7 +75,7 @@ public class JsonQueryProcessor implements HttpRequestProcessor, Closeable {
int workerCount,
@Nullable FunctionFactoryCache functionFactoryCache
) {
this(configuration, engine, messageBus, workerCount, null, new SqlCompiler(engine, messageBus, functionFactoryCache));
this(configuration, engine, messageBus, workerCount, new SqlCompiler(engine, messageBus, functionFactoryCache));
}
public JsonQueryProcessor(
......@@ -97,7 +83,6 @@ public class JsonQueryProcessor implements HttpRequestProcessor, Closeable {
CairoEngine engine,
@Nullable MessageBus messageBus,
int workerCount,
@Nullable FunctionFactoryCache functionFactoryCache,
SqlCompiler sqlCompiler
) {
this.configuration = configuration;
......@@ -120,110 +105,6 @@ public class JsonQueryProcessor implements HttpRequestProcessor, Closeable {
this.nanosecondClock = engine.getConfiguration().getNanosecondClock();
}
private static void doResumeSend(
JsonQueryProcessorState state,
HttpConnectionContext context
) throws PeerDisconnectedException, PeerIsSlowToReadException {
if (state.noCursor()) {
return;
}
LOG.debug().$("resume [fd=").$(context.getFd()).$(']').$();
final HttpChunkedResponseSocket socket = context.getChunkedResponseSocket();
while (true) {
try {
state.resume(socket);
break;
} catch (NoSpaceLeftInResponseBufferException ignored) {
if (socket.resetToBookmark()) {
socket.sendChunk();
} else {
// what we have here is out unit of data, column value or query
// is larger that response content buffer
// all we can do in this scenario is to log appropriately
// and disconnect socket
state.logBufferTooSmall();
throw PeerDisconnectedException.INSTANCE;
}
}
}
// reached the end naturally?
readyForNextRequest(context);
}
private static void cannotCopyRemote(
JsonQueryProcessorState state,
CompiledQuery cc,
CharSequence keepAliveHeader
) throws SqlException {
throw SqlException.$(0, "copy from STDIN is not supported over REST");
}
protected static void header(
HttpChunkedResponseSocket socket,
int status,
CharSequence keepAliveHeader
) throws PeerDisconnectedException, PeerIsSlowToReadException {
socket.status(status, "application/json; charset=utf-8");
socket.headers().setKeepAlive(keepAliveHeader);
socket.sendHeader();
}
private static void readyForNextRequest(HttpConnectionContext context) {
LOG.info().$("all sent [fd=").$(context.getFd()).$(", lastRequestBytesSent=").$(context.getLastRequestBytesSent()).$(", nCompletedRequests=").$(context.getNCompletedRequests() + 1)
.$(", totalBytesSent=").$(context.getTotalBytesSent()).$(']').$();
}
protected static void sendConfirmation(
JsonQueryProcessorState state,
CompiledQuery cq,
CharSequence keepAliveHeader
) throws PeerDisconnectedException, PeerIsSlowToReadException {
final HttpConnectionContext context = state.getHttpConnectionContext();
final HttpChunkedResponseSocket socket = context.getChunkedResponseSocket();
header(socket, 200, keepAliveHeader);
socket.put('{').putQuoted("ddl").put(':').putQuoted("OK").put('}');
socket.sendChunk();
socket.done();
readyForNextRequest(context);
}
static void sendException(
HttpChunkedResponseSocket socket,
int position,
CharSequence message,
int status,
CharSequence query,
CharSequence keepAliveHeader
) throws PeerDisconnectedException, PeerIsSlowToReadException {
header(socket, status, keepAliveHeader);
socket.put('{').
putQuoted("query").put(':').encodeUtf8AndQuote(query == null ? "" : query).put(',').
putQuoted("error").put(':').encodeUtf8AndQuote(message).put(',').
putQuoted("position").put(':').put(position);
socket.put('}');
socket.sendChunk();
socket.done();
}
private static void syntaxError(
HttpChunkedResponseSocket socket,
SqlException sqlException,
JsonQueryProcessorState state,
CharSequence keepAliveHeader
) throws PeerDisconnectedException, PeerIsSlowToReadException {
state.logSyntaxError(sqlException);
sendException(
socket,
sqlException.getPosition(),
sqlException.getFlyweightMessage(),
400,
state.getQuery(),
keepAliveHeader
);
}
@Override
public void close() {
Misc.free(compiler);
......@@ -314,33 +195,99 @@ public class JsonQueryProcessor implements HttpRequestProcessor, Closeable {
}
}
private void executeCachedSelect(
private static void doResumeSend(
JsonQueryProcessorState state,
RecordCursorFactory factory,
HttpConnectionContext context
) throws PeerDisconnectedException, PeerIsSlowToReadException {
if (state.noCursor()) {
return;
}
LOG.debug().$("resume [fd=").$(context.getFd()).$(']').$();
final HttpChunkedResponseSocket socket = context.getChunkedResponseSocket();
while (true) {
try {
state.resume(socket);
break;
} catch (NoSpaceLeftInResponseBufferException ignored) {
if (socket.resetToBookmark()) {
socket.sendChunk();
} else {
// what we have here is out unit of data, column value or query
// is larger that response content buffer
// all we can do in this scenario is to log appropriately
// and disconnect socket
state.logBufferTooSmall();
throw PeerDisconnectedException.INSTANCE;
}
}
}
// reached the end naturally?
readyForNextRequest(context);
}
private static void cannotCopyRemote(
JsonQueryProcessorState state,
CompiledQuery cc,
CharSequence keepAliveHeader
) throws SqlException {
throw SqlException.$(0, "copy from STDIN is not supported over REST");
}
protected static void header(
HttpChunkedResponseSocket socket,
CharSequence keepAliveHeader
) throws PeerDisconnectedException, PeerIsSlowToReadException {
state.setCompilerNanos(0);
state.logExecuteCached();
executeSelect(state, factory, keepAliveHeader);
socket.status(200, "application/json; charset=utf-8");
socket.headers().setKeepAlive(keepAliveHeader);
socket.sendHeader();
}
private void executeSelect(
private static void readyForNextRequest(HttpConnectionContext context) {
LOG.info().$("all sent [fd=").$(context.getFd()).$(", lastRequestBytesSent=").$(context.getLastRequestBytesSent()).$(", nCompletedRequests=").$(context.getNCompletedRequests() + 1)
.$(", totalBytesSent=").$(context.getTotalBytesSent()).$(']').$();
}
protected static void sendConfirmation(
JsonQueryProcessorState state,
RecordCursorFactory factory,
CompiledQuery cq,
CharSequence keepAliveHeader
) throws PeerDisconnectedException, PeerIsSlowToReadException {
final HttpConnectionContext context = state.getHttpConnectionContext();
try {
if (state.of(factory, sqlExecutionContext)) {
header(context.getChunkedResponseSocket(), 200, keepAliveHeader);
doResumeSend(state, context);
} else {
readyForNextRequest(context);
}
} catch (CairoException ex) {
state.setQueryCacheable(ex.isCacheable());
throw ex;
}
final HttpChunkedResponseSocket socket = context.getChunkedResponseSocket();
header(socket, keepAliveHeader);
socket.put('{').putQuoted("ddl").put(':').putQuoted("OK").put('}');
socket.sendChunk();
socket.done();
readyForNextRequest(context);
}
static void sendException(
HttpChunkedResponseSocket socket,
int position,
CharSequence message,
CharSequence query,
CharSequence keepAliveHeader
) throws PeerDisconnectedException, PeerIsSlowToReadException {
header(socket, keepAliveHeader);
JsonQueryProcessorState.prepareExceptionJson(socket, position, message, query);
}
private static void syntaxError(
HttpChunkedResponseSocket socket,
SqlException sqlException,
JsonQueryProcessorState state,
CharSequence keepAliveHeader
) throws PeerDisconnectedException, PeerIsSlowToReadException {
state.logSyntaxError(sqlException);
sendException(
socket,
sqlException.getPosition(),
sqlException.getFlyweightMessage(),
state.getQuery(),
keepAliveHeader
);
}
private void compileQuery(JsonQueryProcessorState state) throws SqlException, PeerDisconnectedException, PeerIsSlowToReadException {
......@@ -355,6 +302,16 @@ public class JsonQueryProcessor implements HttpRequestProcessor, Closeable {
);
}
private void executeCachedSelect(
JsonQueryProcessorState state,
RecordCursorFactory factory,
CharSequence keepAliveHeader
) throws PeerDisconnectedException, PeerIsSlowToReadException {
state.setCompilerNanos(0);
state.logExecuteCached();
executeSelect(state, factory, keepAliveHeader);
}
private void executeInsert(
JsonQueryProcessorState state,
CompiledQuery cc,
......@@ -381,6 +338,25 @@ public class JsonQueryProcessor implements HttpRequestProcessor, Closeable {
keepAliveHeader);
}
private void executeSelect(
JsonQueryProcessorState state,
RecordCursorFactory factory,
CharSequence keepAliveHeader
) throws PeerDisconnectedException, PeerIsSlowToReadException {
final HttpConnectionContext context = state.getHttpConnectionContext();
try {
if (state.of(factory, sqlExecutionContext)) {
header(context.getChunkedResponseSocket(), keepAliveHeader);
doResumeSend(state, context);
} else {
readyForNextRequest(context);
}
} catch (CairoException ex) {
state.setQueryCacheable(ex.isCacheable());
throw ex;
}
}
private void internalError(
HttpChunkedResponseSocket socket,
CharSequence message,
......@@ -392,7 +368,7 @@ public class JsonQueryProcessor implements HttpRequestProcessor, Closeable {
} else {
state.error().$("internal error [q=`").utf8(state.getQuery()).$("`, ex=").$(e).$(']').$();
}
sendException(socket, 0, message, 500, state.getQuery(), configuration.getKeepAliveHeader());
sendException(socket, 0, message, state.getQuery(), configuration.getKeepAliveHeader());
}
private boolean parseUrl(
......@@ -404,7 +380,7 @@ public class JsonQueryProcessor implements HttpRequestProcessor, Closeable {
final DirectByteCharSequence query = header.getUrlParam("query");
if (query == null || query.length() == 0) {
state.info().$("Empty query header received. Sending empty reply.").$();
sendException(state.getHttpConnectionContext().getChunkedResponseSocket(), 0, "No query text", 400, query, keepAliveHeader);
sendException(state.getHttpConnectionContext().getChunkedResponseSocket(), 0, "No query text", query, keepAliveHeader);
return false;
}
......@@ -444,7 +420,7 @@ public class JsonQueryProcessor implements HttpRequestProcessor, Closeable {
state.configure(header, query, skip, stop);
} catch (Utf8Exception e) {
state.info().$("Bad UTF8 encoding").$();
sendException(state.getHttpConnectionContext().getChunkedResponseSocket(), 0, "Bad UTF8 encoding in query text", 400, query, keepAliveHeader);
sendException(state.getHttpConnectionContext().getChunkedResponseSocket(), 0, "Bad UTF8 encoding in query text", query, keepAliveHeader);
return false;
}
return true;
......
......@@ -200,10 +200,6 @@ public class JsonQueryProcessorState implements Mutable, Closeable {
return query;
}
void setQueryCacheable(boolean queryCacheable) {
this.queryCacheable = queryCacheable;
}
public Rnd getRnd() {
return rnd;
}
......@@ -231,6 +227,10 @@ public class JsonQueryProcessorState implements Mutable, Closeable {
$(']').$();
}
public void logSyntaxError(SqlException e) {
info().$("syntax-error [q=`").utf8(query).$("`, at=").$(e.getPosition()).$(", message=`").utf8(e.getFlyweightMessage()).$('`').$(']').$();
}
public void logTimings() {
info().$("timings ").
$("[compiler: ").$(compilerNanos).
......@@ -240,10 +240,6 @@ public class JsonQueryProcessorState implements Mutable, Closeable {
$("`]").$();
}
public void logSyntaxError(SqlException e) {
info().$("syntax-error [q=`").utf8(query).$("`, at=").$(e.getPosition()).$(", message=`").utf8(e.getFlyweightMessage()).$('`').$(']').$();
}
public void setCompilerNanos(long compilerNanos) {
this.compilerNanos = compilerNanos;
}
......@@ -252,6 +248,16 @@ public class JsonQueryProcessorState implements Mutable, Closeable {
this.executeStartNanos = nanosecondClock.getTicks();
}
static void prepareExceptionJson(HttpChunkedResponseSocket socket, int position, CharSequence message, CharSequence query) throws PeerDisconnectedException, PeerIsSlowToReadException {
socket.put('{').
putQuoted("query").put(':').encodeUtf8AndQuote(query == null ? "" : query).put(',').
putQuoted("error").put(':').encodeUtf8AndQuote(message).put(',').
putQuoted("position").put(':').put(position);
socket.put('}');
socket.sendChunk();
socket.done();
}
private static void putStringOrNull(CharSink r, CharSequence str) {
if (str == null) {
r.put("null");
......@@ -336,12 +342,16 @@ public class JsonQueryProcessorState implements Mutable, Closeable {
socket.put('"').putISODate(t).put('"');
}
private static void putCursorValue(HttpChunkedResponseSocket socket, Record rec, int col) {
putStringOrNull(socket, null);
}
private boolean addColumnToOutput(RecordMetadata metadata, CharSequence columnNames, int start, int hi) throws PeerDisconnectedException, PeerIsSlowToReadException {
if (start == hi) {
info().$("empty column in list '").$(columnNames).$('\'').$();
HttpChunkedResponseSocket socket = getHttpConnectionContext().getChunkedResponseSocket();
JsonQueryProcessor.header(socket, 400, "");
JsonQueryProcessor.header(socket, "");
socket.put('{').
putQuoted("query").put(':').encodeUtf8AndQuote(query).put(',').
putQuoted("error").put(':').putQuoted("empty column in list");
......@@ -355,7 +365,7 @@ public class JsonQueryProcessorState implements Mutable, Closeable {
if (columnIndex == RecordMetadata.COLUMN_NOT_FOUND) {
info().$("invalid column in list: '").$(columnNames, start, hi).$('\'').$();
HttpChunkedResponseSocket socket = getHttpConnectionContext().getChunkedResponseSocket();
JsonQueryProcessor.header(socket, 400, "");
JsonQueryProcessor.header(socket, "");
socket.put('{').
putQuoted("query").put(':').encodeUtf8AndQuote(query).put(',').
putQuoted("error").put(':').put('\'').put("invalid column in list: ").put(columnNames, start, hi).put('\'');
......@@ -528,7 +538,7 @@ public class JsonQueryProcessorState implements Mutable, Closeable {
} catch (Utf8Exception e) {
info().$("utf8 error when decoding column list '").$(columnNames).$('\'').$();
HttpChunkedResponseSocket socket = getHttpConnectionContext().getChunkedResponseSocket();
JsonQueryProcessor.header(socket, 400, "");
JsonQueryProcessor.header(socket, "");
socket.put('{').
putQuoted("error").put(':').putQuoted("utf8 error in column list");
socket.put('}');
......@@ -713,10 +723,6 @@ public class JsonQueryProcessorState implements Mutable, Closeable {
putStrValue(socket, rec, columnSkewList.getQuick(col));
}
private static void putCursorValue(HttpChunkedResponseSocket socket, Record rec, int col) {
putStringOrNull(socket, null);
}
private void putSkewedSymValue(HttpChunkedResponseSocket socket, Record rec, int col) {
putSymValue(socket, rec, columnSkewList.getQuick(col));
}
......@@ -729,6 +735,10 @@ public class JsonQueryProcessorState implements Mutable, Closeable {
resumeActions.getQuick(queryState).onResume(socket, columnCount);
}
void setQueryCacheable(boolean queryCacheable) {
this.queryCacheable = queryCacheable;
}
@FunctionalInterface
interface StateResumeAction {
void onResume(
......
......@@ -68,7 +68,7 @@ public class TableStatusCheckProcessor implements HttpRequestProcessor, Closeabl
public void onRequestComplete(HttpConnectionContext context) throws PeerDisconnectedException, PeerIsSlowToReadException {
CharSequence tableName = context.getRequestHeader().getUrlParam("j");
if (tableName == null) {
context.simpleResponse().sendStatus(400, "table name missing");
context.simpleResponse().sendStatus(200, "table name missing");
} else {
int check = cairoEngine.getStatus(context.getCairoSecurityContext(), path, tableName);
if (Chars.equalsNc("json", context.getRequestHeader().getUrlParam("f"))) {
......
......@@ -62,12 +62,6 @@ public class TextImportProcessor implements HttpRequestProcessor, HttpMultipartC
// processor. For different threads to lookup the same value from local value map the key,
// which is LV, has to be the same between processor instances
private static final LocalValue<TextImportProcessorState> LV = new LocalValue<>();
static {
atomicityParamMap.put("skipRow", Atomicity.SKIP_ROW);
atomicityParamMap.put("abort", Atomicity.SKIP_ALL);
}
private final CairoEngine engine;
private HttpConnectionContext transientContext;
private TextImportProcessorState transientState;
......@@ -76,6 +70,129 @@ public class TextImportProcessor implements HttpRequestProcessor, HttpMultipartC
this.engine = cairoEngine;
}
@Override
public void close() {
}
@Override
public void onChunk(long lo, long hi)
throws PeerDisconnectedException, PeerIsSlowToReadException, ServerDisconnectException {
if (hi > lo) {
try {
transientState.textLoader.parse(lo, hi, transientContext.getCairoSecurityContext());
if (transientState.messagePart == MESSAGE_DATA && !transientState.analysed) {
transientState.analysed = true;
transientState.textLoader.setState(TextLoader.LOAD_DATA);
}
} catch (TextException | CairoException | CairoError e) {
handleTextException(e);
}
}
}
@Override
public void onPartBegin(HttpRequestHeader partHeader) throws PeerDisconnectedException, PeerIsSlowToReadException, ServerDisconnectException {
final CharSequence contentDisposition = partHeader.getContentDispositionName();
LOG.debug().$("part begin [name=").$(contentDisposition).$(']').$();
if (Chars.equalsNc("data", contentDisposition)) {
final HttpRequestHeader rh = transientContext.getRequestHeader();
CharSequence name = rh.getUrlParam("name");
if (name == null) {
name = partHeader.getContentDispositionFilename();
}
if (name == null) {
sendError("no file name given");
throw ServerDisconnectException.INSTANCE;
}
CharSequence partitionedBy = rh.getUrlParam("partitionBy");
if (partitionedBy == null) {
partitionedBy = "NONE";
}
int partitionBy = PartitionBy.fromString(partitionedBy);
if (partitionBy == -1) {
sendError("invalid partitionBy");
throw ServerDisconnectException.INSTANCE;
}
CharSequence timestampIndexCol = rh.getUrlParam("timestamp");
if (partitionBy != PartitionBy.NONE && timestampIndexCol == null) {
sendError("when specifying partitionBy you must also specify timestamp");
throw ServerDisconnectException.INSTANCE;
}
transientState.analysed = false;
transientState.textLoader.configureDestination(
name,
Chars.equalsNc("true", rh.getUrlParam("overwrite")),
Chars.equalsNc("true", rh.getUrlParam("durable")),
getAtomicity(rh.getUrlParam("atomicity")),
partitionBy,
timestampIndexCol
);
transientState.textLoader.setForceHeaders(Chars.equalsNc("true", rh.getUrlParam("forceHeader")));
transientState.textLoader.setSkipRowsWithExtraValues(Chars.equalsNc("true", rh.getUrlParam("skipLev")));
transientState.textLoader.setState(TextLoader.ANALYZE_STRUCTURE);
transientState.forceHeader = Chars.equalsNc("true", rh.getUrlParam("forceHeader"));
transientState.messagePart = MESSAGE_DATA;
} else if (Chars.equalsNc("schema", contentDisposition)) {
transientState.textLoader.setState(TextLoader.LOAD_JSON_METADATA);
transientState.messagePart = MESSAGE_SCHEMA;
} else {
if (partHeader.getContentDisposition() == null) {
sendError("'Content-Disposition' multipart header missing'");
} else {
sendError("invalid value in 'Content-Disposition' multipart header");
}
throw ServerDisconnectException.INSTANCE;
}
}
// This processor implements HttpMultipartContentListener, methods of which
// have neither context nor dispatcher. During "chunk" processing we may need
// to send something back to client, or disconnect them. To do that we need
// these transient references. resumeRecv() will set them and they will remain
// valid during multipart events.
@Override
public void onPartEnd() throws PeerDisconnectedException, PeerIsSlowToReadException, ServerDisconnectException {
try {
LOG.debug().$("part end").$();
transientState.textLoader.wrapUp();
if (transientState.messagePart == MESSAGE_DATA) {
sendResponse(transientContext);
}
} catch (TextException | CairoException | CairoError e) {
handleTextException(e);
}
}
@Override
public void onRequestComplete(HttpConnectionContext context) {
transientState.clear();
}
@Override
public void resumeRecv(HttpConnectionContext context) {
this.transientContext = context;
this.transientState = LV.get(context);
if (this.transientState == null) {
LOG.debug().$("new text state").$();
LV.set(context, this.transientState = new TextImportProcessorState(engine));
transientState.json = isJson(context);
}
}
@Override
public void resumeSend(
HttpConnectionContext context
) throws PeerDisconnectedException, PeerIsSlowToReadException, ServerDisconnectException {
doResumeSend(LV.get(context), context.getChunkedResponseSocket());
}
private static void resumeJson(TextImportProcessorState state, HttpChunkedResponseSocket socket) throws PeerDisconnectedException, PeerIsSlowToReadException {
final TextLoader textLoader = state.textLoader;
final RecordMetadata metadata = textLoader.getMetadata();
......@@ -141,12 +258,6 @@ public class TextImportProcessor implements HttpRequestProcessor, HttpMultipartC
return b;
}
// This processor implements HttpMultipartContentListener, methods of which
// have neither context nor dispatcher. During "chunk" processing we may need
// to send something back to client, or disconnect them. To do that we need
// these transient references. resumeRecv() will set them and they will remain
// valid during multipart events.
private static void pad(CharSink b, int w, long value) {
int len = (int) Math.log10(value);
if (len < 0) {
......@@ -248,122 +359,6 @@ public class TextImportProcessor implements HttpRequestProcessor, HttpMultipartC
return atomicity == -1 ? Atomicity.SKIP_COL : atomicity;
}
@Override
public void close() {
}
@Override
public void onChunk(long lo, long hi)
throws PeerDisconnectedException, PeerIsSlowToReadException, ServerDisconnectException {
if (hi > lo) {
try {
transientState.textLoader.parse(lo, hi, transientContext.getCairoSecurityContext());
if (transientState.messagePart == MESSAGE_DATA && !transientState.analysed) {
transientState.analysed = true;
transientState.textLoader.setState(TextLoader.LOAD_DATA);
}
} catch (TextException | CairoException | CairoError e) {
handleTextException(e);
}
}
}
@Override
public void onPartBegin(HttpRequestHeader partHeader) throws PeerDisconnectedException, PeerIsSlowToReadException, ServerDisconnectException {
final CharSequence contentDisposition = partHeader.getContentDispositionName();
LOG.debug().$("part begin [name=").$(contentDisposition).$(']').$();
if (Chars.equalsNc("data", contentDisposition)) {
final HttpRequestHeader rh = transientContext.getRequestHeader();
CharSequence name = rh.getUrlParam("name");
if (name == null) {
name = partHeader.getContentDispositionFilename();
}
if (name == null) {
transientContext.simpleResponse().sendStatus(400, "no file name given");
throw ServerDisconnectException.INSTANCE;
}
CharSequence partitionedBy = rh.getUrlParam("partitionBy");
if (partitionedBy == null) {
partitionedBy = "NONE";
}
int partitionBy = PartitionBy.fromString(partitionedBy);
if (partitionBy == -1) {
transientContext.simpleResponse().sendStatus(400, "invalid partitionBy");
throw ServerDisconnectException.INSTANCE;
}
CharSequence timestampIndexCol = rh.getUrlParam("timestamp");
if (partitionBy != PartitionBy.NONE && timestampIndexCol == null) {
transientContext.simpleResponse().sendStatus(400, "when specifying partitionBy you must also specify timestamp");
throw ServerDisconnectException.INSTANCE;
}
transientState.analysed = false;
transientState.textLoader.configureDestination(
name,
Chars.equalsNc("true", rh.getUrlParam("overwrite")),
Chars.equalsNc("true", rh.getUrlParam("durable")),
getAtomicity(rh.getUrlParam("atomicity")),
partitionBy,
timestampIndexCol
);
transientState.textLoader.setForceHeaders(Chars.equalsNc("true", rh.getUrlParam("forceHeader")));
transientState.textLoader.setSkipRowsWithExtraValues(Chars.equalsNc("true", rh.getUrlParam("skipLev")));
transientState.textLoader.setState(TextLoader.ANALYZE_STRUCTURE);
transientState.forceHeader = Chars.equalsNc("true", rh.getUrlParam("forceHeader"));
transientState.messagePart = MESSAGE_DATA;
} else if (Chars.equalsNc("schema", contentDisposition)) {
transientState.textLoader.setState(TextLoader.LOAD_JSON_METADATA);
transientState.messagePart = MESSAGE_SCHEMA;
} else {
if (partHeader.getContentDisposition() == null) {
transientContext.simpleResponse().sendStatus(400, "'Content-Disposition' multipart header missing'");
} else {
transientContext.simpleResponse().sendStatus(400, "invalid value in 'Content-Disposition' multipart header");
}
throw ServerDisconnectException.INSTANCE;
}
}
@Override
public void onPartEnd() throws PeerDisconnectedException, PeerIsSlowToReadException, ServerDisconnectException {
try {
LOG.debug().$("part end").$();
transientState.textLoader.wrapUp();
if (transientState.messagePart == MESSAGE_DATA) {
sendResponse(transientContext);
}
} catch (TextException | CairoException | CairoError e) {
handleTextException(e);
}
}
@Override
public void onRequestComplete(HttpConnectionContext context) {
transientState.clear();
}
@Override
public void resumeRecv(HttpConnectionContext context) {
this.transientContext = context;
this.transientState = LV.get(context);
if (this.transientState == null) {
LOG.debug().$("new text state").$();
LV.set(context, this.transientState = new TextImportProcessorState(engine));
transientState.json = Chars.equalsNc("json", context.getRequestHeader().getUrlParam("fmt"));
}
}
@Override
public void resumeSend(
HttpConnectionContext context
) throws PeerDisconnectedException, PeerIsSlowToReadException, ServerDisconnectException {
doResumeSend(LV.get(context), context.getChunkedResponseSocket());
}
private void doResumeSend(
TextImportProcessorState state,
HttpChunkedResponseSocket socket
......@@ -392,10 +387,18 @@ public class TextImportProcessor implements HttpRequestProcessor, HttpMultipartC
private void handleTextException(FlyweightMessageContainer e)
throws PeerDisconnectedException, PeerIsSlowToReadException, ServerDisconnectException {
sendError(transientContext, e.getFlyweightMessage(), Chars.equalsNc("json", transientContext.getRequestHeader().getUrlParam("fmt")));
sendError(transientContext, e.getFlyweightMessage(), isJson(transientContext));
throw ServerDisconnectException.INSTANCE;
}
private boolean isJson(HttpConnectionContext transientContext) {
return Chars.equalsNc("json", transientContext.getRequestHeader().getUrlParam("fmt"));
}
private void sendError(CharSequence message) throws PeerDisconnectedException, PeerIsSlowToReadException {
sendError(transientContext, message, transientState.json);
}
private void sendError(
HttpConnectionContext context,
CharSequence message,
......@@ -403,11 +406,11 @@ public class TextImportProcessor implements HttpRequestProcessor, HttpMultipartC
) throws PeerDisconnectedException, PeerIsSlowToReadException {
final HttpChunkedResponseSocket socket = context.getChunkedResponseSocket();
if (json) {
socket.status(400, CONTENT_TYPE_JSON);
socket.status(200, CONTENT_TYPE_JSON);
socket.sendHeader();
socket.put('{').putQuoted("status").put(':').encodeUtf8AndQuote(message).put('}');
} else {
socket.status(400, CONTENT_TYPE_TEXT);
socket.status(200, CONTENT_TYPE_TEXT);
socket.sendHeader();
socket.encodeUtf8(message);
}
......@@ -431,4 +434,9 @@ public class TextImportProcessor implements HttpRequestProcessor, HttpMultipartC
sendError(context, state.stateMessage, state.json);
}
}
static {
atomicityParamMap.put("skipRow", Atomicity.SKIP_ROW);
atomicityParamMap.put("abort", Atomicity.SKIP_ALL);
}
}
......@@ -125,7 +125,7 @@ public class TextQueryProcessor implements HttpRequestProcessor, Closeable {
try {
state.cursor = state.recordCursorFactory.getCursor(sqlExecutionContext);
state.metadata = state.recordCursorFactory.getMetadata();
header(context.getChunkedResponseSocket(), 200);
header(context.getChunkedResponseSocket());
resumeSend(context);
} catch (CairoException e) {
state.setQueryCacheable(e.isCacheable());
......@@ -134,7 +134,7 @@ public class TextQueryProcessor implements HttpRequestProcessor, Closeable {
internalError(context.getChunkedResponseSocket(), e, state);
}
} else {
header(context.getChunkedResponseSocket(), 200);
header(context.getChunkedResponseSocket());
sendConfirmation(context.getChunkedResponseSocket());
readyForNextRequest(context);
}
......@@ -299,11 +299,8 @@ public class TextQueryProcessor implements HttpRequestProcessor, Closeable {
return LOG.error().$('[').$(state.getFd()).$("] ");
}
protected void header(
HttpChunkedResponseSocket socket,
int status
) throws PeerDisconnectedException, PeerIsSlowToReadException {
socket.status(status, "text/csv; charset=utf-8");
protected void header(HttpChunkedResponseSocket socket) throws PeerDisconnectedException, PeerIsSlowToReadException {
socket.status(200, "text/csv; charset=utf-8");
socket.headers().put("Content-Disposition: attachment; filename=\"questdb-query-").put(clock.getTicks()).put(".csv\"").put(Misc.EOL);
socket.headers().setKeepAlive(configuration.getKeepAliveHeader());
socket.sendHeader();
......@@ -319,7 +316,7 @@ public class TextQueryProcessor implements HttpRequestProcessor, Closeable {
TextQueryProcessorState state
) throws PeerDisconnectedException, PeerIsSlowToReadException {
error(state).$("Server error executing query ").utf8(state.query).$(e).$();
sendException(socket, 0, e.getMessage(), 500, state.query);
sendException(socket, 0, e.getMessage(), state.query);
}
private boolean parseUrl(
......@@ -331,7 +328,7 @@ public class TextQueryProcessor implements HttpRequestProcessor, Closeable {
final DirectByteCharSequence query = request.getUrlParam("query");
if (query == null || query.length() == 0) {
info(state).$("Empty query request received. Sending empty reply.").$();
sendException(socket, 0, "No query text", 400, state.query);
sendException(socket, 0, "No query text", state.query);
return false;
}
......@@ -372,7 +369,7 @@ public class TextQueryProcessor implements HttpRequestProcessor, Closeable {
TextUtil.utf8Decode(query.getLo(), query.getHi(), state.query);
} catch (Utf8Exception e) {
info(state).$("Bad UTF8 encoding").$();
sendException(socket, 0, "Bad UTF8 encoding in query text", 400, state.query);
sendException(socket, 0, "Bad UTF8 encoding in query text", state.query);
return false;
}
state.skip = skip;
......@@ -473,17 +470,10 @@ public class TextQueryProcessor implements HttpRequestProcessor, Closeable {
HttpChunkedResponseSocket socket,
int position,
CharSequence message,
int status,
CharSequence query
) throws PeerDisconnectedException, PeerIsSlowToReadException {
header(socket, status);
socket.put('{').
putQuoted("query").put(':').encodeUtf8AndQuote(query == null ? "" : query).put(',').
putQuoted("error").put(':').encodeUtf8AndQuote(message).put(',').
putQuoted("position").put(':').put(position);
socket.put('}');
socket.sendChunk();
socket.done();
header(socket);
JsonQueryProcessorState.prepareExceptionJson(socket, position, message, query);
}
private void syntaxError(
......@@ -496,6 +486,6 @@ public class TextQueryProcessor implements HttpRequestProcessor, Closeable {
.$("`, at=").$(sqlException.getPosition())
.$(", message=`").$(sqlException.getFlyweightMessage()).$('`')
.$(']').$();
sendException(socket, sqlException.getPosition(), sqlException.getFlyweightMessage(), 400, state.query);
sendException(socket, sqlException.getPosition(), sqlException.getFlyweightMessage(), state.query);
}
}
......@@ -26,6 +26,7 @@ type RawDqlResult = {
count: number
dataset: any[][]
ddl: undefined
error: undefined
query: string
timings: Timings
}
......@@ -35,7 +36,8 @@ type RawDdlResult = {
}
type RawErrorResult = {
error: string
ddl: undefined
error: "<error message>"
position: number
query: string
}
......@@ -45,7 +47,7 @@ type DdlResult = {
type: Type.DDL
}
type RawResult = RawDqlResult | RawDdlResult
type RawResult = RawDqlResult | RawDdlResult | RawErrorResult
export type ErrorResult = RawErrorResult & {
type: Type.ERROR
......@@ -200,6 +202,14 @@ export class Client {
}
}
if (data.error) {
// eslint-disable-next-line prefer-promise-reject-errors
return Promise.reject({
...data,
type: Type.ERROR,
})
}
return {
...data,
timings: {
......@@ -210,19 +220,9 @@ export class Client {
}
}
if (/quest/.test(response.headers.get("server") || "")) {
const data = (await response.json()) as RawErrorResult
// eslint-disable-next-line prefer-promise-reject-errors
return Promise.reject({
...data,
type: Type.ERROR,
})
}
// eslint-disable-next-line prefer-promise-reject-errors
return Promise.reject({
error: "QuestDB is not reachable",
error: `QuestDB is not reachable [${response.status}]`,
position: -1,
query,
type: Type.ERROR,
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册