提交 92536794 编写于 作者: V Vlad Ilyushchenko

server configuration implementation + test

上级 ab04d4cf
/*******************************************************************************
* ___ _ ____ ____
* / _ \ _ _ ___ ___| |_| _ \| __ )
* | | | | | | |/ _ \/ __| __| | | | _ \
* | |_| | |_| | __/\__ \ |_| |_| | |_) |
* \__\_\\__,_|\___||___/\__|____/|____/
*
* Copyright (C) 2014-2019 Appsicle
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
******************************************************************************/
package com.questdb;
public class ServerConfigurationException extends Exception {
public ServerConfigurationException(String key, String value) {
super("invalid configuration value [key=" + key + ", value=" + value + "]");
}
}
/*******************************************************************************
* ___ _ ____ ____
* / _ \ _ _ ___ ___| |_| _ \| __ )
* | | | | | | |/ _ \/ __| __| | | | _ \
* | |_| | |_| | __/\__ \ |_| |_| | |_) |
* \__\_\\__,_|\___||___/\__|____/|____/
*
* Copyright (C) 2014-2019 Appsicle
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
******************************************************************************/
package com.questdb;
import com.questdb.cairo.CairoConfiguration;
import com.questdb.cutlass.http.HttpServerConfiguration;
import com.questdb.cutlass.line.udp.LineUdpReceiverConfiguration;
public interface ServerConfigurationV2 {
CairoConfiguration getCairoConfiguration();
HttpServerConfiguration getHttpServerConfiguration();
LineUdpReceiverConfiguration getLineUdpReceiverConfiguration();
}
......@@ -49,7 +49,7 @@ public interface CairoConfiguration {
int getIndexValueBlockSize();
int getMaxNumberOfSwapFiles();
int getMaxSwapFileCount();
MicrosecondClock getMicrosecondClock();
......@@ -59,19 +59,19 @@ public interface CairoConfiguration {
int getParallelIndexThreshold();
int getReaderPoolSegments();
int getReaderPoolMaxSegments();
CharSequence getRoot();
long getSpinLockTimeoutUs();
int getSqlCacheBlockCount();
int getSqlCacheBlocks();
int getSqlCacheBlockSize();
int getSqlCacheRows();
int getSqlCharacterStoreCapacity();
int getSqlCharacterStorePoolCapacity();
int getSqlCharacterStoreSequencePoolCapacity();
int getSqlColumnPoolCapacity();
......
......@@ -91,7 +91,7 @@ public class DefaultCairoConfiguration implements CairoConfiguration {
}
@Override
public int getMaxNumberOfSwapFiles() {
public int getMaxSwapFileCount() {
return 30;
}
......@@ -116,7 +116,7 @@ public class DefaultCairoConfiguration implements CairoConfiguration {
}
@Override
public int getReaderPoolSegments() {
public int getReaderPoolMaxSegments() {
return 5;
}
......@@ -131,13 +131,13 @@ public class DefaultCairoConfiguration implements CairoConfiguration {
}
@Override
public int getSqlCacheBlockCount() {
return 16;
public int getSqlCacheBlocks() {
return 4;
}
@Override
public int getSqlCacheBlockSize() {
return 4;
public int getSqlCacheRows() {
return 16;
}
@Override
......@@ -148,7 +148,7 @@ public class DefaultCairoConfiguration implements CairoConfiguration {
}
@Override
public int getSqlCharacterStorePoolCapacity() {
public int getSqlCharacterStoreSequencePoolCapacity() {
return 64;
}
......
......@@ -98,7 +98,7 @@ public final class TableUtils {
FilesFacade ff,
AppendMemory memory,
Path path,
CharSequence root,
@Transient CharSequence root,
TableStructure structure,
int mkDirMode
) {
......
......@@ -814,7 +814,7 @@ public class TableWriter implements Closeable {
private int addColumnToMeta(CharSequence name, int type, boolean indexFlag, int indexValueBlockCapacity) {
int index;
try {
index = openMetaSwapFile(ff, ddlMem, path, rootLen, configuration.getMaxNumberOfSwapFiles());
index = openMetaSwapFile(ff, ddlMem, path, rootLen, configuration.getMaxSwapFileCount());
int columnCount = metaMem.getInt(META_OFFSET_COUNT);
ddlMem.putInt(columnCount + 1);
......
......@@ -53,7 +53,7 @@ public class ReaderPool extends AbstractPool implements ResourcePool<TableReader
public ReaderPool(CairoConfiguration configuration) {
super(configuration, configuration.getInactiveReaderTTL());
this.maxSegments = configuration.getReaderPoolSegments();
this.maxSegments = configuration.getReaderPoolMaxSegments();
this.maxEntries = maxSegments * ENTRY_SIZE;
}
......
......@@ -105,8 +105,8 @@ class DefaultHttpServerConfiguration implements HttpServerConfiguration {
}
@Override
public int getConnectionStringPoolSize() {
return 128;
public int getConnectionPoolInitialCapacity() {
return 16;
}
@Override
......@@ -135,7 +135,7 @@ class DefaultHttpServerConfiguration implements HttpServerConfiguration {
}
@Override
public int getConnectionPoolInitialSize() {
return 16;
public int getConnectionStringPoolCapacity() {
return 128;
}
}
......@@ -53,7 +53,7 @@ public class HttpConnectionContext implements IOContext, Locality, Mutable {
public HttpConnectionContext(HttpServerConfiguration configuration) {
this.configuration = configuration;
this.nf = configuration.getDispatcherConfiguration().getNetworkFacade();
this.csPool = new ObjectPool<>(DirectByteCharSequence.FACTORY, configuration.getConnectionStringPoolSize());
this.csPool = new ObjectPool<>(DirectByteCharSequence.FACTORY, configuration.getConnectionStringPoolCapacity());
this.headerParser = new HttpHeaderParser(configuration.getRequestHeaderBufferSize(), csPool);
this.multipartContentHeaderParser = new HttpHeaderParser(configuration.getMultipartHeaderBufferSize(), csPool);
this.multipartContentParser = new HttpMultipartContentParser(multipartContentHeaderParser);
......
......@@ -145,7 +145,7 @@ public class HttpServer implements Closeable {
workerHaltLatch,
-1,
LOG,
configuration.getConnectionPoolInitialSize(),
configuration.getConnectionPoolInitialCapacity(),
// have each thread release their own processor selectors
// in case processors stash some of their resources in thread-local variables
() -> Misc.free(selectors.getQuick(index))
......
......@@ -31,9 +31,9 @@ import com.questdb.std.time.MillisecondClock;
public interface HttpServerConfiguration {
String DEFAULT_PROCESSOR_URL = "*";
int getConnectionPoolInitialSize();
int getConnectionPoolInitialCapacity();
int getConnectionStringPoolSize();
int getConnectionStringPoolCapacity();
int getMultipartHeaderBufferSize();
......
/*******************************************************************************
* ___ _ ____ ____
* / _ \ _ _ ___ ___| |_| _ \| __ )
* | | | | | | |/ _ \/ __| __| | | | _ \
* | |_| | |_| | __/\__ \ |_| |_| | |_) |
* \__\_\\__,_|\___||___/\__|____/|____/
*
* Copyright (C) 2014-2019 Appsicle
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
******************************************************************************/
package com.questdb.cutlass.http;
import com.questdb.cutlass.http.processors.StaticContentProcessorConfiguration;
import com.questdb.cutlass.http.processors.TextImportProcessorConfiguration;
import com.questdb.cutlass.text.TextConfiguration;
import com.questdb.network.*;
import com.questdb.std.FilesFacade;
import com.questdb.std.FilesFacadeImpl;
import com.questdb.std.Numbers;
import com.questdb.std.NumericException;
import com.questdb.std.time.MillisecondClock;
import com.questdb.std.time.MillisecondClockImpl;
import java.util.Properties;
public class PropHttpServerConfiguration implements HttpServerConfiguration {
private final IODispatcherConfiguration ioDispatcherConfiguration = new PropIODispatcherConfiguration();
private final TextImportProcessorConfiguration textImportProcessorConfiguration = new PropTextImportProcessorConfiguration();
private final StaticContentProcessorConfiguration staticContentProcessorConfiguration = new PropStaticContentProcessorConfiguration();
private final PropTextConfiguration textConfiguration = new PropTextConfiguration();
private final int connectionPoolInitialSize;
private final int connectionStringPoolSize;
private final int multipartHeaderBufferSize;
private final long multipartIdleSpinCount;
private final int recvBufferSize;
private final int requestHeaderBufferSize;
private final int responseHeaderBufferSize;
private final int workerCount;
private final int sendBufferSize;
private final CharSequence indexFileName;
private final CharSequence publicDirectory;
private final boolean abortBrokenUploads;
private final int activeConnectionLimit;
private final CharSequence bindIPv4Address;
private final int bindPort;
private final int eventCapacity;
private final int ioQueueCapacity;
private final long idleConnectionTimeout;
private final int interestQueueCapacity;
private final int listenBacklog;
private final int sndBufSize;
private final int rcvBufSize;
private final String adapterSetConfigurationFileName;
private final int dateAdapterPoolSize;
private final int jsonCacheLimit;
private final int jsonCacheSize;
private final double maxRequiredDelimiterStdDev;
private final int metadataStringPoolSize;
private final long rollBufferLimit;
private final long rollBufferSize;
private final int textAnalysisMaxLines;
private final int textLexerStringPoolSize;
private final int timestampAdapterPoolSize;
private final int utf8SinkCapacity;
public PropHttpServerConfiguration(Properties properties) {
this.connectionPoolInitialSize = getInt(properties, "http.connection.pool.initial.size");
this.connectionStringPoolSize = getInt(properties, "http.connection.string.pool.size");
this.multipartHeaderBufferSize = getInt(properties, "http.multipart.header.buffer.size");
this.multipartIdleSpinCount = Long.parseLong(properties.getProperty("http.multipart.idle.spin.count"));
this.recvBufferSize = getInt(properties, "http.receive.buffer.size");
this.requestHeaderBufferSize = getInt(properties, "http.request.header.buffer.size");
this.responseHeaderBufferSize = getInt(properties, "http.response.header.buffer.size");
this.workerCount = getInt(properties, "http.worker.count");
this.sendBufferSize = getInt(properties, "http.send.buffer.size");
this.indexFileName = getString(properties, "http.static.index.file.name");
this.publicDirectory = getString(properties, "http.static.pubic.directory");
this.abortBrokenUploads = Boolean.parseBoolean(properties.getProperty("http.text.abort.broken.uploads"));
this.activeConnectionLimit = getInt(properties, "http.net.active.connection.limit");
this.eventCapacity = getInt(properties, "http.net.event.capacity");
this.ioQueueCapacity = getInt(properties, "http.net.io.queue.capacity");
this.idleConnectionTimeout = Long.parseLong(properties.getProperty("http.net.idle.connection.timeout"));
this.interestQueueCapacity = getInt(properties, "http.net.interest.queue.capacity");
this.listenBacklog = getInt(properties, "http.net.listen.backlog");
this.sndBufSize = getInt(properties, "http.net.snd.buf.size");
this.rcvBufSize = getInt(properties, "http.net.rcv.buf.size");
this.adapterSetConfigurationFileName = getString(properties, "http.text.adapter.set.config");
this.dateAdapterPoolSize = getInt(properties, "http.text.date.adapter.pool.size");
this.jsonCacheLimit = getInt(properties, "http.text.json.cache.limit");
this.jsonCacheSize = getInt(properties, "http.text.json.cache.size");
this.maxRequiredDelimiterStdDev = Double.parseDouble(properties.getProperty("http.text.max.required.delimiter.stddev"));
this.metadataStringPoolSize = getInt(properties, "http.text.metadata.string.pool.size");
this.rollBufferLimit = Long.parseLong(properties.getProperty("http.text.roll.buffer.limit"));
this.rollBufferSize = Long.getLong(properties.getProperty("http.text.roll.buffer.size"));
this.textAnalysisMaxLines = getInt(properties, "http.text.analysis.max.lines");
this.textLexerStringPoolSize = getInt(properties, "http.text.lexer.string.pool.size");
this.timestampAdapterPoolSize = getInt(properties, "http.text.timestamp.adapter.pool.size");
this.utf8SinkCapacity = getInt(properties, "http.text.utf8.sink.capacity");
final String httpBindTo = getString(properties, "http.bind.to");
final int colonIndex = httpBindTo.indexOf(':');
if (colonIndex == -1) {
throw new IllegalStateException();
}
this.bindIPv4Address = httpBindTo.substring(0, colonIndex);
this.bindPort = Integer.parseInt(httpBindTo.substring(colonIndex + 1));
}
@Override
public int getConnectionPoolInitialSize() {
return connectionPoolInitialSize;
}
@Override
public int getConnectionStringPoolSize() {
return connectionStringPoolSize;
}
@Override
public int getMultipartHeaderBufferSize() {
return multipartHeaderBufferSize;
}
@Override
public long getMultipartIdleSpinCount() {
return multipartIdleSpinCount;
}
@Override
public int getRecvBufferSize() {
return recvBufferSize;
}
@Override
public int getRequestHeaderBufferSize() {
return requestHeaderBufferSize;
}
@Override
public int getResponseHeaderBufferSize() {
return responseHeaderBufferSize;
}
@Override
public MillisecondClock getClock() {
return MillisecondClockImpl.INSTANCE;
}
@Override
public IODispatcherConfiguration getDispatcherConfiguration() {
return ioDispatcherConfiguration;
}
@Override
public StaticContentProcessorConfiguration getStaticContentProcessorConfiguration() {
return staticContentProcessorConfiguration;
}
@Override
public TextImportProcessorConfiguration getTextImportProcessorConfiguration() {
return textImportProcessorConfiguration;
}
@Override
public int getWorkerCount() {
return workerCount;
}
@Override
public int getSendBufferSize() {
return sendBufferSize;
}
private int getInt(Properties properties, String key) {
try {
return Numbers.parseInt(properties.getProperty(key));
} catch (NumericException e) {
return 0;
}
}
private String getString(Properties properties, String key) {
return properties.getProperty(key);
}
private class PropStaticContentProcessorConfiguration implements StaticContentProcessorConfiguration {
@Override
public FilesFacade getFilesFacade() {
return FilesFacadeImpl.INSTANCE;
}
@Override
public CharSequence getIndexFileName() {
return indexFileName;
}
@Override
public MimeTypesCache getMimeTypesCache() {
return null;
}
@Override
public CharSequence getPublicDirectory() {
return publicDirectory;
}
}
private class PropTextImportProcessorConfiguration implements TextImportProcessorConfiguration {
@Override
public boolean abortBrokenUploads() {
return abortBrokenUploads;
}
@Override
public TextConfiguration getTextConfiguration() {
return textConfiguration;
}
}
private class PropIODispatcherConfiguration implements IODispatcherConfiguration {
@Override
public int getActiveConnectionLimit() {
return activeConnectionLimit;
}
@Override
public CharSequence getBindIPv4Address() {
return bindIPv4Address;
}
@Override
public int getBindPort() {
return bindPort;
}
@Override
public MillisecondClock getClock() {
return MillisecondClockImpl.INSTANCE;
}
@Override
public int getEventCapacity() {
return eventCapacity;
}
@Override
public int getIOQueueCapacity() {
return ioQueueCapacity;
}
@Override
public long getIdleConnectionTimeout() {
return idleConnectionTimeout;
}
@Override
public int getInterestQueueCapacity() {
return interestQueueCapacity;
}
@Override
public int getListenBacklog() {
return listenBacklog;
}
@Override
public NetworkFacade getNetworkFacade() {
return NetworkFacadeImpl.INSTANCE;
}
@Override
public EpollFacade getEpollFacade() {
return EpollFacadeImpl.INSTANCE;
}
@Override
public SelectFacade getSelectFacade() {
return SelectFacadeImpl.INSTANCE;
}
@Override
public int getInitialBias() {
return IOOperation.READ;
}
@Override
public int getSndBufSize() {
return sndBufSize;
}
@Override
public int getRcvBufSize() {
return rcvBufSize;
}
}
private class PropTextConfiguration implements TextConfiguration {
@Override
public String getAdapterSetConfigurationFileName() {
return adapterSetConfigurationFileName;
}
@Override
public int getDateAdapterPoolSize() {
return dateAdapterPoolSize;
}
@Override
public int getJsonCacheLimit() {
return jsonCacheLimit;
}
@Override
public int getJsonCacheSize() {
return jsonCacheSize;
}
@Override
public double getMaxRequiredDelimiterStdDev() {
return maxRequiredDelimiterStdDev;
}
@Override
public int getMetadataStringPoolSize() {
return metadataStringPoolSize;
}
@Override
public long getRollBufferLimit() {
return rollBufferLimit;
}
@Override
public long getRollBufferSize() {
return rollBufferSize;
}
@Override
public int getTextAnalysisMaxLines() {
return textAnalysisMaxLines;
}
@Override
public int getTextLexerStringPoolSize() {
return textLexerStringPoolSize;
}
@Override
public int getTimestampAdapterPoolSize() {
return timestampAdapterPoolSize;
}
@Override
public int getUtf8SinkCapacity() {
return utf8SinkCapacity;
}
}
}
......@@ -53,7 +53,7 @@ public class JsonQueryProcessor implements HttpRequestProcessor, Closeable {
static final int QUERY_METADATA = 2;
static final int QUERY_PREFIX = 1;
// Factory cache is thread local due to possibility of factory being
// close by another thread. Peer disconnect is a typical example of this.
// closed by another thread. Peer disconnect is a typical example of this.
// Being asynchronous we may need to be able to return factory to the cache
// by the same thread that executes the dispatcher.
static final ThreadLocal<AssociativeCache<RecordCursorFactory>> FACTORY_CACHE = ThreadLocal.withInitial(() -> new AssociativeCache<>(8, 8));
......@@ -323,9 +323,6 @@ public class JsonQueryProcessor implements HttpRequestProcessor, Closeable {
HttpChunkedResponseSocket socket,
int status
) throws PeerDisconnectedException, PeerIsSlowToReadException {
// todo: same issue as with sending confirmation
// what happens when header doesn't fit in buffer or there is "slow" client
// half way sending header
socket.status(status, "application/json; charset=utf-8");
// todo: configure this header externally
socket.headers().put("Keep-Alive: timeout=5, max=10000").put(Misc.EOL);
......
......@@ -61,9 +61,9 @@ public class JsonLexer implements Mutable, Closeable {
private boolean useCache = false;
private int position = 0;
public JsonLexer(int cacheCapacity, int cacheSizeLimit) {
this.cacheCapacity = cacheCapacity;
this.cache = Unsafe.malloc(cacheCapacity);
public JsonLexer(int cacheSize, int cacheSizeLimit) {
this.cacheCapacity = cacheSize;
this.cache = Unsafe.malloc(cacheSize);
this.cacheSizeLimit = cacheSizeLimit;
}
......
......@@ -24,7 +24,7 @@
package com.questdb.cutlass.line;
import com.questdb.cairo.*;
import com.questdb.cairo.pool.ResourcePool;
import com.questdb.cairo.sql.CairoEngine;
import com.questdb.cairo.sql.RecordMetadata;
import com.questdb.log.Log;
import com.questdb.log.LogFactory;
......@@ -45,7 +45,7 @@ public class CairoLineProtoParser implements LineProtoParser, Closeable {
};
private static final FieldNameParser NOOP_FIELD_NAME = name -> {
};
private final ResourcePool<TableWriter> pool;
private final CairoEngine engine;
private final CharSequenceObjHashMap<CacheEntry> writerCache = new CharSequenceObjHashMap<>();
private final CharSequenceObjHashMap<TableWriter> commitList = new CharSequenceObjHashMap<>();
private final Path path = new Path();
......@@ -78,10 +78,10 @@ public class CairoLineProtoParser implements LineProtoParser, Closeable {
private final TableStructureAdapter tableStructureAdapter = new TableStructureAdapter();
private final LineEndParser MY_NEW_LINE_END = this::createTableAndAppendRow;
public CairoLineProtoParser(CairoConfiguration configuration, ResourcePool<TableWriter> pool) {
this.configuration = configuration;
public CairoLineProtoParser(CairoEngine engine) {
this.configuration = engine.getConfiguration();
this.clock = configuration.getMicrosecondClock();
this.pool = pool;
this.engine = engine;
}
@Override
......@@ -157,7 +157,7 @@ public class CairoLineProtoParser implements LineProtoParser, Closeable {
}
private void appendFirstRowAndCacheWriter(CharSequenceCache cache) {
TableWriter writer = pool.get(cache.get(tableName));
TableWriter writer = engine.getWriter(cache.get(tableName));
this.writer = writer;
this.metadata = writer.getMetadata();
this.columnCount = metadata.getColumnCount();
......@@ -224,7 +224,7 @@ public class CairoLineProtoParser implements LineProtoParser, Closeable {
private void cacheWriter(CacheEntry entry, CachedCharSequence tableName) {
try {
entry.writer = pool.get(tableName);
entry.writer = engine.getWriter(tableName);
this.tableName = tableName.getCacheAddress();
createState(entry);
LOG.info().$("cached writer [name=").$(tableName).$(']').$();
......@@ -251,7 +251,7 @@ public class CairoLineProtoParser implements LineProtoParser, Closeable {
configuration.getFilesFacade(),
appendMemory,
path,
configuration.getRoot().toString(),
configuration.getRoot(),
tableStructureAdapter.of(cache),
configuration.getMkDirMode()
);
......
......@@ -23,10 +23,8 @@
package com.questdb.cutlass.line.udp;
import com.questdb.cairo.CairoConfiguration;
import com.questdb.cairo.CairoException;
import com.questdb.cairo.TableWriter;
import com.questdb.cairo.pool.ResourcePool;
import com.questdb.cairo.sql.CairoEngine;
import com.questdb.cutlass.line.CairoLineProtoParser;
import com.questdb.cutlass.line.LineProtoLexer;
import com.questdb.log.Log;
......@@ -52,7 +50,7 @@ public class GenericLineProtoReceiver implements Closeable, Job {
private long totalCount = 0;
private long buf;
public GenericLineProtoReceiver(ReceiverConfiguration receiverCfg, CairoConfiguration cairoCfg, ResourcePool<TableWriter> writerPool) {
public GenericLineProtoReceiver(LineUdpReceiverConfiguration receiverCfg, CairoEngine engine) {
nf = receiverCfg.getNetworkFacade();
......@@ -89,7 +87,7 @@ public class GenericLineProtoReceiver implements Closeable, Job {
this.buf = Unsafe.malloc(this.bufLen = receiverCfg.getMsgBufferSize());
lexer = new LineProtoLexer(receiverCfg.getMsgBufferSize());
parser = new CairoLineProtoParser(cairoCfg, writerPool);
parser = new CairoLineProtoParser(engine);
lexer.withParser(parser);
LOG.info()
......
......@@ -25,13 +25,13 @@ package com.questdb.cutlass.line.udp;
import com.questdb.network.NetworkFacade;
public interface ReceiverConfiguration {
public interface LineUdpReceiverConfiguration {
CharSequence getBindIPv4Address();
int getBindIPv4Address();
int getCommitRate();
CharSequence getGroupIPv4Address();
int getGroupIPv4Address();
int getMsgBufferSize();
......
......@@ -23,10 +23,8 @@
package com.questdb.cutlass.line.udp;
import com.questdb.cairo.CairoConfiguration;
import com.questdb.cairo.CairoException;
import com.questdb.cairo.TableWriter;
import com.questdb.cairo.pool.ResourcePool;
import com.questdb.cairo.sql.CairoEngine;
import com.questdb.cutlass.line.CairoLineProtoParser;
import com.questdb.cutlass.line.LineProtoLexer;
import com.questdb.log.Log;
......@@ -50,7 +48,7 @@ public class LinuxLineProtoReceiver implements Closeable, Job {
private int commitRate;
private long totalCount = 0;
public LinuxLineProtoReceiver(ReceiverConfiguration receiverCfg, CairoConfiguration cairoCfg, ResourcePool<TableWriter> writerPool) {
public LinuxLineProtoReceiver(LineUdpReceiverConfiguration receiverCfg, CairoEngine engine) {
nf = receiverCfg.getNetworkFacade();
......@@ -87,7 +85,7 @@ public class LinuxLineProtoReceiver implements Closeable, Job {
msgVec = nf.msgHeaders(receiverCfg.getMsgBufferSize(), msgCount);
lexer = new LineProtoLexer(receiverCfg.getMsgBufferSize());
parser = new CairoLineProtoParser(cairoCfg, writerPool);
parser = new CairoLineProtoParser(engine);
lexer.withParser(parser);
LOG.info().$("started [fd=").$(fd).$(", bind=").$(receiverCfg.getBindIPv4Address()).$(", group=").$(receiverCfg.getGroupIPv4Address()).$(", port=").$(receiverCfg.getPort()).$(", batch=").$(msgCount).$(", commitRate=").$(commitRate).$(']').$();
......
......@@ -23,11 +23,10 @@
package com.questdb.cutlass.line.udp;
import com.questdb.cairo.CairoConfiguration;
import com.questdb.cairo.pool.WriterPool;
import com.questdb.cairo.sql.CairoEngine;
import com.questdb.mp.Job;
@FunctionalInterface
public interface ReceiverFactory {
Job createReceiver(ReceiverConfiguration receiverCfg, CairoConfiguration cairoCfg, WriterPool pool);
Job createReceiver(LineUdpReceiverConfiguration receiverCfg, CairoEngine engine);
}
......@@ -70,7 +70,7 @@ public class CairoTextWriter implements TextLexer.Listener, Closeable, Mutable {
this.engine = engine;
this.configuration = engine.getConfiguration();
this.path = path;
this.utf8Sink = new DirectCharSink(textConfiguration.getUtf8SinkCapacity());
this.utf8Sink = new DirectCharSink(textConfiguration.getUtf8SinkSize());
this.typeManager = typeManager;
}
......
......@@ -30,13 +30,13 @@ public class DefaultTextConfiguration implements TextConfiguration {
}
@Override
public int getDateAdapterPoolSize() {
public int getDateAdapterPoolCapacity() {
return 16;
}
@Override
public int getJsonCacheLimit() {
return 4096;
return 16384;
}
@Override
......@@ -50,17 +50,17 @@ public class DefaultTextConfiguration implements TextConfiguration {
}
@Override
public int getMetadataStringPoolSize() {
public int getMetadataStringPoolCapacity() {
return 128;
}
@Override
public long getRollBufferLimit() {
public int getRollBufferLimit() {
return 4096;
}
@Override
public long getRollBufferSize() {
public int getRollBufferSize() {
return 1024;
}
......@@ -70,17 +70,17 @@ public class DefaultTextConfiguration implements TextConfiguration {
}
@Override
public int getTextLexerStringPoolSize() {
public int getTextLexerStringPoolCapacity() {
return 32;
}
@Override
public int getTimestampAdapterPoolSize() {
public int getTimestampAdapterPoolCapacity() {
return 16;
}
@Override
public int getUtf8SinkCapacity() {
public int getUtf8SinkSize() {
return 4096;
}
}
......@@ -26,7 +26,7 @@ package com.questdb.cutlass.text;
public interface TextConfiguration {
String getAdapterSetConfigurationFileName();
int getDateAdapterPoolSize();
int getDateAdapterPoolCapacity();
int getJsonCacheLimit();
......@@ -34,17 +34,17 @@ public interface TextConfiguration {
double getMaxRequiredDelimiterStdDev();
int getMetadataStringPoolSize();
int getMetadataStringPoolCapacity();
long getRollBufferLimit();
int getRollBufferLimit();
long getRollBufferSize();
int getRollBufferSize();
int getTextAnalysisMaxLines();
int getTextLexerStringPoolSize();
int getTextLexerStringPoolCapacity();
int getTimestampAdapterPoolSize();
int getTimestampAdapterPoolCapacity();
int getUtf8SinkCapacity();
int getUtf8SinkSize();
}
......@@ -41,7 +41,7 @@ public class TextLexer implements Closeable, Mutable {
private final ObjList<DirectByteCharSequence> fields = new ObjList<>();
private final ObjectPool<DirectByteCharSequence> csPool;
private final TextMetadataDetector metadataDetector;
private final long lineRollBufLimit;
private final int lineRollBufLimit;
private boolean ignoreEolOnce;
private byte columnDelimiter;
private boolean inQuote;
......@@ -56,7 +56,7 @@ public class TextLexer implements Closeable, Mutable {
private long lineRollBufCur;
private Listener textLexerListener;
private long lastLineStart;
private long lineRollBufLen;
private int lineRollBufLen;
private long lineRollBufPtr;
private boolean header;
private long lastQuotePos = -1;
......@@ -66,7 +66,7 @@ public class TextLexer implements Closeable, Mutable {
public TextLexer(TextConfiguration textConfiguration, TypeManager typeManager) {
this.metadataDetector = new TextMetadataDetector(typeManager, textConfiguration);
this.csPool = new ObjectPool<>(DirectByteCharSequence.FACTORY, textConfiguration.getTextLexerStringPoolSize());
this.csPool = new ObjectPool<>(DirectByteCharSequence.FACTORY, textConfiguration.getTextLexerStringPoolCapacity());
this.lineRollBufLen = textConfiguration.getRollBufferSize();
this.lineRollBufLimit = textConfiguration.getRollBufferLimit();
this.lineRollBufPtr = Unsafe.malloc(lineRollBufLen);
......@@ -164,7 +164,7 @@ public class TextLexer implements Closeable, Mutable {
return metadataDetector.getColumnTypes();
}
private boolean growRollBuf(long requiredLength) {
private boolean growRollBuf(int requiredLength) {
if (requiredLength > lineRollBufLimit) {
// todo: log content of roll buffer
LOG.info().$("too long [table=").$(tableName).$(", line=").$(lineCount).$(']').$();
......@@ -173,7 +173,7 @@ public class TextLexer implements Closeable, Mutable {
return false;
}
final long len = Math.min(lineRollBufLimit, requiredLength << 1);
final int len = Math.min(lineRollBufLimit, requiredLength << 1);
LOG.info().$("resizing ").$(lineRollBufLen).$(" -> ").$(len).$(" [table=").$(tableName).$(']').$();
long p = Unsafe.malloc(len);
long l = lineRollBufCur - lineRollBufPtr;
......@@ -327,7 +327,7 @@ public class TextLexer implements Closeable, Mutable {
private void rollLine(long lo, long hi) {
// lastLineStart is an offset from 'lo'
// 'lo' is the address of incoming buffer
long l = hi - lo - lastLineStart;
int l = (int) (hi - lo - lastLineStart);
if (l < lineRollBufLen || growRollBuf(l)) {
assert lo + lastLineStart + l <= hi;
Unsafe.getUnsafe().copyMemory(lo + lastLineStart, lineRollBufPtr, l);
......
......@@ -67,7 +67,7 @@ public class TextLoader implements Closeable, Mutable {
com.questdb.std.microtime.DateFormatFactory timestampFormatFactory
) throws JsonException {
this.utf8Sink = new DirectCharSink(textConfiguration.getUtf8SinkCapacity());
this.utf8Sink = new DirectCharSink(textConfiguration.getUtf8SinkSize());
jsonLexer = new JsonLexer(
textConfiguration.getJsonCacheSize(),
textConfiguration.getJsonCacheLimit()
......
......@@ -58,7 +58,7 @@ public class TextMetadataDetector implements TextLexer.Listener, Mutable, Closea
TextConfiguration textConfiguration
) {
this.typeManager = typeManager;
this.utf8Sink = new DirectCharSink(textConfiguration.getUtf8SinkCapacity());
this.utf8Sink = new DirectCharSink(textConfiguration.getUtf8SinkSize());
}
public static boolean utf8Decode(long lo, long hi, CharSink sink) {
......
......@@ -80,7 +80,7 @@ public class TextMetadataParser implements JsonParser, Mutable, Closeable {
) {
this.columnNames = new ObjList<>();
this.columnTypes = new ObjList<>();
this.csPool = new ObjectPool<>(FloatingCharSequence::new, textConfiguration.getMetadataStringPoolSize());
this.csPool = new ObjectPool<>(FloatingCharSequence::new, textConfiguration.getMetadataStringPoolCapacity());
this.dateLocaleFactory = dateLocaleFactory;
this.dateFormatFactory = dateFormatFactory;
this.timestampLocaleFactory = timestampLocaleFactory;
......
......@@ -72,8 +72,8 @@ public class TypeManager implements Mutable {
public TypeManager(TextConfiguration configuration, DirectCharSink utf8Sink, JsonLexer jsonLexer) throws JsonException {
this.utf8Sink = utf8Sink;
this.dateAdapterPool = new ObjectPool<>(() -> new DateAdapter(utf8Sink), configuration.getDateAdapterPoolSize());
this.timestampAdapterPool = new ObjectPool<>(() -> new TimestampAdapter(utf8Sink), configuration.getTimestampAdapterPoolSize());
this.dateAdapterPool = new ObjectPool<>(() -> new DateAdapter(utf8Sink), configuration.getDateAdapterPoolCapacity());
this.timestampAdapterPool = new ObjectPool<>(() -> new TimestampAdapter(utf8Sink), configuration.getTimestampAdapterPoolCapacity());
this.stringAdapter = new StringAdapter(utf8Sink);
this.symbolAdapter = new SymbolAdapter(utf8Sink);
this.jsonLexer = jsonLexer;
......
......@@ -78,7 +78,7 @@ public class SqlCompiler implements Closeable {
this.queryModelPool = new ObjectPool<>(QueryModel.FACTORY, configuration.getSqlModelPoolCapacity());
this.characterStore = new CharacterStore(
configuration.getSqlCharacterStoreCapacity(),
configuration.getSqlCharacterStorePoolCapacity()
configuration.getSqlCharacterStoreSequencePoolCapacity()
);
this.lexer = new GenericLexer(configuration.getSqlLexerPoolCapacity());
final FunctionParser functionParser = new FunctionParser(configuration, ServiceLoader.load(FunctionFactory.class));
......@@ -110,8 +110,8 @@ public class SqlCompiler implements Closeable {
);
this.sqlCache = new AssociativeCache<>(
configuration.getSqlCacheBlockSize(),
configuration.getSqlCacheBlockCount()
configuration.getSqlCacheBlocks(),
configuration.getSqlCacheRows()
);
}
......
......@@ -34,8 +34,8 @@ public class DefaultIODispatcherConfiguration implements IODispatcherConfigurati
}
@Override
public CharSequence getBindIPv4Address() {
return "0.0.0.0";
public int getBindIPv4Address() {
return 0;
}
@Override
......@@ -60,7 +60,7 @@ public class DefaultIODispatcherConfiguration implements IODispatcherConfigurati
@Override
public long getIdleConnectionTimeout() {
return 10000000000000000L;
return 5 * 60 * 1000L;
}
@Override
......
......@@ -31,7 +31,7 @@ public interface IODispatcherConfiguration {
int getActiveConnectionLimit();
CharSequence getBindIPv4Address();
int getBindIPv4Address();
int getBindPort();
......
......@@ -765,141 +765,6 @@ public class IODispatcherTest {
});
}
@Test
public void testJsonQuerySyntaxError() throws Exception {
TestUtils.assertMemoryLeak(() -> {
final String baseDir = System.getProperty("java.io.tmpdir");
final DefaultHttpServerConfiguration httpConfiguration = createHttpServerConfiguration(baseDir);
try (
CairoEngine engine = new Engine(new DefaultCairoConfiguration(baseDir));
HttpServer httpServer = new HttpServer(httpConfiguration)
) {
httpServer.bind(new HttpRequestProcessorFactory() {
@Override
public String getUrl() {
return HttpServerConfiguration.DEFAULT_PROCESSOR_URL;
}
@Override
public HttpRequestProcessor newInstance() {
return new StaticContentProcessor(httpConfiguration.getStaticContentProcessorConfiguration());
}
});
httpServer.bind(new HttpRequestProcessorFactory() {
@Override
public String getUrl() {
return "/query";
}
@Override
public HttpRequestProcessor newInstance() {
return new JsonQueryProcessor(engine);
}
});
httpServer.start();
// create table with all column types
CairoTestUtils.createTestTable(
engine.getConfiguration(),
20,
new Rnd(),
new TestRecord.ArrayBinarySequence());
// send multipart request to server
final String request = "GET /query?query=x%20where2%20i%20%3D%20(%27EHNRX%27) HTTP/1.1\r\n" +
"Host: localhost:9001\r\n" +
"Connection: keep-alive\r\n" +
"Cache-Control: max-age=0\r\n" +
"Upgrade-Insecure-Requests: 1\r\n" +
"User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/74.0.3729.169 Safari/537.36\r\n" +
"Accept: text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3\r\n" +
"Accept-Encoding: gzip, deflate, br\r\n" +
"Accept-Language: en-GB,en-US;q=0.9,en;q=0.8\r\n" +
"\r\n";
byte[] expectedResponse = ("HTTP/1.1 200 OK\r\n" +
"Server: questDB/1.0\r\n" +
"Date: Thu, 1 Jan 1970 00:00:00 GMT\r\n" +
"Transfer-Encoding: chunked\r\n" +
"Content-Type: application/json; charset=utf-8\r\n" +
"Keep-Alive: timeout=5, max=10000\r\n" +
"\r\n" +
"224\r\n" +
"{\"query\":\"x where i = ('EHNRX')\",\"columns\":[{\"name\":\"a\",\"type\":\"BYTE\"},{\"name\":\"b\",\"type\":\"SHORT\"},{\"name\":\"c\",\"type\":\"INT\"},{\"name\":\"d\",\"type\":\"LONG\"},{\"name\":\"e\",\"type\":\"DATE\"},{\"name\":\"f\",\"type\":\"TIMESTAMP\"},{\"name\":\"g\",\"type\":\"FLOAT\"},{\"name\":\"h\",\"type\":\"DOUBLE\"},{\"name\":\"i\",\"type\":\"STRING\"},{\"name\":\"j\",\"type\":\"SYMBOL\"},{\"name\":\"k\",\"type\":\"BOOLEAN\"},{\"name\":\"l\",\"type\":\"BINARY\"}],\"dataset\":[[80,24814,-727724771,8920866532787660373,\"-169665660-01-09T01:58:28.119Z\",\"-51129-02-11T06:38:29.397464Z\",null,null,\"EHNRX\",\"ZSX\",false,[]]],\"count\":1}\r\n" +
"0\r\n" +
"\r\n").getBytes();
sendAndReceive(
NetworkFacadeImpl.INSTANCE,
request,
expectedResponse,
10,
0
);
httpServer.halt();
}
});
}
@Test
@Ignore
public void testUpload() throws Exception {
TestUtils.assertMemoryLeak(() -> {
final String baseDir = System.getProperty("java.io.tmpdir");
// final String baseDir = "/home/vlad/dev/123";
final DefaultHttpServerConfiguration httpConfiguration = createHttpServerConfiguration(baseDir);
try (CairoEngine engine = new Engine(new DefaultCairoConfiguration(baseDir));
HttpServer httpServer = new HttpServer(httpConfiguration)) {
httpServer.bind(new HttpRequestProcessorFactory() {
@Override
public String getUrl() {
return HttpServerConfiguration.DEFAULT_PROCESSOR_URL;
}
@Override
public HttpRequestProcessor newInstance() {
return new StaticContentProcessor(httpConfiguration.getStaticContentProcessorConfiguration());
}
});
httpServer.bind(new HttpRequestProcessorFactory() {
@Override
public String getUrl() {
return "/upload";
}
@Override
public HttpRequestProcessor newInstance() {
return new TextImportProcessor(httpConfiguration.getTextImportProcessorConfiguration(), engine);
}
});
httpServer.bind(new HttpRequestProcessorFactory() {
@Override
public String getUrl() {
return "/query";
}
@Override
public HttpRequestProcessor newInstance() {
return new JsonQueryProcessor(engine);
}
});
httpServer.start();
Thread.sleep(2000000);
}
});
}
@Test
public void testJsonQueryAndDisconnectWithoutWaitingForResult() throws Exception {
TestUtils.assertMemoryLeak(() -> {
......@@ -1045,6 +910,86 @@ public class IODispatcherTest {
});
}
@Test
public void testJsonQuerySyntaxError() throws Exception {
TestUtils.assertMemoryLeak(() -> {
final String baseDir = System.getProperty("java.io.tmpdir");
final DefaultHttpServerConfiguration httpConfiguration = createHttpServerConfiguration(baseDir);
try (
CairoEngine engine = new Engine(new DefaultCairoConfiguration(baseDir));
HttpServer httpServer = new HttpServer(httpConfiguration)
) {
httpServer.bind(new HttpRequestProcessorFactory() {
@Override
public String getUrl() {
return HttpServerConfiguration.DEFAULT_PROCESSOR_URL;
}
@Override
public HttpRequestProcessor newInstance() {
return new StaticContentProcessor(httpConfiguration.getStaticContentProcessorConfiguration());
}
});
httpServer.bind(new HttpRequestProcessorFactory() {
@Override
public String getUrl() {
return "/query";
}
@Override
public HttpRequestProcessor newInstance() {
return new JsonQueryProcessor(engine);
}
});
httpServer.start();
// create table with all column types
CairoTestUtils.createTestTable(
engine.getConfiguration(),
20,
new Rnd(),
new TestRecord.ArrayBinarySequence());
// send multipart request to server
final String request = "GET /query?query=x%20where2%20i%20%3D%20(%27EHNRX%27) HTTP/1.1\r\n" +
"Host: localhost:9001\r\n" +
"Connection: keep-alive\r\n" +
"Cache-Control: max-age=0\r\n" +
"Upgrade-Insecure-Requests: 1\r\n" +
"User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/74.0.3729.169 Safari/537.36\r\n" +
"Accept: text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3\r\n" +
"Accept-Encoding: gzip, deflate, br\r\n" +
"Accept-Language: en-GB,en-US;q=0.9,en;q=0.8\r\n" +
"\r\n";
byte[] expectedResponse = ("HTTP/1.1 400 Bad request\r\n" +
"Server: questDB/1.0\r\n" +
"Date: Thu, 1 Jan 1970 00:00:00 GMT\r\n" +
"Transfer-Encoding: chunked\r\n" +
"Content-Type: application/json; charset=utf-8\r\n" +
"Keep-Alive: timeout=5, max=10000\r\n" +
"\r\n" +
"4d\r\n" +
"{\"query\":\"x where2 i = ('EHNRX')\",\"error\":\"unexpected token: i\",\"position\":9}\r\n" +
"0\r\n" +
"\r\n").getBytes();
sendAndReceive(
NetworkFacadeImpl.INSTANCE,
request,
expectedResponse,
10,
0
);
httpServer.halt();
}
});
}
@Test
public void testMaxConnections() throws Exception {
......@@ -2149,37 +2094,59 @@ public class IODispatcherTest {
});
}
private static class CairoHttpServer {
private final CairoEngine engine;
private final HttpServer httpServer;
@Test
@Ignore
public void testUpload() throws Exception {
TestUtils.assertMemoryLeak(() -> {
final String baseDir = System.getProperty("java.io.tmpdir");
// final String baseDir = "/home/vlad/dev/123";
final DefaultHttpServerConfiguration httpConfiguration = createHttpServerConfiguration(baseDir);
public CairoHttpServer(CharSequence cairoBaseDir, HttpServerConfiguration configuration) {
this.engine = new Engine(new DefaultCairoConfiguration(cairoBaseDir));
this.httpServer = new HttpServer(configuration);
httpServer.bind(new HttpRequestProcessorFactory() {
@Override
public String getUrl() {
return HttpServerConfiguration.DEFAULT_PROCESSOR_URL;
}
@Override
public HttpRequestProcessor newInstance() {
return new StaticContentProcessor(configuration.getStaticContentProcessorConfiguration());
}
});
try (CairoEngine engine = new Engine(new DefaultCairoConfiguration(baseDir));
HttpServer httpServer = new HttpServer(httpConfiguration)) {
httpServer.bind(new HttpRequestProcessorFactory() {
@Override
public String getUrl() {
return HttpServerConfiguration.DEFAULT_PROCESSOR_URL;
}
httpServer.bind(new HttpRequestProcessorFactory() {
@Override
public String getUrl() {
return "/query";
}
@Override
public HttpRequestProcessor newInstance() {
return new StaticContentProcessor(httpConfiguration.getStaticContentProcessorConfiguration());
}
});
@Override
public HttpRequestProcessor newInstance() {
return new JsonQueryProcessor(engine);
}
});
}
httpServer.bind(new HttpRequestProcessorFactory() {
@Override
public String getUrl() {
return "/upload";
}
@Override
public HttpRequestProcessor newInstance() {
return new TextImportProcessor(httpConfiguration.getTextImportProcessorConfiguration(), engine);
}
});
httpServer.bind(new HttpRequestProcessorFactory() {
@Override
public String getUrl() {
return "/query";
}
@Override
public HttpRequestProcessor newInstance() {
return new JsonQueryProcessor(engine);
}
});
httpServer.start();
Thread.sleep(2000000);
}
});
}
private static void assertDownloadResponse(long fd, Rnd rnd, long buffer, int len, int nonRepeatedContentLength, String expectedResponseHeader, long expectedResponseLen) {
......@@ -2320,6 +2287,7 @@ public class IODispatcherTest {
while (received < expectedToReceive) {
int n = nf.recv(fd, ptr + received, len - received);
if (n > 0) {
// dump(ptr + received, n);
// compare bytes
for (int i = 0; i < n; i++) {
if (expectedResponse[received + i] != Unsafe.getUnsafe().getByte(ptr + received + i)) {
......@@ -2364,6 +2332,39 @@ public class IODispatcherTest {
Unsafe.free(buf, bufLen);
}
private static class CairoHttpServer {
private final CairoEngine engine;
private final HttpServer httpServer;
public CairoHttpServer(CharSequence cairoBaseDir, HttpServerConfiguration configuration) {
this.engine = new Engine(new DefaultCairoConfiguration(cairoBaseDir));
this.httpServer = new HttpServer(configuration);
httpServer.bind(new HttpRequestProcessorFactory() {
@Override
public String getUrl() {
return HttpServerConfiguration.DEFAULT_PROCESSOR_URL;
}
@Override
public HttpRequestProcessor newInstance() {
return new StaticContentProcessor(configuration.getStaticContentProcessorConfiguration());
}
});
httpServer.bind(new HttpRequestProcessorFactory() {
@Override
public String getUrl() {
return "/query";
}
@Override
public HttpRequestProcessor newInstance() {
return new JsonQueryProcessor(engine);
}
});
}
}
private static class HelloContext implements IOContext {
private final long fd;
private final long buffer = Unsafe.malloc(1024);
......
......@@ -24,7 +24,7 @@
package com.questdb.cutlass.line;
import com.questdb.cairo.*;
import com.questdb.cairo.pool.WriterPool;
import com.questdb.cairo.sql.CairoEngine;
import com.questdb.std.*;
import com.questdb.std.microtime.DateFormatUtils;
import com.questdb.std.microtime.MicrosecondClock;
......@@ -506,8 +506,8 @@ public class CairoLineProtoParserTest extends AbstractCairoTest {
private void assertThat(String expected, String lines, CharSequence tableName, CairoConfiguration configuration) throws Exception {
TestUtils.assertMemoryLeak(() -> {
try (WriterPool pool = new WriterPool(configuration, null)) {
try (CairoLineProtoParser parser = new CairoLineProtoParser(configuration, pool)) {
try (CairoEngine engine = new Engine(configuration, null)) {
try (CairoLineProtoParser parser = new CairoLineProtoParser(engine)) {
byte[] bytes = lines.getBytes(StandardCharsets.UTF_8);
int len = bytes.length;
long mem = Unsafe.malloc(len);
......
......@@ -24,7 +24,7 @@
package com.questdb.cutlass.line.udp;
import com.questdb.cairo.*;
import com.questdb.cairo.pool.WriterPool;
import com.questdb.cairo.sql.CairoEngine;
import com.questdb.mp.Job;
import com.questdb.mp.SOCountDownLatch;
import com.questdb.mp.Worker;
......@@ -73,7 +73,7 @@ public class LinuxLineProtoReceiverTest extends AbstractCairoTest {
@Test
public void testGenericSimpleReceive() throws Exception {
assertReceive(new TestReceiverConfiguration(), GENERIC_FACTORY);
assertReceive(new TestLineUdpReceiverConfiguration(), GENERIC_FACTORY);
}
@Test
......@@ -121,7 +121,7 @@ public class LinuxLineProtoReceiverTest extends AbstractCairoTest {
if (Os.type != Os.LINUX) {
return;
}
assertReceive(new TestReceiverConfiguration(), LINUX_FACTORY);
assertReceive(new TestLineUdpReceiverConfiguration(), LINUX_FACTORY);
}
private void assertCannotBindSocket(ReceiverFactory factory) throws Exception {
......@@ -132,7 +132,7 @@ public class LinuxLineProtoReceiverTest extends AbstractCairoTest {
return false;
}
};
ReceiverConfiguration receiverCfg = new TestReceiverConfiguration() {
LineUdpReceiverConfiguration receiverCfg = new TestLineUdpReceiverConfiguration() {
@Override
public NetworkFacade getNetworkFacade() {
return nf;
......@@ -146,12 +146,12 @@ public class LinuxLineProtoReceiverTest extends AbstractCairoTest {
TestUtils.assertMemoryLeak(() -> {
NetworkFacade nf = new NetworkFacadeImpl() {
@Override
public boolean join(long fd, CharSequence bindIPv4Address, CharSequence groupIPv4Address) {
public boolean join(long fd, int bindIPv4Address, int groupIPv4Address) {
return false;
}
};
ReceiverConfiguration receiverCfg = new TestReceiverConfiguration() {
LineUdpReceiverConfiguration receiverCfg = new TestLineUdpReceiverConfiguration() {
@Override
public NetworkFacade getNetworkFacade() {
return nf;
......@@ -170,7 +170,7 @@ public class LinuxLineProtoReceiverTest extends AbstractCairoTest {
return -1;
}
};
ReceiverConfiguration receiverCfg = new TestReceiverConfiguration() {
LineUdpReceiverConfiguration receiverCfg = new TestLineUdpReceiverConfiguration() {
@Override
public NetworkFacade getNetworkFacade() {
return nf;
......@@ -188,25 +188,24 @@ public class LinuxLineProtoReceiverTest extends AbstractCairoTest {
}
};
ReceiverConfiguration configuration = new TestReceiverConfiguration() {
LineUdpReceiverConfiguration configuration = new TestLineUdpReceiverConfiguration() {
@Override
public int getReceiveBufferSize() {
return 2048;
public NetworkFacade getNetworkFacade() {
return nf;
}
@Override
public NetworkFacade getNetworkFacade() {
return nf;
public int getReceiveBufferSize() {
return 2048;
}
};
assertReceive(configuration, factory);
}
private void assertConstructorFail(ReceiverConfiguration receiverCfg, ReceiverFactory factory) {
CairoConfiguration cairoCfg = new DefaultCairoConfiguration(root);
try (WriterPool pool = new WriterPool(cairoCfg, null)) {
private void assertConstructorFail(LineUdpReceiverConfiguration receiverCfg, ReceiverFactory factory) {
try (CairoEngine engine = new Engine(new DefaultCairoConfiguration(root), null)) {
try {
factory.createReceiver(receiverCfg, cairoCfg, pool);
factory.createReceiver(receiverCfg, engine);
Assert.fail();
} catch (CairoException ignore) {
}
......@@ -214,7 +213,7 @@ public class LinuxLineProtoReceiverTest extends AbstractCairoTest {
}
private void assertFrequentCommit(ReceiverFactory factory) throws Exception {
ReceiverConfiguration configuration = new TestReceiverConfiguration() {
LineUdpReceiverConfiguration configuration = new TestLineUdpReceiverConfiguration() {
@Override
public int getCommitRate() {
return 0;
......@@ -223,7 +222,7 @@ public class LinuxLineProtoReceiverTest extends AbstractCairoTest {
assertReceive(configuration, factory);
}
private void assertReceive(ReceiverConfiguration receiverCfg, ReceiverFactory factory) throws Exception {
private void assertReceive(LineUdpReceiverConfiguration receiverCfg, ReceiverFactory factory) throws Exception {
TestUtils.assertMemoryLeak(() -> {
final String expected = "colour\tshape\tsize\ttimestamp\n" +
"blue\tsquare\t3.400000000000\t1970-01-01T00:01:40.000000Z\n" +
......@@ -237,10 +236,9 @@ public class LinuxLineProtoReceiverTest extends AbstractCairoTest {
"blue\tsquare\t3.400000000000\t1970-01-01T00:01:40.000000Z\n" +
"blue\tsquare\t3.400000000000\t1970-01-01T00:01:40.000000Z\n";
CairoConfiguration cairoCfg = new DefaultCairoConfiguration(root);
try (WriterPool pool = new WriterPool(cairoCfg, null)) {
try (CairoEngine engine = new Engine(new DefaultCairoConfiguration(root), null)) {
Job receiver = factory.createReceiver(receiverCfg, cairoCfg, pool);
Job receiver = factory.createReceiver(receiverCfg, engine);
try {
......@@ -257,7 +255,7 @@ public class LinuxLineProtoReceiverTest extends AbstractCairoTest {
}
// warm writer up
try (TableWriter w = pool.get("tab")) {
try (TableWriter w = engine.getWriter("tab")) {
w.warmUp();
}
......@@ -266,14 +264,14 @@ public class LinuxLineProtoReceiverTest extends AbstractCairoTest {
Worker worker = new Worker(jobs, workerHaltLatch);
worker.start();
try (LineProtoSender sender = new LineProtoSender(NetworkFacadeImpl.INSTANCE, 0, Net.parseIPv4(receiverCfg.getBindIPv4Address()), receiverCfg.getPort(), 1400)) {
try (LineProtoSender sender = new LineProtoSender(NetworkFacadeImpl.INSTANCE, 0, receiverCfg.getBindIPv4Address(), receiverCfg.getPort(), 1400)) {
for (int i = 0; i < 10; i++) {
sender.metric("tab").tag("colour", "blue").tag("shape", "square").field("size", 3.4, 4).$(100000000);
}
sender.flush();
}
try (TableReader reader = new TableReader(cairoCfg, "tab")) {
try (TableReader reader = new TableReader(new DefaultCairoConfiguration(root), "tab")) {
int count = 1000000;
while (true) {
if (count-- > 0 && reader.size() < 10) {
......@@ -300,11 +298,11 @@ public class LinuxLineProtoReceiverTest extends AbstractCairoTest {
});
}
private static class TestReceiverConfiguration implements ReceiverConfiguration {
private static class TestLineUdpReceiverConfiguration implements LineUdpReceiverConfiguration {
@Override
public CharSequence getBindIPv4Address() {
return "127.0.0.1";
public int getBindIPv4Address() {
return Net.parseIPv4("127.0.0.1");
}
@Override
......@@ -313,8 +311,8 @@ public class LinuxLineProtoReceiverTest extends AbstractCairoTest {
}
@Override
public CharSequence getGroupIPv4Address() {
return "224.1.1.1";
public int getGroupIPv4Address() {
return Net.parseIPv4("224.1.1.1");
}
@Override
......
......@@ -619,12 +619,12 @@ public class TextLoaderTest extends AbstractCairoTest {
try (TextLoader loader = new TextLoader(
new DefaultTextConfiguration() {
@Override
public long getRollBufferLimit() {
public int getRollBufferLimit() {
return 128;
}
@Override
public long getRollBufferSize() {
public int getRollBufferSize() {
return 32;
}
},
......@@ -883,12 +883,12 @@ public class TextLoaderTest extends AbstractCairoTest {
try (TextLoader loader = new TextLoader(
new DefaultTextConfiguration() {
@Override
public long getRollBufferLimit() {
public int getRollBufferLimit() {
return 128;
}
@Override
public long getRollBufferSize() {
public int getRollBufferSize() {
return 32;
}
},
......
http.connection.pool.initial.capacity=64
http.connection.string.pool.capacity=512
http.multipart.header.buffer.size=256
http.multipart.idle.spin.count=100000
http.receive.buffer.size=4k
http.request.header.buffer.size=2k
http.response.header.buffer.size=9012
http.worker.count=6
http.send.buffer.size=128
http.static.index.file.name=index2.html
http.static.pubic.directory=public_ok
http.net.active.connection.limit=64
http.net.event.capacity=2048
http.net.io.queue.capacity=64
http.net.idle.connection.timeout=7000000
http.net.interest.queue.capacity=512
http.net.listen.backlog=64
http.net.snd.buf.size=4m
http.net.rcv.buf.size=8m
http.text.abort.broken.uploads=false
http.text.adapter.set.config=/loader.json
http.text.date.adapter.pool.capacity=32
http.text.json.cache.limit=64k
http.text.json.cache.size=8m
http.text.max.required.delimiter.stddev=0.3d
http.text.metadata.string.pool.capacity=512
http.text.roll.buffer.limit=6k
http.text.roll.buffer.size=3k
http.text.analysis.max.lines=400
http.text.lexer.string.pool.capacity=128
http.text.timestamp.adapter.pool.capacity=512
http.text.utf8.sink.size=8k
http.bind.to=10.5.8.30:9900
cairo.create.as.select.retry.count=12
cairo.default.map.type=compact
cairo.default.symbol.cache.flag=true
cairo.default.symbol.capacity=512
cairo.file.operation.retry.count=10
cairo.idle.check.interval=20000
cairo.inactive.reader.ttl=600000
cairo.inactive.writer.ttl=400000
cairo.index.value.block.size=1024
cairo.max.swap.file.count=23
cairo.mkdir.mode=580
cairo.parallel.index.threshold=1000000
cairo.reader.pool.max.segments=10
cairo.spin.lock.timeout=5000000
cairo.cache.rows=32
cairo.cache.blocks=16
cairo.character.store.capacity=2048
cairo.character.store.sequence.pool.capacity=128
cairo.column.pool.capacity=2048
cairo.compact.map.load.factor=0.8
cairo.expression.pool.capacity=1024
cairo.fast.map.load.factor=0.3
cairo.sql.join.context.pool.capacity=32
cairo.lexer.pool.capacity=1024
cairo.sql.map.key.capacity=1024
cairo.sql.map.page.size=6m
cairo.model.pool.capacity=256
cairo.sql.sort.key.page.size=10m
cairo.sql.sort.light.value.page.size=3m
cairo.sql.hash.join.value.page.size=8m
cairo.sql.tree.page.size=16m
cairo.sql.hash.join.light.value.page.size=2m
cairo.sql.sort.value.page.size=4m
cairo.work.steal.timeout.nanos=1000000
cairo.parallel.indexing.enabled=false
cairo.sql.join.metadata.page.size=8k
line.udp.bind.to=10.2.1.33:9915
line.udp.commit.rate=100000
line.udp.join=224.1.1.1
line.udp.msg.buffer.size=4m
line.udp.msg.count=4000
line.udp.receive.buffer.size=512
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册