提交 8e5770ed 编写于 作者: V Vlad Ilyushchenko

GRIFFIN: refactored text loader not to depend on existence of JSON...

GRIFFIN: refactored text loader not to depend on existence of JSON configuration at runtime. Such configuration is loaded in-memory as a separate step and text loader relies on parsed model at runtime.
上级 bac02ddf
......@@ -153,6 +153,7 @@ public class PropServerConfiguration implements ServerConfiguration {
private int lineUdpPort;
private int jsonQueryFloatScale;
private int jsonQueryDoubleScale;
private int jsonQueryCopyBufferSize;
private int jsonQueryConnectionCheckFrequency;
private boolean httpFrozenClock;
private int sqlAnalyticColumnPoolCapacity;
......@@ -234,6 +235,7 @@ public class PropServerConfiguration implements ServerConfiguration {
this.jsonQueryConnectionCheckFrequency = getInt(properties, "http.json.query.connection.check.frequency", 1_000_000);
this.jsonQueryDoubleScale = getInt(properties, "http.json.query.double.scale", 10);
this.jsonQueryFloatScale = getInt(properties, "http.json.query.float.scale", 10);
this.jsonQueryCopyBufferSize = getIntSize(properties, "http.json.query.copy.buffer.size", 2 * 1024 * 1024);
parseBindTo(properties, "http.bind.to", "0.0.0.0:9000", (a, p) -> {
bindIPv4Address = a;
......@@ -1013,13 +1015,13 @@ public class PropServerConfiguration implements ServerConfiguration {
private class PropJsonQueryProcessorConfiguration implements JsonQueryProcessorConfiguration {
@Override
public CharSequence getKeepAliveHeader() {
return keepAliveHeader;
public MillisecondClock getClock() {
return httpFrozenClock ? StationaryMillisClock.INSTANCE : MillisecondClockImpl.INSTANCE;
}
@Override
public int getFloatScale() {
return jsonQueryFloatScale;
public int getConnectionCheckFrequency() {
return jsonQueryConnectionCheckFrequency;
}
@Override
......@@ -1028,8 +1030,13 @@ public class PropServerConfiguration implements ServerConfiguration {
}
@Override
public int getConnectionCheckFrequency() {
return jsonQueryConnectionCheckFrequency;
public int getFloatScale() {
return jsonQueryFloatScale;
}
@Override
public CharSequence getKeepAliveHeader() {
return keepAliveHeader;
}
@Override
......@@ -1038,8 +1045,13 @@ public class PropServerConfiguration implements ServerConfiguration {
}
@Override
public MillisecondClock getClock() {
return httpFrozenClock ? StationaryMillisClock.INSTANCE : MillisecondClockImpl.INSTANCE;
public int getCopyBufferSize() {
return jsonQueryCopyBufferSize;
}
@Override
public FilesFacade getFilesFacade() {
return FilesFacadeImpl.INSTANCE;
}
}
......
......@@ -66,15 +66,16 @@ class DefaultHttpServerConfiguration implements HttpServerConfiguration {
return null;
}
};
private final TextImportProcessorConfiguration textImportProcessorConfiguration = new DefaultTextImportProcessorConfiguration();
private final JsonQueryProcessorConfiguration jsonQueryProcessorConfiguration = new JsonQueryProcessorConfiguration() {
@Override
public CharSequence getKeepAliveHeader() {
return "Keep-Alive: timeout=5, max=10000\r\n";
public MillisecondClock getClock() {
return DefaultHttpServerConfiguration.this.getClock();
}
@Override
public int getFloatScale() {
return 10;
public int getConnectionCheckFrequency() {
return 1_000_000;
}
@Override
......@@ -83,21 +84,30 @@ class DefaultHttpServerConfiguration implements HttpServerConfiguration {
}
@Override
public int getConnectionCheckFrequency() {
return 1_000_000;
public int getFloatScale() {
return 10;
}
@Override
public MillisecondClock getClock() {
return DefaultHttpServerConfiguration.this.getClock();
public CharSequence getKeepAliveHeader() {
return "Keep-Alive: timeout=5, max=10000\r\n";
}
@Override
public TextConfiguration getTextConfiguration() {
return textImportProcessorConfiguration.getTextConfiguration();
}
@Override
public int getCopyBufferSize() {
return 2 * 1024 * 1024;
}
@Override
public FilesFacade getFilesFacade() {
return FilesFacadeImpl.INSTANCE;
}
};
private final TextImportProcessorConfiguration textImportProcessorConfiguration = new DefaultTextImportProcessorConfiguration();
public DefaultHttpServerConfiguration() {
String defaultFilePath = this.getClass().getResource("/site/conf/mime.types").getFile();
......@@ -176,6 +186,11 @@ class DefaultHttpServerConfiguration implements HttpServerConfiguration {
return 2;
}
@Override
public boolean workerHaltOnError() {
return false;
}
@Override
public int[] getWorkerAffinity() {
return new int[]{-1, -1};
......@@ -200,9 +215,4 @@ class DefaultHttpServerConfiguration implements HttpServerConfiguration {
public boolean allowDeflateBeforeSend() {
return false;
}
@Override
public boolean workerHaltOnError() {
return false;
}
}
......@@ -25,6 +25,9 @@ package io.questdb.cutlass.http;
import io.questdb.cairo.CairoEngine;
import io.questdb.cutlass.http.processors.*;
import io.questdb.cutlass.json.JsonException;
import io.questdb.cutlass.json.JsonLexer;
import io.questdb.cutlass.text.types.InputFormatConfiguration;
import io.questdb.log.Log;
import io.questdb.log.LogFactory;
import io.questdb.mp.Job;
......@@ -36,6 +39,8 @@ import io.questdb.network.IODispatchers;
import io.questdb.network.IORequestProcessor;
import io.questdb.std.ThreadLocal;
import io.questdb.std.*;
import io.questdb.std.time.DateFormatFactory;
import io.questdb.std.time.DateLocaleFactory;
import org.jetbrains.annotations.Nullable;
import java.io.Closeable;
......@@ -96,8 +101,24 @@ public class HttpServer implements Closeable {
WorkerPool workerPool,
Log workerPoolLog,
CairoEngine cairoEngine
) {
) throws JsonException {
if (configuration.isEnabled()) {
final DateFormatFactory dateFormatFactory = new DateFormatFactory();
final io.questdb.std.microtime.DateFormatFactory timestampFormatFactory = new io.questdb.std.microtime.DateFormatFactory();
final InputFormatConfiguration inputFormatConfiguration = new InputFormatConfiguration(
dateFormatFactory,
DateLocaleFactory.INSTANCE,
timestampFormatFactory,
io.questdb.std.microtime.DateLocaleFactory.INSTANCE
);
try (JsonLexer jsonLexer = new JsonLexer(configuration.getTextImportProcessorConfiguration().getTextConfiguration().getJsonCacheSize(), configuration.getTextImportProcessorConfiguration().getTextConfiguration().getJsonCacheLimit())) {
inputFormatConfiguration.parseConfiguration(
jsonLexer,
configuration.getTextImportProcessorConfiguration().getTextConfiguration().getAdapterSetConfigurationFileName()
);
}
final WorkerPool localPool;
if (configuration.getWorkerCount() > 0) {
localPool = new WorkerPool(new WorkerPoolConfiguration() {
......@@ -129,7 +150,11 @@ public class HttpServer implements Closeable {
@Override
public HttpRequestProcessor newInstance() {
return new JsonQueryProcessor(configuration.getJsonQueryProcessorConfiguration(), cairoEngine);
return new JsonQueryProcessor(
configuration.getJsonQueryProcessorConfiguration(),
cairoEngine,
inputFormatConfiguration
);
}
});
......@@ -141,7 +166,11 @@ public class HttpServer implements Closeable {
@Override
public HttpRequestProcessor newInstance() {
return new TextImportProcessor(configuration.getTextImportProcessorConfiguration(), cairoEngine);
return new TextImportProcessor(
configuration.getTextImportProcessorConfiguration(),
cairoEngine,
inputFormatConfiguration
);
}
});
......
......@@ -36,6 +36,7 @@ import io.questdb.cutlass.http.HttpRequestProcessor;
import io.questdb.cutlass.json.JsonException;
import io.questdb.cutlass.text.Atomicity;
import io.questdb.cutlass.text.TextLoader;
import io.questdb.cutlass.text.types.InputFormatConfiguration;
import io.questdb.griffin.*;
import io.questdb.griffin.model.CopyModel;
import io.questdb.log.Log;
......@@ -45,8 +46,6 @@ import io.questdb.network.*;
import io.questdb.std.*;
import io.questdb.std.str.CharSink;
import io.questdb.std.str.Path;
import io.questdb.std.time.DateFormatFactory;
import io.questdb.std.time.DateLocaleFactory;
import java.io.Closeable;
import java.util.concurrent.atomic.AtomicLong;
......@@ -65,28 +64,24 @@ public class JsonQueryProcessor implements HttpRequestProcessor, Closeable {
private final ObjList<StateResumeAction> resumeActions = new ObjList<>();
private final TextLoader textLoader;
private final Path path = new Path();
private final FilesFacade ff;
public JsonQueryProcessor(JsonQueryProcessorConfiguration configuration, CairoEngine engine) {
public JsonQueryProcessor(
JsonQueryProcessorConfiguration configuration,
CairoEngine engine,
InputFormatConfiguration inputFormatConfiguration
) {
// todo: add scheduler
this.configuration = configuration;
this.ff = configuration.getFilesFacade();
this.compiler = new SqlCompiler(engine);
this.floatScale = configuration.getFloatScale();
this.doubleScale = configuration.getDoubleScale();
try {
this.textLoader = new TextLoader(
configuration.getTextConfiguration(),
engine,
DateLocaleFactory.INSTANCE,
new DateFormatFactory(),
io.questdb.std.microtime.DateLocaleFactory.INSTANCE,
new io.questdb.std.microtime.DateFormatFactory()
);
} catch (JsonException e) {
// todo: we must not do this
throw new RuntimeException("damn");
}
this.textLoader = new TextLoader(
configuration.getTextConfiguration(),
engine,
inputFormatConfiguration
);
this.valueWriters.extendAndSet(ColumnType.BOOLEAN, this::putBooleanValue);
this.valueWriters.extendAndSet(ColumnType.BYTE, this::putByteValue);
this.valueWriters.extendAndSet(ColumnType.DOUBLE, this::putDoubleValue);
......@@ -257,25 +252,28 @@ public class JsonQueryProcessor implements HttpRequestProcessor, Closeable {
try {
textLoader.clear();
textLoader.setState(TextLoader.ANALYZE_STRUCTURE);
// todo: configure the following
// - when happens when data row errors out, max errors may be?
// - we should be able to skip X rows from top, dodgy headers etc.
textLoader.configureDestination(model.getTableName().token, true, false, Atomicity.SKIP_ROW);
int len = 4 * 1024 * 1024;
int len = configuration.getCopyBufferSize();
long buf = Unsafe.malloc(len);
try {
path.of(GenericLexer.unquote(model.getFileName().token)).$();
long fd = Files.openRO(path);
long fd = ff.openRO(path);
if (fd == -1) {
throw SqlException.$(model.getFileName().position, "could not open file [errno=").put(Os.errno()).put(']');
}
long fileLen = Files.length(fd);
long n = (int) Files.read(fd, buf, len, 0);
long fileLen = ff.length(fd);
long n = ff.read(fd, buf, len, 0);
if (n > 0) {
textLoader.parse(buf, buf + n, executionContext.getCairoSecurityContext());
textLoader.setState(TextLoader.LOAD_DATA);
int read;
while (n < fileLen) {
read = (int) Files.read(fd, buf, len, n);
read = (int) ff.read(fd, buf, len, n);
if (read < 1) {
throw SqlException.$(model.getFileName().position, "could not read file [errno=").put(Os.errno()).put(']');
throw SqlException.$(model.getFileName().position, "could not read file [errno=").put(ff.errno()).put(']');
}
textLoader.parse(buf, buf + read, executionContext.getCairoSecurityContext());
n += read;
......@@ -286,7 +284,7 @@ public class JsonQueryProcessor implements HttpRequestProcessor, Closeable {
Unsafe.free(buf, len);
}
} catch (JsonException e) {
e.printStackTrace();
// we do not expect JSON exception here
} finally {
LOG.info().$("copied").$();
}
......@@ -414,14 +412,6 @@ public class JsonQueryProcessor implements HttpRequestProcessor, Closeable {
return LOG.error().$('[').$(state.fd).$("] ");
}
long getCacheHits() {
return cacheHits.longValue();
}
long getCacheMisses() {
return cacheMisses.longValue();
}
protected void header(
HttpChunkedResponseSocket socket,
int status
......
......@@ -24,6 +24,7 @@
package io.questdb.cutlass.http.processors;
import io.questdb.cutlass.text.TextConfiguration;
import io.questdb.std.FilesFacade;
import io.questdb.std.time.MillisecondClock;
public interface JsonQueryProcessorConfiguration {
......@@ -39,4 +40,8 @@ public interface JsonQueryProcessorConfiguration {
CharSequence getKeepAliveHeader();
TextConfiguration getTextConfiguration();
int getCopyBufferSize();
FilesFacade getFilesFacade();
}
......@@ -31,6 +31,7 @@ import io.questdb.cutlass.http.*;
import io.questdb.cutlass.json.JsonException;
import io.questdb.cutlass.text.Atomicity;
import io.questdb.cutlass.text.TextLoader;
import io.questdb.cutlass.text.types.InputFormatConfiguration;
import io.questdb.log.Log;
import io.questdb.log.LogFactory;
import io.questdb.network.*;
......@@ -62,15 +63,19 @@ public class TextImportProcessor implements HttpRequestProcessor, HttpMultipartC
private static final LocalValue<TextImportProcessorState> LV = new LocalValue<>();
private final TextImportProcessorConfiguration configuration;
private final CairoEngine engine;
private final InputFormatConfiguration inputFormatConfiguration;
private HttpConnectionContext transientContext;
private IODispatcher<HttpConnectionContext> transientDispatcher;
private TextImportProcessorState transientState;
public TextImportProcessor(
TextImportProcessorConfiguration configuration,
CairoEngine cairoEngine
CairoEngine cairoEngine,
InputFormatConfiguration inputFormatConfiguration
) {
this.configuration = configuration;
this.engine = cairoEngine;
this.inputFormatConfiguration = inputFormatConfiguration;
}
@Override
......@@ -170,13 +175,13 @@ public class TextImportProcessor implements HttpRequestProcessor, HttpMultipartC
this.transientDispatcher = dispatcher;
this.transientState = LV.get(context);
if (this.transientState == null) {
try {
LOG.debug().$("new text state").$();
LV.set(context, this.transientState = new TextImportProcessorState(configuration.getTextConfiguration(), engine));
} catch (JsonException e) {
// todo: handle gracefully
e.printStackTrace();
}
LOG.debug().$("new text state").$();
LV.set(context, this.transientState = new TextImportProcessorState(
configuration.getTextConfiguration(),
engine,
inputFormatConfiguration
)
);
}
}
......
......@@ -24,13 +24,11 @@
package io.questdb.cutlass.http.processors;
import io.questdb.cairo.CairoEngine;
import io.questdb.cutlass.json.JsonException;
import io.questdb.cutlass.text.TextConfiguration;
import io.questdb.cutlass.text.TextLoader;
import io.questdb.cutlass.text.types.InputFormatConfiguration;
import io.questdb.std.Misc;
import io.questdb.std.Mutable;
import io.questdb.std.time.DateFormatFactory;
import io.questdb.std.time.DateLocaleFactory;
import java.io.Closeable;
......@@ -48,18 +46,8 @@ class TextImportProcessorState implements Mutable, Closeable {
int state;
boolean json = false;
TextImportProcessorState(
TextConfiguration configuration,
CairoEngine engine
) throws JsonException {
this.textLoader = new TextLoader(
configuration,
engine,
DateLocaleFactory.INSTANCE,
new DateFormatFactory(),
io.questdb.std.microtime.DateLocaleFactory.INSTANCE,
new io.questdb.std.microtime.DateFormatFactory()
);
TextImportProcessorState(TextConfiguration configuration, CairoEngine engine, InputFormatConfiguration inputFormatConfiguration) {
this.textLoader = new TextLoader(configuration, engine, inputFormatConfiguration);
}
@Override
......
......@@ -28,6 +28,7 @@ import io.questdb.cairo.CairoSecurityContext;
import io.questdb.cairo.sql.RecordMetadata;
import io.questdb.cutlass.json.JsonException;
import io.questdb.cutlass.json.JsonLexer;
import io.questdb.cutlass.text.types.InputFormatConfiguration;
import io.questdb.cutlass.text.types.TypeManager;
import io.questdb.log.Log;
import io.questdb.log.LogFactory;
......@@ -37,8 +38,6 @@ import io.questdb.std.Mutable;
import io.questdb.std.ObjList;
import io.questdb.std.str.DirectCharSink;
import io.questdb.std.str.Path;
import io.questdb.std.time.DateFormatFactory;
import io.questdb.std.time.DateLocaleFactory;
import java.io.Closeable;
......@@ -62,33 +61,18 @@ public class TextLoader implements Closeable, Mutable {
private byte columnDelimiter = -1;
/**
* @throws JsonException when default configuration cannot be loaded from classpath
*
*/
public TextLoader(
TextConfiguration textConfiguration,
CairoEngine engine,
DateLocaleFactory dateLocaleFactory,
DateFormatFactory dateFormatFactory,
io.questdb.std.microtime.DateLocaleFactory timestampLocaleFactory,
io.questdb.std.microtime.DateFormatFactory timestampFormatFactory
) throws JsonException {
public TextLoader(TextConfiguration textConfiguration, CairoEngine engine, InputFormatConfiguration inputFormatConfiguration) {
this.utf8Sink = new DirectCharSink(textConfiguration.getUtf8SinkSize());
jsonLexer = new JsonLexer(
textConfiguration.getJsonCacheSize(),
textConfiguration.getJsonCacheLimit()
);
this.typeManager = new TypeManager(textConfiguration, utf8Sink, jsonLexer);
this.typeManager = new TypeManager(textConfiguration, utf8Sink, inputFormatConfiguration);
textLexer = new TextLexer(textConfiguration, typeManager);
textWriter = new CairoTextWriter(engine, path, textConfiguration, typeManager);
textMetadataParser = new TextMetadataParser(
textConfiguration,
dateLocaleFactory,
dateFormatFactory,
timestampLocaleFactory,
timestampFormatFactory,
typeManager
);
textMetadataParser = new TextMetadataParser(textConfiguration, typeManager);
textAnalysisMaxLines = textConfiguration.getTextAnalysisMaxLines();
textDelimiterScanner = new TextDelimiterScanner(textConfiguration);
parseMethods.extendAndSet(LOAD_JSON_METADATA, this::parseJsonMetadata);
......@@ -149,14 +133,14 @@ public class TextLoader implements Closeable, Mutable {
return textWriter.getMetadata();
}
public int getPartitionBy() {
return textWriter.getPartitionBy();
}
public long getParsedLineCount() {
return textLexer.getLineCount();
}
public int getPartitionBy() {
return textWriter.getPartitionBy();
}
public CharSequence getTableName() {
return textWriter.getTableName();
}
......@@ -181,10 +165,35 @@ public class TextLoader implements Closeable, Mutable {
parseMethods.getQuick(state).parse(lo, hi, cairoSecurityContext);
}
public void setState(int state) {
LOG.debug().$("state change [old=").$(this.state).$(", new=").$(state).$(']').$();
this.state = state;
jsonLexer.clear();
}
public void wrapUp() throws JsonException {
switch (state) {
case LOAD_JSON_METADATA:
jsonLexer.parseLast();
break;
case ANALYZE_STRUCTURE:
case LOAD_DATA:
textLexer.parseLast();
textWriter.commit();
break;
default:
break;
}
}
private void parseData(long lo, long hi, CairoSecurityContext cairoSecurityContext) {
textLexer.parse(lo, hi, Integer.MAX_VALUE, textWriter);
}
private void parseJsonMetadata(long lo, long hi, CairoSecurityContext cairoSecurityContext) throws JsonException {
jsonLexer.parse(lo, hi, textMetadataParser);
}
private void parseStructure(long lo, long hi, CairoSecurityContext cairoSecurityContext) {
if (columnDelimiter > 0) {
textLexer.of(columnDelimiter);
......@@ -203,31 +212,6 @@ public class TextLoader implements Closeable, Mutable {
textLexer.parse(lo, hi, Integer.MAX_VALUE, textWriter);
}
private void parseJsonMetadata(long lo, long hi, CairoSecurityContext cairoSecurityContext) throws JsonException {
jsonLexer.parse(lo, hi, textMetadataParser);
}
public void setState(int state) {
LOG.debug().$("state change [old=").$(this.state).$(", new=").$(state).$(']').$();
this.state = state;
jsonLexer.clear();
}
public void wrapUp() throws JsonException {
switch (state) {
case LOAD_JSON_METADATA:
jsonLexer.parseLast();
break;
case ANALYZE_STRUCTURE:
case LOAD_DATA:
textLexer.parseLast();
textWriter.commit();
break;
default:
break;
}
}
@FunctionalInterface
private interface ParserMethod {
void parse(long lo, long hi, CairoSecurityContext cairoSecurityContext) throws JsonException;
......
......@@ -50,14 +50,6 @@ public class TextMetadataParser implements JsonParser, Mutable, Closeable {
private static final int P_PATTERN = 3;
private static final int P_LOCALE = 4;
private static final CharSequenceIntHashMap propertyNameMap = new CharSequenceIntHashMap();
static {
propertyNameMap.put("name", P_NAME);
propertyNameMap.put("type", P_TYPE);
propertyNameMap.put("pattern", P_PATTERN);
propertyNameMap.put("locale", P_LOCALE);
}
private final DateLocaleFactory dateLocaleFactory;
private final io.questdb.std.microtime.DateLocaleFactory timestampLocaleFactory;
private final ObjectPool<FloatingCharSequence> csPool;
......@@ -78,21 +70,14 @@ public class TextMetadataParser implements JsonParser, Mutable, Closeable {
private CharSequence tableName;
private int localePosition;
public TextMetadataParser(
TextConfiguration textConfiguration,
DateLocaleFactory dateLocaleFactory,
DateFormatFactory dateFormatFactory,
io.questdb.std.microtime.DateLocaleFactory timestampLocaleFactory,
io.questdb.std.microtime.DateFormatFactory timestampFormatFactory,
TypeManager typeManager
) {
public TextMetadataParser(TextConfiguration textConfiguration, TypeManager typeManager) {
this.columnNames = new ObjList<>();
this.columnTypes = new ObjList<>();
this.csPool = new ObjectPool<>(FloatingCharSequence::new, textConfiguration.getMetadataStringPoolCapacity());
this.dateLocaleFactory = dateLocaleFactory;
this.dateFormatFactory = dateFormatFactory;
this.timestampLocaleFactory = timestampLocaleFactory;
this.timestampFormatFactory = timestampFormatFactory;
this.dateLocaleFactory = typeManager.getInputFormatConfiguration().getDateLocaleFactory();
this.dateFormatFactory = typeManager.getInputFormatConfiguration().getDateFormatFactory();
this.timestampLocaleFactory = typeManager.getInputFormatConfiguration().getTimestampLocaleFactory();
this.timestampFormatFactory = typeManager.getInputFormatConfiguration().getTimestampFormatFactory();
this.typeManager = typeManager;
}
......@@ -278,4 +263,11 @@ public class TextMetadataParser implements JsonParser, Mutable, Closeable {
return this;
}
}
static {
propertyNameMap.put("name", P_NAME);
propertyNameMap.put("type", P_TYPE);
propertyNameMap.put("pattern", P_PATTERN);
propertyNameMap.put("locale", P_LOCALE);
}
}
/*******************************************************************************
* ___ _ ____ ____
* / _ \ _ _ ___ ___| |_| _ \| __ )
* | | | | | | |/ _ \/ __| __| | | | _ \
* | |_| | |_| | __/\__ \ |_| |_| | |_) |
* \__\_\\__,_|\___||___/\__|____/|____/
*
* Copyright (C) 2014-2019 Appsicle
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
******************************************************************************/
package io.questdb.cutlass.text.types;
import io.questdb.cutlass.json.JsonException;
import io.questdb.cutlass.json.JsonLexer;
import io.questdb.log.Log;
import io.questdb.log.LogFactory;
import io.questdb.std.Chars;
import io.questdb.std.ObjList;
import io.questdb.std.Unsafe;
import io.questdb.std.time.DateFormat;
import io.questdb.std.time.DateFormatFactory;
import io.questdb.std.time.DateLocale;
import io.questdb.std.time.DateLocaleFactory;
import java.io.IOException;
import java.io.InputStream;
public class InputFormatConfiguration {
private final static Log LOG = LogFactory.getLog(InputFormatConfiguration.class);
private static final int STATE_EXPECT_TOP = 0;
private static final int STATE_EXPECT_FIRST_LEVEL_NAME = 1;
private static final int STATE_EXPECT_DATE_FORMAT_ARRAY = 2;
private static final int STATE_EXPECT_TIMESTAMP_FORMAT_ARRAY = 3;
private static final int STATE_EXPECT_DATE_FORMAT_VALUE = 4;
private static final int STATE_EXPECT_DATE_LOCALE_VALUE = 5;
private static final int STATE_EXPECT_TIMESTAMP_FORMAT_VALUE = 6;
private static final int STATE_EXPECT_TIMESTAMP_LOCALE_VALUE = 7;
private static final int STATE_EXPECT_DATE_FORMAT_ENTRY = 8;
private static final int STATE_EXPECT_TIMESTAMP_FORMAT_ENTRY = 9;
private final ObjList<DateFormat> dateFormats = new ObjList<>();
private final ObjList<DateLocale> dateLocales = new ObjList<>();
private final ObjList<io.questdb.std.microtime.DateFormat> timestampFormats = new ObjList<>();
private final ObjList<io.questdb.std.microtime.DateLocale> timestampLocales = new ObjList<>();
private final DateFormatFactory dateFormatFactory;
private final DateLocaleFactory dateLocaleFactory;
private final io.questdb.std.microtime.DateFormatFactory timestampFormatFactory;
private final io.questdb.std.microtime.DateLocaleFactory timestampLocaleFactory;
private int jsonState = STATE_EXPECT_TOP; // expect start of object
private DateFormat jsonDateFormat;
private DateLocale jsonDateLocale;
private io.questdb.std.microtime.DateFormat jsonTimestampFormat;
private io.questdb.std.microtime.DateLocale jsonTimestampLocale;
public InputFormatConfiguration(
DateFormatFactory dateFormatFactory,
DateLocaleFactory dateLocaleFactory,
io.questdb.std.microtime.DateFormatFactory timestampFormatFactory,
io.questdb.std.microtime.DateLocaleFactory timestampLocaleFactory
) {
this.dateFormatFactory = dateFormatFactory;
this.dateLocaleFactory = dateLocaleFactory;
this.timestampFormatFactory = timestampFormatFactory;
this.timestampLocaleFactory = timestampLocaleFactory;
}
public void clear() {
dateFormats.clear();
dateLocales.clear();
timestampFormats.clear();
timestampLocales.clear();
jsonState = STATE_EXPECT_TOP;
jsonDateFormat = null;
jsonDateLocale = null;
jsonTimestampFormat = null;
jsonTimestampLocale = null;
}
public DateFormatFactory getDateFormatFactory() {
return dateFormatFactory;
}
public ObjList<DateFormat> getDateFormats() {
return dateFormats;
}
public DateLocaleFactory getDateLocaleFactory() {
return dateLocaleFactory;
}
public ObjList<DateLocale> getDateLocales() {
return dateLocales;
}
public io.questdb.std.microtime.DateFormatFactory getTimestampFormatFactory() {
return timestampFormatFactory;
}
public ObjList<io.questdb.std.microtime.DateFormat> getTimestampFormats() {
return timestampFormats;
}
public io.questdb.std.microtime.DateLocaleFactory getTimestampLocaleFactory() {
return timestampLocaleFactory;
}
public ObjList<io.questdb.std.microtime.DateLocale> getTimestampLocales() {
return timestampLocales;
}
public void parseConfiguration(JsonLexer jsonLexer, String adapterSetConfigurationFileName) throws JsonException {
this.clear();
jsonLexer.clear();
LOG.info().$("loading [from=").$(adapterSetConfigurationFileName).$(']').$();
try (InputStream stream = this.getClass().getResourceAsStream(adapterSetConfigurationFileName)) {
if (stream == null) {
throw JsonException.$(0, "could not find [resource=").put(adapterSetConfigurationFileName).put(']');
}
// here is where using direct memory is very disadvantageous
// we will copy buffer twice to parse json, but luckily contents should be small
// and we should be parsing this only once on startup
byte[] heapBuffer = new byte[4096];
long memBuffer = Unsafe.malloc(heapBuffer.length);
try {
int len;
while ((len = stream.read(heapBuffer)) > 0) {
// copy to mem buffer
for (int i = 0; i < len; i++) {
Unsafe.getUnsafe().putByte(memBuffer + i, heapBuffer[i]);
}
jsonLexer.parse(memBuffer, memBuffer + len, this::onJsonEvent);
}
jsonLexer.clear();
} finally {
Unsafe.free(memBuffer, heapBuffer.length);
}
} catch (IOException e) {
throw JsonException.$(0, "could not read [resource=").put(adapterSetConfigurationFileName).put(']');
}
}
private void onJsonEvent(int code, CharSequence tag, int position) throws JsonException {
switch (code) {
case JsonLexer.EVT_OBJ_START:
switch (jsonState) {
case STATE_EXPECT_TOP:
// this is top level object
// lets dive in
jsonState = STATE_EXPECT_FIRST_LEVEL_NAME;
break;
case STATE_EXPECT_DATE_FORMAT_VALUE:
case STATE_EXPECT_TIMESTAMP_FORMAT_VALUE:
throw JsonException.$(position, "format value expected (obj)");
case STATE_EXPECT_DATE_LOCALE_VALUE:
case STATE_EXPECT_TIMESTAMP_LOCALE_VALUE:
throw JsonException.$(position, "locale value expected (obj)");
case STATE_EXPECT_DATE_FORMAT_ENTRY:
jsonDateFormat = null;
jsonDateLocale = null;
break;
case STATE_EXPECT_TIMESTAMP_FORMAT_ENTRY:
jsonTimestampFormat = null;
jsonTimestampLocale = null;
break;
default:
throw JsonException.$(position, "array expected (obj)");
}
break;
case JsonLexer.EVT_OBJ_END:
switch (jsonState) {
case STATE_EXPECT_DATE_FORMAT_ENTRY: // we just closed a date object
if (jsonDateFormat == null) {
throw JsonException.$(position, "date format is missing");
}
dateFormats.add(jsonDateFormat);
dateLocales.add(jsonDateLocale == null ? DateLocaleFactory.INSTANCE.getDefaultDateLocale() : jsonDateLocale);
break;
case STATE_EXPECT_TIMESTAMP_FORMAT_ENTRY:
if (jsonTimestampFormat == null) {
throw JsonException.$(position, "timestamp format is missing");
}
timestampFormats.add(jsonTimestampFormat);
timestampLocales.add(jsonTimestampLocale == null ? io.questdb.std.microtime.DateLocaleFactory.INSTANCE.getDefaultDateLocale() : jsonTimestampLocale);
break;
default:
// the only time we get here would be when
// main object is closed.
// other end_of_object cannot get there unless we
// allow to enter these objects in the first place
break;
}
break;
case JsonLexer.EVT_ARRAY_END:
jsonState = STATE_EXPECT_FIRST_LEVEL_NAME;
break;
case JsonLexer.EVT_NAME:
switch (jsonState) {
case STATE_EXPECT_FIRST_LEVEL_NAME:
if (Chars.equals(tag, "date")) {
jsonState = STATE_EXPECT_DATE_FORMAT_ARRAY; // expect array with date formats
} else if (Chars.equals(tag, "timestamp")) {
jsonState = STATE_EXPECT_TIMESTAMP_FORMAT_ARRAY; // expect array with timestamp formats
} else {
// unknown tag name?
throw JsonException.$(position, "'date' and/or 'timestamp' expected");
}
break;
case STATE_EXPECT_DATE_FORMAT_ENTRY:
processEntry(tag, position, STATE_EXPECT_DATE_FORMAT_VALUE, STATE_EXPECT_DATE_LOCALE_VALUE);
break;
default:
processEntry(tag, position, STATE_EXPECT_TIMESTAMP_FORMAT_VALUE, STATE_EXPECT_TIMESTAMP_LOCALE_VALUE);
break;
}
break;
case JsonLexer.EVT_VALUE:
switch (jsonState) {
case STATE_EXPECT_DATE_FORMAT_VALUE:
// date format
assert jsonDateFormat == null;
if (Chars.equals("null", tag)) {
throw JsonException.$(position, "null format");
}
jsonDateFormat = dateFormatFactory.get(tag);
jsonState = STATE_EXPECT_DATE_FORMAT_ENTRY;
break;
case STATE_EXPECT_DATE_LOCALE_VALUE: // date locale
assert jsonDateLocale == null;
jsonDateLocale = dateLocaleFactory.getDateLocale(tag);
if (jsonDateLocale == null) {
throw JsonException.$(position, "invalid [locale=").put(tag).put(']');
}
jsonState = STATE_EXPECT_DATE_FORMAT_ENTRY;
break;
case STATE_EXPECT_TIMESTAMP_FORMAT_VALUE: // timestamp format
assert jsonTimestampFormat == null;
if (Chars.equals("null", tag)) {
throw JsonException.$(position, "null format");
}
jsonTimestampFormat = timestampFormatFactory.get(tag);
jsonState = STATE_EXPECT_TIMESTAMP_FORMAT_ENTRY;
break;
case STATE_EXPECT_TIMESTAMP_LOCALE_VALUE:
assert jsonTimestampLocale == null;
jsonTimestampLocale = timestampLocaleFactory.getDateLocale(tag);
if (jsonTimestampLocale == null) {
throw JsonException.$(position, "invalid [locale=").put(tag).put(']');
}
jsonState = STATE_EXPECT_TIMESTAMP_FORMAT_ENTRY;
break;
default:
// we are picking up values from attributes we don't expect
throw JsonException.$(position, "array expected (value)");
}
break;
case JsonLexer.EVT_ARRAY_START:
switch (jsonState) {
case STATE_EXPECT_DATE_FORMAT_ARRAY: // we are working on dates
jsonState = STATE_EXPECT_DATE_FORMAT_ENTRY;
break;
case STATE_EXPECT_TIMESTAMP_FORMAT_ARRAY: // we are working on timestamps
jsonState = STATE_EXPECT_TIMESTAMP_FORMAT_ENTRY;
break;
case STATE_EXPECT_DATE_FORMAT_VALUE:
case STATE_EXPECT_TIMESTAMP_FORMAT_VALUE:
throw JsonException.$(position, "format value expected (array)");
default:
throw JsonException.$(position, "locale value expected (array)");
}
break;
default:
break;
}
}
private void processEntry(CharSequence tag, int position, int stateExpectFormatValue, int stateExpectLocaleValue) throws JsonException {
if (Chars.equals(tag, "format")) {
jsonState = stateExpectFormatValue; // expect date format
} else if (Chars.equals(tag, "locale")) {
jsonState = stateExpectLocaleValue;
} else {
// unknown tag name?
throw JsonException.$(position, "unknown [tag=").put(tag).put(']');
}
}
}
......@@ -25,64 +25,45 @@ package io.questdb.cutlass.text.types;
import io.questdb.cairo.CairoException;
import io.questdb.cairo.ColumnType;
import io.questdb.cutlass.json.JsonException;
import io.questdb.cutlass.json.JsonLexer;
import io.questdb.cutlass.text.TextConfiguration;
import io.questdb.log.Log;
import io.questdb.log.LogFactory;
import io.questdb.std.*;
import io.questdb.std.Mutable;
import io.questdb.std.ObjList;
import io.questdb.std.ObjectPool;
import io.questdb.std.str.DirectCharSink;
import io.questdb.std.time.DateFormat;
import io.questdb.std.time.DateFormatFactory;
import io.questdb.std.time.DateLocale;
import io.questdb.std.time.DateLocaleFactory;
import java.io.IOException;
import java.io.InputStream;
public class TypeManager implements Mutable {
private static final Log LOG = LogFactory.getLog(TypeManager.class);
private static final int STATE_EXPECT_TOP = 0;
private static final int STATE_EXPECT_FIRST_LEVEL_NAME = 1;
private static final int STATE_EXPECT_DATE_FORMAT_ARRAY = 2;
private static final int STATE_EXPECT_TIMESTAMP_FORMAT_ARRAY = 3;
private static final int STATE_EXPECT_DATE_FORMAT_VALUE = 4;
private static final int STATE_EXPECT_DATE_LOCALE_VALUE = 5;
private static final int STATE_EXPECT_TIMESTAMP_FORMAT_VALUE = 6;
private static final int STATE_EXPECT_TIMESTAMP_LOCALE_VALUE = 7;
private static final int STATE_EXPECT_DATE_FORMAT_ENTRY = 8;
private static final int STATE_EXPECT_TIMESTAMP_FORMAT_ENTRY = 9;
private final ObjList<TypeAdapter> probes = new ObjList<>();
private final int probeCount;
private final StringAdapter stringAdapter;
private final DirectCharSink utf8Sink;
private final ObjectPool<DateAdapter> dateAdapterPool;
private final ObjectPool<TimestampAdapter> timestampAdapterPool;
private final SymbolAdapter symbolAdapter;
private final JsonLexer jsonLexer;
private final DateFormatFactory dateFormatFactory;
private final DateLocaleFactory dateLocaleFactory;
private final io.questdb.std.microtime.DateFormatFactory timestampFormatFactory;
private final io.questdb.std.microtime.DateLocaleFactory timestampLocaleFactory;
private int jsonState = STATE_EXPECT_TOP; // expect start of object
private DateFormat jsonDateFormat;
private DateLocale jsonDateLocale;
private io.questdb.std.microtime.DateFormat jsonTimestampFormat;
private io.questdb.std.microtime.DateLocale jsonTimestampLocale;
private final InputFormatConfiguration inputFormatConfiguration;
public TypeManager(TextConfiguration configuration, DirectCharSink utf8Sink, JsonLexer jsonLexer) throws JsonException {
this.utf8Sink = utf8Sink;
public TypeManager(
TextConfiguration configuration,
DirectCharSink utf8Sink,
InputFormatConfiguration inputFormatConfiguration
) {
this.dateAdapterPool = new ObjectPool<>(() -> new DateAdapter(utf8Sink), configuration.getDateAdapterPoolCapacity());
this.timestampAdapterPool = new ObjectPool<>(() -> new TimestampAdapter(utf8Sink), configuration.getTimestampAdapterPoolCapacity());
this.inputFormatConfiguration = inputFormatConfiguration;
this.stringAdapter = new StringAdapter(utf8Sink);
this.symbolAdapter = new SymbolAdapter(utf8Sink);
this.jsonLexer = jsonLexer;
addDefaultProbes();
this.dateFormatFactory = new DateFormatFactory();
this.dateLocaleFactory = DateLocaleFactory.INSTANCE;
this.timestampFormatFactory = new io.questdb.std.microtime.DateFormatFactory();
this.timestampLocaleFactory = io.questdb.std.microtime.DateLocaleFactory.INSTANCE;
parseConfiguration(configuration.getAdapterSetConfigurationFileName());
final ObjList<DateFormat> dateFormats = inputFormatConfiguration.getDateFormats();
final ObjList<DateLocale> dateLocales = inputFormatConfiguration.getDateLocales();
for (int i = 0, n = dateFormats.size(); i < n; i++) {
probes.add(new DateAdapter(utf8Sink).of(dateFormats.getQuick(i), dateLocales.getQuick(i)));
}
final ObjList<io.questdb.std.microtime.DateFormat> timestampFormats = inputFormatConfiguration.getTimestampFormats();
final ObjList<io.questdb.std.microtime.DateLocale> timestampLocales = inputFormatConfiguration.getTimestampLocales();
for (int i = 0, n = timestampFormats.size(); i < n; i++) {
probes.add(new TimestampAdapter(utf8Sink).of(timestampFormats.getQuick(i), timestampLocales.getQuick(i)));
}
this.probeCount = probes.size();
}
......@@ -92,6 +73,10 @@ public class TypeManager implements Mutable {
timestampAdapterPool.clear();
}
public InputFormatConfiguration getInputFormatConfiguration() {
return inputFormatConfiguration;
}
public TypeAdapter getProbe(int index) {
return probes.getQuick(index);
}
......@@ -149,189 +134,4 @@ public class TypeManager implements Mutable {
ObjList<TypeAdapter> getAllAdapters() {
return probes;
}
private void onJsonEvent(int code, CharSequence tag, int position) throws JsonException {
switch (code) {
case JsonLexer.EVT_OBJ_START:
switch (jsonState) {
case STATE_EXPECT_TOP:
// this is top level object
// lets dive in
jsonState = STATE_EXPECT_FIRST_LEVEL_NAME;
break;
case STATE_EXPECT_DATE_FORMAT_VALUE:
case STATE_EXPECT_TIMESTAMP_FORMAT_VALUE:
throw JsonException.$(position, "format value expected (obj)");
case STATE_EXPECT_DATE_LOCALE_VALUE:
case STATE_EXPECT_TIMESTAMP_LOCALE_VALUE:
throw JsonException.$(position, "locale value expected (obj)");
case STATE_EXPECT_DATE_FORMAT_ENTRY:
jsonDateFormat = null;
jsonDateLocale = null;
break;
case STATE_EXPECT_TIMESTAMP_FORMAT_ENTRY:
jsonTimestampFormat = null;
jsonTimestampLocale = null;
break;
default:
throw JsonException.$(position, "array expected (obj)");
}
break;
case JsonLexer.EVT_OBJ_END:
switch (jsonState) {
case STATE_EXPECT_DATE_FORMAT_ENTRY: // we just closed a date object
if (jsonDateFormat == null) {
throw JsonException.$(position, "date format is missing");
}
probes.add(
new DateAdapter(utf8Sink)
.of(
jsonDateFormat,
jsonDateLocale == null ? DateLocaleFactory.INSTANCE.getDefaultDateLocale() : jsonDateLocale
)
);
break;
case STATE_EXPECT_TIMESTAMP_FORMAT_ENTRY:
if (jsonTimestampFormat == null) {
throw JsonException.$(position, "timestamp format is missing");
}
probes.add(
new TimestampAdapter(utf8Sink)
.of(
jsonTimestampFormat,
jsonTimestampLocale == null ? io.questdb.std.microtime.DateLocaleFactory.INSTANCE.getDefaultDateLocale() : jsonTimestampLocale
)
);
break;
default:
// the only time we get here would be when
// main object is closed.
// other end_of_object cannot get there unless we
// allow to enter these objects in the first place
break;
}
break;
case JsonLexer.EVT_ARRAY_END:
jsonState = STATE_EXPECT_FIRST_LEVEL_NAME;
break;
case JsonLexer.EVT_NAME:
switch (jsonState) {
case STATE_EXPECT_FIRST_LEVEL_NAME:
if (Chars.equals(tag, "date")) {
jsonState = STATE_EXPECT_DATE_FORMAT_ARRAY; // expect array with date formats
} else if (Chars.equals(tag, "timestamp")) {
jsonState = STATE_EXPECT_TIMESTAMP_FORMAT_ARRAY; // expect array with timestamp formats
} else {
// unknown tag name?
throw JsonException.$(position, "'date' and/or 'timestamp' expected");
}
break;
case STATE_EXPECT_DATE_FORMAT_ENTRY:
processEntry(tag, position, STATE_EXPECT_DATE_FORMAT_VALUE, STATE_EXPECT_DATE_LOCALE_VALUE);
break;
default:
processEntry(tag, position, STATE_EXPECT_TIMESTAMP_FORMAT_VALUE, STATE_EXPECT_TIMESTAMP_LOCALE_VALUE);
break;
}
break;
case JsonLexer.EVT_VALUE:
switch (jsonState) {
case STATE_EXPECT_DATE_FORMAT_VALUE:
// date format
assert jsonDateFormat == null;
if (Chars.equals("null", tag)) {
throw JsonException.$(position, "null format");
}
jsonDateFormat = dateFormatFactory.get(tag);
jsonState = STATE_EXPECT_DATE_FORMAT_ENTRY;
break;
case STATE_EXPECT_DATE_LOCALE_VALUE: // date locale
assert jsonDateLocale == null;
jsonDateLocale = dateLocaleFactory.getDateLocale(tag);
if (jsonDateLocale == null) {
throw JsonException.$(position, "invalid [locale=").put(tag).put(']');
}
jsonState = STATE_EXPECT_DATE_FORMAT_ENTRY;
break;
case STATE_EXPECT_TIMESTAMP_FORMAT_VALUE: // timestamp format
assert jsonTimestampFormat == null;
if (Chars.equals("null", tag)) {
throw JsonException.$(position, "null format");
}
jsonTimestampFormat = timestampFormatFactory.get(tag);
jsonState = STATE_EXPECT_TIMESTAMP_FORMAT_ENTRY;
break;
case STATE_EXPECT_TIMESTAMP_LOCALE_VALUE:
assert jsonTimestampLocale == null;
jsonTimestampLocale = timestampLocaleFactory.getDateLocale(tag);
if (jsonTimestampLocale == null) {
throw JsonException.$(position, "invalid [locale=").put(tag).put(']');
}
jsonState = STATE_EXPECT_TIMESTAMP_FORMAT_ENTRY;
break;
default:
// we are picking up values from attributes we don't expect
throw JsonException.$(position, "array expected (value)");
}
break;
case JsonLexer.EVT_ARRAY_START:
switch (jsonState) {
case STATE_EXPECT_DATE_FORMAT_ARRAY: // we are working on dates
jsonState = STATE_EXPECT_DATE_FORMAT_ENTRY;
break;
case STATE_EXPECT_TIMESTAMP_FORMAT_ARRAY: // we are working on timestamps
jsonState = STATE_EXPECT_TIMESTAMP_FORMAT_ENTRY;
break;
case STATE_EXPECT_DATE_FORMAT_VALUE:
case STATE_EXPECT_TIMESTAMP_FORMAT_VALUE:
throw JsonException.$(position, "format value expected (array)");
default:
throw JsonException.$(position, "locale value expected (array)");
}
break;
default:
break;
}
}
private void parseConfiguration(String adapterSetConfigurationFileName) throws JsonException {
LOG.info().$("loading [from=").$(adapterSetConfigurationFileName).$(']').$();
try (InputStream stream = this.getClass().getResourceAsStream(adapterSetConfigurationFileName)) {
if (stream == null) {
throw JsonException.$(0, "could not find [resource=").put(adapterSetConfigurationFileName).put(']');
}
// here is where using direct memory is very disadvantageous
// we will copy buffer twice to parse json, but luckily contents should be small
// and we should be parsing this only once on startup
byte[] heapBuffer = new byte[4096];
long memBuffer = Unsafe.malloc(heapBuffer.length);
try {
int len;
while ((len = stream.read(heapBuffer)) > 0) {
// copy to mem buffer
for (int i = 0; i < len; i++) {
Unsafe.getUnsafe().putByte(memBuffer + i, heapBuffer[i]);
}
jsonLexer.parse(memBuffer, memBuffer + len, this::onJsonEvent);
}
jsonLexer.clear();
} finally {
Unsafe.free(memBuffer, heapBuffer.length);
}
} catch (IOException e) {
throw JsonException.$(0, "could not read [resource=").put(adapterSetConfigurationFileName).put(']');
}
}
private void processEntry(CharSequence tag, int position, int stateExpectFormatValue, int stateExpectLocaleValue) throws JsonException {
if (Chars.equals(tag, "format")) {
jsonState = stateExpectFormatValue; // expect date format
} else if (Chars.equals(tag, "locale")) {
jsonState = stateExpectLocaleValue;
} else {
// unknown tag name?
throw JsonException.$(position, "unknown [tag=").put(tag).put(']');
}
}
}
\ No newline at end of file
......@@ -26,6 +26,10 @@ package io.questdb.std.time;
import io.questdb.std.NumericException;
import io.questdb.std.str.CharSink;
/**
* Instances of DateFormat do not have state. They are thread-safe. In that multuple threads can use
* same DateFormat instance without disk of data corruption.
*/
public interface DateFormat {
void format(long datetime, DateLocale locale, CharSequence timeZoneName, CharSink sink);
......
......@@ -117,6 +117,7 @@ public class PropServerConfigurationTest {
Assert.assertEquals(1_000_000, configuration.getHttpServerConfiguration().getJsonQueryProcessorConfiguration().getConnectionCheckFrequency());
Assert.assertEquals(10, configuration.getHttpServerConfiguration().getJsonQueryProcessorConfiguration().getDoubleScale());
Assert.assertEquals(10, configuration.getHttpServerConfiguration().getJsonQueryProcessorConfiguration().getFloatScale());
Assert.assertEquals(2097152, configuration.getHttpServerConfiguration().getJsonQueryProcessorConfiguration().getCopyBufferSize());
Assert.assertEquals(5, configuration.getCairoConfiguration().getCreateAsSelectRetryCount());
Assert.assertEquals("fast", configuration.getCairoConfiguration().getDefaultMapType());
......@@ -301,6 +302,8 @@ public class PropServerConfigurationTest {
Assert.assertEquals(2_000, configuration.getHttpServerConfiguration().getJsonQueryProcessorConfiguration().getConnectionCheckFrequency());
Assert.assertEquals(6, configuration.getHttpServerConfiguration().getJsonQueryProcessorConfiguration().getDoubleScale());
Assert.assertEquals(4, configuration.getHttpServerConfiguration().getJsonQueryProcessorConfiguration().getFloatScale());
Assert.assertEquals(4194304, configuration.getHttpServerConfiguration().getJsonQueryProcessorConfiguration().getCopyBufferSize());
Assert.assertSame(FilesFacadeImpl.INSTANCE, configuration.getHttpServerConfiguration().getJsonQueryProcessorConfiguration().getFilesFacade());
Assert.assertEquals(12, configuration.getCairoConfiguration().getCreateAsSelectRetryCount());
Assert.assertEquals("compact", configuration.getCairoConfiguration().getDefaultMapType());
......
......@@ -29,6 +29,8 @@ import io.questdb.cairo.security.AllowAllCairoSecurityContext;
import io.questdb.cairo.sql.RecordCursor;
import io.questdb.cairo.sql.RecordCursorFactory;
import io.questdb.cutlass.json.JsonException;
import io.questdb.cutlass.json.JsonLexer;
import io.questdb.cutlass.text.types.InputFormatConfiguration;
import io.questdb.griffin.AbstractGriffinTest;
import io.questdb.griffin.SqlException;
import io.questdb.std.Files;
......@@ -49,11 +51,20 @@ import java.nio.charset.StandardCharsets;
public class TextLoaderTest extends AbstractGriffinTest {
private static final ByteManipulator ENTITY_MANIPULATOR = (index, len, b) -> b;
private static final InputFormatConfiguration inputFormatConfiguration = new InputFormatConfiguration(
new DateFormatFactory(),
DateLocaleFactory.INSTANCE,
new io.questdb.std.microtime.DateFormatFactory(),
io.questdb.std.microtime.DateLocaleFactory.INSTANCE
);
private static final JsonLexer jsonLexer = new JsonLexer(1024, 1024);
@AfterClass
public static void tearDownClass() {
compiler.close();
engine.close();
jsonLexer.close();
}
@After
......@@ -815,23 +826,27 @@ public class TextLoaderTest extends AbstractGriffinTest {
"CMP2,2,4770,2.85092033445835,2015-02-08T19:15:09.000Z,2015-02-08 19:15:09,02/08/2015,253,TRUE,33766814\n" +
"CMP1,5,4938,4.42754498450086,2015-02-09T19:15:09.000Z,2015-02-09 19:15:09,02/09/2015,7817,FALSE,61983099\n";
try (TextLoader loader = new TextLoader(
new DefaultTextConfiguration() {
@Override
public int getRollBufferLimit() {
return 128;
}
final TextConfiguration textConfiguration = new DefaultTextConfiguration() {
@Override
public int getRollBufferLimit() {
return 128;
}
@Override
public int getRollBufferSize() {
return 32;
}
},
@Override
public int getRollBufferSize() {
return 32;
}
};
inputFormatConfiguration.parseConfiguration(
jsonLexer,
textConfiguration.getAdapterSetConfigurationFileName()
);
try (TextLoader loader = new TextLoader(
textConfiguration,
engine,
DateLocaleFactory.INSTANCE,
new DateFormatFactory(),
io.questdb.std.microtime.DateLocaleFactory.INSTANCE,
new io.questdb.std.microtime.DateFormatFactory()
inputFormatConfiguration
)) {
configureLoaderDefaults(loader, (byte) ',');
playText(
......@@ -1013,23 +1028,27 @@ public class TextLoaderTest extends AbstractGriffinTest {
"CMP2,2,4770,2.85092033445835,2015-02-08T19:15:09.000Z,2015-02-08 19:15:09,02/08/2015,253,TRUE,33766814\n" +
"CMP1,5,4938,4.42754498450086,2015-02-09T19:15:09.000Z,2015-02-09 19:15:09,02/09/2015,7817,FALSE,61983099\n";
try (TextLoader loader = new TextLoader(
new DefaultTextConfiguration() {
@Override
public int getRollBufferLimit() {
return 128;
}
TextConfiguration textConfiguration = new DefaultTextConfiguration() {
@Override
public int getRollBufferLimit() {
return 128;
}
@Override
public int getRollBufferSize() {
return 32;
}
},
@Override
public int getRollBufferSize() {
return 32;
}
};
inputFormatConfiguration.parseConfiguration(
jsonLexer,
textConfiguration.getAdapterSetConfigurationFileName()
);
try (TextLoader loader = new TextLoader(
textConfiguration,
engine,
DateLocaleFactory.INSTANCE,
new DateFormatFactory(),
io.questdb.std.microtime.DateLocaleFactory.INSTANCE,
new io.questdb.std.microtime.DateFormatFactory()
inputFormatConfiguration
)) {
configureLoaderDefaults(loader, (byte) ',');
playText(
......@@ -2171,13 +2190,15 @@ public class TextLoaderTest extends AbstractGriffinTest {
private void assertNoLeak(TextConfiguration textConfiguration, TestCode code) throws Exception {
TestUtils.assertMemoryLeak(() -> {
inputFormatConfiguration.parseConfiguration(
jsonLexer,
textConfiguration.getAdapterSetConfigurationFileName()
);
try (TextLoader loader = new TextLoader(
textConfiguration,
engine,
DateLocaleFactory.INSTANCE,
new DateFormatFactory(),
io.questdb.std.microtime.DateLocaleFactory.INSTANCE,
new io.questdb.std.microtime.DateFormatFactory()
inputFormatConfiguration
)) {
code.run(loader);
}
......
......@@ -25,6 +25,7 @@ package io.questdb.cutlass.text;
import io.questdb.cutlass.json.JsonException;
import io.questdb.cutlass.json.JsonLexer;
import io.questdb.cutlass.text.types.InputFormatConfiguration;
import io.questdb.cutlass.text.types.TypeManager;
import io.questdb.std.Unsafe;
import io.questdb.std.str.DirectCharSink;
......@@ -38,19 +39,26 @@ public class TextMetadataParserTest {
private static TextMetadataParser textMetadataParser;
private static TypeManager typeManager;
private static DirectCharSink utf8Sink;
private static JsonLexer jsonLexer;
@BeforeClass
public static void setUpClass() throws JsonException {
utf8Sink = new DirectCharSink(1024);
jsonLexer = new JsonLexer(1024, 1024);
typeManager = new TypeManager(new DefaultTextConfiguration(), utf8Sink, jsonLexer);
textMetadataParser = new TextMetadataParser(
new DefaultTextConfiguration(),
DateLocaleFactory.INSTANCE,
InputFormatConfiguration inputFormatConfiguration = new InputFormatConfiguration(
new DateFormatFactory(),
io.questdb.std.microtime.DateLocaleFactory.INSTANCE,
DateLocaleFactory.INSTANCE,
new io.questdb.std.microtime.DateFormatFactory(),
io.questdb.std.microtime.DateLocaleFactory.INSTANCE
);
try (JsonLexer jsonLexer = new JsonLexer(1024, 1024)) {
inputFormatConfiguration.parseConfiguration(jsonLexer, new DefaultTextConfiguration().getAdapterSetConfigurationFileName());
}
typeManager = new TypeManager(
new DefaultTextConfiguration(),
utf8Sink,
inputFormatConfiguration
);
textMetadataParser = new TextMetadataParser(
new DefaultTextConfiguration(),
typeManager
);
}
......@@ -58,7 +66,6 @@ public class TextMetadataParserTest {
@AfterClass
public static void tearDown() {
LEXER.close();
jsonLexer.close();
utf8Sink.close();
textMetadataParser.close();
}
......
......@@ -28,8 +28,11 @@ import io.questdb.cairo.ColumnType;
import io.questdb.cutlass.json.JsonException;
import io.questdb.cutlass.json.JsonLexer;
import io.questdb.cutlass.text.DefaultTextConfiguration;
import io.questdb.cutlass.text.TextConfiguration;
import io.questdb.std.Misc;
import io.questdb.std.str.DirectCharSink;
import io.questdb.std.time.DateFormatFactory;
import io.questdb.std.time.DateLocaleFactory;
import io.questdb.test.tools.TestUtils;
import org.junit.*;
......@@ -110,6 +113,21 @@ public class TypeManagerTest {
Assert.assertEquals("[CHAR,INT,LONG,DOUBLE,BOOLEAN,LONG256]", typeManager.getAllAdapters().toString());
}
@Test
public void testIllegalMethodParameterBinary() throws JsonException {
testIllegalParameterForGetTypeAdapter(ColumnType.BINARY);
}
@Test
public void testIllegalMethodParameterDate() throws JsonException {
testIllegalParameterForGetTypeAdapter(ColumnType.DATE);
}
@Test
public void testIllegalMethodParameterTimestamp() throws JsonException {
testIllegalParameterForGetTypeAdapter(ColumnType.TIMESTAMP);
}
@Test
public void testResourceNotFound() {
assertFailure("/textloader/types/not_found.json", 0, "could not find [resource=/textloader/types/not_found.json]");
......@@ -170,31 +188,6 @@ public class TypeManagerTest {
assertFailure("/textloader/types/unknown_top_level_prop.json", 309, "'date' and/or 'timestamp' expected");
}
@Test
public void testIllegalMethodParameterBinary() throws JsonException {
testIllegalParameterForGetTypeAdapter(ColumnType.BINARY);
}
@Test
public void testIllegalMethodParameterDate() throws JsonException {
testIllegalParameterForGetTypeAdapter(ColumnType.DATE);
}
@Test
public void testIllegalMethodParameterTimestamp() throws JsonException {
testIllegalParameterForGetTypeAdapter(ColumnType.TIMESTAMP);
}
private void testIllegalParameterForGetTypeAdapter(int columnType) throws JsonException {
TypeManager typeManager = new TypeManager(new DefaultTextConfiguration(), utf8Sink, jsonLexer);
try {
typeManager.getTypeAdapter(columnType);
Assert.fail();
} catch (CairoException e) {
TestUtils.assertContains(e.getMessage(), "no adapter for type");
}
}
private void assertFailure(String resourceName, int position, CharSequence text) {
try {
createTypeManager(resourceName);
......@@ -206,11 +199,47 @@ public class TypeManagerTest {
}
private TypeManager createTypeManager(String fileResource) throws JsonException {
return new TypeManager(new DefaultTextConfiguration() {
@Override
public String getAdapterSetConfigurationFileName() {
return fileResource;
}
}, utf8Sink, jsonLexer);
InputFormatConfiguration inputFormatConfiguration = new InputFormatConfiguration(
new DateFormatFactory(),
DateLocaleFactory.INSTANCE,
new io.questdb.std.microtime.DateFormatFactory(),
io.questdb.std.microtime.DateLocaleFactory.INSTANCE
);
inputFormatConfiguration.parseConfiguration(jsonLexer, fileResource);
return new TypeManager(
new DefaultTextConfiguration() {
@Override
public String getAdapterSetConfigurationFileName() {
return fileResource;
}
},
utf8Sink,
inputFormatConfiguration
);
}
private void testIllegalParameterForGetTypeAdapter(int columnType) throws JsonException {
TextConfiguration textConfiguration = new DefaultTextConfiguration();
InputFormatConfiguration inputFormatConfiguration = new InputFormatConfiguration(
new DateFormatFactory(),
DateLocaleFactory.INSTANCE,
new io.questdb.std.microtime.DateFormatFactory(),
io.questdb.std.microtime.DateLocaleFactory.INSTANCE
);
inputFormatConfiguration.parseConfiguration(jsonLexer, textConfiguration.getAdapterSetConfigurationFileName());
TypeManager typeManager = new TypeManager(
textConfiguration,
utf8Sink,
inputFormatConfiguration
);
try {
typeManager.getTypeAdapter(columnType);
Assert.fail();
} catch (CairoException e) {
TestUtils.assertContains(e.getMessage(), "no adapter for type");
}
}
}
\ No newline at end of file
......@@ -36,6 +36,7 @@ http.bind.to=10.5.8.30:9900
http.json.query.connection.check.frequency=2000
http.json.query.double.scale=6
http.json.query.float.scale=4
http.json.query.copy.buffer.size=4m
cairo.create.as.select.retry.count=12
cairo.default.map.type=compact
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册