未验证 提交 bd4da5b8 编写于 作者: V Vlad Ilyushchenko 提交者: GitHub

chore: excluding test function factories from production run. Added handling...

chore: excluding test function factories from production run. Added handling of unexpected exceptions. Fixed #367 (#392)
上级 359601fd
......@@ -949,6 +949,11 @@ public class PropServerConfiguration implements ServerConfiguration {
return indexValueBlockSize;
}
@Override
public boolean enableTestFactories() {
return false;
}
@Override
public int getDoubleToStrCastScale() {
return doubleToStrCastScale;
......
......@@ -161,4 +161,6 @@ public interface CairoConfiguration {
int getGroupByPoolCapacity();
int getGroupByMapCapacity();
boolean enableTestFactories();
}
......@@ -24,7 +24,9 @@
package io.questdb.cairo;
public class CairoError extends Error {
import io.questdb.std.FlyweightMessageContainer;
public class CairoError extends Error implements FlyweightMessageContainer {
public CairoError(Throwable cause) {
super(cause);
}
......@@ -32,4 +34,9 @@ public class CairoError extends Error {
public CairoError(String message) {
super(message);
}
@Override
public CharSequence getFlyweightMessage() {
return getMessage();
}
}
......@@ -24,12 +24,13 @@
package io.questdb.cairo;
import io.questdb.std.FlyweightMessageContainer;
import io.questdb.std.Sinkable;
import io.questdb.std.ThreadLocal;
import io.questdb.std.str.CharSink;
import io.questdb.std.str.StringSink;
public class CairoException extends RuntimeException implements Sinkable {
public class CairoException extends RuntimeException implements Sinkable, FlyweightMessageContainer {
private static final ThreadLocal<CairoException> tlException = new ThreadLocal<>(CairoException::new);
protected final StringSink message = new StringSink();
private int errno;
......@@ -63,6 +64,7 @@ public class CairoException extends RuntimeException implements Sinkable {
return this;
}
@Override
public CharSequence getFlyweightMessage() {
return message;
}
......
......@@ -52,6 +52,11 @@ public class DefaultCairoConfiguration implements CairoConfiguration {
return 16;
}
@Override
public boolean enableTestFactories() {
return true;
}
@Override
public int getCreateAsSelectRetryCount() {
return 5;
......
......@@ -28,14 +28,7 @@ import io.questdb.cairo.CairoSecurityContext;
import io.questdb.cairo.security.CairoSecurityContextImpl;
import io.questdb.log.Log;
import io.questdb.log.LogFactory;
import io.questdb.network.IOContext;
import io.questdb.network.IODispatcher;
import io.questdb.network.IOOperation;
import io.questdb.network.Net;
import io.questdb.network.NetworkFacade;
import io.questdb.network.PeerDisconnectedException;
import io.questdb.network.PeerIsSlowToReadException;
import io.questdb.network.ServerDisconnectException;
import io.questdb.network.*;
import io.questdb.std.Chars;
import io.questdb.std.Mutable;
import io.questdb.std.ObjectPool;
......@@ -177,7 +170,7 @@ public class HttpConnectionContext implements IOContext, Locality, Mutable {
return responseSink.getSimple();
}
private void completeRequest(HttpRequestProcessor processor) throws PeerDisconnectedException, PeerIsSlowToReadException {
private void completeRequest(HttpRequestProcessor processor) throws PeerDisconnectedException, PeerIsSlowToReadException, ServerDisconnectException {
LOG.debug().$("complete [fd=").$(fd).$(']').$();
processor.onRequestComplete(this);
}
......
......@@ -24,12 +24,13 @@
package io.questdb.cutlass.http;
import io.questdb.std.FlyweightMessageContainer;
import io.questdb.std.Sinkable;
import io.questdb.std.ThreadLocal;
import io.questdb.std.str.CharSink;
import io.questdb.std.str.StringSink;
public class HttpException extends RuntimeException implements Sinkable {
public class HttpException extends RuntimeException implements Sinkable, FlyweightMessageContainer {
private static final ThreadLocal<HttpException> tlException = new ThreadLocal<>(HttpException::new);
private final StringSink message = new StringSink();
......@@ -41,7 +42,7 @@ public class HttpException extends RuntimeException implements Sinkable {
}
public CharSequence getFlyweightMessage() {
return tlException.get().getFlyweightMessage();
return message;
}
@Override
......
......@@ -31,7 +31,7 @@ import io.questdb.network.ServerDisconnectException;
public interface HttpRequestProcessor {
void onHeadersReady(HttpConnectionContext context);
void onRequestComplete(HttpConnectionContext context) throws PeerDisconnectedException, PeerIsSlowToReadException;
void onRequestComplete(HttpConnectionContext context) throws PeerDisconnectedException, PeerIsSlowToReadException, ServerDisconnectException;
default void resumeRecv(HttpConnectionContext context) {
}
......
......@@ -24,10 +24,6 @@
package io.questdb.cutlass.http.processors;
import java.io.Closeable;
import org.jetbrains.annotations.Nullable;
import io.questdb.MessageBus;
import io.questdb.cairo.CairoEngine;
import io.questdb.cairo.CairoError;
......@@ -36,11 +32,7 @@ import io.questdb.cairo.sql.InsertMethod;
import io.questdb.cairo.sql.InsertStatement;
import io.questdb.cairo.sql.ReaderOutOfDateException;
import io.questdb.cairo.sql.RecordCursorFactory;
import io.questdb.cutlass.http.HttpChunkedResponseSocket;
import io.questdb.cutlass.http.HttpConnectionContext;
import io.questdb.cutlass.http.HttpRequestHeader;
import io.questdb.cutlass.http.HttpRequestProcessor;
import io.questdb.cutlass.http.LocalValue;
import io.questdb.cutlass.http.*;
import io.questdb.cutlass.text.Utf8Exception;
import io.questdb.griffin.CompiledQuery;
import io.questdb.griffin.SqlCompiler;
......@@ -48,17 +40,12 @@ import io.questdb.griffin.SqlException;
import io.questdb.griffin.SqlExecutionContextImpl;
import io.questdb.log.Log;
import io.questdb.log.LogFactory;
import io.questdb.network.IOOperation;
import io.questdb.network.NoSpaceLeftInResponseBufferException;
import io.questdb.network.PeerDisconnectedException;
import io.questdb.network.PeerIsSlowToReadException;
import io.questdb.std.Chars;
import io.questdb.std.Misc;
import io.questdb.std.NanosecondClock;
import io.questdb.std.Numbers;
import io.questdb.std.NumericException;
import io.questdb.std.ObjList;
import io.questdb.network.*;
import io.questdb.std.*;
import io.questdb.std.str.DirectByteCharSequence;
import org.jetbrains.annotations.Nullable;
import java.io.Closeable;
public class JsonQueryProcessor implements HttpRequestProcessor, Closeable {
private static final LocalValue<JsonQueryProcessorState> LV = new LocalValue<>();
......@@ -102,7 +89,7 @@ public class JsonQueryProcessor implements HttpRequestProcessor, Closeable {
Misc.free(compiler);
}
public void execute0(JsonQueryProcessorState state) throws PeerDisconnectedException, PeerIsSlowToReadException {
public void execute0(JsonQueryProcessorState state) throws PeerDisconnectedException, PeerIsSlowToReadException, ServerDisconnectException {
state.startExecutionTimer();
final HttpConnectionContext context = state.getHttpConnectionContext();
// do not set random for new request to avoid copying random from previous request into next one
......@@ -129,9 +116,12 @@ public class JsonQueryProcessor implements HttpRequestProcessor, Closeable {
} catch (SqlException e) {
syntaxError(context.getChunkedResponseSocket(), e, state, configuration.getKeepAliveHeader());
readyForNextRequest(context);
} catch (CairoException | CairoError e) {
internalError(context.getChunkedResponseSocket(), e, state);
} catch (CairoError | CairoException e) {
internalError(context.getChunkedResponseSocket(), e.getFlyweightMessage(), e, state);
readyForNextRequest(context);
} catch (Throwable e) {
LOG.error().$("Uh-oh. Error!").$(e).$();
throw ServerDisconnectException.INSTANCE;
}
}
......@@ -142,7 +132,7 @@ public class JsonQueryProcessor implements HttpRequestProcessor, Closeable {
@Override
public void onRequestComplete(
HttpConnectionContext context
) throws PeerDisconnectedException, PeerIsSlowToReadException {
) throws PeerDisconnectedException, PeerIsSlowToReadException, ServerDisconnectException {
JsonQueryProcessorState state = LV.get(context);
if (state == null) {
LV.set(context, state = new JsonQueryProcessorState(
......@@ -360,11 +350,12 @@ public class JsonQueryProcessor implements HttpRequestProcessor, Closeable {
private void internalError(
HttpChunkedResponseSocket socket,
CharSequence message,
Throwable e,
JsonQueryProcessorState state
) throws PeerDisconnectedException, PeerIsSlowToReadException {
state.error().$("Server error executing query ").utf8(state.getQuery()).$(e).$();
sendException(socket, 0, e.getMessage(), 500, state.getQuery(), configuration.getKeepAliveHeader());
state.error().$("internal error [q=`").utf8(state.getQuery()).$("`, ex=").$(e).$();
sendException(socket, 0, message, 500, state.getQuery(), configuration.getKeepAliveHeader());
}
private boolean parseUrl(
......
......@@ -24,9 +24,7 @@
package io.questdb.cutlass.http.processors;
import io.questdb.cairo.CairoEngine;
import io.questdb.cairo.ColumnType;
import io.questdb.cairo.PartitionBy;
import io.questdb.cairo.*;
import io.questdb.cairo.sql.RecordMetadata;
import io.questdb.cutlass.http.*;
import io.questdb.cutlass.text.Atomicity;
......@@ -35,10 +33,7 @@ import io.questdb.cutlass.text.TextLoader;
import io.questdb.log.Log;
import io.questdb.log.LogFactory;
import io.questdb.network.*;
import io.questdb.std.CharSequenceIntHashMap;
import io.questdb.std.Chars;
import io.questdb.std.LongList;
import io.questdb.std.Misc;
import io.questdb.std.*;
import io.questdb.std.str.CharSink;
import java.io.Closeable;
......@@ -264,7 +259,7 @@ public class TextImportProcessor implements HttpRequestProcessor, HttpMultipartC
transientState.analysed = true;
transientState.textLoader.setState(TextLoader.LOAD_DATA);
}
} catch (TextException e) {
} catch (TextException | CairoException | CairoError e) {
handleTextException(e);
}
}
......@@ -338,7 +333,7 @@ public class TextImportProcessor implements HttpRequestProcessor, HttpMultipartC
if (transientState.messagePart == MESSAGE_DATA) {
sendResponse(transientContext);
}
} catch (TextException e) {
} catch (TextException | CairoException | CairoError e) {
handleTextException(e);
}
}
......@@ -398,7 +393,7 @@ public class TextImportProcessor implements HttpRequestProcessor, HttpMultipartC
state.clear();
}
private void handleTextException(TextException e)
private void handleTextException(FlyweightMessageContainer e)
throws PeerDisconnectedException, PeerIsSlowToReadException, ServerDisconnectException {
sendError(transientContext, e.getFlyweightMessage(), Chars.equalsNc("json", transientContext.getRequestHeader().getUrlParam("fmt")));
throw ServerDisconnectException.INSTANCE;
......
......@@ -24,12 +24,13 @@
package io.questdb.cutlass.json;
import io.questdb.std.FlyweightMessageContainer;
import io.questdb.std.Sinkable;
import io.questdb.std.ThreadLocal;
import io.questdb.std.str.CharSink;
import io.questdb.std.str.StringSink;
public class JsonException extends Exception implements Sinkable {
public class JsonException extends Exception implements Sinkable, FlyweightMessageContainer {
private static final ThreadLocal<JsonException> tlException = new ThreadLocal<>(JsonException::new);
private final StringSink message = new StringSink();
private int position;
......@@ -45,6 +46,7 @@ public class JsonException extends Exception implements Sinkable {
return ex;
}
@Override
public CharSequence getFlyweightMessage() {
return message;
}
......
......@@ -24,13 +24,14 @@
package io.questdb.cutlass.text;
import io.questdb.std.FlyweightMessageContainer;
import io.questdb.std.Numbers;
import io.questdb.std.Sinkable;
import io.questdb.std.ThreadLocal;
import io.questdb.std.str.CharSink;
import io.questdb.std.str.StringSink;
public class TextException extends Exception implements Sinkable {
public class TextException extends Exception implements Sinkable, FlyweightMessageContainer {
private static final ThreadLocal<TextException> tlException = new ThreadLocal<>(TextException::new);
private final StringSink message = new StringSink();
......@@ -42,6 +43,7 @@ public class TextException extends Exception implements Sinkable {
return te;
}
@Override
public CharSequence getFlyweightMessage() {
return message;
}
......
......@@ -81,7 +81,7 @@ public class FunctionParser implements PostOrderTreeTraversalAlgo.Visitor {
public FunctionParser(CairoConfiguration configuration, Iterable<FunctionFactory> functionFactories) {
this.configuration = configuration;
loadFunctionFactories(functionFactories);
loadFunctionFactories(functionFactories, configuration.enableTestFactories());
}
public static int getArgType(char c) {
......@@ -204,14 +204,14 @@ public class FunctionParser implements PostOrderTreeTraversalAlgo.Visitor {
return ex;
}
private static SqlException invalidArgument(CharSequence message, ExpressionNode node, ObjList<Function> args, CharSequence expected, int offset, int count) {
private static SqlException invalidArgument(ExpressionNode node, ObjList<Function> args, CharSequence expected, int offset, int count) {
SqlException ex = SqlException.position(node.position);
ex.put(message);
ex.put("unexpected argument for function: ");
ex.put(node.token);
ex.put(". expected args: ");
ex.put('(');
if (expected != null) {
for (int i = 0, n = count; i < n; i++) {
for (int i = 0; i < count; i++) {
if (i > 0) {
ex.put(',');
}
......@@ -670,9 +670,9 @@ public class FunctionParser implements PostOrderTreeTraversalAlgo.Visitor {
if (candidateSignature != null) {
final int sigArgOffset = candidateSignature.indexOf('(') + 1;
int sigArgCount = candidateSignature.length() - 1 - sigArgOffset;
throw invalidArgument("unexpected argument for function: ", node, args, candidateSignature, sigArgOffset, sigArgCount);
throw invalidArgument(node, args, candidateSignature, sigArgOffset, sigArgCount);
} else {
throw invalidArgument("unexpected argument for function: ", node, args, "", 0, 0);
throw invalidArgument(node, args, "", 0, 0);
}
}
......@@ -815,41 +815,43 @@ public class FunctionParser implements PostOrderTreeTraversalAlgo.Visitor {
addFactoryToList(extraList, name, factory);
}
private void loadFunctionFactories(Iterable<FunctionFactory> functionFactories) {
private void loadFunctionFactories(Iterable<FunctionFactory> functionFactories, boolean enableTestFactories) {
LOG.info().$("loading functions [test=").$(enableTestFactories).$(']').$();
for (FunctionFactory factory : functionFactories) {
if (!factory.getClass().getName().contains("test") || enableTestFactories) {
final String sig = factory.getSignature();
final int openBraceIndex;
try {
openBraceIndex = validateSignatureAndGetNameSeparator(sig);
} catch (SqlException e) {
LOG.error().$((Sinkable) e).$(" [signature=").$(factory.getSignature()).$(",class=").$(factory.getClass().getName()).$(']').$();
continue;
}
final String sig = factory.getSignature();
final int openBraceIndex;
try {
openBraceIndex = validateSignatureAndGetNameSeparator(sig);
} catch (SqlException e) {
LOG.error().$((Sinkable) e).$(" [signature=").$(factory.getSignature()).$(",class=").$(factory.getClass().getName()).$(']').$();
continue;
}
final String name = sig.substring(0, openBraceIndex);
addFactoryToList(factories, name, factory);
// Add != counterparts to equality function factories
if (factory instanceof AbstractBooleanFunctionFactory) {
switch (name) {
case "=":
addFactory(booleanFactories, "!=", factory);
break;
case "<":
// `a < b` == `a >= b`
addFactory(booleanFactories, ">=", factory);
if (sig.charAt(2) == sig.charAt(3)) {
// `a < b` == `b > a`
addFactory(commutativeBooleanFactories, ">", factory);
// `a < b` == `b > a` == `b <= a`
addFactory(booleanFactories, "<=", factory);
addFactory(commutativeBooleanFactories, "<=", factory);
}
break;
final String name = sig.substring(0, openBraceIndex);
addFactoryToList(factories, name, factory);
// Add != counterparts to equality function factories
if (factory instanceof AbstractBooleanFunctionFactory) {
switch (name) {
case "=":
addFactory(booleanFactories, "!=", factory);
break;
case "<":
// `a < b` == `a >= b`
addFactory(booleanFactories, ">=", factory);
if (sig.charAt(2) == sig.charAt(3)) {
// `a < b` == `b > a`
addFactory(commutativeBooleanFactories, ">", factory);
// `a < b` == `b > a` == `b <= a`
addFactory(booleanFactories, "<=", factory);
addFactory(commutativeBooleanFactories, "<=", factory);
}
break;
}
} else if (factory.isGroupBy()) {
groupByFunctionNames.add(name);
}
} else if (factory.isGroupBy()) {
groupByFunctionNames.add(name);
}
}
}
......
......@@ -25,12 +25,13 @@
package io.questdb.griffin;
import io.questdb.cairo.ColumnType;
import io.questdb.std.FlyweightMessageContainer;
import io.questdb.std.Sinkable;
import io.questdb.std.ThreadLocal;
import io.questdb.std.str.CharSink;
import io.questdb.std.str.StringSink;
public class SqlException extends Exception implements Sinkable {
public class SqlException extends Exception implements Sinkable, FlyweightMessageContainer {
private static final ThreadLocal<SqlException> tlException = new ThreadLocal<>(SqlException::new);
private final StringSink message = new StringSink();
private int position;
......@@ -75,6 +76,7 @@ public class SqlException extends Exception implements Sinkable {
.put(", to=").put(toName).put(']');
}
@Override
public CharSequence getFlyweightMessage() {
return message;
}
......
......@@ -24,12 +24,13 @@
package io.questdb.griffin.engine.functions.bind;
import io.questdb.std.FlyweightMessageContainer;
import io.questdb.std.Sinkable;
import io.questdb.std.ThreadLocal;
import io.questdb.std.str.CharSink;
import io.questdb.std.str.StringSink;
public class BindException extends RuntimeException implements Sinkable {
public class BindException extends RuntimeException implements Sinkable, FlyweightMessageContainer {
private static final ThreadLocal<BindException> tlException = new ThreadLocal<>(BindException::new);
private final StringSink message = new StringSink();
......@@ -43,6 +44,7 @@ public class BindException extends RuntimeException implements Sinkable {
return ex;
}
@Override
public CharSequence getFlyweightMessage() {
return message;
}
......
/*******************************************************************************
* ___ _ ____ ____
* / _ \ _ _ ___ ___| |_| _ \| __ )
* | | | | | | |/ _ \/ __| __| | | | _ \
* | |_| | |_| | __/\__ \ |_| |_| | |_) |
* \__\_\\__,_|\___||___/\__|____/|____/
*
* Copyright (c) 2014-2019 Appsicle
* Copyright (c) 2019-2020 QuestDB
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
******************************************************************************/
package io.questdb.griffin.engine.functions.test;
import io.questdb.cairo.CairoConfiguration;
import io.questdb.cairo.sql.Function;
import io.questdb.cairo.sql.Record;
import io.questdb.cairo.sql.SymbolTableSource;
import io.questdb.griffin.FunctionFactory;
import io.questdb.griffin.SqlException;
import io.questdb.griffin.SqlExecutionContext;
import io.questdb.griffin.engine.functions.BooleanFunction;
import io.questdb.std.ObjList;
public class TestNPEFactory implements FunctionFactory {
@Override
public String getSignature() {
return "npe()";
}
@Override
public Function newInstance(ObjList<Function> args, int position, CairoConfiguration configuration) throws SqlException {
return NPEFunction.INSTANCE;
}
private static class NPEFunction extends BooleanFunction {
private final static NPEFunction INSTANCE = new NPEFunction(0);
public NPEFunction(int position) {
super(position);
}
@Override
public boolean getBool(Record rec) {
throw new NullPointerException();
}
@Override
public void init(SymbolTableSource symbolTableSource, SqlExecutionContext executionContext) {
}
}
}
......@@ -39,8 +39,8 @@ public class ShowColumnsRecordCursorFactory implements RecordCursorFactory {
private static final int N_TYPE_COL = 1;
static {
final GenericRecordMetadata metadata = new GenericRecordMetadata();
metadata.add(new TableColumnMetadata("columnName", ColumnType.STRING));
metadata.add(new TableColumnMetadata("columnType", ColumnType.STRING));
metadata.add(new TableColumnMetadata("column", ColumnType.STRING));
metadata.add(new TableColumnMetadata("type", ColumnType.STRING));
METADATA = metadata;
}
......
/*******************************************************************************
* ___ _ ____ ____
* / _ \ _ _ ___ ___| |_| _ \| __ )
* | | | | | | |/ _ \/ __| __| | | | _ \
* | |_| | |_| | __/\__ \ |_| |_| | |_) |
* \__\_\\__,_|\___||___/\__|____/|____/
*
* Copyright (c) 2014-2019 Appsicle
* Copyright (c) 2019-2020 QuestDB
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
******************************************************************************/
package io.questdb.std;
@FunctionalInterface
public interface FlyweightMessageContainer {
CharSequence getFlyweightMessage();
}
......@@ -84,6 +84,7 @@ open module io.questdb {
// test functions
io.questdb.griffin.engine.functions.test.TestMatchFunctionFactory,
TestSumXDoubleGroupByFunctionFactory,
io.questdb.griffin.engine.functions.test.TestNPEFactory,
io.questdb.griffin.engine.functions.test.TestSumTDoubleGroupByFunctionFactory,
io.questdb.griffin.engine.functions.test.TestSumStringGroupByFunctionFactory,
io.questdb.griffin.engine.functions.bool.OrFunctionFactory,
......
......@@ -63,6 +63,48 @@ public class IODispatcherTest {
private long configuredMaxQueryResponseRowLimit = Long.MAX_VALUE;
private static void assertDownloadResponse(long fd, Rnd rnd, long buffer, int len, int nonRepeatedContentLength, String expectedResponseHeader, long expectedResponseLen) {
int expectedHeaderLen = expectedResponseHeader.length();
int headerCheckRemaining = expectedResponseHeader.length();
long downloadedSoFar = 0;
int contentRemaining = 0;
while (downloadedSoFar < expectedResponseLen) {
int contentOffset = 0;
int n = Net.recv(fd, buffer, len);
Assert.assertTrue(n > -1);
if (n > 0) {
if (headerCheckRemaining > 0) {
for (int i = 0; i < n && headerCheckRemaining > 0; i++) {
if (expectedResponseHeader.charAt(expectedHeaderLen - headerCheckRemaining) != (char) Unsafe.getUnsafe().getByte(buffer + i)) {
Assert.fail("at " + (expectedHeaderLen - headerCheckRemaining));
}
headerCheckRemaining--;
contentOffset++;
}
}
if (headerCheckRemaining == 0) {
for (int i = contentOffset; i < n; i++) {
if (contentRemaining == 0) {
contentRemaining = nonRepeatedContentLength;
rnd.reset();
}
Assert.assertEquals(rnd.nextByte(), Unsafe.getUnsafe().getByte(buffer + i));
contentRemaining--;
}
}
downloadedSoFar += n;
}
}
}
private static void sendRequest(String request, long fd, long buffer) {
final int requestLen = request.length();
Chars.asciiStrCpy(request, requestLen, buffer);
Assert.assertEquals(requestLen, Net.send(fd, buffer, requestLen));
}
@Test
public void testBiasWrite() throws Exception {
......@@ -557,7 +599,8 @@ public class IODispatcherTest {
requestCount,
0,
false,
expectDisconnect);
expectDisconnect
);
} finally {
Net.close(fd);
workerPool.halt();
......@@ -836,7 +879,6 @@ public class IODispatcherTest {
);
}
@Ignore
@Test
public void testImportMultipleOnSameConnection() throws Exception {
testImport(
......@@ -918,7 +960,163 @@ public class IODispatcherTest {
"--------------------------27d997ca93d2689d--"
, NetworkFacadeImpl.INSTANCE
, false
, 150
, 1 // todo: we need to fix writer queuing and increase request count
);
}
@Test
public void testImportColumnMismatch() throws Exception {
testImport(
"HTTP/1.1 200 OK\r\n" +
"Server: questDB/1.0\r\n" +
"Date: Thu, 1 Jan 1970 00:00:00 GMT\r\n" +
"Transfer-Encoding: chunked\r\n" +
"Content-Type: text/plain; charset=utf-8\r\n" +
"\r\n" +
"05d7\r\n" +
"+---------------------------------------------------------------------------------------------------------------+\r\n" +
"| Location: | fhv_tripdata_2017-02.csv | Pattern | Locale | Errors |\r\n" +
"| Partition by | NONE | | | |\r\n" +
"+---------------------------------------------------------------------------------------------------------------+\r\n" +
"| Rows handled | 24 | | | |\r\n" +
"| Rows imported | 24 | | | |\r\n" +
"+---------------------------------------------------------------------------------------------------------------+\r\n" +
"| 0 | DispatchingBaseNum | STRING | 0 |\r\n" +
"| 1 | PickupDateTime | DATE | 0 |\r\n" +
"| 2 | DropOffDatetime | STRING | 0 |\r\n" +
"| 3 | PUlocationID | STRING | 0 |\r\n" +
"| 4 | DOlocationID | STRING | 0 |\r\n" +
"+---------------------------------------------------------------------------------------------------------------+\r\n" +
"\r\n" +
"00\r\n" +
"\r\n"
,
"POST /upload HTTP/1.1\r\n" +
"Host: localhost:9001\r\n" +
"User-Agent: curl/7.64.0\r\n" +
"Accept: */*\r\n" +
"Content-Length: 437760673\r\n" +
"Content-Type: multipart/form-data; boundary=------------------------27d997ca93d2689d\r\n" +
"Expect: 100-continue\r\n" +
"\r\n" +
"--------------------------27d997ca93d2689d\r\n" +
"Content-Disposition: form-data; name=\"schema\"; filename=\"schema.json\"\r\n" +
"Content-Type: application/octet-stream\r\n" +
"\r\n" +
"[\r\n" +
" {\r\n" +
" \"name\": \"date\",\r\n" +
" \"type\": \"DATE\",\r\n" +
" \"pattern\": \"d MMMM y.\",\r\n" +
" \"locale\": \"ru-RU\"\r\n" +
" }\r\n" +
"]\r\n" +
"\r\n" +
"--------------------------27d997ca93d2689d\r\n" +
"Content-Disposition: form-data; name=\"data\"; filename=\"fhv_tripdata_2017-02.csv\"\r\n" +
"Content-Type: application/octet-stream\r\n" +
"\r\n" +
"Dispatching_base_num,Pickup_DateTime,DropOff_datetime,PUlocationID,DOlocationID\r\n" +
"B00008,2017-02-01 00:30:00,,,\r\n" +
"B00008,2017-02-01 00:40:00,,,\r\n" +
"B00009,2017-02-01 00:30:00,,,\r\n" +
"B00013,2017-02-01 00:11:00,,,\r\n" +
"B00013,2017-02-01 00:41:00,,,\r\n" +
"B00013,2017-02-01 00:00:00,,,\r\n" +
"B00013,2017-02-01 00:53:00,,,\r\n" +
"B00013,2017-02-01 00:44:00,,,\r\n" +
"B00013,2017-02-01 00:05:00,,,\r\n" +
"B00013,2017-02-01 00:54:00,,,\r\n" +
"B00014,2017-02-01 00:45:00,,,\r\n" +
"B00014,2017-02-01 00:45:00,,,\r\n" +
"B00014,2017-02-01 00:46:00,,,\r\n" +
"B00014,2017-02-01 00:54:00,,,\r\n" +
"B00014,2017-02-01 00:45:00,,,\r\n" +
"B00014,2017-02-01 00:45:00,,,\r\n" +
"B00014,2017-02-01 00:45:00,,,\r\n" +
"B00014,2017-02-01 00:26:00,,,\r\n" +
"B00014,2017-02-01 00:55:00,,,\r\n" +
"B00014,2017-02-01 00:47:00,,,\r\n" +
"B00014,2017-02-01 00:05:00,,,\r\n" +
"B00014,2017-02-01 00:58:00,,,\r\n" +
"B00014,2017-02-01 00:33:00,,,\r\n" +
"B00014,2017-02-01 00:45:00,,,\r\n" +
"\r\n" +
"--------------------------27d997ca93d2689d--"
, NetworkFacadeImpl.INSTANCE
, false
, 1
);
// append different data structure to the same table
testImport(
"HTTP/1.1 400 Bad request\r\n" +
"Server: questDB/1.0\r\n" +
"Date: Thu, 1 Jan 1970 00:00:00 GMT\r\n" +
"Transfer-Encoding: chunked\r\n" +
"Content-Type: text/plain; charset=utf-8\r\n" +
"\r\n" +
"5d\r\n" +
"column count mismatch [textColumnCount=6, tableColumnCount=5, table=fhv_tripdata_2017-02.csv]\r\n" +
"00\r\n" +
"\r\n"
,
"POST /upload?overwrite=false HTTP/1.1\r\n" +
"Host: localhost:9001\r\n" +
"User-Agent: curl/7.64.0\r\n" +
"Accept: */*\r\n" +
"Content-Length: 437760673\r\n" +
"Content-Type: multipart/form-data; boundary=------------------------27d997ca93d2689d\r\n" +
"Expect: 100-continue\r\n" +
"\r\n" +
"--------------------------27d997ca93d2689d\r\n" +
"Content-Disposition: form-data; name=\"schema\"; filename=\"schema.json\"\r\n" +
"Content-Type: application/octet-stream\r\n" +
"\r\n" +
"[\r\n" +
" {\r\n" +
" \"name\": \"date\",\r\n" +
" \"type\": \"DATE\",\r\n" +
" \"pattern\": \"d MMMM y.\",\r\n" +
" \"locale\": \"ru-RU\"\r\n" +
" }\r\n" +
"]\r\n" +
"\r\n" +
"--------------------------27d997ca93d2689d\r\n" +
"Content-Disposition: form-data; name=\"data\"; filename=\"fhv_tripdata_2017-02.csv\"\r\n" +
"Content-Type: application/octet-stream\r\n" +
"\r\n" +
"Dispatching_base_num,DropOff_datetime,PUlocationID,DOlocationID,x,y\r\n" +
"B00008,,,,,\r\n" +
"B00008,,,,,\r\n" +
"B00009,,,,,\r\n" +
"B00013,,,,,\r\n" +
"B00013,,,,,\r\n" +
"B00013,,,,,\r\n" +
"B00013,,,,,\r\n" +
"B00013,,,,,\r\n" +
"B00013,,,,,\r\n" +
"B00013,,,,,\r\n" +
"B00014,,,,,\r\n" +
"B00014,,,,,\r\n" +
"B00014,,,,,\r\n" +
"B00014,,,,,\r\n" +
"B00014,,,,,\r\n" +
"B00014,,,,,\r\n" +
"B00014,,,,,\r\n" +
"B00014,,,,,\r\n" +
"B00014,,,,,\r\n" +
"B00014,,,,,\r\n" +
"B00014,,,,,\r\n" +
"B00014,,,,,\r\n" +
"B00014,,,,,\r\n" +
"B00014,,,,,\r\n" +
"\r\n" +
"--------------------------27d997ca93d2689d--"
, NetworkFacadeImpl.INSTANCE
, false
, 1
);
}
......@@ -1011,7 +1209,7 @@ public class IODispatcherTest {
}
},
false,
150
1 // todo: fix writer queue and increase request count
);
}
......@@ -1172,7 +1370,6 @@ public class IODispatcherTest {
expectedResponse,
1,
0,
false,
false
);
} finally {
......@@ -1548,7 +1745,7 @@ public class IODispatcherTest {
"00\r\n" +
"\r\n";
sendAndReceive(nf, request, expectedResponse, 10, 100L, false, false);
sendAndReceive(nf, request, expectedResponse, 10, 100L, false);
} finally {
workerPool.halt();
}
......@@ -1644,7 +1841,6 @@ public class IODispatcherTest {
"\r\n",
1,
0,
false,
false
);
......@@ -1676,7 +1872,6 @@ public class IODispatcherTest {
"\r\n",
1,
0,
false,
false
);
......@@ -1708,7 +1903,6 @@ public class IODispatcherTest {
"\r\n",
1,
0,
false,
false
);
......@@ -1740,7 +1934,6 @@ public class IODispatcherTest {
"\r\n",
1,
0,
false,
false
);
......@@ -1772,7 +1965,6 @@ public class IODispatcherTest {
"\r\n",
1,
0,
false,
false
);
});
......@@ -2262,7 +2454,6 @@ public class IODispatcherTest {
"\r\n",
1,
0,
false,
false
);
......@@ -2292,7 +2483,6 @@ public class IODispatcherTest {
"\r\n",
1,
0,
false,
false
);
......@@ -2322,7 +2512,6 @@ public class IODispatcherTest {
"\r\n",
1,
0,
false,
false
);
......@@ -2352,7 +2541,6 @@ public class IODispatcherTest {
"\r\n",
1,
0,
false,
false
);
});
......@@ -2390,7 +2578,6 @@ public class IODispatcherTest {
"\r\n",
1,
0,
false,
false
);
......@@ -2422,7 +2609,6 @@ public class IODispatcherTest {
"\r\n",
1,
0,
false,
false
);
......@@ -2454,7 +2640,6 @@ public class IODispatcherTest {
"\r\n",
1,
0,
false,
false
);
......@@ -2486,7 +2671,6 @@ public class IODispatcherTest {
"\r\n",
1,
0,
false,
false
);
......@@ -2518,7 +2702,6 @@ public class IODispatcherTest {
"\r\n",
1,
0,
false,
false
);
});
......@@ -2646,7 +2829,6 @@ public class IODispatcherTest {
expectedResponse,
10,
0,
false,
false
);
......@@ -2655,6 +2837,93 @@ public class IODispatcherTest {
});
}
@Test
public void testJsonQueryDataError() throws Exception {
assertMemoryLeak(() -> {
final String baseDir = temp.getRoot().getAbsolutePath();
final DefaultHttpServerConfiguration httpConfiguration = createHttpServerConfiguration(baseDir, false, false);
final WorkerPool workerPool = new WorkerPool(new WorkerPoolConfiguration() {
@Override
public int[] getWorkerAffinity() {
return new int[]{-1};
}
@Override
public int getWorkerCount() {
return 1;
}
@Override
public boolean haltOnError() {
return false;
}
});
try (
CairoEngine engine = new CairoEngine(new DefaultCairoConfiguration(baseDir), null);
HttpServer httpServer = new HttpServer(httpConfiguration, workerPool, false)
) {
httpServer.bind(new HttpRequestProcessorFactory() {
@Override
public HttpRequestProcessor newInstance() {
return new StaticContentProcessor(httpConfiguration.getStaticContentProcessorConfiguration());
}
@Override
public String getUrl() {
return HttpServerConfiguration.DEFAULT_PROCESSOR_URL;
}
});
httpServer.bind(new HttpRequestProcessorFactory() {
@Override
public HttpRequestProcessor newInstance() {
return new JsonQueryProcessor(
httpConfiguration.getJsonQueryProcessorConfiguration(),
engine,
null,
workerPool.getWorkerCount()
);
}
@Override
public String getUrl() {
return "/query";
}
});
workerPool.start(LOG);
try {
// send multipart request to server
final String request = "GET /query?limit=0%2C1000&count=true&src=con&query=select%20npe()%20from%20long_sequence(1)&timings=true HTTP/1.1\r\n" +
"Host: localhost:9000\r\n" +
"Connection: keep-alive\r\n" +
"User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/83.0.4103.97 Safari/537.36\r\n" +
"Accept: */*\r\n" +
"Sec-Fetch-Site: same-origin\r\n" +
"Sec-Fetch-Mode: cors\r\n" +
"Sec-Fetch-Dest: empty\r\n" +
"Referer: http://localhost:9000/index.html\r\n" +
"Accept-Encoding: gzip, deflate, br\r\n" +
"Accept-Language: en-GB,en-US;q=0.9,en;q=0.8\r\n" +
"Cookie: _ga=GA1.1.2124932001.1573824669; _hjid=f2db90b2-18cf-4956-8870-fcdcde56f3ca; _hjIncludedInSample=1; _gid=GA1.1.697400188.1591597903\r\n" +
"\r\n";
sendAndExpectDisconnect(
NetworkFacadeImpl.INSTANCE,
request,
1,
20000
);
} finally {
workerPool.halt();
}
}
});
}
@Test
public void testJsonQueryTopLimit() throws Exception {
testJsonQuery(
......@@ -4213,48 +4482,6 @@ public class IODispatcherTest {
"\r\n");
}
private static void assertDownloadResponse(long fd, Rnd rnd, long buffer, int len, int nonRepeatedContentLength, String expectedResponseHeader, long expectedResponseLen) {
int expectedHeaderLen = expectedResponseHeader.length();
int headerCheckRemaining = expectedResponseHeader.length();
long downloadedSoFar = 0;
int contentRemaining = 0;
while (downloadedSoFar < expectedResponseLen) {
int contentOffset = 0;
int n = Net.recv(fd, buffer, len);
Assert.assertTrue(n > -1);
if (n > 0) {
if (headerCheckRemaining > 0) {
for (int i = 0; i < n && headerCheckRemaining > 0; i++) {
if (expectedResponseHeader.charAt(expectedHeaderLen - headerCheckRemaining) != (char) Unsafe.getUnsafe().getByte(buffer + i)) {
Assert.fail("at " + (expectedHeaderLen - headerCheckRemaining));
}
headerCheckRemaining--;
contentOffset++;
}
}
if (headerCheckRemaining == 0) {
for (int i = contentOffset; i < n; i++) {
if (contentRemaining == 0) {
contentRemaining = nonRepeatedContentLength;
rnd.reset();
}
Assert.assertEquals(rnd.nextByte(), Unsafe.getUnsafe().getByte(buffer + i));
contentRemaining--;
}
}
downloadedSoFar += n;
}
}
}
private static void sendRequest(String request, long fd, long buffer) {
final int requestLen = request.length();
Chars.asciiStrCpy(request, requestLen, buffer);
Assert.assertEquals(requestLen, Net.send(fd, buffer, requestLen));
}
@NotNull
private DefaultHttpServerConfiguration createHttpServerConfiguration(
String baseDir,
......@@ -4387,6 +4614,25 @@ public class IODispatcherTest {
};
}
private void sendAndReceive(
NetworkFacade nf,
String request,
String response,
int requestCount,
long pauseBetweenSendAndReceive,
boolean print
) throws InterruptedException {
sendAndReceive(
nf,
request,
response,
requestCount,
pauseBetweenSendAndReceive,
print,
false
);
}
private void sendAndReceive(
NetworkFacade nf,
String request,
......@@ -4421,6 +4667,7 @@ public class IODispatcherTest {
if (pauseBetweenSendAndReceive > 0) {
Thread.sleep(pauseBetweenSendAndReceive);
}
// receive response
final int expectedToReceive = expectedResponse.length;
int received = 0;
......@@ -4450,7 +4697,7 @@ public class IODispatcherTest {
break;
}
}
if (disconnected && !expectDisconnect) {
if (!expectDisconnect && disconnected) {
LOG.error().$("disconnected?").$();
Assert.fail();
}
......@@ -4466,6 +4713,60 @@ public class IODispatcherTest {
}
}
private void sendAndExpectDisconnect(
NetworkFacade nf,
String request,
long pauseBetweenSendAndReceive,
int expectDisconnectInMillis
) throws InterruptedException {
long fd = nf.socketTcp(true);
try {
long sockAddr = nf.sockaddr("127.0.0.1", 9001);
try {
Assert.assertTrue(fd > -1);
Assert.assertEquals(0, nf.connect(fd, sockAddr));
Assert.assertEquals(0, nf.setTcpNoDelay(fd, true));
final int len = request.length() * 2;
long ptr = Unsafe.malloc(len);
try {
int sent = 0;
int reqLen = request.length();
Chars.asciiStrCpy(request, reqLen, ptr);
while (sent < reqLen) {
int n = nf.send(fd, ptr + sent, reqLen - sent);
Assert.assertTrue(n > -1);
sent += n;
}
if (pauseBetweenSendAndReceive > 0) {
Thread.sleep(pauseBetweenSendAndReceive);
}
if (expectDisconnectInMillis > 0) {
nf.configureNonBlocking(fd);
long t = System.currentTimeMillis();
boolean disconnected = true;
while (nf.recv(fd, ptr, 1) > -1) {
if (t + expectDisconnectInMillis < System.currentTimeMillis()) {
disconnected = false;
break;
}
}
Assert.assertTrue("disconnect expected", disconnected);
}
} finally {
Unsafe.free(ptr, len);
}
} finally {
nf.freeSockAddr(sockAddr);
}
} finally {
nf.close(fd);
}
}
private void testJsonQuery(int recordCount, String request, String expectedResponse, int requestCount) throws Exception {
testJsonQuery0(2, engine -> {
// create table with all column types
......@@ -4481,7 +4782,6 @@ public class IODispatcherTest {
expectedResponse,
requestCount,
0,
false,
false
);
});
......
......@@ -52,7 +52,7 @@ public class ShowTablesTest extends AbstractGriffinTest {
public void testShowColumnsWithSimpleTable() throws Exception {
assertMemoryLeak(() -> {
compiler.compile("create table balances(cust_id int, ccy symbol, balance double)", sqlExecutionContext);
assertQuery("columnName\tcolumnType\ncust_id\tINT\nccy\tSYMBOL\nbalance\tDOUBLE\n", "show columns from balances", null, false, sqlExecutionContext, false);
assertQuery("column\ttype\ncust_id\tINT\nccy\tSYMBOL\nbalance\tDOUBLE\n", "show columns from balances", null, false, sqlExecutionContext, false);
});
}
......@@ -107,7 +107,7 @@ public class ShowTablesTest extends AbstractGriffinTest {
public void testShowColumnsWithFunction() throws Exception {
assertMemoryLeak(() -> {
compiler.compile("create table balances(cust_id int, ccy symbol, balance double)", sqlExecutionContext);
assertQuery("columnName\tcolumnType\ncust_id\tINT\nccy\tSYMBOL\nbalance\tDOUBLE\n", "select * from table_columns('balances')", null, false, sqlExecutionContext, false);
assertQuery("column\ttype\ncust_id\tINT\nccy\tSYMBOL\nbalance\tDOUBLE\n", "select * from table_columns('balances')", null, false, sqlExecutionContext, false);
});
}
......
......@@ -70,10 +70,36 @@
#
################################################################################
################################################################################
# ___ _ ____ ____
# / _ \ _ _ ___ ___| |_| _ \| __ )
# | | | | | | |/ _ \/ __| __| | | | _ \
# | |_| | |_| | __/\__ \ |_| |_| | |_) |
# \__\_\\__,_|\___||___/\__|____/|____/
#
# Copyright (c) 2014-2019 Appsicle
# Copyright (c) 2019-2020 QuestDB
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
################################################################################
io.questdb.griffin.engine.functions.test.TestMatchFunctionFactory
io.questdb.griffin.engine.functions.test.TestSumXDoubleGroupByFunctionFactory
io.questdb.griffin.engine.functions.test.TestSumTDoubleGroupByFunctionFactory
io.questdb.griffin.engine.functions.test.TestSumStringGroupByFunctionFactory
io.questdb.griffin.engine.functions.test.TestNPEFactory
# logical operations
io.questdb.griffin.engine.functions.bool.OrFunctionFactory
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册