diff --git a/core/src/main/java/com/questdb/PropServerConfiguration.java b/core/src/main/java/com/questdb/PropServerConfiguration.java index 26812133aeb8bf70c6279a68e84bc6c84e14e8ce..c80e204b24222653d9854c95ee13573a2af3808e 100644 --- a/core/src/main/java/com/questdb/PropServerConfiguration.java +++ b/core/src/main/java/com/questdb/PropServerConfiguration.java @@ -26,6 +26,7 @@ package com.questdb; import com.questdb.cairo.CairoConfiguration; import com.questdb.cutlass.http.HttpServerConfiguration; import com.questdb.cutlass.http.MimeTypesCache; +import com.questdb.cutlass.http.processors.JsonQueryProcessorConfiguration; import com.questdb.cutlass.http.processors.StaticContentProcessorConfiguration; import com.questdb.cutlass.http.processors.TextImportProcessorConfiguration; import com.questdb.cutlass.line.udp.LineUdpReceiverConfiguration; @@ -50,6 +51,7 @@ public class PropServerConfiguration implements ServerConfigurationV2 { private final TextConfiguration textConfiguration = new PropTextConfiguration(); private final CairoConfiguration cairoConfiguration = new PropCairoConfiguration(); private final LineUdpReceiverConfiguration lineUdpReceiverConfiguration = new PropLineUdpReceiverConfiguration(); + private final JsonQueryProcessorConfiguration jsonQueryProcessorConfiguration = new PropJsonQueryProcessorConfiguration(); private final int connectionPoolInitialCapacity; private final int connectionStringPoolCapacity; private final int multipartHeaderBufferSize; @@ -130,6 +132,9 @@ public class PropServerConfiguration implements ServerConfigurationV2 { private int bindPort; private int lineUdpBindIPV4Address; private int lineUdpPort; + private final int jsonQueryFloatScale; + private final int jsonQueryDoubleScale; + private final int jsonQueryConnectionCheckFrequency; public PropServerConfiguration(String root, Properties properties) throws ServerConfigurationException { this.connectionPoolInitialCapacity = getInt(properties, "http.connection.pool.initial.capacity", 16); @@ -143,8 +148,8 @@ public class PropServerConfiguration implements ServerConfigurationV2 { this.sendBufferSize = getIntSize(properties, "http.send.buffer.size", 2 * 1024 * 1024); this.indexFileName = getString(properties, "http.static.index.file.name", "index.html"); - int keepAliveTimeout = getInt(properties, "http.keep-alive.timeout", 30); - int keepAliveMax = getInt(properties, "http.keep-alive.max", 1_000_000); + int keepAliveTimeout = getInt(properties, "http.keep-alive.timeout", 5); + int keepAliveMax = getInt(properties, "http.keep-alive.max", 10_000); if (keepAliveTimeout > 0 && keepAliveMax > 0) { this.keepAliveHeader = "Keep-Alive: timeout=5, max=10000" + Misc.EOL; @@ -191,6 +196,9 @@ public class PropServerConfiguration implements ServerConfigurationV2 { this.timestampAdapterPoolCapacity = getInt(properties, "http.text.timestamp.adapter.pool.capacity", 64); this.utf8SinkSize = getIntSize(properties, "http.text.utf8.sink.size", 4096); + this.jsonQueryConnectionCheckFrequency = getInt(properties, "http.json.query.connection.check.frequency", 1_000_000); + this.jsonQueryDoubleScale = getInt(properties, "http.json.query.double.scale", 10); + this.jsonQueryFloatScale = getInt(properties, "http.json.query.float.scale", 10); parseBindTo(properties, "http.bind.to", "0.0.0.0:9000", (a, p) -> { bindIPv4Address = a; @@ -611,6 +619,11 @@ public class PropServerConfiguration implements ServerConfigurationV2 { public int getSendBufferSize() { return sendBufferSize; } + + @Override + public JsonQueryProcessorConfiguration getJsonQueryProcessorConfiguration() { + return jsonQueryProcessorConfiguration; + } } private class PropCairoConfiguration implements CairoConfiguration { @@ -856,4 +869,26 @@ public class PropServerConfiguration implements ServerConfigurationV2 { return lineUdpReceiveBufferSize; } } + + private class PropJsonQueryProcessorConfiguration implements JsonQueryProcessorConfiguration { + @Override + public CharSequence getKeepAliveHeader() { + return keepAliveHeader; + } + + @Override + public int getFloatScale() { + return jsonQueryFloatScale; + } + + @Override + public int getDoubleScale() { + return jsonQueryDoubleScale; + } + + @Override + public int getConnectionCheckFrequency() { + return jsonQueryConnectionCheckFrequency; + } + } } diff --git a/core/src/main/java/com/questdb/ServerMain.java b/core/src/main/java/com/questdb/ServerMain.java index 7f4e30c35c88d7af55c89f9ec986dde394b0f575..2e9aa7da500d52b242940d36e7dc8482e2158468 100644 --- a/core/src/main/java/com/questdb/ServerMain.java +++ b/core/src/main/java/com/questdb/ServerMain.java @@ -97,7 +97,7 @@ public class ServerMain { @Override public HttpRequestProcessor newInstance() { - return new JsonQueryProcessor(cairoEngine); + return new JsonQueryProcessor(configuration.getHttpServerConfiguration().getJsonQueryProcessorConfiguration(), cairoEngine); } }); diff --git a/core/src/main/java/com/questdb/cutlass/http/DefaultHttpServerConfiguration.java b/core/src/main/java/com/questdb/cutlass/http/DefaultHttpServerConfiguration.java index f758f93fc63b6a9bbc85d7fe5a5c8fbd14aaca30..960602c169ecb80aaf059f6d5c868c5690aacb61 100644 --- a/core/src/main/java/com/questdb/cutlass/http/DefaultHttpServerConfiguration.java +++ b/core/src/main/java/com/questdb/cutlass/http/DefaultHttpServerConfiguration.java @@ -24,6 +24,7 @@ package com.questdb.cutlass.http; import com.questdb.cutlass.http.processors.DefaultTextImportProcessorConfiguration; +import com.questdb.cutlass.http.processors.JsonQueryProcessorConfiguration; import com.questdb.cutlass.http.processors.StaticContentProcessorConfiguration; import com.questdb.cutlass.http.processors.TextImportProcessorConfiguration; import com.questdb.network.DefaultIODispatcherConfiguration; @@ -65,6 +66,28 @@ class DefaultHttpServerConfiguration implements HttpServerConfiguration { } }; + private final JsonQueryProcessorConfiguration jsonQueryProcessorConfiguration = new JsonQueryProcessorConfiguration() { + @Override + public CharSequence getKeepAliveHeader() { + return "Keep-Alive: timeout=5, max=10000\r\n"; + } + + @Override + public int getFloatScale() { + return 10; + } + + @Override + public int getDoubleScale() { + return 10; + } + + @Override + public int getConnectionCheckFrequency() { + return 1_000_000; + } + }; + private final TextImportProcessorConfiguration textImportProcessorConfiguration = new DefaultTextImportProcessorConfiguration(); public DefaultHttpServerConfiguration() { @@ -143,4 +166,9 @@ class DefaultHttpServerConfiguration implements HttpServerConfiguration { public int getConnectionStringPoolCapacity() { return 128; } + + @Override + public JsonQueryProcessorConfiguration getJsonQueryProcessorConfiguration() { + return jsonQueryProcessorConfiguration; + } } diff --git a/core/src/main/java/com/questdb/cutlass/http/HttpChunkedResponseSocket.java b/core/src/main/java/com/questdb/cutlass/http/HttpChunkedResponseSocket.java index 58ace1326cd5c8ce8eae2b686cf361e2d0b457db..e84d74a628edbcbb8d43ef1b318bed1f99432571 100644 --- a/core/src/main/java/com/questdb/cutlass/http/HttpChunkedResponseSocket.java +++ b/core/src/main/java/com/questdb/cutlass/http/HttpChunkedResponseSocket.java @@ -30,7 +30,7 @@ public interface HttpChunkedResponseSocket extends CharSink { void done() throws PeerDisconnectedException, PeerIsSlowToReadException; - CharSink headers(); + HttpResponseHeader headers(); boolean resetToBookmark(); diff --git a/core/src/main/java/com/questdb/cutlass/http/HttpResponseHeader.java b/core/src/main/java/com/questdb/cutlass/http/HttpResponseHeader.java index 3c0528ce4dc234c936d1e9f5d293af9ae188c4e7..600a4ee5c491b1cbdaf5f8a94ce781c98f30dad4 100644 --- a/core/src/main/java/com/questdb/cutlass/http/HttpResponseHeader.java +++ b/core/src/main/java/com/questdb/cutlass/http/HttpResponseHeader.java @@ -29,4 +29,10 @@ public interface HttpResponseHeader extends CharSink { void send() throws PeerDisconnectedException, PeerIsSlowToReadException; String status(int code, CharSequence contentType, long contentLength); + + default void setKeepAlive(CharSequence keepAliveHeader) { + if (keepAliveHeader != null) { + put(keepAliveHeader); + } + } } diff --git a/core/src/main/java/com/questdb/cutlass/http/HttpResponseSink.java b/core/src/main/java/com/questdb/cutlass/http/HttpResponseSink.java index 8f15ae265f1d5dcef7a5ad8ddf64a7d3db7d1879..b2b598e36bf50c7cd369828c90998223232f2e75 100644 --- a/core/src/main/java/com/questdb/cutlass/http/HttpResponseSink.java +++ b/core/src/main/java/com/questdb/cutlass/http/HttpResponseSink.java @@ -575,7 +575,7 @@ public class HttpResponseSink implements Closeable, Mutable { // } @Override - public CharSink headers() { + public HttpResponseHeader headers() { return headerImpl; } diff --git a/core/src/main/java/com/questdb/cutlass/http/HttpServerConfiguration.java b/core/src/main/java/com/questdb/cutlass/http/HttpServerConfiguration.java index 5e140d1092787d1e194ae3a76194eba8689a03be..251a4cc460df20e2d6d74dbb6e22402ffa45a898 100644 --- a/core/src/main/java/com/questdb/cutlass/http/HttpServerConfiguration.java +++ b/core/src/main/java/com/questdb/cutlass/http/HttpServerConfiguration.java @@ -23,6 +23,7 @@ package com.questdb.cutlass.http; +import com.questdb.cutlass.http.processors.JsonQueryProcessorConfiguration; import com.questdb.cutlass.http.processors.StaticContentProcessorConfiguration; import com.questdb.cutlass.http.processors.TextImportProcessorConfiguration; import com.questdb.network.IODispatcherConfiguration; @@ -53,6 +54,8 @@ public interface HttpServerConfiguration { TextImportProcessorConfiguration getTextImportProcessorConfiguration(); + JsonQueryProcessorConfiguration getJsonQueryProcessorConfiguration(); + int getWorkerCount(); int getSendBufferSize(); diff --git a/core/src/main/java/com/questdb/cutlass/http/processors/AbstractQueryContext.java b/core/src/main/java/com/questdb/cutlass/http/processors/AbstractQueryContext.java index 85b5360eed0f81bbed7a1d45d62f60c14f3d0a11..5a373cdf626e1b9ddae05a7c9b8f4bae2fb22215 100644 --- a/core/src/main/java/com/questdb/cutlass/http/processors/AbstractQueryContext.java +++ b/core/src/main/java/com/questdb/cutlass/http/processors/AbstractQueryContext.java @@ -48,8 +48,8 @@ public abstract class AbstractQueryContext implements Mutable, Closeable { int queryState = JsonQueryProcessor.QUERY_PREFIX; int columnIndex; - public AbstractQueryContext(long fd, int cyclesBeforeCancel) { - this.cancellationHandler = new ChannelCheckCancellationHandler(fd, cyclesBeforeCancel); + public AbstractQueryContext(long fd, int connectionCheckFrequency) { + this.cancellationHandler = new ChannelCheckCancellationHandler(fd, connectionCheckFrequency); this.fd = fd; } diff --git a/core/src/main/java/com/questdb/cutlass/http/processors/JsonQueryProcessor.java b/core/src/main/java/com/questdb/cutlass/http/processors/JsonQueryProcessor.java index 0187bd885b289cdf7004a93f48342f980b14605c..c91ed6a1bc5786c420f8706a712a1919c16d4377 100644 --- a/core/src/main/java/com/questdb/cutlass/http/processors/JsonQueryProcessor.java +++ b/core/src/main/java/com/questdb/cutlass/http/processors/JsonQueryProcessor.java @@ -62,13 +62,19 @@ public class JsonQueryProcessor implements HttpRequestProcessor, Closeable { private final AtomicLong cacheHits = new AtomicLong(); private final AtomicLong cacheMisses = new AtomicLong(); private final SqlCompiler compiler; + private final JsonQueryProcessorConfiguration configuration; + private final int floatScale; + private final int doubleScale; - public JsonQueryProcessor(CairoEngine engine) { + public JsonQueryProcessor(JsonQueryProcessorConfiguration configuration, CairoEngine engine) { // todo: add scheduler + this.configuration = configuration; this.compiler = new SqlCompiler(engine); + this.floatScale = configuration.getFloatScale(); + this.doubleScale = configuration.getDoubleScale(); } - private static void putValue(HttpChunkedResponseSocket socket, int type, Record rec, int col) { + private void putValue(HttpChunkedResponseSocket socket, int type, Record rec, int col) { switch (type) { case ColumnType.BOOLEAN: socket.put(rec.getBool(col)); @@ -77,10 +83,10 @@ public class JsonQueryProcessor implements HttpRequestProcessor, Closeable { socket.put(rec.getByte(col)); break; case ColumnType.DOUBLE: - socket.put(rec.getDouble(col), 10); + socket.put(rec.getDouble(col), doubleScale); break; case ColumnType.FLOAT: - socket.put(rec.getFloat(col), 10); + socket.put(rec.getFloat(col), floatScale); break; case ColumnType.INT: final int i = rec.getInt(col); @@ -143,7 +149,6 @@ public class JsonQueryProcessor implements HttpRequestProcessor, Closeable { @Override public void close() { Misc.free(compiler); - Misc.free(FACTORY_CACHE.get()); } @Override @@ -161,8 +166,7 @@ public class JsonQueryProcessor implements HttpRequestProcessor, Closeable { ) throws PeerDisconnectedException, PeerIsSlowToReadException { JsonQueryProcessorState state = LV.get(context); if (state == null) { - // todo: configure state externally - LV.set(context, state = new JsonQueryProcessorState(context.getFd(), 1000)); + LV.set(context, state = new JsonQueryProcessorState(context.getFd(), configuration.getConnectionCheckFrequency())); } HttpChunkedResponseSocket socket = context.getChunkedResponseSocket(); if (parseUrl(socket, context.getRequestHeader(), state)) { @@ -319,8 +323,7 @@ public class JsonQueryProcessor implements HttpRequestProcessor, Closeable { int status ) throws PeerDisconnectedException, PeerIsSlowToReadException { socket.status(status, "application/json; charset=utf-8"); - // todo: configure this header externally - socket.headers().put("Keep-Alive: timeout=5, max=10000").put(Misc.EOL); + socket.headers().setKeepAlive(configuration.getKeepAliveHeader()); socket.sendHeader(); } diff --git a/core/src/main/java/com/questdb/cutlass/http/processors/JsonQueryProcessorConfiguration.java b/core/src/main/java/com/questdb/cutlass/http/processors/JsonQueryProcessorConfiguration.java new file mode 100644 index 0000000000000000000000000000000000000000..8e312a9c4233dc855ccd854531093c18b0a80285 --- /dev/null +++ b/core/src/main/java/com/questdb/cutlass/http/processors/JsonQueryProcessorConfiguration.java @@ -0,0 +1,34 @@ +/******************************************************************************* + * ___ _ ____ ____ + * / _ \ _ _ ___ ___| |_| _ \| __ ) + * | | | | | | |/ _ \/ __| __| | | | _ \ + * | |_| | |_| | __/\__ \ |_| |_| | |_) | + * \__\_\\__,_|\___||___/\__|____/|____/ + * + * Copyright (C) 2014-2019 Appsicle + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + ******************************************************************************/ + +package com.questdb.cutlass.http.processors; + +public interface JsonQueryProcessorConfiguration { + CharSequence getKeepAliveHeader(); + + int getFloatScale(); + + int getDoubleScale(); + + int getConnectionCheckFrequency(); +} diff --git a/core/src/main/java/com/questdb/cutlass/http/processors/JsonQueryProcessorState.java b/core/src/main/java/com/questdb/cutlass/http/processors/JsonQueryProcessorState.java index fb7161723fda08f26218be61e123011a610b29a8..f87fe839f335c378babd5724142e9b9997385315 100644 --- a/core/src/main/java/com/questdb/cutlass/http/processors/JsonQueryProcessorState.java +++ b/core/src/main/java/com/questdb/cutlass/http/processors/JsonQueryProcessorState.java @@ -27,8 +27,8 @@ public class JsonQueryProcessorState extends AbstractQueryContext { boolean fetchAll = false; boolean noMeta = false; - public JsonQueryProcessorState(long fd, int cyclesBeforeCancel) { - super(fd, cyclesBeforeCancel); + public JsonQueryProcessorState(long fd, int connectonCheckFrequency) { + super(fd, connectonCheckFrequency); } @Override diff --git a/core/src/main/java/com/questdb/cutlass/http/processors/StaticContentProcessor.java b/core/src/main/java/com/questdb/cutlass/http/processors/StaticContentProcessor.java index 5b459199c59696c16b217162932d0651357f4314..f27808b42f2b1335a971f39b291fa325f66da9a9 100644 --- a/core/src/main/java/com/questdb/cutlass/http/processors/StaticContentProcessor.java +++ b/core/src/main/java/com/questdb/cutlass/http/processors/StaticContentProcessor.java @@ -246,10 +246,7 @@ public class StaticContentProcessor implements HttpRequestProcessor, Closeable { header.put("Content-Disposition: attachment; filename=\"").put(FileNameExtractorCharSequence.get(path)).put("\"").put(Misc.EOL); } header.put("ETag: ").put('"').put(ff.getLastModified(path)).put('"').put(Misc.EOL); - if (keepAliveHeader != null) { - header.put(keepAliveHeader); - } - + header.setKeepAlive(keepAliveHeader); header.send(); resumeSend(context, dispatcher); } diff --git a/core/src/test/java/com/questdb/PropServerConfigurationTest.java b/core/src/test/java/com/questdb/PropServerConfigurationTest.java index 639f18d640c1a9a716b0cd7f11be1362caa290a1..001301c7c89e04460b9fcf915230c1195e17e3c8 100644 --- a/core/src/test/java/com/questdb/PropServerConfigurationTest.java +++ b/core/src/test/java/com/questdb/PropServerConfigurationTest.java @@ -114,6 +114,10 @@ public class PropServerConfigurationTest { Assert.assertEquals(0, configuration.getHttpServerConfiguration().getDispatcherConfiguration().getBindIPv4Address()); Assert.assertEquals(9000, configuration.getHttpServerConfiguration().getDispatcherConfiguration().getBindPort()); + Assert.assertEquals(1_000_000, configuration.getHttpServerConfiguration().getJsonQueryProcessorConfiguration().getConnectionCheckFrequency()); + Assert.assertEquals(10, configuration.getHttpServerConfiguration().getJsonQueryProcessorConfiguration().getDoubleScale()); + Assert.assertEquals(10, configuration.getHttpServerConfiguration().getJsonQueryProcessorConfiguration().getFloatScale()); + Assert.assertEquals(5, configuration.getCairoConfiguration().getCreateAsSelectRetryCount()); Assert.assertEquals("fast", configuration.getCairoConfiguration().getDefaultMapType()); Assert.assertFalse(configuration.getCairoConfiguration().getDefaultSymbolCacheFlag()); @@ -288,6 +292,10 @@ public class PropServerConfigurationTest { Assert.assertEquals(168101918, configuration.getHttpServerConfiguration().getDispatcherConfiguration().getBindIPv4Address()); Assert.assertEquals(9900, configuration.getHttpServerConfiguration().getDispatcherConfiguration().getBindPort()); + Assert.assertEquals(2_000, configuration.getHttpServerConfiguration().getJsonQueryProcessorConfiguration().getConnectionCheckFrequency()); + Assert.assertEquals(6, configuration.getHttpServerConfiguration().getJsonQueryProcessorConfiguration().getDoubleScale()); + Assert.assertEquals(4, configuration.getHttpServerConfiguration().getJsonQueryProcessorConfiguration().getFloatScale()); + Assert.assertEquals(12, configuration.getCairoConfiguration().getCreateAsSelectRetryCount()); Assert.assertEquals("compact", configuration.getCairoConfiguration().getDefaultMapType()); Assert.assertTrue(configuration.getCairoConfiguration().getDefaultSymbolCacheFlag()); diff --git a/core/src/test/java/com/questdb/cutlass/http/IODispatcherTest.java b/core/src/test/java/com/questdb/cutlass/http/IODispatcherTest.java index 7fbef839a377e485250ac716e4dbb75f2f083968..e52a62bfe2ddfe7894c858324adeaa2b3a8cc155 100644 --- a/core/src/test/java/com/questdb/cutlass/http/IODispatcherTest.java +++ b/core/src/test/java/com/questdb/cutlass/http/IODispatcherTest.java @@ -714,7 +714,7 @@ public class IODispatcherTest { @Override public HttpRequestProcessor newInstance() { - return new JsonQueryProcessor(engine); + return new JsonQueryProcessor(httpConfiguration.getJsonQueryProcessorConfiguration(), engine); } }); @@ -794,7 +794,7 @@ public class IODispatcherTest { @Override public HttpRequestProcessor newInstance() { - return new JsonQueryProcessor(engine); + return new JsonQueryProcessor(httpConfiguration.getJsonQueryProcessorConfiguration(), engine); } }); @@ -939,7 +939,7 @@ public class IODispatcherTest { @Override public HttpRequestProcessor newInstance() { - return new JsonQueryProcessor(engine); + return new JsonQueryProcessor(httpConfiguration.getJsonQueryProcessorConfiguration(), engine); } }); @@ -2136,7 +2136,7 @@ public class IODispatcherTest { @Override public HttpRequestProcessor newInstance() { - return new JsonQueryProcessor(engine); + return new JsonQueryProcessor(httpConfiguration.getJsonQueryProcessorConfiguration(), engine); } }); @@ -2363,7 +2363,7 @@ public class IODispatcherTest { @Override public HttpRequestProcessor newInstance() { - return new JsonQueryProcessor(engine); + return new JsonQueryProcessor(configuration.getJsonQueryProcessorConfiguration(), engine); } }); } diff --git a/core/src/test/resources/server.conf b/core/src/test/resources/server.conf index 49954f51869b277826924a54210ebf6f584063ce..484d8425a40bca93ec8f40ec8a1809136c5ae993 100644 --- a/core/src/test/resources/server.conf +++ b/core/src/test/resources/server.conf @@ -33,6 +33,10 @@ http.text.timestamp.adapter.pool.capacity=512 http.text.utf8.sink.size=8k http.bind.to=10.5.8.30:9900 +http.json.query.connection.check.frequency=2000 +http.json.query.double.scale=6 +http.json.query.float.scale=4 + cairo.create.as.select.retry.count=12 cairo.default.map.type=compact cairo.default.symbol.cache.flag=true