From 15f9acd6a7428d6db1bd96dafcd94399a375dda7 Mon Sep 17 00:00:00 2001 From: glasstiger <94906625+glasstiger@users.noreply.github.com> Date: Wed, 20 Apr 2022 13:24:16 +0100 Subject: [PATCH] feat(ilp): allow users to change default column type when ILP adds new column automatically (#2040) --- .../io/questdb/PropServerConfiguration.java | 34 ++++++ .../src/main/java/io/questdb/PropertyKey.java | 2 + .../cutlass/line/tcp/DefaultColumnTypes.java | 32 ++++- .../DefaultLineTcpReceiverConfiguration.java | 11 ++ .../line/tcp/LineTcpMeasurementEvent.java | 7 +- .../line/tcp/LineTcpMeasurementScheduler.java | 8 +- .../cutlass/line/tcp/LineTcpParser.java | 3 +- .../tcp/LineTcpReceiverConfiguration.java | 4 + .../line/tcp/TableStructureAdapter.java | 6 +- .../DefaultLineUdpReceiverConfiguration.java | 11 ++ .../cutlass/line/udp/LineUdpParserImpl.java | 24 +++- .../line/udp/LineUdpParserSupport.java | 10 +- .../udp/LineUdpReceiverConfiguration.java | 4 + .../questdb/PropServerConfigurationTest.java | 113 ++++++++++++++++++ .../line/tcp/BaseLineTcpContextTest.java | 15 +++ .../tcp/LineTcpAuthConnectionContextTest.java | 3 + .../tcp/LineTcpConnectionContextTest.java | 58 +++++++++ .../line/udp/LineUdpParserImplTest.java | 93 +++++++++++--- .../line/udp/LineUdpParserSupportTest.java | 11 ++ core/src/test/resources/server.conf | 2 + 20 files changed, 418 insertions(+), 33 deletions(-) diff --git a/core/src/main/java/io/questdb/PropServerConfiguration.java b/core/src/main/java/io/questdb/PropServerConfiguration.java index 96604f882..198327221 100644 --- a/core/src/main/java/io/questdb/PropServerConfiguration.java +++ b/core/src/main/java/io/questdb/PropServerConfiguration.java @@ -372,6 +372,8 @@ public class PropServerConfiguration implements ServerConfiguration { private boolean stringToCharCastAllowed; private boolean symbolAsFieldSupported; private boolean isStringAsTagSupported; + private short floatDefaultColumnType; + private short integerDefaultColumnType; public PropServerConfiguration( String root, @@ -866,6 +868,18 @@ public class PropServerConfiguration implements ServerConfiguration { this.stringToCharCastAllowed = getBoolean(properties, env, PropertyKey.LINE_TCP_UNDOCUMENTED_STRING_TO_CHAR_CAST_ALLOWED, false); this.symbolAsFieldSupported = getBoolean(properties, env, PropertyKey.LINE_TCP_UNDOCUMENTED_SYMBOL_AS_FIELD_SUPPORTED, false); this.isStringAsTagSupported = getBoolean(properties, env, PropertyKey.LINE_TCP_UNDOCUMENTED_STRING_AS_TAG_SUPPORTED, false); + String floatDefaultColumnTypeName = getString(properties, env, PropertyKey.LINE_FLOAT_DEFAULT_COLUMN_TYPE, ColumnType.nameOf(ColumnType.DOUBLE)); + this.floatDefaultColumnType = ColumnType.tagOf(floatDefaultColumnTypeName); + if (floatDefaultColumnType != ColumnType.DOUBLE && floatDefaultColumnType != ColumnType.FLOAT) { + log.info().$("invalid default column type for float ").$(floatDefaultColumnTypeName).$("), will use DOUBLE").$(); + this.floatDefaultColumnType = ColumnType.DOUBLE; + } + String integerDefaultColumnTypeName = getString(properties, env, PropertyKey.LINE_INTEGER_DEFAULT_COLUMN_TYPE, ColumnType.nameOf(ColumnType.LONG)); + this.integerDefaultColumnType = ColumnType.tagOf(integerDefaultColumnTypeName); + if (integerDefaultColumnType != ColumnType.LONG && integerDefaultColumnType != ColumnType.INT && integerDefaultColumnType != ColumnType.SHORT && integerDefaultColumnType != ColumnType.BYTE) { + log.info().$("invalid default column type for integer ").$(integerDefaultColumnTypeName).$("), will use LONG").$(); + this.integerDefaultColumnType = ColumnType.LONG; + } } this.sharedWorkerCount = getInt(properties, env, PropertyKey.SHARED_WORKER_COUNT, Math.max(1, cpuAvailable / 2 - 1 - cpuUsed)); @@ -2314,6 +2328,16 @@ public class PropServerConfiguration implements ServerConfiguration { public int getDefaultPartitionBy() { return lineUdpDefaultPartitionBy; } + + @Override + public short getDefaultColumnTypeForFloat() { + return floatDefaultColumnType; + } + + @Override + public short getDefaultColumnTypeForInteger() { + return integerDefaultColumnType; + } } private class PropLineTcpReceiverIODispatcherConfiguration implements IODispatcherConfiguration { @@ -2583,6 +2607,16 @@ public class PropServerConfiguration implements ServerConfiguration { public boolean isStringAsTagSupported() { return isStringAsTagSupported; } + + @Override + public short getDefaultColumnTypeForFloat() { + return floatDefaultColumnType; + } + + @Override + public short getDefaultColumnTypeForInteger() { + return integerDefaultColumnType; + } } private class PropJsonQueryProcessorConfiguration implements JsonQueryProcessorConfiguration { diff --git a/core/src/main/java/io/questdb/PropertyKey.java b/core/src/main/java/io/questdb/PropertyKey.java index fd7c785b7..1bf48d2db 100644 --- a/core/src/main/java/io/questdb/PropertyKey.java +++ b/core/src/main/java/io/questdb/PropertyKey.java @@ -278,6 +278,8 @@ public enum PropertyKey { LINE_TCP_UNDOCUMENTED_STRING_TO_CHAR_CAST_ALLOWED("line.tcp.undocumented.string.to.char.cast.allowed"), LINE_TCP_UNDOCUMENTED_SYMBOL_AS_FIELD_SUPPORTED("line.tcp.undocumented.symbol.as.field.supported"), LINE_TCP_UNDOCUMENTED_STRING_AS_TAG_SUPPORTED("line.tcp.undocumented.string.as.tag.supported"), + LINE_FLOAT_DEFAULT_COLUMN_TYPE("line.float.default.column.type"), + LINE_INTEGER_DEFAULT_COLUMN_TYPE("line.integer.default.column.type"), LINE_TCP_NET_IO_QUEUE_CAPACITY("line.tcp.net.io.queue.capacity"), LINE_TCP_IO_AGGRESIVE_RECV("line.tcp.io.aggressive.recv"), METRICS_ENABLED("metrics.enabled"), diff --git a/core/src/main/java/io/questdb/cutlass/line/tcp/DefaultColumnTypes.java b/core/src/main/java/io/questdb/cutlass/line/tcp/DefaultColumnTypes.java index 5e57e3b71..1475dc240 100644 --- a/core/src/main/java/io/questdb/cutlass/line/tcp/DefaultColumnTypes.java +++ b/core/src/main/java/io/questdb/cutlass/line/tcp/DefaultColumnTypes.java @@ -27,15 +27,14 @@ package io.questdb.cutlass.line.tcp; import io.questdb.cairo.ColumnType; class DefaultColumnTypes { - static final int[] DEFAULT_COLUMN_TYPES = new int[LineTcpParser.N_ENTITY_TYPES]; + final int[] DEFAULT_COLUMN_TYPES = new int[LineTcpParser.N_ENTITY_TYPES]; + final int[] MAPPED_COLUMN_TYPES = new int[LineTcpParser.N_MAPPED_ENTITY_TYPES]; - static { + DefaultColumnTypes(LineTcpReceiverConfiguration configuration) { // if not set it defaults to ColumnType.UNDEFINED DEFAULT_COLUMN_TYPES[LineTcpParser.ENTITY_TYPE_TAG] = ColumnType.SYMBOL; - DEFAULT_COLUMN_TYPES[LineTcpParser.ENTITY_TYPE_FLOAT] = ColumnType.DOUBLE; - DEFAULT_COLUMN_TYPES[LineTcpParser.ENTITY_TYPE_DOUBLE] = ColumnType.DOUBLE; - DEFAULT_COLUMN_TYPES[LineTcpParser.ENTITY_TYPE_LONG] = ColumnType.LONG; - DEFAULT_COLUMN_TYPES[LineTcpParser.ENTITY_TYPE_INTEGER] = ColumnType.LONG; + DEFAULT_COLUMN_TYPES[LineTcpParser.ENTITY_TYPE_FLOAT] = configuration.getDefaultColumnTypeForFloat(); + DEFAULT_COLUMN_TYPES[LineTcpParser.ENTITY_TYPE_INTEGER] = configuration.getDefaultColumnTypeForInteger(); DEFAULT_COLUMN_TYPES[LineTcpParser.ENTITY_TYPE_STRING] = ColumnType.STRING; DEFAULT_COLUMN_TYPES[LineTcpParser.ENTITY_TYPE_SYMBOL] = ColumnType.SYMBOL; DEFAULT_COLUMN_TYPES[LineTcpParser.ENTITY_TYPE_BOOLEAN] = ColumnType.BOOLEAN; @@ -45,5 +44,26 @@ class DefaultColumnTypes { DEFAULT_COLUMN_TYPES[LineTcpParser.ENTITY_TYPE_GEOINT] = ColumnType.getGeoHashTypeWithBits(32); DEFAULT_COLUMN_TYPES[LineTcpParser.ENTITY_TYPE_GEOLONG] = ColumnType.getGeoHashTypeWithBits(60); DEFAULT_COLUMN_TYPES[LineTcpParser.ENTITY_TYPE_TIMESTAMP] = ColumnType.TIMESTAMP; + + // we could remove this mapping by sending the column type to the writer + // currently we are passing the ILP entity type instead + MAPPED_COLUMN_TYPES[LineTcpParser.ENTITY_TYPE_TAG] = ColumnType.SYMBOL; + MAPPED_COLUMN_TYPES[LineTcpParser.ENTITY_TYPE_FLOAT] = ColumnType.FLOAT; + MAPPED_COLUMN_TYPES[LineTcpParser.ENTITY_TYPE_DOUBLE] = ColumnType.DOUBLE; + MAPPED_COLUMN_TYPES[LineTcpParser.ENTITY_TYPE_BYTE] = ColumnType.BYTE; + MAPPED_COLUMN_TYPES[LineTcpParser.ENTITY_TYPE_SHORT] = ColumnType.SHORT; + MAPPED_COLUMN_TYPES[LineTcpParser.ENTITY_TYPE_INTEGER] = ColumnType.INT; + MAPPED_COLUMN_TYPES[LineTcpParser.ENTITY_TYPE_LONG] = ColumnType.LONG; + MAPPED_COLUMN_TYPES[LineTcpParser.ENTITY_TYPE_DATE] = ColumnType.DATE; + MAPPED_COLUMN_TYPES[LineTcpParser.ENTITY_TYPE_CHAR] = ColumnType.CHAR; + MAPPED_COLUMN_TYPES[LineTcpParser.ENTITY_TYPE_STRING] = ColumnType.STRING; + //MAPPED_COLUMN_TYPES[LineTcpParser.ENTITY_TYPE_SYMBOL] = ColumnType.SYMBOL; + MAPPED_COLUMN_TYPES[LineTcpParser.ENTITY_TYPE_BOOLEAN] = ColumnType.BOOLEAN; + MAPPED_COLUMN_TYPES[LineTcpParser.ENTITY_TYPE_LONG256] = ColumnType.LONG256; + MAPPED_COLUMN_TYPES[LineTcpParser.ENTITY_TYPE_GEOBYTE] = ColumnType.getGeoHashTypeWithBits(8); + MAPPED_COLUMN_TYPES[LineTcpParser.ENTITY_TYPE_GEOSHORT] = ColumnType.getGeoHashTypeWithBits(16); + MAPPED_COLUMN_TYPES[LineTcpParser.ENTITY_TYPE_GEOINT] = ColumnType.getGeoHashTypeWithBits(32); + MAPPED_COLUMN_TYPES[LineTcpParser.ENTITY_TYPE_GEOLONG] = ColumnType.getGeoHashTypeWithBits(60); + MAPPED_COLUMN_TYPES[LineTcpParser.ENTITY_TYPE_TIMESTAMP] = ColumnType.TIMESTAMP; } } diff --git a/core/src/main/java/io/questdb/cutlass/line/tcp/DefaultLineTcpReceiverConfiguration.java b/core/src/main/java/io/questdb/cutlass/line/tcp/DefaultLineTcpReceiverConfiguration.java index d8a0c6502..963c62837 100644 --- a/core/src/main/java/io/questdb/cutlass/line/tcp/DefaultLineTcpReceiverConfiguration.java +++ b/core/src/main/java/io/questdb/cutlass/line/tcp/DefaultLineTcpReceiverConfiguration.java @@ -26,6 +26,7 @@ package io.questdb.cutlass.line.tcp; import io.questdb.WorkerPoolAwareConfiguration; import io.questdb.cairo.CairoSecurityContext; +import io.questdb.cairo.ColumnType; import io.questdb.cairo.PartitionBy; import io.questdb.cairo.security.AllowAllCairoSecurityContext; import io.questdb.cutlass.line.LineProtoNanoTimestampAdapter; @@ -161,4 +162,14 @@ public class DefaultLineTcpReceiverConfiguration implements LineTcpReceiverConfi public boolean isStringAsTagSupported() { return false; } + + @Override + public short getDefaultColumnTypeForFloat() { + return ColumnType.DOUBLE; + } + + @Override + public short getDefaultColumnTypeForInteger() { + return ColumnType.LONG; + } } diff --git a/core/src/main/java/io/questdb/cutlass/line/tcp/LineTcpMeasurementEvent.java b/core/src/main/java/io/questdb/cutlass/line/tcp/LineTcpMeasurementEvent.java index 514d5a888..bdc7f8592 100644 --- a/core/src/main/java/io/questdb/cutlass/line/tcp/LineTcpMeasurementEvent.java +++ b/core/src/main/java/io/questdb/cutlass/line/tcp/LineTcpMeasurementEvent.java @@ -43,6 +43,7 @@ class LineTcpMeasurementEvent implements Closeable { private final MicrosecondClock clock; private final LineProtoTimestampAdapter timestampAdapter; private final LineTcpEventBuffer buffer; + private final DefaultColumnTypes defaultColumnTypes; private final boolean stringToCharCastAllowed; private final boolean symbolAsFieldSupported; private int writerWorkerId; @@ -54,12 +55,14 @@ class LineTcpMeasurementEvent implements Closeable { long bufSize, MicrosecondClock clock, LineProtoTimestampAdapter timestampAdapter, + DefaultColumnTypes defaultColumnTypes, boolean stringToCharCastAllowed, boolean symbolAsFieldSupported ) { this.buffer = new LineTcpEventBuffer(bufLo, bufSize); this.clock = clock; this.timestampAdapter = timestampAdapter; + this.defaultColumnTypes = defaultColumnTypes; this.stringToCharCastAllowed = stringToCharCastAllowed; this.symbolAsFieldSupported = symbolAsFieldSupported; } @@ -120,7 +123,7 @@ class LineTcpMeasurementEvent implements Closeable { // column is added row.cancel(); row = null; - final int colType = DefaultColumnTypes.DEFAULT_COLUMN_TYPES[entityType]; + final int colType = defaultColumnTypes.MAPPED_COLUMN_TYPES[entityType]; writer.addColumn(columnName, colType); // Seek to beginning of entities @@ -271,7 +274,7 @@ class LineTcpMeasurementEvent implements Closeable { CharSequence colName = localDetails.getColName(); if (TableUtils.isValidColumnName(colName)) { offset = buffer.addColumnName(offset, colName); - colType = DefaultColumnTypes.DEFAULT_COLUMN_TYPES[entityType]; + colType = defaultColumnTypes.DEFAULT_COLUMN_TYPES[entityType]; } else { throw invalidColNameError(colName); } diff --git a/core/src/main/java/io/questdb/cutlass/line/tcp/LineTcpMeasurementScheduler.java b/core/src/main/java/io/questdb/cutlass/line/tcp/LineTcpMeasurementScheduler.java index 8710422e8..570f74b19 100644 --- a/core/src/main/java/io/questdb/cutlass/line/tcp/LineTcpMeasurementScheduler.java +++ b/core/src/main/java/io/questdb/cutlass/line/tcp/LineTcpMeasurementScheduler.java @@ -77,6 +77,7 @@ class LineTcpMeasurementScheduler implements Closeable { CairoConfiguration cairoConfiguration = engine.getConfiguration(); this.configuration = lineConfiguration; MillisecondClock milliClock = cairoConfiguration.getMillisecondClock(); + DefaultColumnTypes defaultColumnTypes = new DefaultColumnTypes(lineConfiguration); int n = ioWorkerPool.getWorkerCount(); this.netIoJobs = new NetworkIOJob[n]; this.tableNameSinks = new StringSink[n]; @@ -110,8 +111,9 @@ class LineTcpMeasurementScheduler implements Closeable { addressSize, lineConfiguration.getMicrosecondClock(), lineConfiguration.getTimestampAdapter(), - lineConfiguration.isStringToCharCastAllowed(), - lineConfiguration.isSymbolAsFieldSupported()), + defaultColumnTypes, + lineConfiguration.isStringToCharCastAllowed(), + lineConfiguration.isSymbolAsFieldSupported()), getEventSlotSize(maxMeasurementSize), queueSize, MemoryTag.NATIVE_DEFAULT @@ -133,7 +135,7 @@ class LineTcpMeasurementScheduler implements Closeable { writerWorkerPool.assign(i, (Job) lineTcpWriterJob); writerWorkerPool.assign(i, (Closeable) lineTcpWriterJob); } - this.tableStructureAdapter = new TableStructureAdapter(cairoConfiguration, configuration.getDefaultPartitionBy()); + this.tableStructureAdapter = new TableStructureAdapter(cairoConfiguration, defaultColumnTypes, configuration.getDefaultPartitionBy()); writerIdleTimeout = lineConfiguration.getWriterIdleTimeout(); } diff --git a/core/src/main/java/io/questdb/cutlass/line/tcp/LineTcpParser.java b/core/src/main/java/io/questdb/cutlass/line/tcp/LineTcpParser.java index 37a85dd7f..48138b16e 100644 --- a/core/src/main/java/io/questdb/cutlass/line/tcp/LineTcpParser.java +++ b/core/src/main/java/io/questdb/cutlass/line/tcp/LineTcpParser.java @@ -50,13 +50,14 @@ public class LineTcpParser { public static final byte ENTITY_TYPE_GEOINT = 11; public static final byte ENTITY_TYPE_GEOLONG = 12; public static final byte ENTITY_TYPE_TIMESTAMP = 13; + public static final int N_ENTITY_TYPES = ENTITY_TYPE_TIMESTAMP + 1; public static final byte ENTITY_TYPE_LONG = 14; public static final byte ENTITY_TYPE_DOUBLE = 15; - public static final int N_ENTITY_TYPES = ENTITY_TYPE_DOUBLE + 1; public static final byte ENTITY_TYPE_SHORT = 16; public static final byte ENTITY_TYPE_BYTE = 17; public static final byte ENTITY_TYPE_DATE = 18; public static final byte ENTITY_TYPE_CHAR = 19; + public static final int N_MAPPED_ENTITY_TYPES = ENTITY_TYPE_CHAR + 1; static final byte ENTITY_TYPE_NONE = (byte) 0xff; // visible for testing private static final Log LOG = LogFactory.getLog(LineTcpParser.class); diff --git a/core/src/main/java/io/questdb/cutlass/line/tcp/LineTcpReceiverConfiguration.java b/core/src/main/java/io/questdb/cutlass/line/tcp/LineTcpReceiverConfiguration.java index 13ab37655..537139ec1 100644 --- a/core/src/main/java/io/questdb/cutlass/line/tcp/LineTcpReceiverConfiguration.java +++ b/core/src/main/java/io/questdb/cutlass/line/tcp/LineTcpReceiverConfiguration.java @@ -87,4 +87,8 @@ public interface LineTcpReceiverConfiguration { boolean isSymbolAsFieldSupported(); boolean isStringAsTagSupported(); + + short getDefaultColumnTypeForFloat(); + + short getDefaultColumnTypeForInteger(); } diff --git a/core/src/main/java/io/questdb/cutlass/line/tcp/TableStructureAdapter.java b/core/src/main/java/io/questdb/cutlass/line/tcp/TableStructureAdapter.java index 572efe160..3d9cfe929 100644 --- a/core/src/main/java/io/questdb/cutlass/line/tcp/TableStructureAdapter.java +++ b/core/src/main/java/io/questdb/cutlass/line/tcp/TableStructureAdapter.java @@ -40,12 +40,14 @@ class TableStructureAdapter implements TableStructure { private final LowerCaseCharSequenceHashSet entityNamesUtf16 = new LowerCaseCharSequenceHashSet(); private final ObjList entities = new ObjList<>(); private final CairoConfiguration cairoConfiguration; + private final DefaultColumnTypes defaultColumnTypes; private final int defaultPartitionBy; private CharSequence tableName; private int timestampIndex = -1; - public TableStructureAdapter(CairoConfiguration configuration, int defaultPartitionBy) { + public TableStructureAdapter(CairoConfiguration configuration, DefaultColumnTypes defaultColumnTypes, int defaultPartitionBy) { this.cairoConfiguration = configuration; + this.defaultColumnTypes = defaultColumnTypes; this.defaultPartitionBy = defaultPartitionBy; } @@ -73,7 +75,7 @@ class TableStructureAdapter implements TableStructure { if (columnIndex == getTimestampIndex()) { return ColumnType.TIMESTAMP; } - return DefaultColumnTypes.DEFAULT_COLUMN_TYPES[entities.get(columnIndex).getType()]; + return defaultColumnTypes.DEFAULT_COLUMN_TYPES[entities.get(columnIndex).getType()]; } @Override diff --git a/core/src/main/java/io/questdb/cutlass/line/udp/DefaultLineUdpReceiverConfiguration.java b/core/src/main/java/io/questdb/cutlass/line/udp/DefaultLineUdpReceiverConfiguration.java index 84e728e94..dc1960195 100644 --- a/core/src/main/java/io/questdb/cutlass/line/udp/DefaultLineUdpReceiverConfiguration.java +++ b/core/src/main/java/io/questdb/cutlass/line/udp/DefaultLineUdpReceiverConfiguration.java @@ -25,6 +25,7 @@ package io.questdb.cutlass.line.udp; import io.questdb.cairo.CairoSecurityContext; +import io.questdb.cairo.ColumnType; import io.questdb.cairo.CommitMode; import io.questdb.cairo.PartitionBy; import io.questdb.cairo.security.AllowAllCairoSecurityContext; @@ -115,4 +116,14 @@ public class DefaultLineUdpReceiverConfiguration implements LineUdpReceiverConfi public int getCommitMode() { return CommitMode.NOSYNC; } + + @Override + public short getDefaultColumnTypeForFloat() { + return ColumnType.DOUBLE; + } + + @Override + public short getDefaultColumnTypeForInteger() { + return ColumnType.LONG; + } } diff --git a/core/src/main/java/io/questdb/cutlass/line/udp/LineUdpParserImpl.java b/core/src/main/java/io/questdb/cutlass/line/udp/LineUdpParserImpl.java index 4b920983d..ec5c6a31f 100644 --- a/core/src/main/java/io/questdb/cutlass/line/udp/LineUdpParserImpl.java +++ b/core/src/main/java/io/questdb/cutlass/line/udp/LineUdpParserImpl.java @@ -68,6 +68,8 @@ public class LineUdpParserImpl implements LineUdpParser, Closeable { private final CairoSecurityContext cairoSecurityContext; private final LineProtoTimestampAdapter timestampAdapter; private final LineUdpReceiverConfiguration udpConfiguration; + private final short defaultFloatColumnType; + private final short defaultIntegerColumnType; // state // cache entry index is always a negative value private int cacheEntryIndex = 0; @@ -99,6 +101,9 @@ public class LineUdpParserImpl implements LineUdpParser, Closeable { this.udpConfiguration = udpConfiguration; this.cairoSecurityContext = udpConfiguration.getCairoSecurityContext(); this.timestampAdapter = udpConfiguration.getTimestampAdapter(); + + defaultFloatColumnType = udpConfiguration.getDefaultColumnTypeForFloat(); + defaultIntegerColumnType = udpConfiguration.getDefaultColumnTypeForInteger(); } @Override @@ -324,7 +329,7 @@ public class LineUdpParserImpl implements LineUdpParser, Closeable { } private void parseFieldValue(CachedCharSequence value, CharSequenceCache cache) { - int valueType = LineUdpParserSupport.getValueType(value); + int valueType = LineUdpParserSupport.getValueType(value, defaultFloatColumnType, defaultIntegerColumnType); if (valueType == ColumnType.UNDEFINED) { switchModeToSkipLine(); } else { @@ -334,7 +339,7 @@ public class LineUdpParserImpl implements LineUdpParser, Closeable { @SuppressWarnings("unused") private void parseFieldValueNewTable(CachedCharSequence value, CharSequenceCache cache) { - int valueType = LineUdpParserSupport.getValueType(value); + int valueType = LineUdpParserSupport.getValueType(value, defaultFloatColumnType, defaultIntegerColumnType); if (valueType == ColumnType.UNDEFINED || valueType == ColumnType.NULL) { // cannot create a col of type null switchModeToSkipLine(); } else { @@ -368,6 +373,18 @@ public class LineUdpParserImpl implements LineUdpParser, Closeable { || columnTypeTag == ColumnType.TIMESTAMP || columnTypeTag == ColumnType.DATE; break; + case ColumnType.INT: + valid = columnTypeTag == ColumnType.INT + || columnTypeTag == ColumnType.SHORT + || columnTypeTag == ColumnType.BYTE; + break; + case ColumnType.SHORT: + valid = columnTypeTag == ColumnType.SHORT + || columnTypeTag == ColumnType.BYTE; + break; + case ColumnType.BYTE: + valid = columnTypeTag == ColumnType.BYTE; + break; case ColumnType.BOOLEAN: valid = columnTypeTag == ColumnType.BOOLEAN; break; @@ -380,6 +397,9 @@ public class LineUdpParserImpl implements LineUdpParser, Closeable { case ColumnType.DOUBLE: valid = columnTypeTag == ColumnType.DOUBLE || columnTypeTag == ColumnType.FLOAT; break; + case ColumnType.FLOAT: + valid = columnTypeTag == ColumnType.FLOAT; + break; case ColumnType.SYMBOL: valid = columnTypeTag == ColumnType.SYMBOL; break; diff --git a/core/src/main/java/io/questdb/cutlass/line/udp/LineUdpParserSupport.java b/core/src/main/java/io/questdb/cutlass/line/udp/LineUdpParserSupport.java index 91aa6ca92..91cab9405 100644 --- a/core/src/main/java/io/questdb/cutlass/line/udp/LineUdpParserSupport.java +++ b/core/src/main/java/io/questdb/cutlass/line/udp/LineUdpParserSupport.java @@ -179,6 +179,10 @@ public class LineUdpParserSupport { } public static int getValueType(CharSequence value) { + return getValueType(value, ColumnType.DOUBLE, ColumnType.LONG); + } + + public static int getValueType(CharSequence value, short defaultFloatColumnType, short defaultIntegerColumnType) { // method called for inbound ilp messages on each value. // returning UNDEFINED makes the whole line be skipped. // 0 len values, return null type. @@ -193,7 +197,7 @@ public class LineUdpParserSupport { if (valueLen > 3 && value.charAt(0) == '0' && value.charAt(1) == 'x') { return ColumnType.LONG256; } - return valueLen == 1 ? ColumnType.SYMBOL : ColumnType.LONG; + return valueLen == 1 ? ColumnType.SYMBOL : defaultIntegerColumnType; case 't': if (valueLen > 1 && ((first >= '0' && first <= '9') || first == '-')) { return ColumnType.TIMESTAMP; @@ -223,10 +227,10 @@ public class LineUdpParserSupport { return ColumnType.STRING; default: if (last >= '0' && last <= '9' && ((first >= '0' && first <= '9') || first == '-' || first == '.')) { - return ColumnType.DOUBLE; + return defaultFloatColumnType; } if (SqlKeywords.isNanKeyword(value)) { - return ColumnType.DOUBLE; + return defaultFloatColumnType; } if (value.charAt(0) == '"') { return ColumnType.UNDEFINED; diff --git a/core/src/main/java/io/questdb/cutlass/line/udp/LineUdpReceiverConfiguration.java b/core/src/main/java/io/questdb/cutlass/line/udp/LineUdpReceiverConfiguration.java index d36fd24a5..56ebd3fba 100644 --- a/core/src/main/java/io/questdb/cutlass/line/udp/LineUdpReceiverConfiguration.java +++ b/core/src/main/java/io/questdb/cutlass/line/udp/LineUdpReceiverConfiguration.java @@ -61,4 +61,8 @@ public interface LineUdpReceiverConfiguration { LineProtoTimestampAdapter getTimestampAdapter(); int getDefaultPartitionBy(); + + short getDefaultColumnTypeForFloat(); + + short getDefaultColumnTypeForInteger(); } diff --git a/core/src/test/java/io/questdb/PropServerConfigurationTest.java b/core/src/test/java/io/questdb/PropServerConfigurationTest.java index d4500c6ff..806005bf2 100644 --- a/core/src/test/java/io/questdb/PropServerConfigurationTest.java +++ b/core/src/test/java/io/questdb/PropServerConfigurationTest.java @@ -25,6 +25,7 @@ package io.questdb; import io.questdb.cairo.CairoConfiguration; +import io.questdb.cairo.ColumnType; import io.questdb.cairo.CommitMode; import io.questdb.cairo.PartitionBy; import io.questdb.cairo.SqlJitMode; @@ -284,6 +285,8 @@ public class PropServerConfigurationTest { Assert.assertEquals(500_000, configuration.getCairoConfiguration().getWriterAsyncCommandBusyWaitTimeout()); Assert.assertEquals(30_000_000, configuration.getCairoConfiguration().getWriterAsyncCommandMaxTimeout()); Assert.assertEquals(1023, configuration.getCairoConfiguration().getWriterTickRowsCountMod()); + Assert.assertEquals(ColumnType.DOUBLE, configuration.getLineTcpReceiverConfiguration().getDefaultColumnTypeForFloat()); + Assert.assertEquals(ColumnType.LONG, configuration.getLineTcpReceiverConfiguration().getDefaultColumnTypeForInteger()); Assert.assertTrue(configuration.getHttpServerConfiguration().getHttpContextConfiguration().getServerKeepAlive()); Assert.assertEquals("HTTP/1.1 ", configuration.getHttpServerConfiguration().getHttpContextConfiguration().getHttpVersion()); @@ -713,6 +716,8 @@ public class PropServerConfigurationTest { Assert.assertEquals(PartitionBy.MONTH, configuration.getLineTcpReceiverConfiguration().getDefaultPartitionBy()); Assert.assertEquals(5_000, configuration.getLineTcpReceiverConfiguration().getWriterIdleTimeout()); Assert.assertEquals(16, configuration.getCairoConfiguration().getPartitionPurgeListCapacity()); + Assert.assertEquals(ColumnType.FLOAT, configuration.getLineTcpReceiverConfiguration().getDefaultColumnTypeForFloat()); + Assert.assertEquals(ColumnType.INT, configuration.getLineTcpReceiverConfiguration().getDefaultColumnTypeForInteger()); Assert.assertTrue(configuration.getCairoConfiguration().getTelemetryConfiguration().getEnabled()); Assert.assertEquals(512, configuration.getCairoConfiguration().getTelemetryConfiguration().getQueueCapacity()); @@ -824,4 +829,112 @@ public class PropServerConfigurationTest { configuration = new PropServerConfiguration(root, properties, null, LOG, new BuildInformationHolder()); Assert.assertEquals(SqlJitMode.JIT_MODE_DISABLED, configuration.getCairoConfiguration().getSqlJitMode()); } + + @Test + public void testDefaultAddColumnTypeForFloat() throws ServerConfigurationException, JsonException { + Properties properties = new Properties(); + + // default + PropServerConfiguration configuration = new PropServerConfiguration(root, properties, null, LOG, new BuildInformationHolder()); + Assert.assertEquals(ColumnType.DOUBLE, configuration.getLineTcpReceiverConfiguration().getDefaultColumnTypeForFloat()); + + // empty + properties.setProperty("line.float.default.column.type", ""); + configuration = new PropServerConfiguration(root, properties, null, LOG, new BuildInformationHolder()); + Assert.assertEquals(ColumnType.DOUBLE, configuration.getLineTcpReceiverConfiguration().getDefaultColumnTypeForFloat()); + + // double + properties.setProperty("line.float.default.column.type", "DOUBLE"); + configuration = new PropServerConfiguration(root, properties, null, LOG, new BuildInformationHolder()); + Assert.assertEquals(ColumnType.DOUBLE, configuration.getLineTcpReceiverConfiguration().getDefaultColumnTypeForFloat()); + + // float + properties.setProperty("line.float.default.column.type", "FLOAT"); + configuration = new PropServerConfiguration(root, properties, null, LOG, new BuildInformationHolder()); + Assert.assertEquals(ColumnType.FLOAT, configuration.getLineTcpReceiverConfiguration().getDefaultColumnTypeForFloat()); + + // lowercase + properties.setProperty("line.float.default.column.type", "double"); + configuration = new PropServerConfiguration(root, properties, null, LOG, new BuildInformationHolder()); + Assert.assertEquals(ColumnType.DOUBLE, configuration.getLineTcpReceiverConfiguration().getDefaultColumnTypeForFloat()); + + // camel case + properties.setProperty("line.float.default.column.type", "Float"); + configuration = new PropServerConfiguration(root, properties, null, LOG, new BuildInformationHolder()); + Assert.assertEquals(ColumnType.FLOAT, configuration.getLineTcpReceiverConfiguration().getDefaultColumnTypeForFloat()); + + // not allowed + properties.setProperty("line.float.default.column.type", "STRING"); + configuration = new PropServerConfiguration(root, properties, null, LOG, new BuildInformationHolder()); + Assert.assertEquals(ColumnType.DOUBLE, configuration.getLineTcpReceiverConfiguration().getDefaultColumnTypeForFloat()); + + // not allowed + properties.setProperty("line.float.default.column.type", "SHORT"); + configuration = new PropServerConfiguration(root, properties, null, LOG, new BuildInformationHolder()); + Assert.assertEquals(ColumnType.DOUBLE, configuration.getLineTcpReceiverConfiguration().getDefaultColumnTypeForFloat()); + + // non existent type + properties.setProperty("line.float.default.column.type", "FLAT"); + configuration = new PropServerConfiguration(root, properties, null, LOG, new BuildInformationHolder()); + Assert.assertEquals(ColumnType.DOUBLE, configuration.getLineTcpReceiverConfiguration().getDefaultColumnTypeForFloat()); + } + + @Test + public void testDefaultAddColumnTypeForInteger() throws ServerConfigurationException, JsonException { + Properties properties = new Properties(); + + // default + PropServerConfiguration configuration = new PropServerConfiguration(root, properties, null, LOG, new BuildInformationHolder()); + Assert.assertEquals(ColumnType.LONG, configuration.getLineTcpReceiverConfiguration().getDefaultColumnTypeForInteger()); + + // empty + properties.setProperty("line.integer.default.column.type", ""); + configuration = new PropServerConfiguration(root, properties, null, LOG, new BuildInformationHolder()); + Assert.assertEquals(ColumnType.LONG, configuration.getLineTcpReceiverConfiguration().getDefaultColumnTypeForInteger()); + + // long + properties.setProperty("line.integer.default.column.type", "LONG"); + configuration = new PropServerConfiguration(root, properties, null, LOG, new BuildInformationHolder()); + Assert.assertEquals(ColumnType.LONG, configuration.getLineTcpReceiverConfiguration().getDefaultColumnTypeForInteger()); + + // int + properties.setProperty("line.integer.default.column.type", "INT"); + configuration = new PropServerConfiguration(root, properties, null, LOG, new BuildInformationHolder()); + Assert.assertEquals(ColumnType.INT, configuration.getLineTcpReceiverConfiguration().getDefaultColumnTypeForInteger()); + + // short + properties.setProperty("line.integer.default.column.type", "SHORT"); + configuration = new PropServerConfiguration(root, properties, null, LOG, new BuildInformationHolder()); + Assert.assertEquals(ColumnType.SHORT, configuration.getLineTcpReceiverConfiguration().getDefaultColumnTypeForInteger()); + + // byte + properties.setProperty("line.integer.default.column.type", "BYTE"); + configuration = new PropServerConfiguration(root, properties, null, LOG, new BuildInformationHolder()); + Assert.assertEquals(ColumnType.BYTE, configuration.getLineTcpReceiverConfiguration().getDefaultColumnTypeForInteger()); + + // lowercase + properties.setProperty("line.integer.default.column.type", "int"); + configuration = new PropServerConfiguration(root, properties, null, LOG, new BuildInformationHolder()); + Assert.assertEquals(ColumnType.INT, configuration.getLineTcpReceiverConfiguration().getDefaultColumnTypeForInteger()); + + // camel case + properties.setProperty("line.integer.default.column.type", "Short"); + configuration = new PropServerConfiguration(root, properties, null, LOG, new BuildInformationHolder()); + Assert.assertEquals(ColumnType.SHORT, configuration.getLineTcpReceiverConfiguration().getDefaultColumnTypeForInteger()); + + // not allowed + properties.setProperty("line.integer.default.column.type", "SYMBOL"); + configuration = new PropServerConfiguration(root, properties, null, LOG, new BuildInformationHolder()); + Assert.assertEquals(ColumnType.LONG, configuration.getLineTcpReceiverConfiguration().getDefaultColumnTypeForInteger()); + + // not allowed + properties.setProperty("line.integer.default.column.type", "FLOAT"); + configuration = new PropServerConfiguration(root, properties, null, LOG, new BuildInformationHolder()); + Assert.assertEquals(ColumnType.LONG, configuration.getLineTcpReceiverConfiguration().getDefaultColumnTypeForInteger()); + + // non existent type + properties.setProperty("line.integer.default.column.type", "BITE"); + configuration = new PropServerConfiguration(root, properties, null, LOG, new BuildInformationHolder()); + Assert.assertEquals(ColumnType.LONG, configuration.getLineTcpReceiverConfiguration().getDefaultColumnTypeForInteger()); + } } diff --git a/core/src/test/java/io/questdb/cutlass/line/tcp/BaseLineTcpContextTest.java b/core/src/test/java/io/questdb/cutlass/line/tcp/BaseLineTcpContextTest.java index 50c4c151c..0e870fad2 100644 --- a/core/src/test/java/io/questdb/cutlass/line/tcp/BaseLineTcpContextTest.java +++ b/core/src/test/java/io/questdb/cutlass/line/tcp/BaseLineTcpContextTest.java @@ -25,6 +25,7 @@ package io.questdb.cutlass.line.tcp; import io.questdb.cairo.AbstractCairoTest; +import io.questdb.cairo.ColumnType; import io.questdb.cairo.TableReader; import io.questdb.log.Log; import io.questdb.log.LogFactory; @@ -95,6 +96,8 @@ abstract class BaseLineTcpContextTest extends AbstractCairoTest { protected boolean disconnectOnError; protected boolean stringToCharCastAllowed; protected boolean symbolAsFieldSupported; + protected short floatDefaultColumnType; + protected short integerDefaultColumnType; @Before public void before() { @@ -104,6 +107,8 @@ abstract class BaseLineTcpContextTest extends AbstractCairoTest { disconnected = true; netMsgBufferSize.set(512); disconnectOnError = false; + floatDefaultColumnType = ColumnType.DOUBLE; + integerDefaultColumnType = ColumnType.LONG; lineTcpConfiguration = createNoAuthReceiverConfiguration(provideLineTcpNetworkFacade()); } @@ -193,6 +198,16 @@ abstract class BaseLineTcpContextTest extends AbstractCairoTest { return symbolAsFieldSupported; } + @Override + public short getDefaultColumnTypeForFloat() { + return floatDefaultColumnType; + } + + @Override + public short getDefaultColumnTypeForInteger() { + return integerDefaultColumnType; + } + @Override public MicrosecondClock getMicrosecondClock() { return new MicrosecondClockImpl() { diff --git a/core/src/test/java/io/questdb/cutlass/line/tcp/LineTcpAuthConnectionContextTest.java b/core/src/test/java/io/questdb/cutlass/line/tcp/LineTcpAuthConnectionContextTest.java index eee503643..d4434e39b 100644 --- a/core/src/test/java/io/questdb/cutlass/line/tcp/LineTcpAuthConnectionContextTest.java +++ b/core/src/test/java/io/questdb/cutlass/line/tcp/LineTcpAuthConnectionContextTest.java @@ -25,6 +25,7 @@ package io.questdb.cutlass.line.tcp; import io.questdb.cairo.CairoException; +import io.questdb.cairo.ColumnType; import io.questdb.std.Files; import io.questdb.std.Unsafe; import io.questdb.test.tools.TestUtils; @@ -59,6 +60,8 @@ public class LineTcpAuthConnectionContextTest extends BaseLineTcpContextTest { disconnected = true; netMsgBufferSize.set(1024); maxSendBytes = 1024; + floatDefaultColumnType = ColumnType.DOUBLE; + integerDefaultColumnType = ColumnType.LONG; lineTcpConfiguration = createReceiverConfiguration(true, new LineTcpNetworkFacade() { @Override public int send(long fd, long buffer, int bufferLen) { diff --git a/core/src/test/java/io/questdb/cutlass/line/tcp/LineTcpConnectionContextTest.java b/core/src/test/java/io/questdb/cutlass/line/tcp/LineTcpConnectionContextTest.java index 529a9241e..06d7cfda2 100644 --- a/core/src/test/java/io/questdb/cutlass/line/tcp/LineTcpConnectionContextTest.java +++ b/core/src/test/java/io/questdb/cutlass/line/tcp/LineTcpConnectionContextTest.java @@ -1080,6 +1080,64 @@ public class LineTcpConnectionContextTest extends BaseLineTcpContextTest { }); } + @Test + public void testAddIntegerColumnAsByte() throws Exception { + integerDefaultColumnType = ColumnType.BYTE; + testDefaultColumnType(ColumnType.BYTE, "21i", "21", "0"); + } + + @Test + public void testAddIntegerColumnAsShort() throws Exception { + integerDefaultColumnType = ColumnType.SHORT; + testDefaultColumnType(ColumnType.SHORT, "21i", "21", "0"); + } + + @Test + public void testAddIntegerColumnAsInt() throws Exception { + integerDefaultColumnType = ColumnType.INT; + testDefaultColumnType(ColumnType.INT, "21i", "21", "NaN"); + } + + @Test + public void testAddIntegerColumnAsLong() throws Exception { + integerDefaultColumnType = ColumnType.LONG; + testDefaultColumnType(ColumnType.LONG, "21i", "21", "NaN"); + } + + @Test + public void testAddFloatColumnAsDouble() throws Exception { + floatDefaultColumnType = ColumnType.DOUBLE; + testDefaultColumnType(ColumnType.DOUBLE, "24.3", "24.3", "NaN"); + } + + @Test + public void testAddFloatColumnAsFloat() throws Exception { + floatDefaultColumnType = ColumnType.FLOAT; + testDefaultColumnType(ColumnType.FLOAT, "24.3", "24.3000", "NaN"); + } + + private void testDefaultColumnType(short expectedType, String ilpValue, String tableValue, String emptyValue) throws Exception { + String table = "addDefColType"; + addTable(table); + runInContext(() -> { + recvBuffer = + table + ",location=us-midwest temperature=82 1465839830100400200\n" + + table + ",location=us-eastcoast temperature=81,newcol=" + ilpValue + " 1465839830101400200\n"; + do { + handleContextIO(); + Assert.assertFalse(disconnected); + } while (recvBuffer.length() > 0); + closeContext(); + String expected = "location\ttemperature\ttimestamp\tnewcol\n" + + "us-midwest\t82.0\t2016-06-13T17:43:50.100400Z\t" + emptyValue + "\n" + + "us-eastcoast\t81.0\t2016-06-13T17:43:50.101400Z\t" + tableValue + "\n"; + try (TableReader reader = new TableReader(configuration, table)) { + assertCursorTwoPass(expected, reader.getCursor(), reader.getMetadata()); + Assert.assertEquals(expectedType, ColumnType.tagOf(reader.getMetadata().getColumnType("newcol"))); + } + }); + } + @Test public void testDuplicateNewFieldAlternating() throws Exception { String table = "dupField"; diff --git a/core/src/test/java/io/questdb/cutlass/line/udp/LineUdpParserImplTest.java b/core/src/test/java/io/questdb/cutlass/line/udp/LineUdpParserImplTest.java index ce87995a6..f22c5792e 100644 --- a/core/src/test/java/io/questdb/cutlass/line/udp/LineUdpParserImplTest.java +++ b/core/src/test/java/io/questdb/cutlass/line/udp/LineUdpParserImplTest.java @@ -25,8 +25,6 @@ package io.questdb.cutlass.line.udp; import io.questdb.cairo.*; -import io.questdb.cairo.security.AllowAllCairoSecurityContext; -import io.questdb.cutlass.line.LineProtoNanoTimestampAdapter; import io.questdb.std.*; import io.questdb.std.datetime.microtime.MicrosecondClock; import io.questdb.std.datetime.microtime.TimestampFormatUtils; @@ -40,20 +38,83 @@ import org.junit.Test; public class LineUdpParserImplTest extends AbstractCairoTest { @Test - public void testAddColumn() throws Exception { + public void testAddColumnDefaultLong() throws Exception { + testAddColumnInteger(ColumnType.LONG, "NaN"); + } + + @Test + public void testAddColumnDefaultInteger() throws Exception { + testAddColumnInteger(ColumnType.INT, "NaN"); + } + + @Test + public void testAddColumnDefaultShort() throws Exception { + testAddColumnInteger(ColumnType.SHORT, "0"); + } + + @Test + public void testAddColumnDefaultByte() throws Exception { + testAddColumnInteger(ColumnType.BYTE, "0"); + } + + private void testAddColumnInteger(short colType, String nullValue) throws Exception { final String expected = "tag\ttag2\tfield\tf4\tfield2\tfx\ttimestamp\tf5\n" + - "abc\txyz\t10000\t9.034\tstr\ttrue\t1970-01-01T00:01:40.000000Z\tNaN\n" + - "woopsie\tdaisy\t2000\t3.0889100000000003\tcomment\ttrue\t1970-01-01T00:01:40.000000Z\tNaN\n" + - "444\td555\t510\t1.4000000000000001\tcomment\ttrue\t1970-01-01T00:01:40.000000Z\t55\n" + - "666\t777\t410\t1.1\tcomment\\ X\tfalse\t1970-01-01T00:01:40.000000Z\tNaN\n"; + "abc\txyz\t100\t9.034\tstr\ttrue\t1970-01-01T00:01:40.000000Z\t" + nullValue + "\n" + + "woopsie\tdaisy\t127\t3.0889100000000003\tcomment\ttrue\t1970-01-01T00:01:40.000000Z\t" + nullValue + "\n" + + "444\td555\t110\t1.4000000000000001\tcomment\ttrue\t1970-01-01T00:01:40.000000Z\t55\n" + + "666\t777\t40\t1.1\tcomment\\ X\tfalse\t1970-01-01T00:01:40.000000Z\t" + nullValue + "\n"; - final String lines = "tab,tag=abc,tag2=xyz field=10000i,f4=9.034,field2=\"str\",fx=true 100000000000\n" + - "tab,tag=woopsie,tag2=daisy field=2000i,f4=3.08891,field2=\"comment\",fx=true 100000000000\n" + - "tab,tag=444,tag2=d555 field=510i,f4=1.4,f5=55i,field2=\"comment\",fx=true 100000000000\n" + - "tab,tag=666,tag2=777 field=410i,f4=1.1,field2=\"comment\\ X\",fx=false 100000000000\n"; + final String lines = "tab,tag=abc,tag2=xyz field=100i,f4=9.034,field2=\"str\",fx=true 100000000000\n" + + "tab,tag=woopsie,tag2=daisy field=127i,f4=3.08891,field2=\"comment\",fx=true 100000000000\n" + + "tab,tag=444,tag2=d555 field=110i,f4=1.4,f5=55i,field2=\"comment\",fx=true 100000000000\n" + + "tab,tag=666,tag2=777 field=40i,f4=1.1,field2=\"comment\\ X\",fx=false 100000000000\n"; + assertThat(expected, lines, "tab", configuration, new DefaultLineUdpReceiverConfiguration() { + @Override + public short getDefaultColumnTypeForInteger() { + return colType; + } + }); - assertThat(expected, lines, "tab"); + try (TableReader reader = new TableReader(configuration, "tab")) { + Assert.assertEquals(colType, reader.getMetadata().getColumnType("f5")); + } + } + + @Test + public void testAddColumnDefaultFloat() throws Exception { + testAddColumnFloat(ColumnType.FLOAT, "tag\ttag2\tfield\tf4\tfield2\tfx\ttimestamp\tf5\n" + + "abc\txyz\t100\t9.0340\tstr\ttrue\t1970-01-01T00:01:40.000000Z\tNaN\n" + + "woopsie\tdaisy\t127\t3.0889\tcomment\ttrue\t1970-01-01T00:01:40.000000Z\tNaN\n" + + "444\td555\t110\t1.4000\tcomment\ttrue\t1970-01-01T00:01:40.000000Z\t55.0000\n" + + "666\t777\t40\t1.1000\tcomment\\ X\tfalse\t1970-01-01T00:01:40.000000Z\tNaN\n"); + } + + @Test + public void testAddColumnDefaultDouble() throws Exception { + testAddColumnFloat(ColumnType.DOUBLE, "tag\ttag2\tfield\tf4\tfield2\tfx\ttimestamp\tf5\n" + + "abc\txyz\t100\t9.034\tstr\ttrue\t1970-01-01T00:01:40.000000Z\tNaN\n" + + "woopsie\tdaisy\t127\t3.0889100000000003\tcomment\ttrue\t1970-01-01T00:01:40.000000Z\tNaN\n" + + "444\td555\t110\t1.4000000000000001\tcomment\ttrue\t1970-01-01T00:01:40.000000Z\t55.0\n" + + "666\t777\t40\t1.1\tcomment\\ X\tfalse\t1970-01-01T00:01:40.000000Z\tNaN\n"); + } + + private void testAddColumnFloat(short colType, String expected) throws Exception { + final String lines = "tab,tag=abc,tag2=xyz field=100i,f4=9.034,field2=\"str\",fx=true 100000000000\n" + + "tab,tag=woopsie,tag2=daisy field=127i,f4=3.08891,field2=\"comment\",fx=true 100000000000\n" + + "tab,tag=444,tag2=d555 field=110i,f4=1.4,f5=55,field2=\"comment\",fx=true 100000000000\n" + + "tab,tag=666,tag2=777 field=40i,f4=1.1,field2=\"comment\\ X\",fx=false 100000000000\n"; + + assertThat(expected, lines, "tab", configuration, new DefaultLineUdpReceiverConfiguration() { + @Override + public short getDefaultColumnTypeForFloat() { + return colType; + } + }); + + try (TableReader reader = new TableReader(configuration, "tab")) { + Assert.assertEquals(colType, reader.getMetadata().getColumnType("f5")); + } } @Test @@ -716,10 +777,10 @@ public class LineUdpParserImplTest extends AbstractCairoTest { } } - private void assertThat(String expected, String lines, CharSequence tableName, CairoConfiguration configuration) throws Exception { + private void assertThat(String expected, String lines, CharSequence tableName, CairoConfiguration configuration, LineUdpReceiverConfiguration udpConfiguration) throws Exception { TestUtils.assertMemoryLeak(() -> { try (CairoEngine engine = new CairoEngine(configuration)) { - try (LineUdpParserImpl parser = new LineUdpParserImpl(engine, new DefaultLineUdpReceiverConfiguration())) { + try (LineUdpParserImpl parser = new LineUdpParserImpl(engine, udpConfiguration)) { byte[] bytes = lines.getBytes(Files.UTF_8); int len = bytes.length; long mem = Unsafe.malloc(len, MemoryTag.NATIVE_DEFAULT); @@ -742,6 +803,10 @@ public class LineUdpParserImplTest extends AbstractCairoTest { }); } + private void assertThat(String expected, String lines, CharSequence tableName, CairoConfiguration configuration) throws Exception { + assertThat(expected, lines, tableName, configuration, new DefaultLineUdpReceiverConfiguration()); + } + private void assertThat(String expected, String lines, CharSequence tableName) throws Exception { assertThat(expected, lines, tableName, configuration); } diff --git a/core/src/test/java/io/questdb/cutlass/line/udp/LineUdpParserSupportTest.java b/core/src/test/java/io/questdb/cutlass/line/udp/LineUdpParserSupportTest.java index 8e3bc51a6..fd3c48c04 100644 --- a/core/src/test/java/io/questdb/cutlass/line/udp/LineUdpParserSupportTest.java +++ b/core/src/test/java/io/questdb/cutlass/line/udp/LineUdpParserSupportTest.java @@ -85,11 +85,22 @@ public class LineUdpParserSupportTest extends LineUdpInsertTest { Assert.assertEquals(ColumnType.LONG, LineUdpParserSupport.getValueType("123i")); Assert.assertEquals(ColumnType.LONG, LineUdpParserSupport.getValueType("1i")); + Assert.assertEquals(ColumnType.INT, LineUdpParserSupport.getValueType("123i", ColumnType.DOUBLE, ColumnType.INT)); + Assert.assertEquals(ColumnType.INT, LineUdpParserSupport.getValueType("1i", ColumnType.FLOAT, ColumnType.INT)); + Assert.assertEquals(ColumnType.SHORT, LineUdpParserSupport.getValueType("123i", ColumnType.DOUBLE, ColumnType.SHORT)); + Assert.assertEquals(ColumnType.SHORT, LineUdpParserSupport.getValueType("1i", ColumnType.DOUBLE, ColumnType.SHORT)); + Assert.assertEquals(ColumnType.BYTE, LineUdpParserSupport.getValueType("123i", ColumnType.FLOAT, ColumnType.BYTE)); + Assert.assertEquals(ColumnType.BYTE, LineUdpParserSupport.getValueType("1i", ColumnType.DOUBLE, ColumnType.BYTE)); Assert.assertEquals(ColumnType.DOUBLE, LineUdpParserSupport.getValueType("1.45")); Assert.assertEquals(ColumnType.DOUBLE, LineUdpParserSupport.getValueType("1e-13")); Assert.assertEquals(ColumnType.DOUBLE, LineUdpParserSupport.getValueType("1.0")); Assert.assertEquals(ColumnType.DOUBLE, LineUdpParserSupport.getValueType("1")); + Assert.assertEquals(ColumnType.FLOAT, LineUdpParserSupport.getValueType("1.45", ColumnType.FLOAT, ColumnType.LONG)); + Assert.assertEquals(ColumnType.FLOAT, LineUdpParserSupport.getValueType("1e-13", ColumnType.FLOAT, ColumnType.INT)); + Assert.assertEquals(ColumnType.FLOAT, LineUdpParserSupport.getValueType("1.0", ColumnType.FLOAT, ColumnType.BYTE)); + Assert.assertEquals(ColumnType.FLOAT, LineUdpParserSupport.getValueType("1", ColumnType.FLOAT, ColumnType.LONG)); + Assert.assertEquals(ColumnType.TIMESTAMP, LineUdpParserSupport.getValueType("123t")); Assert.assertEquals(ColumnType.UNDEFINED, LineUdpParserSupport.getValueType("aaa\"")); diff --git a/core/src/test/resources/server.conf b/core/src/test/resources/server.conf index f823556be..a59830dad 100644 --- a/core/src/test/resources/server.conf +++ b/core/src/test/resources/server.conf @@ -185,6 +185,8 @@ line.tcp.default.partition.by=YEAR line.tcp.min.idle.ms.before.writer.release=5000 line.default.partition.by=MONTH +line.float.default.column.type=FLOAT +line.integer.default.column.type=INT pg.binary.param.count.capacity=9 pg.select.cache.enabled=false -- GitLab