提交 c1b7d531 编写于 作者: V Vlad Ilyushchenko

CUTLASS: extracted config from new processors

上级 bb5f239f
......@@ -26,6 +26,7 @@ package com.questdb;
import com.questdb.cairo.CairoConfiguration;
import com.questdb.cutlass.http.HttpServerConfiguration;
import com.questdb.cutlass.http.MimeTypesCache;
import com.questdb.cutlass.http.processors.JsonQueryProcessorConfiguration;
import com.questdb.cutlass.http.processors.StaticContentProcessorConfiguration;
import com.questdb.cutlass.http.processors.TextImportProcessorConfiguration;
import com.questdb.cutlass.line.udp.LineUdpReceiverConfiguration;
......@@ -50,6 +51,7 @@ public class PropServerConfiguration implements ServerConfigurationV2 {
private final TextConfiguration textConfiguration = new PropTextConfiguration();
private final CairoConfiguration cairoConfiguration = new PropCairoConfiguration();
private final LineUdpReceiverConfiguration lineUdpReceiverConfiguration = new PropLineUdpReceiverConfiguration();
private final JsonQueryProcessorConfiguration jsonQueryProcessorConfiguration = new PropJsonQueryProcessorConfiguration();
private final int connectionPoolInitialCapacity;
private final int connectionStringPoolCapacity;
private final int multipartHeaderBufferSize;
......@@ -130,6 +132,9 @@ public class PropServerConfiguration implements ServerConfigurationV2 {
private int bindPort;
private int lineUdpBindIPV4Address;
private int lineUdpPort;
private final int jsonQueryFloatScale;
private final int jsonQueryDoubleScale;
private final int jsonQueryConnectionCheckFrequency;
public PropServerConfiguration(String root, Properties properties) throws ServerConfigurationException {
this.connectionPoolInitialCapacity = getInt(properties, "http.connection.pool.initial.capacity", 16);
......@@ -143,8 +148,8 @@ public class PropServerConfiguration implements ServerConfigurationV2 {
this.sendBufferSize = getIntSize(properties, "http.send.buffer.size", 2 * 1024 * 1024);
this.indexFileName = getString(properties, "http.static.index.file.name", "index.html");
int keepAliveTimeout = getInt(properties, "http.keep-alive.timeout", 30);
int keepAliveMax = getInt(properties, "http.keep-alive.max", 1_000_000);
int keepAliveTimeout = getInt(properties, "http.keep-alive.timeout", 5);
int keepAliveMax = getInt(properties, "http.keep-alive.max", 10_000);
if (keepAliveTimeout > 0 && keepAliveMax > 0) {
this.keepAliveHeader = "Keep-Alive: timeout=5, max=10000" + Misc.EOL;
......@@ -191,6 +196,9 @@ public class PropServerConfiguration implements ServerConfigurationV2 {
this.timestampAdapterPoolCapacity = getInt(properties, "http.text.timestamp.adapter.pool.capacity", 64);
this.utf8SinkSize = getIntSize(properties, "http.text.utf8.sink.size", 4096);
this.jsonQueryConnectionCheckFrequency = getInt(properties, "http.json.query.connection.check.frequency", 1_000_000);
this.jsonQueryDoubleScale = getInt(properties, "http.json.query.double.scale", 10);
this.jsonQueryFloatScale = getInt(properties, "http.json.query.float.scale", 10);
parseBindTo(properties, "http.bind.to", "0.0.0.0:9000", (a, p) -> {
bindIPv4Address = a;
......@@ -611,6 +619,11 @@ public class PropServerConfiguration implements ServerConfigurationV2 {
public int getSendBufferSize() {
return sendBufferSize;
}
@Override
public JsonQueryProcessorConfiguration getJsonQueryProcessorConfiguration() {
return jsonQueryProcessorConfiguration;
}
}
private class PropCairoConfiguration implements CairoConfiguration {
......@@ -856,4 +869,26 @@ public class PropServerConfiguration implements ServerConfigurationV2 {
return lineUdpReceiveBufferSize;
}
}
private class PropJsonQueryProcessorConfiguration implements JsonQueryProcessorConfiguration {
@Override
public CharSequence getKeepAliveHeader() {
return keepAliveHeader;
}
@Override
public int getFloatScale() {
return jsonQueryFloatScale;
}
@Override
public int getDoubleScale() {
return jsonQueryDoubleScale;
}
@Override
public int getConnectionCheckFrequency() {
return jsonQueryConnectionCheckFrequency;
}
}
}
......@@ -97,7 +97,7 @@ public class ServerMain {
@Override
public HttpRequestProcessor newInstance() {
return new JsonQueryProcessor(cairoEngine);
return new JsonQueryProcessor(configuration.getHttpServerConfiguration().getJsonQueryProcessorConfiguration(), cairoEngine);
}
});
......
......@@ -24,6 +24,7 @@
package com.questdb.cutlass.http;
import com.questdb.cutlass.http.processors.DefaultTextImportProcessorConfiguration;
import com.questdb.cutlass.http.processors.JsonQueryProcessorConfiguration;
import com.questdb.cutlass.http.processors.StaticContentProcessorConfiguration;
import com.questdb.cutlass.http.processors.TextImportProcessorConfiguration;
import com.questdb.network.DefaultIODispatcherConfiguration;
......@@ -65,6 +66,28 @@ class DefaultHttpServerConfiguration implements HttpServerConfiguration {
}
};
private final JsonQueryProcessorConfiguration jsonQueryProcessorConfiguration = new JsonQueryProcessorConfiguration() {
@Override
public CharSequence getKeepAliveHeader() {
return "Keep-Alive: timeout=5, max=10000\r\n";
}
@Override
public int getFloatScale() {
return 10;
}
@Override
public int getDoubleScale() {
return 10;
}
@Override
public int getConnectionCheckFrequency() {
return 1_000_000;
}
};
private final TextImportProcessorConfiguration textImportProcessorConfiguration = new DefaultTextImportProcessorConfiguration();
public DefaultHttpServerConfiguration() {
......@@ -143,4 +166,9 @@ class DefaultHttpServerConfiguration implements HttpServerConfiguration {
public int getConnectionStringPoolCapacity() {
return 128;
}
@Override
public JsonQueryProcessorConfiguration getJsonQueryProcessorConfiguration() {
return jsonQueryProcessorConfiguration;
}
}
......@@ -30,7 +30,7 @@ public interface HttpChunkedResponseSocket extends CharSink {
void done() throws PeerDisconnectedException, PeerIsSlowToReadException;
CharSink headers();
HttpResponseHeader headers();
boolean resetToBookmark();
......
......@@ -29,4 +29,10 @@ public interface HttpResponseHeader extends CharSink {
void send() throws PeerDisconnectedException, PeerIsSlowToReadException;
String status(int code, CharSequence contentType, long contentLength);
default void setKeepAlive(CharSequence keepAliveHeader) {
if (keepAliveHeader != null) {
put(keepAliveHeader);
}
}
}
......@@ -575,7 +575,7 @@ public class HttpResponseSink implements Closeable, Mutable {
// }
@Override
public CharSink headers() {
public HttpResponseHeader headers() {
return headerImpl;
}
......
......@@ -23,6 +23,7 @@
package com.questdb.cutlass.http;
import com.questdb.cutlass.http.processors.JsonQueryProcessorConfiguration;
import com.questdb.cutlass.http.processors.StaticContentProcessorConfiguration;
import com.questdb.cutlass.http.processors.TextImportProcessorConfiguration;
import com.questdb.network.IODispatcherConfiguration;
......@@ -53,6 +54,8 @@ public interface HttpServerConfiguration {
TextImportProcessorConfiguration getTextImportProcessorConfiguration();
JsonQueryProcessorConfiguration getJsonQueryProcessorConfiguration();
int getWorkerCount();
int getSendBufferSize();
......
......@@ -48,8 +48,8 @@ public abstract class AbstractQueryContext implements Mutable, Closeable {
int queryState = JsonQueryProcessor.QUERY_PREFIX;
int columnIndex;
public AbstractQueryContext(long fd, int cyclesBeforeCancel) {
this.cancellationHandler = new ChannelCheckCancellationHandler(fd, cyclesBeforeCancel);
public AbstractQueryContext(long fd, int connectionCheckFrequency) {
this.cancellationHandler = new ChannelCheckCancellationHandler(fd, connectionCheckFrequency);
this.fd = fd;
}
......
......@@ -62,13 +62,19 @@ public class JsonQueryProcessor implements HttpRequestProcessor, Closeable {
private final AtomicLong cacheHits = new AtomicLong();
private final AtomicLong cacheMisses = new AtomicLong();
private final SqlCompiler compiler;
private final JsonQueryProcessorConfiguration configuration;
private final int floatScale;
private final int doubleScale;
public JsonQueryProcessor(CairoEngine engine) {
public JsonQueryProcessor(JsonQueryProcessorConfiguration configuration, CairoEngine engine) {
// todo: add scheduler
this.configuration = configuration;
this.compiler = new SqlCompiler(engine);
this.floatScale = configuration.getFloatScale();
this.doubleScale = configuration.getDoubleScale();
}
private static void putValue(HttpChunkedResponseSocket socket, int type, Record rec, int col) {
private void putValue(HttpChunkedResponseSocket socket, int type, Record rec, int col) {
switch (type) {
case ColumnType.BOOLEAN:
socket.put(rec.getBool(col));
......@@ -77,10 +83,10 @@ public class JsonQueryProcessor implements HttpRequestProcessor, Closeable {
socket.put(rec.getByte(col));
break;
case ColumnType.DOUBLE:
socket.put(rec.getDouble(col), 10);
socket.put(rec.getDouble(col), doubleScale);
break;
case ColumnType.FLOAT:
socket.put(rec.getFloat(col), 10);
socket.put(rec.getFloat(col), floatScale);
break;
case ColumnType.INT:
final int i = rec.getInt(col);
......@@ -143,7 +149,6 @@ public class JsonQueryProcessor implements HttpRequestProcessor, Closeable {
@Override
public void close() {
Misc.free(compiler);
Misc.free(FACTORY_CACHE.get());
}
@Override
......@@ -161,8 +166,7 @@ public class JsonQueryProcessor implements HttpRequestProcessor, Closeable {
) throws PeerDisconnectedException, PeerIsSlowToReadException {
JsonQueryProcessorState state = LV.get(context);
if (state == null) {
// todo: configure state externally
LV.set(context, state = new JsonQueryProcessorState(context.getFd(), 1000));
LV.set(context, state = new JsonQueryProcessorState(context.getFd(), configuration.getConnectionCheckFrequency()));
}
HttpChunkedResponseSocket socket = context.getChunkedResponseSocket();
if (parseUrl(socket, context.getRequestHeader(), state)) {
......@@ -319,8 +323,7 @@ public class JsonQueryProcessor implements HttpRequestProcessor, Closeable {
int status
) throws PeerDisconnectedException, PeerIsSlowToReadException {
socket.status(status, "application/json; charset=utf-8");
// todo: configure this header externally
socket.headers().put("Keep-Alive: timeout=5, max=10000").put(Misc.EOL);
socket.headers().setKeepAlive(configuration.getKeepAliveHeader());
socket.sendHeader();
}
......
/*******************************************************************************
* ___ _ ____ ____
* / _ \ _ _ ___ ___| |_| _ \| __ )
* | | | | | | |/ _ \/ __| __| | | | _ \
* | |_| | |_| | __/\__ \ |_| |_| | |_) |
* \__\_\\__,_|\___||___/\__|____/|____/
*
* Copyright (C) 2014-2019 Appsicle
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
******************************************************************************/
package com.questdb.cutlass.http.processors;
public interface JsonQueryProcessorConfiguration {
CharSequence getKeepAliveHeader();
int getFloatScale();
int getDoubleScale();
int getConnectionCheckFrequency();
}
......@@ -27,8 +27,8 @@ public class JsonQueryProcessorState extends AbstractQueryContext {
boolean fetchAll = false;
boolean noMeta = false;
public JsonQueryProcessorState(long fd, int cyclesBeforeCancel) {
super(fd, cyclesBeforeCancel);
public JsonQueryProcessorState(long fd, int connectonCheckFrequency) {
super(fd, connectonCheckFrequency);
}
@Override
......
......@@ -246,10 +246,7 @@ public class StaticContentProcessor implements HttpRequestProcessor, Closeable {
header.put("Content-Disposition: attachment; filename=\"").put(FileNameExtractorCharSequence.get(path)).put("\"").put(Misc.EOL);
}
header.put("ETag: ").put('"').put(ff.getLastModified(path)).put('"').put(Misc.EOL);
if (keepAliveHeader != null) {
header.put(keepAliveHeader);
}
header.setKeepAlive(keepAliveHeader);
header.send();
resumeSend(context, dispatcher);
}
......
......@@ -114,6 +114,10 @@ public class PropServerConfigurationTest {
Assert.assertEquals(0, configuration.getHttpServerConfiguration().getDispatcherConfiguration().getBindIPv4Address());
Assert.assertEquals(9000, configuration.getHttpServerConfiguration().getDispatcherConfiguration().getBindPort());
Assert.assertEquals(1_000_000, configuration.getHttpServerConfiguration().getJsonQueryProcessorConfiguration().getConnectionCheckFrequency());
Assert.assertEquals(10, configuration.getHttpServerConfiguration().getJsonQueryProcessorConfiguration().getDoubleScale());
Assert.assertEquals(10, configuration.getHttpServerConfiguration().getJsonQueryProcessorConfiguration().getFloatScale());
Assert.assertEquals(5, configuration.getCairoConfiguration().getCreateAsSelectRetryCount());
Assert.assertEquals("fast", configuration.getCairoConfiguration().getDefaultMapType());
Assert.assertFalse(configuration.getCairoConfiguration().getDefaultSymbolCacheFlag());
......@@ -288,6 +292,10 @@ public class PropServerConfigurationTest {
Assert.assertEquals(168101918, configuration.getHttpServerConfiguration().getDispatcherConfiguration().getBindIPv4Address());
Assert.assertEquals(9900, configuration.getHttpServerConfiguration().getDispatcherConfiguration().getBindPort());
Assert.assertEquals(2_000, configuration.getHttpServerConfiguration().getJsonQueryProcessorConfiguration().getConnectionCheckFrequency());
Assert.assertEquals(6, configuration.getHttpServerConfiguration().getJsonQueryProcessorConfiguration().getDoubleScale());
Assert.assertEquals(4, configuration.getHttpServerConfiguration().getJsonQueryProcessorConfiguration().getFloatScale());
Assert.assertEquals(12, configuration.getCairoConfiguration().getCreateAsSelectRetryCount());
Assert.assertEquals("compact", configuration.getCairoConfiguration().getDefaultMapType());
Assert.assertTrue(configuration.getCairoConfiguration().getDefaultSymbolCacheFlag());
......
......@@ -714,7 +714,7 @@ public class IODispatcherTest {
@Override
public HttpRequestProcessor newInstance() {
return new JsonQueryProcessor(engine);
return new JsonQueryProcessor(httpConfiguration.getJsonQueryProcessorConfiguration(), engine);
}
});
......@@ -794,7 +794,7 @@ public class IODispatcherTest {
@Override
public HttpRequestProcessor newInstance() {
return new JsonQueryProcessor(engine);
return new JsonQueryProcessor(httpConfiguration.getJsonQueryProcessorConfiguration(), engine);
}
});
......@@ -939,7 +939,7 @@ public class IODispatcherTest {
@Override
public HttpRequestProcessor newInstance() {
return new JsonQueryProcessor(engine);
return new JsonQueryProcessor(httpConfiguration.getJsonQueryProcessorConfiguration(), engine);
}
});
......@@ -2136,7 +2136,7 @@ public class IODispatcherTest {
@Override
public HttpRequestProcessor newInstance() {
return new JsonQueryProcessor(engine);
return new JsonQueryProcessor(httpConfiguration.getJsonQueryProcessorConfiguration(), engine);
}
});
......@@ -2363,7 +2363,7 @@ public class IODispatcherTest {
@Override
public HttpRequestProcessor newInstance() {
return new JsonQueryProcessor(engine);
return new JsonQueryProcessor(configuration.getJsonQueryProcessorConfiguration(), engine);
}
});
}
......
......@@ -33,6 +33,10 @@ http.text.timestamp.adapter.pool.capacity=512
http.text.utf8.sink.size=8k
http.bind.to=10.5.8.30:9900
http.json.query.connection.check.frequency=2000
http.json.query.double.scale=6
http.json.query.float.scale=4
cairo.create.as.select.retry.count=12
cairo.default.map.type=compact
cairo.default.symbol.cache.flag=true
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册