提交 ab04d4cf 编写于 作者: V Vlad Ilyushchenko

QuestDB server config - work in progress

上级 5820be76
/*******************************************************************************
* ___ _ ____ ____
* / _ \ _ _ ___ ___| |_| _ \| __ )
* | | | | | | |/ _ \/ __| __| | | | _ \
* | |_| | |_| | __/\__ \ |_| |_| | |_) |
* \__\_\\__,_|\___||___/\__|____/|____/
*
* Copyright (C) 2014-2019 Appsicle
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
******************************************************************************/
package com.questdb.cutlass.http;
import com.questdb.cutlass.http.processors.StaticContentProcessorConfiguration;
import com.questdb.cutlass.http.processors.TextImportProcessorConfiguration;
import com.questdb.cutlass.text.TextConfiguration;
import com.questdb.network.*;
import com.questdb.std.FilesFacade;
import com.questdb.std.FilesFacadeImpl;
import com.questdb.std.Numbers;
import com.questdb.std.NumericException;
import com.questdb.std.time.MillisecondClock;
import com.questdb.std.time.MillisecondClockImpl;
import java.util.Properties;
public class PropHttpServerConfiguration implements HttpServerConfiguration {
private final IODispatcherConfiguration ioDispatcherConfiguration = new PropIODispatcherConfiguration();
private final TextImportProcessorConfiguration textImportProcessorConfiguration = new PropTextImportProcessorConfiguration();
private final StaticContentProcessorConfiguration staticContentProcessorConfiguration = new PropStaticContentProcessorConfiguration();
private final PropTextConfiguration textConfiguration = new PropTextConfiguration();
private final int connectionPoolInitialSize;
private final int connectionStringPoolSize;
private final int multipartHeaderBufferSize;
private final long multipartIdleSpinCount;
private final int recvBufferSize;
private final int requestHeaderBufferSize;
private final int responseHeaderBufferSize;
private final int workerCount;
private final int sendBufferSize;
private final CharSequence indexFileName;
private final CharSequence publicDirectory;
private final boolean abortBrokenUploads;
private final int activeConnectionLimit;
private final CharSequence bindIPv4Address;
private final int bindPort;
private final int eventCapacity;
private final int ioQueueCapacity;
private final long idleConnectionTimeout;
private final int interestQueueCapacity;
private final int listenBacklog;
private final int sndBufSize;
private final int rcvBufSize;
private final String adapterSetConfigurationFileName;
private final int dateAdapterPoolSize;
private final int jsonCacheLimit;
private final int jsonCacheSize;
private final double maxRequiredDelimiterStdDev;
private final int metadataStringPoolSize;
private final long rollBufferLimit;
private final long rollBufferSize;
private final int textAnalysisMaxLines;
private final int textLexerStringPoolSize;
private final int timestampAdapterPoolSize;
private final int utf8SinkCapacity;
public PropHttpServerConfiguration(Properties properties) {
this.connectionPoolInitialSize = getInt(properties, "http.connection.pool.initial.size");
this.connectionStringPoolSize = getInt(properties, "http.connection.string.pool.size");
this.multipartHeaderBufferSize = getInt(properties, "http.multipart.header.buffer.size");
this.multipartIdleSpinCount = Long.parseLong(properties.getProperty("http.multipart.idle.spin.count"));
this.recvBufferSize = getInt(properties, "http.receive.buffer.size");
this.requestHeaderBufferSize = getInt(properties, "http.request.header.buffer.size");
this.responseHeaderBufferSize = getInt(properties, "http.response.header.buffer.size");
this.workerCount = getInt(properties, "http.worker.count");
this.sendBufferSize = getInt(properties, "http.send.buffer.size");
this.indexFileName = getString(properties, "http.static.index.file.name");
this.publicDirectory = getString(properties, "http.static.pubic.directory");
this.abortBrokenUploads = Boolean.parseBoolean(properties.getProperty("http.text.abort.broken.uploads"));
this.activeConnectionLimit = getInt(properties, "http.net.active.connection.limit");
this.eventCapacity = getInt(properties, "http.net.event.capacity");
this.ioQueueCapacity = getInt(properties, "http.net.io.queue.capacity");
this.idleConnectionTimeout = Long.parseLong(properties.getProperty("http.net.idle.connection.timeout"));
this.interestQueueCapacity = getInt(properties, "http.net.interest.queue.capacity");
this.listenBacklog = getInt(properties, "http.net.listen.backlog");
this.sndBufSize = getInt(properties, "http.net.snd.buf.size");
this.rcvBufSize = getInt(properties, "http.net.rcv.buf.size");
this.adapterSetConfigurationFileName = getString(properties, "http.text.adapter.set.config");
this.dateAdapterPoolSize = getInt(properties, "http.text.date.adapter.pool.size");
this.jsonCacheLimit = getInt(properties, "http.text.json.cache.limit");
this.jsonCacheSize = getInt(properties, "http.text.json.cache.size");
this.maxRequiredDelimiterStdDev = Double.parseDouble(properties.getProperty("http.text.max.required.delimiter.stddev"));
this.metadataStringPoolSize = getInt(properties, "http.text.metadata.string.pool.size");
this.rollBufferLimit = Long.parseLong(properties.getProperty("http.text.roll.buffer.limit"));
this.rollBufferSize = Long.getLong(properties.getProperty("http.text.roll.buffer.size"));
this.textAnalysisMaxLines = getInt(properties, "http.text.analysis.max.lines");
this.textLexerStringPoolSize = getInt(properties, "http.text.lexer.string.pool.size");
this.timestampAdapterPoolSize = getInt(properties, "http.text.timestamp.adapter.pool.size");
this.utf8SinkCapacity = getInt(properties, "http.text.utf8.sink.capacity");
final String httpBindTo = getString(properties, "http.bind.to");
final int colonIndex = httpBindTo.indexOf(':');
if (colonIndex == -1) {
throw new IllegalStateException();
}
this.bindIPv4Address = httpBindTo.substring(0, colonIndex);
this.bindPort = Integer.parseInt(httpBindTo.substring(colonIndex + 1));
}
@Override
public int getConnectionPoolInitialSize() {
return connectionPoolInitialSize;
}
@Override
public int getConnectionStringPoolSize() {
return connectionStringPoolSize;
}
@Override
public int getMultipartHeaderBufferSize() {
return multipartHeaderBufferSize;
}
@Override
public long getMultipartIdleSpinCount() {
return multipartIdleSpinCount;
}
@Override
public int getRecvBufferSize() {
return recvBufferSize;
}
@Override
public int getRequestHeaderBufferSize() {
return requestHeaderBufferSize;
}
@Override
public int getResponseHeaderBufferSize() {
return responseHeaderBufferSize;
}
@Override
public MillisecondClock getClock() {
return MillisecondClockImpl.INSTANCE;
}
@Override
public IODispatcherConfiguration getDispatcherConfiguration() {
return ioDispatcherConfiguration;
}
@Override
public StaticContentProcessorConfiguration getStaticContentProcessorConfiguration() {
return staticContentProcessorConfiguration;
}
@Override
public TextImportProcessorConfiguration getTextImportProcessorConfiguration() {
return textImportProcessorConfiguration;
}
@Override
public int getWorkerCount() {
return workerCount;
}
@Override
public int getSendBufferSize() {
return sendBufferSize;
}
private int getInt(Properties properties, String key) {
try {
return Numbers.parseInt(properties.getProperty(key));
} catch (NumericException e) {
return 0;
}
}
private String getString(Properties properties, String key) {
return properties.getProperty(key);
}
private class PropStaticContentProcessorConfiguration implements StaticContentProcessorConfiguration {
@Override
public FilesFacade getFilesFacade() {
return FilesFacadeImpl.INSTANCE;
}
@Override
public CharSequence getIndexFileName() {
return indexFileName;
}
@Override
public MimeTypesCache getMimeTypesCache() {
return null;
}
@Override
public CharSequence getPublicDirectory() {
return publicDirectory;
}
}
private class PropTextImportProcessorConfiguration implements TextImportProcessorConfiguration {
@Override
public boolean abortBrokenUploads() {
return abortBrokenUploads;
}
@Override
public TextConfiguration getTextConfiguration() {
return textConfiguration;
}
}
private class PropIODispatcherConfiguration implements IODispatcherConfiguration {
@Override
public int getActiveConnectionLimit() {
return activeConnectionLimit;
}
@Override
public CharSequence getBindIPv4Address() {
return bindIPv4Address;
}
@Override
public int getBindPort() {
return bindPort;
}
@Override
public MillisecondClock getClock() {
return MillisecondClockImpl.INSTANCE;
}
@Override
public int getEventCapacity() {
return eventCapacity;
}
@Override
public int getIOQueueCapacity() {
return ioQueueCapacity;
}
@Override
public long getIdleConnectionTimeout() {
return idleConnectionTimeout;
}
@Override
public int getInterestQueueCapacity() {
return interestQueueCapacity;
}
@Override
public int getListenBacklog() {
return listenBacklog;
}
@Override
public NetworkFacade getNetworkFacade() {
return NetworkFacadeImpl.INSTANCE;
}
@Override
public EpollFacade getEpollFacade() {
return EpollFacadeImpl.INSTANCE;
}
@Override
public SelectFacade getSelectFacade() {
return SelectFacadeImpl.INSTANCE;
}
@Override
public int getInitialBias() {
return IOOperation.READ;
}
@Override
public int getSndBufSize() {
return sndBufSize;
}
@Override
public int getRcvBufSize() {
return rcvBufSize;
}
}
private class PropTextConfiguration implements TextConfiguration {
@Override
public String getAdapterSetConfigurationFileName() {
return adapterSetConfigurationFileName;
}
@Override
public int getDateAdapterPoolSize() {
return dateAdapterPoolSize;
}
@Override
public int getJsonCacheLimit() {
return jsonCacheLimit;
}
@Override
public int getJsonCacheSize() {
return jsonCacheSize;
}
@Override
public double getMaxRequiredDelimiterStdDev() {
return maxRequiredDelimiterStdDev;
}
@Override
public int getMetadataStringPoolSize() {
return metadataStringPoolSize;
}
@Override
public long getRollBufferLimit() {
return rollBufferLimit;
}
@Override
public long getRollBufferSize() {
return rollBufferSize;
}
@Override
public int getTextAnalysisMaxLines() {
return textAnalysisMaxLines;
}
@Override
public int getTextLexerStringPoolSize() {
return textLexerStringPoolSize;
}
@Override
public int getTimestampAdapterPoolSize() {
return timestampAdapterPoolSize;
}
@Override
public int getUtf8SinkCapacity() {
return utf8SinkCapacity;
}
}
}
......@@ -367,7 +367,7 @@ public class JsonQueryProcessor implements HttpRequestProcessor, Closeable {
return;
}
LOG.info().$("resume").$();
LOG.debug().$("resume [fd=").$(context.getFd()).$(']').$();
final HttpChunkedResponseSocket socket = context.getChunkedResponseSocket();
final int columnCount = state.metadata.getColumnCount();
......@@ -491,7 +491,7 @@ public class JsonQueryProcessor implements HttpRequestProcessor, Closeable {
}
private void readyForNextRequest(HttpConnectionContext context, IODispatcher<HttpConnectionContext> dispatcher) {
LOG.info().$("all sent").$();
LOG.debug().$("all sent [fd=").$(context.getFd()).$(']').$();
context.clear();
dispatcher.registerChannel(context, IOOperation.READ);
}
......
......@@ -691,8 +691,10 @@ public class IODispatcherTest {
final String baseDir = System.getProperty("java.io.tmpdir");
final DefaultHttpServerConfiguration httpConfiguration = createHttpServerConfiguration(baseDir);
try (CairoEngine engine = new Engine(new DefaultCairoConfiguration(baseDir));
HttpServer httpServer = new HttpServer(httpConfiguration)) {
try (
CairoEngine engine = new Engine(new DefaultCairoConfiguration(baseDir));
HttpServer httpServer = new HttpServer(httpConfiguration)
) {
httpServer.bind(new HttpRequestProcessorFactory() {
@Override
public String getUrl() {
......@@ -763,6 +765,141 @@ public class IODispatcherTest {
});
}
@Test
public void testJsonQuerySyntaxError() throws Exception {
TestUtils.assertMemoryLeak(() -> {
final String baseDir = System.getProperty("java.io.tmpdir");
final DefaultHttpServerConfiguration httpConfiguration = createHttpServerConfiguration(baseDir);
try (
CairoEngine engine = new Engine(new DefaultCairoConfiguration(baseDir));
HttpServer httpServer = new HttpServer(httpConfiguration)
) {
httpServer.bind(new HttpRequestProcessorFactory() {
@Override
public String getUrl() {
return HttpServerConfiguration.DEFAULT_PROCESSOR_URL;
}
@Override
public HttpRequestProcessor newInstance() {
return new StaticContentProcessor(httpConfiguration.getStaticContentProcessorConfiguration());
}
});
httpServer.bind(new HttpRequestProcessorFactory() {
@Override
public String getUrl() {
return "/query";
}
@Override
public HttpRequestProcessor newInstance() {
return new JsonQueryProcessor(engine);
}
});
httpServer.start();
// create table with all column types
CairoTestUtils.createTestTable(
engine.getConfiguration(),
20,
new Rnd(),
new TestRecord.ArrayBinarySequence());
// send multipart request to server
final String request = "GET /query?query=x%20where2%20i%20%3D%20(%27EHNRX%27) HTTP/1.1\r\n" +
"Host: localhost:9001\r\n" +
"Connection: keep-alive\r\n" +
"Cache-Control: max-age=0\r\n" +
"Upgrade-Insecure-Requests: 1\r\n" +
"User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/74.0.3729.169 Safari/537.36\r\n" +
"Accept: text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3\r\n" +
"Accept-Encoding: gzip, deflate, br\r\n" +
"Accept-Language: en-GB,en-US;q=0.9,en;q=0.8\r\n" +
"\r\n";
byte[] expectedResponse = ("HTTP/1.1 200 OK\r\n" +
"Server: questDB/1.0\r\n" +
"Date: Thu, 1 Jan 1970 00:00:00 GMT\r\n" +
"Transfer-Encoding: chunked\r\n" +
"Content-Type: application/json; charset=utf-8\r\n" +
"Keep-Alive: timeout=5, max=10000\r\n" +
"\r\n" +
"224\r\n" +
"{\"query\":\"x where i = ('EHNRX')\",\"columns\":[{\"name\":\"a\",\"type\":\"BYTE\"},{\"name\":\"b\",\"type\":\"SHORT\"},{\"name\":\"c\",\"type\":\"INT\"},{\"name\":\"d\",\"type\":\"LONG\"},{\"name\":\"e\",\"type\":\"DATE\"},{\"name\":\"f\",\"type\":\"TIMESTAMP\"},{\"name\":\"g\",\"type\":\"FLOAT\"},{\"name\":\"h\",\"type\":\"DOUBLE\"},{\"name\":\"i\",\"type\":\"STRING\"},{\"name\":\"j\",\"type\":\"SYMBOL\"},{\"name\":\"k\",\"type\":\"BOOLEAN\"},{\"name\":\"l\",\"type\":\"BINARY\"}],\"dataset\":[[80,24814,-727724771,8920866532787660373,\"-169665660-01-09T01:58:28.119Z\",\"-51129-02-11T06:38:29.397464Z\",null,null,\"EHNRX\",\"ZSX\",false,[]]],\"count\":1}\r\n" +
"0\r\n" +
"\r\n").getBytes();
sendAndReceive(
NetworkFacadeImpl.INSTANCE,
request,
expectedResponse,
10,
0
);
httpServer.halt();
}
});
}
@Test
@Ignore
public void testUpload() throws Exception {
TestUtils.assertMemoryLeak(() -> {
final String baseDir = System.getProperty("java.io.tmpdir");
// final String baseDir = "/home/vlad/dev/123";
final DefaultHttpServerConfiguration httpConfiguration = createHttpServerConfiguration(baseDir);
try (CairoEngine engine = new Engine(new DefaultCairoConfiguration(baseDir));
HttpServer httpServer = new HttpServer(httpConfiguration)) {
httpServer.bind(new HttpRequestProcessorFactory() {
@Override
public String getUrl() {
return HttpServerConfiguration.DEFAULT_PROCESSOR_URL;
}
@Override
public HttpRequestProcessor newInstance() {
return new StaticContentProcessor(httpConfiguration.getStaticContentProcessorConfiguration());
}
});
httpServer.bind(new HttpRequestProcessorFactory() {
@Override
public String getUrl() {
return "/upload";
}
@Override
public HttpRequestProcessor newInstance() {
return new TextImportProcessor(httpConfiguration.getTextImportProcessorConfiguration(), engine);
}
});
httpServer.bind(new HttpRequestProcessorFactory() {
@Override
public String getUrl() {
return "/query";
}
@Override
public HttpRequestProcessor newInstance() {
return new JsonQueryProcessor(engine);
}
});
httpServer.start();
Thread.sleep(2000000);
}
});
}
@Test
public void testJsonQueryAndDisconnectWithoutWaitingForResult() throws Exception {
TestUtils.assertMemoryLeak(() -> {
......@@ -2012,46 +2149,37 @@ public class IODispatcherTest {
});
}
@Test
@Ignore
public void testUpload() throws Exception {
TestUtils.assertMemoryLeak(() -> {
final String baseDir = System.getProperty("java.io.tmpdir");
// final String baseDir = "/home/vlad/dev/123";
final DefaultHttpServerConfiguration httpConfiguration = createHttpServerConfiguration(baseDir);
private static class CairoHttpServer {
private final CairoEngine engine;
private final HttpServer httpServer;
try (CairoEngine engine = new Engine(new DefaultCairoConfiguration(baseDir));
HttpServer httpServer = new HttpServer(httpConfiguration)) {
httpServer.bind(new HttpRequestProcessorFactory() {
@Override
public String getUrl() {
return HttpServerConfiguration.DEFAULT_PROCESSOR_URL;
}
@Override
public HttpRequestProcessor newInstance() {
return new StaticContentProcessor(httpConfiguration.getStaticContentProcessorConfiguration());
}
});
httpServer.bind(new HttpRequestProcessorFactory() {
@Override
public String getUrl() {
return "/upload";
}
public CairoHttpServer(CharSequence cairoBaseDir, HttpServerConfiguration configuration) {
this.engine = new Engine(new DefaultCairoConfiguration(cairoBaseDir));
this.httpServer = new HttpServer(configuration);
httpServer.bind(new HttpRequestProcessorFactory() {
@Override
public String getUrl() {
return HttpServerConfiguration.DEFAULT_PROCESSOR_URL;
}
@Override
public HttpRequestProcessor newInstance() {
return new TextImportProcessor(httpConfiguration.getTextImportProcessorConfiguration(), engine);
}
});
@Override
public HttpRequestProcessor newInstance() {
return new StaticContentProcessor(configuration.getStaticContentProcessorConfiguration());
}
});
httpServer.start();
httpServer.bind(new HttpRequestProcessorFactory() {
@Override
public String getUrl() {
return "/query";
}
Thread.sleep(2000000);
}
});
@Override
public HttpRequestProcessor newInstance() {
return new JsonQueryProcessor(engine);
}
});
}
}
private static void assertDownloadResponse(long fd, Rnd rnd, long buffer, int len, int nonRepeatedContentLength, String expectedResponseHeader, long expectedResponseLen) {
......@@ -2152,8 +2280,6 @@ public class IODispatcherTest {
public StaticContentProcessorConfiguration getStaticContentProcessorConfiguration() {
return staticContentProcessorConfiguration;
}
};
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册