未验证 提交 15f9acd6 编写于 作者: G glasstiger 提交者: GitHub

feat(ilp): allow users to change default column type when ILP adds new column automatically (#2040)

上级 7b82ad61
......@@ -372,6 +372,8 @@ public class PropServerConfiguration implements ServerConfiguration {
private boolean stringToCharCastAllowed;
private boolean symbolAsFieldSupported;
private boolean isStringAsTagSupported;
private short floatDefaultColumnType;
private short integerDefaultColumnType;
public PropServerConfiguration(
String root,
......@@ -866,6 +868,18 @@ public class PropServerConfiguration implements ServerConfiguration {
this.stringToCharCastAllowed = getBoolean(properties, env, PropertyKey.LINE_TCP_UNDOCUMENTED_STRING_TO_CHAR_CAST_ALLOWED, false);
this.symbolAsFieldSupported = getBoolean(properties, env, PropertyKey.LINE_TCP_UNDOCUMENTED_SYMBOL_AS_FIELD_SUPPORTED, false);
this.isStringAsTagSupported = getBoolean(properties, env, PropertyKey.LINE_TCP_UNDOCUMENTED_STRING_AS_TAG_SUPPORTED, false);
String floatDefaultColumnTypeName = getString(properties, env, PropertyKey.LINE_FLOAT_DEFAULT_COLUMN_TYPE, ColumnType.nameOf(ColumnType.DOUBLE));
this.floatDefaultColumnType = ColumnType.tagOf(floatDefaultColumnTypeName);
if (floatDefaultColumnType != ColumnType.DOUBLE && floatDefaultColumnType != ColumnType.FLOAT) {
log.info().$("invalid default column type for float ").$(floatDefaultColumnTypeName).$("), will use DOUBLE").$();
this.floatDefaultColumnType = ColumnType.DOUBLE;
}
String integerDefaultColumnTypeName = getString(properties, env, PropertyKey.LINE_INTEGER_DEFAULT_COLUMN_TYPE, ColumnType.nameOf(ColumnType.LONG));
this.integerDefaultColumnType = ColumnType.tagOf(integerDefaultColumnTypeName);
if (integerDefaultColumnType != ColumnType.LONG && integerDefaultColumnType != ColumnType.INT && integerDefaultColumnType != ColumnType.SHORT && integerDefaultColumnType != ColumnType.BYTE) {
log.info().$("invalid default column type for integer ").$(integerDefaultColumnTypeName).$("), will use LONG").$();
this.integerDefaultColumnType = ColumnType.LONG;
}
}
this.sharedWorkerCount = getInt(properties, env, PropertyKey.SHARED_WORKER_COUNT, Math.max(1, cpuAvailable / 2 - 1 - cpuUsed));
......@@ -2314,6 +2328,16 @@ public class PropServerConfiguration implements ServerConfiguration {
public int getDefaultPartitionBy() {
return lineUdpDefaultPartitionBy;
}
@Override
public short getDefaultColumnTypeForFloat() {
return floatDefaultColumnType;
}
@Override
public short getDefaultColumnTypeForInteger() {
return integerDefaultColumnType;
}
}
private class PropLineTcpReceiverIODispatcherConfiguration implements IODispatcherConfiguration {
......@@ -2583,6 +2607,16 @@ public class PropServerConfiguration implements ServerConfiguration {
public boolean isStringAsTagSupported() {
return isStringAsTagSupported;
}
@Override
public short getDefaultColumnTypeForFloat() {
return floatDefaultColumnType;
}
@Override
public short getDefaultColumnTypeForInteger() {
return integerDefaultColumnType;
}
}
private class PropJsonQueryProcessorConfiguration implements JsonQueryProcessorConfiguration {
......
......@@ -278,6 +278,8 @@ public enum PropertyKey {
LINE_TCP_UNDOCUMENTED_STRING_TO_CHAR_CAST_ALLOWED("line.tcp.undocumented.string.to.char.cast.allowed"),
LINE_TCP_UNDOCUMENTED_SYMBOL_AS_FIELD_SUPPORTED("line.tcp.undocumented.symbol.as.field.supported"),
LINE_TCP_UNDOCUMENTED_STRING_AS_TAG_SUPPORTED("line.tcp.undocumented.string.as.tag.supported"),
LINE_FLOAT_DEFAULT_COLUMN_TYPE("line.float.default.column.type"),
LINE_INTEGER_DEFAULT_COLUMN_TYPE("line.integer.default.column.type"),
LINE_TCP_NET_IO_QUEUE_CAPACITY("line.tcp.net.io.queue.capacity"),
LINE_TCP_IO_AGGRESIVE_RECV("line.tcp.io.aggressive.recv"),
METRICS_ENABLED("metrics.enabled"),
......
......@@ -27,15 +27,14 @@ package io.questdb.cutlass.line.tcp;
import io.questdb.cairo.ColumnType;
class DefaultColumnTypes {
static final int[] DEFAULT_COLUMN_TYPES = new int[LineTcpParser.N_ENTITY_TYPES];
final int[] DEFAULT_COLUMN_TYPES = new int[LineTcpParser.N_ENTITY_TYPES];
final int[] MAPPED_COLUMN_TYPES = new int[LineTcpParser.N_MAPPED_ENTITY_TYPES];
static {
DefaultColumnTypes(LineTcpReceiverConfiguration configuration) {
// if not set it defaults to ColumnType.UNDEFINED
DEFAULT_COLUMN_TYPES[LineTcpParser.ENTITY_TYPE_TAG] = ColumnType.SYMBOL;
DEFAULT_COLUMN_TYPES[LineTcpParser.ENTITY_TYPE_FLOAT] = ColumnType.DOUBLE;
DEFAULT_COLUMN_TYPES[LineTcpParser.ENTITY_TYPE_DOUBLE] = ColumnType.DOUBLE;
DEFAULT_COLUMN_TYPES[LineTcpParser.ENTITY_TYPE_LONG] = ColumnType.LONG;
DEFAULT_COLUMN_TYPES[LineTcpParser.ENTITY_TYPE_INTEGER] = ColumnType.LONG;
DEFAULT_COLUMN_TYPES[LineTcpParser.ENTITY_TYPE_FLOAT] = configuration.getDefaultColumnTypeForFloat();
DEFAULT_COLUMN_TYPES[LineTcpParser.ENTITY_TYPE_INTEGER] = configuration.getDefaultColumnTypeForInteger();
DEFAULT_COLUMN_TYPES[LineTcpParser.ENTITY_TYPE_STRING] = ColumnType.STRING;
DEFAULT_COLUMN_TYPES[LineTcpParser.ENTITY_TYPE_SYMBOL] = ColumnType.SYMBOL;
DEFAULT_COLUMN_TYPES[LineTcpParser.ENTITY_TYPE_BOOLEAN] = ColumnType.BOOLEAN;
......@@ -45,5 +44,26 @@ class DefaultColumnTypes {
DEFAULT_COLUMN_TYPES[LineTcpParser.ENTITY_TYPE_GEOINT] = ColumnType.getGeoHashTypeWithBits(32);
DEFAULT_COLUMN_TYPES[LineTcpParser.ENTITY_TYPE_GEOLONG] = ColumnType.getGeoHashTypeWithBits(60);
DEFAULT_COLUMN_TYPES[LineTcpParser.ENTITY_TYPE_TIMESTAMP] = ColumnType.TIMESTAMP;
// we could remove this mapping by sending the column type to the writer
// currently we are passing the ILP entity type instead
MAPPED_COLUMN_TYPES[LineTcpParser.ENTITY_TYPE_TAG] = ColumnType.SYMBOL;
MAPPED_COLUMN_TYPES[LineTcpParser.ENTITY_TYPE_FLOAT] = ColumnType.FLOAT;
MAPPED_COLUMN_TYPES[LineTcpParser.ENTITY_TYPE_DOUBLE] = ColumnType.DOUBLE;
MAPPED_COLUMN_TYPES[LineTcpParser.ENTITY_TYPE_BYTE] = ColumnType.BYTE;
MAPPED_COLUMN_TYPES[LineTcpParser.ENTITY_TYPE_SHORT] = ColumnType.SHORT;
MAPPED_COLUMN_TYPES[LineTcpParser.ENTITY_TYPE_INTEGER] = ColumnType.INT;
MAPPED_COLUMN_TYPES[LineTcpParser.ENTITY_TYPE_LONG] = ColumnType.LONG;
MAPPED_COLUMN_TYPES[LineTcpParser.ENTITY_TYPE_DATE] = ColumnType.DATE;
MAPPED_COLUMN_TYPES[LineTcpParser.ENTITY_TYPE_CHAR] = ColumnType.CHAR;
MAPPED_COLUMN_TYPES[LineTcpParser.ENTITY_TYPE_STRING] = ColumnType.STRING;
//MAPPED_COLUMN_TYPES[LineTcpParser.ENTITY_TYPE_SYMBOL] = ColumnType.SYMBOL;
MAPPED_COLUMN_TYPES[LineTcpParser.ENTITY_TYPE_BOOLEAN] = ColumnType.BOOLEAN;
MAPPED_COLUMN_TYPES[LineTcpParser.ENTITY_TYPE_LONG256] = ColumnType.LONG256;
MAPPED_COLUMN_TYPES[LineTcpParser.ENTITY_TYPE_GEOBYTE] = ColumnType.getGeoHashTypeWithBits(8);
MAPPED_COLUMN_TYPES[LineTcpParser.ENTITY_TYPE_GEOSHORT] = ColumnType.getGeoHashTypeWithBits(16);
MAPPED_COLUMN_TYPES[LineTcpParser.ENTITY_TYPE_GEOINT] = ColumnType.getGeoHashTypeWithBits(32);
MAPPED_COLUMN_TYPES[LineTcpParser.ENTITY_TYPE_GEOLONG] = ColumnType.getGeoHashTypeWithBits(60);
MAPPED_COLUMN_TYPES[LineTcpParser.ENTITY_TYPE_TIMESTAMP] = ColumnType.TIMESTAMP;
}
}
......@@ -26,6 +26,7 @@ package io.questdb.cutlass.line.tcp;
import io.questdb.WorkerPoolAwareConfiguration;
import io.questdb.cairo.CairoSecurityContext;
import io.questdb.cairo.ColumnType;
import io.questdb.cairo.PartitionBy;
import io.questdb.cairo.security.AllowAllCairoSecurityContext;
import io.questdb.cutlass.line.LineProtoNanoTimestampAdapter;
......@@ -161,4 +162,14 @@ public class DefaultLineTcpReceiverConfiguration implements LineTcpReceiverConfi
public boolean isStringAsTagSupported() {
return false;
}
@Override
public short getDefaultColumnTypeForFloat() {
return ColumnType.DOUBLE;
}
@Override
public short getDefaultColumnTypeForInteger() {
return ColumnType.LONG;
}
}
......@@ -43,6 +43,7 @@ class LineTcpMeasurementEvent implements Closeable {
private final MicrosecondClock clock;
private final LineProtoTimestampAdapter timestampAdapter;
private final LineTcpEventBuffer buffer;
private final DefaultColumnTypes defaultColumnTypes;
private final boolean stringToCharCastAllowed;
private final boolean symbolAsFieldSupported;
private int writerWorkerId;
......@@ -54,12 +55,14 @@ class LineTcpMeasurementEvent implements Closeable {
long bufSize,
MicrosecondClock clock,
LineProtoTimestampAdapter timestampAdapter,
DefaultColumnTypes defaultColumnTypes,
boolean stringToCharCastAllowed,
boolean symbolAsFieldSupported
) {
this.buffer = new LineTcpEventBuffer(bufLo, bufSize);
this.clock = clock;
this.timestampAdapter = timestampAdapter;
this.defaultColumnTypes = defaultColumnTypes;
this.stringToCharCastAllowed = stringToCharCastAllowed;
this.symbolAsFieldSupported = symbolAsFieldSupported;
}
......@@ -120,7 +123,7 @@ class LineTcpMeasurementEvent implements Closeable {
// column is added
row.cancel();
row = null;
final int colType = DefaultColumnTypes.DEFAULT_COLUMN_TYPES[entityType];
final int colType = defaultColumnTypes.MAPPED_COLUMN_TYPES[entityType];
writer.addColumn(columnName, colType);
// Seek to beginning of entities
......@@ -271,7 +274,7 @@ class LineTcpMeasurementEvent implements Closeable {
CharSequence colName = localDetails.getColName();
if (TableUtils.isValidColumnName(colName)) {
offset = buffer.addColumnName(offset, colName);
colType = DefaultColumnTypes.DEFAULT_COLUMN_TYPES[entityType];
colType = defaultColumnTypes.DEFAULT_COLUMN_TYPES[entityType];
} else {
throw invalidColNameError(colName);
}
......
......@@ -77,6 +77,7 @@ class LineTcpMeasurementScheduler implements Closeable {
CairoConfiguration cairoConfiguration = engine.getConfiguration();
this.configuration = lineConfiguration;
MillisecondClock milliClock = cairoConfiguration.getMillisecondClock();
DefaultColumnTypes defaultColumnTypes = new DefaultColumnTypes(lineConfiguration);
int n = ioWorkerPool.getWorkerCount();
this.netIoJobs = new NetworkIOJob[n];
this.tableNameSinks = new StringSink[n];
......@@ -110,8 +111,9 @@ class LineTcpMeasurementScheduler implements Closeable {
addressSize,
lineConfiguration.getMicrosecondClock(),
lineConfiguration.getTimestampAdapter(),
lineConfiguration.isStringToCharCastAllowed(),
lineConfiguration.isSymbolAsFieldSupported()),
defaultColumnTypes,
lineConfiguration.isStringToCharCastAllowed(),
lineConfiguration.isSymbolAsFieldSupported()),
getEventSlotSize(maxMeasurementSize),
queueSize,
MemoryTag.NATIVE_DEFAULT
......@@ -133,7 +135,7 @@ class LineTcpMeasurementScheduler implements Closeable {
writerWorkerPool.assign(i, (Job) lineTcpWriterJob);
writerWorkerPool.assign(i, (Closeable) lineTcpWriterJob);
}
this.tableStructureAdapter = new TableStructureAdapter(cairoConfiguration, configuration.getDefaultPartitionBy());
this.tableStructureAdapter = new TableStructureAdapter(cairoConfiguration, defaultColumnTypes, configuration.getDefaultPartitionBy());
writerIdleTimeout = lineConfiguration.getWriterIdleTimeout();
}
......
......@@ -50,13 +50,14 @@ public class LineTcpParser {
public static final byte ENTITY_TYPE_GEOINT = 11;
public static final byte ENTITY_TYPE_GEOLONG = 12;
public static final byte ENTITY_TYPE_TIMESTAMP = 13;
public static final int N_ENTITY_TYPES = ENTITY_TYPE_TIMESTAMP + 1;
public static final byte ENTITY_TYPE_LONG = 14;
public static final byte ENTITY_TYPE_DOUBLE = 15;
public static final int N_ENTITY_TYPES = ENTITY_TYPE_DOUBLE + 1;
public static final byte ENTITY_TYPE_SHORT = 16;
public static final byte ENTITY_TYPE_BYTE = 17;
public static final byte ENTITY_TYPE_DATE = 18;
public static final byte ENTITY_TYPE_CHAR = 19;
public static final int N_MAPPED_ENTITY_TYPES = ENTITY_TYPE_CHAR + 1;
static final byte ENTITY_TYPE_NONE = (byte) 0xff; // visible for testing
private static final Log LOG = LogFactory.getLog(LineTcpParser.class);
......
......@@ -87,4 +87,8 @@ public interface LineTcpReceiverConfiguration {
boolean isSymbolAsFieldSupported();
boolean isStringAsTagSupported();
short getDefaultColumnTypeForFloat();
short getDefaultColumnTypeForInteger();
}
......@@ -40,12 +40,14 @@ class TableStructureAdapter implements TableStructure {
private final LowerCaseCharSequenceHashSet entityNamesUtf16 = new LowerCaseCharSequenceHashSet();
private final ObjList<LineTcpParser.ProtoEntity> entities = new ObjList<>();
private final CairoConfiguration cairoConfiguration;
private final DefaultColumnTypes defaultColumnTypes;
private final int defaultPartitionBy;
private CharSequence tableName;
private int timestampIndex = -1;
public TableStructureAdapter(CairoConfiguration configuration, int defaultPartitionBy) {
public TableStructureAdapter(CairoConfiguration configuration, DefaultColumnTypes defaultColumnTypes, int defaultPartitionBy) {
this.cairoConfiguration = configuration;
this.defaultColumnTypes = defaultColumnTypes;
this.defaultPartitionBy = defaultPartitionBy;
}
......@@ -73,7 +75,7 @@ class TableStructureAdapter implements TableStructure {
if (columnIndex == getTimestampIndex()) {
return ColumnType.TIMESTAMP;
}
return DefaultColumnTypes.DEFAULT_COLUMN_TYPES[entities.get(columnIndex).getType()];
return defaultColumnTypes.DEFAULT_COLUMN_TYPES[entities.get(columnIndex).getType()];
}
@Override
......
......@@ -25,6 +25,7 @@
package io.questdb.cutlass.line.udp;
import io.questdb.cairo.CairoSecurityContext;
import io.questdb.cairo.ColumnType;
import io.questdb.cairo.CommitMode;
import io.questdb.cairo.PartitionBy;
import io.questdb.cairo.security.AllowAllCairoSecurityContext;
......@@ -115,4 +116,14 @@ public class DefaultLineUdpReceiverConfiguration implements LineUdpReceiverConfi
public int getCommitMode() {
return CommitMode.NOSYNC;
}
@Override
public short getDefaultColumnTypeForFloat() {
return ColumnType.DOUBLE;
}
@Override
public short getDefaultColumnTypeForInteger() {
return ColumnType.LONG;
}
}
......@@ -68,6 +68,8 @@ public class LineUdpParserImpl implements LineUdpParser, Closeable {
private final CairoSecurityContext cairoSecurityContext;
private final LineProtoTimestampAdapter timestampAdapter;
private final LineUdpReceiverConfiguration udpConfiguration;
private final short defaultFloatColumnType;
private final short defaultIntegerColumnType;
// state
// cache entry index is always a negative value
private int cacheEntryIndex = 0;
......@@ -99,6 +101,9 @@ public class LineUdpParserImpl implements LineUdpParser, Closeable {
this.udpConfiguration = udpConfiguration;
this.cairoSecurityContext = udpConfiguration.getCairoSecurityContext();
this.timestampAdapter = udpConfiguration.getTimestampAdapter();
defaultFloatColumnType = udpConfiguration.getDefaultColumnTypeForFloat();
defaultIntegerColumnType = udpConfiguration.getDefaultColumnTypeForInteger();
}
@Override
......@@ -324,7 +329,7 @@ public class LineUdpParserImpl implements LineUdpParser, Closeable {
}
private void parseFieldValue(CachedCharSequence value, CharSequenceCache cache) {
int valueType = LineUdpParserSupport.getValueType(value);
int valueType = LineUdpParserSupport.getValueType(value, defaultFloatColumnType, defaultIntegerColumnType);
if (valueType == ColumnType.UNDEFINED) {
switchModeToSkipLine();
} else {
......@@ -334,7 +339,7 @@ public class LineUdpParserImpl implements LineUdpParser, Closeable {
@SuppressWarnings("unused")
private void parseFieldValueNewTable(CachedCharSequence value, CharSequenceCache cache) {
int valueType = LineUdpParserSupport.getValueType(value);
int valueType = LineUdpParserSupport.getValueType(value, defaultFloatColumnType, defaultIntegerColumnType);
if (valueType == ColumnType.UNDEFINED || valueType == ColumnType.NULL) { // cannot create a col of type null
switchModeToSkipLine();
} else {
......@@ -368,6 +373,18 @@ public class LineUdpParserImpl implements LineUdpParser, Closeable {
|| columnTypeTag == ColumnType.TIMESTAMP
|| columnTypeTag == ColumnType.DATE;
break;
case ColumnType.INT:
valid = columnTypeTag == ColumnType.INT
|| columnTypeTag == ColumnType.SHORT
|| columnTypeTag == ColumnType.BYTE;
break;
case ColumnType.SHORT:
valid = columnTypeTag == ColumnType.SHORT
|| columnTypeTag == ColumnType.BYTE;
break;
case ColumnType.BYTE:
valid = columnTypeTag == ColumnType.BYTE;
break;
case ColumnType.BOOLEAN:
valid = columnTypeTag == ColumnType.BOOLEAN;
break;
......@@ -380,6 +397,9 @@ public class LineUdpParserImpl implements LineUdpParser, Closeable {
case ColumnType.DOUBLE:
valid = columnTypeTag == ColumnType.DOUBLE || columnTypeTag == ColumnType.FLOAT;
break;
case ColumnType.FLOAT:
valid = columnTypeTag == ColumnType.FLOAT;
break;
case ColumnType.SYMBOL:
valid = columnTypeTag == ColumnType.SYMBOL;
break;
......
......@@ -179,6 +179,10 @@ public class LineUdpParserSupport {
}
public static int getValueType(CharSequence value) {
return getValueType(value, ColumnType.DOUBLE, ColumnType.LONG);
}
public static int getValueType(CharSequence value, short defaultFloatColumnType, short defaultIntegerColumnType) {
// method called for inbound ilp messages on each value.
// returning UNDEFINED makes the whole line be skipped.
// 0 len values, return null type.
......@@ -193,7 +197,7 @@ public class LineUdpParserSupport {
if (valueLen > 3 && value.charAt(0) == '0' && value.charAt(1) == 'x') {
return ColumnType.LONG256;
}
return valueLen == 1 ? ColumnType.SYMBOL : ColumnType.LONG;
return valueLen == 1 ? ColumnType.SYMBOL : defaultIntegerColumnType;
case 't':
if (valueLen > 1 && ((first >= '0' && first <= '9') || first == '-')) {
return ColumnType.TIMESTAMP;
......@@ -223,10 +227,10 @@ public class LineUdpParserSupport {
return ColumnType.STRING;
default:
if (last >= '0' && last <= '9' && ((first >= '0' && first <= '9') || first == '-' || first == '.')) {
return ColumnType.DOUBLE;
return defaultFloatColumnType;
}
if (SqlKeywords.isNanKeyword(value)) {
return ColumnType.DOUBLE;
return defaultFloatColumnType;
}
if (value.charAt(0) == '"') {
return ColumnType.UNDEFINED;
......
......@@ -61,4 +61,8 @@ public interface LineUdpReceiverConfiguration {
LineProtoTimestampAdapter getTimestampAdapter();
int getDefaultPartitionBy();
short getDefaultColumnTypeForFloat();
short getDefaultColumnTypeForInteger();
}
......@@ -25,6 +25,7 @@
package io.questdb;
import io.questdb.cairo.CairoConfiguration;
import io.questdb.cairo.ColumnType;
import io.questdb.cairo.CommitMode;
import io.questdb.cairo.PartitionBy;
import io.questdb.cairo.SqlJitMode;
......@@ -284,6 +285,8 @@ public class PropServerConfigurationTest {
Assert.assertEquals(500_000, configuration.getCairoConfiguration().getWriterAsyncCommandBusyWaitTimeout());
Assert.assertEquals(30_000_000, configuration.getCairoConfiguration().getWriterAsyncCommandMaxTimeout());
Assert.assertEquals(1023, configuration.getCairoConfiguration().getWriterTickRowsCountMod());
Assert.assertEquals(ColumnType.DOUBLE, configuration.getLineTcpReceiverConfiguration().getDefaultColumnTypeForFloat());
Assert.assertEquals(ColumnType.LONG, configuration.getLineTcpReceiverConfiguration().getDefaultColumnTypeForInteger());
Assert.assertTrue(configuration.getHttpServerConfiguration().getHttpContextConfiguration().getServerKeepAlive());
Assert.assertEquals("HTTP/1.1 ", configuration.getHttpServerConfiguration().getHttpContextConfiguration().getHttpVersion());
......@@ -713,6 +716,8 @@ public class PropServerConfigurationTest {
Assert.assertEquals(PartitionBy.MONTH, configuration.getLineTcpReceiverConfiguration().getDefaultPartitionBy());
Assert.assertEquals(5_000, configuration.getLineTcpReceiverConfiguration().getWriterIdleTimeout());
Assert.assertEquals(16, configuration.getCairoConfiguration().getPartitionPurgeListCapacity());
Assert.assertEquals(ColumnType.FLOAT, configuration.getLineTcpReceiverConfiguration().getDefaultColumnTypeForFloat());
Assert.assertEquals(ColumnType.INT, configuration.getLineTcpReceiverConfiguration().getDefaultColumnTypeForInteger());
Assert.assertTrue(configuration.getCairoConfiguration().getTelemetryConfiguration().getEnabled());
Assert.assertEquals(512, configuration.getCairoConfiguration().getTelemetryConfiguration().getQueueCapacity());
......@@ -824,4 +829,112 @@ public class PropServerConfigurationTest {
configuration = new PropServerConfiguration(root, properties, null, LOG, new BuildInformationHolder());
Assert.assertEquals(SqlJitMode.JIT_MODE_DISABLED, configuration.getCairoConfiguration().getSqlJitMode());
}
@Test
public void testDefaultAddColumnTypeForFloat() throws ServerConfigurationException, JsonException {
Properties properties = new Properties();
// default
PropServerConfiguration configuration = new PropServerConfiguration(root, properties, null, LOG, new BuildInformationHolder());
Assert.assertEquals(ColumnType.DOUBLE, configuration.getLineTcpReceiverConfiguration().getDefaultColumnTypeForFloat());
// empty
properties.setProperty("line.float.default.column.type", "");
configuration = new PropServerConfiguration(root, properties, null, LOG, new BuildInformationHolder());
Assert.assertEquals(ColumnType.DOUBLE, configuration.getLineTcpReceiverConfiguration().getDefaultColumnTypeForFloat());
// double
properties.setProperty("line.float.default.column.type", "DOUBLE");
configuration = new PropServerConfiguration(root, properties, null, LOG, new BuildInformationHolder());
Assert.assertEquals(ColumnType.DOUBLE, configuration.getLineTcpReceiverConfiguration().getDefaultColumnTypeForFloat());
// float
properties.setProperty("line.float.default.column.type", "FLOAT");
configuration = new PropServerConfiguration(root, properties, null, LOG, new BuildInformationHolder());
Assert.assertEquals(ColumnType.FLOAT, configuration.getLineTcpReceiverConfiguration().getDefaultColumnTypeForFloat());
// lowercase
properties.setProperty("line.float.default.column.type", "double");
configuration = new PropServerConfiguration(root, properties, null, LOG, new BuildInformationHolder());
Assert.assertEquals(ColumnType.DOUBLE, configuration.getLineTcpReceiverConfiguration().getDefaultColumnTypeForFloat());
// camel case
properties.setProperty("line.float.default.column.type", "Float");
configuration = new PropServerConfiguration(root, properties, null, LOG, new BuildInformationHolder());
Assert.assertEquals(ColumnType.FLOAT, configuration.getLineTcpReceiverConfiguration().getDefaultColumnTypeForFloat());
// not allowed
properties.setProperty("line.float.default.column.type", "STRING");
configuration = new PropServerConfiguration(root, properties, null, LOG, new BuildInformationHolder());
Assert.assertEquals(ColumnType.DOUBLE, configuration.getLineTcpReceiverConfiguration().getDefaultColumnTypeForFloat());
// not allowed
properties.setProperty("line.float.default.column.type", "SHORT");
configuration = new PropServerConfiguration(root, properties, null, LOG, new BuildInformationHolder());
Assert.assertEquals(ColumnType.DOUBLE, configuration.getLineTcpReceiverConfiguration().getDefaultColumnTypeForFloat());
// non existent type
properties.setProperty("line.float.default.column.type", "FLAT");
configuration = new PropServerConfiguration(root, properties, null, LOG, new BuildInformationHolder());
Assert.assertEquals(ColumnType.DOUBLE, configuration.getLineTcpReceiverConfiguration().getDefaultColumnTypeForFloat());
}
@Test
public void testDefaultAddColumnTypeForInteger() throws ServerConfigurationException, JsonException {
Properties properties = new Properties();
// default
PropServerConfiguration configuration = new PropServerConfiguration(root, properties, null, LOG, new BuildInformationHolder());
Assert.assertEquals(ColumnType.LONG, configuration.getLineTcpReceiverConfiguration().getDefaultColumnTypeForInteger());
// empty
properties.setProperty("line.integer.default.column.type", "");
configuration = new PropServerConfiguration(root, properties, null, LOG, new BuildInformationHolder());
Assert.assertEquals(ColumnType.LONG, configuration.getLineTcpReceiverConfiguration().getDefaultColumnTypeForInteger());
// long
properties.setProperty("line.integer.default.column.type", "LONG");
configuration = new PropServerConfiguration(root, properties, null, LOG, new BuildInformationHolder());
Assert.assertEquals(ColumnType.LONG, configuration.getLineTcpReceiverConfiguration().getDefaultColumnTypeForInteger());
// int
properties.setProperty("line.integer.default.column.type", "INT");
configuration = new PropServerConfiguration(root, properties, null, LOG, new BuildInformationHolder());
Assert.assertEquals(ColumnType.INT, configuration.getLineTcpReceiverConfiguration().getDefaultColumnTypeForInteger());
// short
properties.setProperty("line.integer.default.column.type", "SHORT");
configuration = new PropServerConfiguration(root, properties, null, LOG, new BuildInformationHolder());
Assert.assertEquals(ColumnType.SHORT, configuration.getLineTcpReceiverConfiguration().getDefaultColumnTypeForInteger());
// byte
properties.setProperty("line.integer.default.column.type", "BYTE");
configuration = new PropServerConfiguration(root, properties, null, LOG, new BuildInformationHolder());
Assert.assertEquals(ColumnType.BYTE, configuration.getLineTcpReceiverConfiguration().getDefaultColumnTypeForInteger());
// lowercase
properties.setProperty("line.integer.default.column.type", "int");
configuration = new PropServerConfiguration(root, properties, null, LOG, new BuildInformationHolder());
Assert.assertEquals(ColumnType.INT, configuration.getLineTcpReceiverConfiguration().getDefaultColumnTypeForInteger());
// camel case
properties.setProperty("line.integer.default.column.type", "Short");
configuration = new PropServerConfiguration(root, properties, null, LOG, new BuildInformationHolder());
Assert.assertEquals(ColumnType.SHORT, configuration.getLineTcpReceiverConfiguration().getDefaultColumnTypeForInteger());
// not allowed
properties.setProperty("line.integer.default.column.type", "SYMBOL");
configuration = new PropServerConfiguration(root, properties, null, LOG, new BuildInformationHolder());
Assert.assertEquals(ColumnType.LONG, configuration.getLineTcpReceiverConfiguration().getDefaultColumnTypeForInteger());
// not allowed
properties.setProperty("line.integer.default.column.type", "FLOAT");
configuration = new PropServerConfiguration(root, properties, null, LOG, new BuildInformationHolder());
Assert.assertEquals(ColumnType.LONG, configuration.getLineTcpReceiverConfiguration().getDefaultColumnTypeForInteger());
// non existent type
properties.setProperty("line.integer.default.column.type", "BITE");
configuration = new PropServerConfiguration(root, properties, null, LOG, new BuildInformationHolder());
Assert.assertEquals(ColumnType.LONG, configuration.getLineTcpReceiverConfiguration().getDefaultColumnTypeForInteger());
}
}
......@@ -25,6 +25,7 @@
package io.questdb.cutlass.line.tcp;
import io.questdb.cairo.AbstractCairoTest;
import io.questdb.cairo.ColumnType;
import io.questdb.cairo.TableReader;
import io.questdb.log.Log;
import io.questdb.log.LogFactory;
......@@ -95,6 +96,8 @@ abstract class BaseLineTcpContextTest extends AbstractCairoTest {
protected boolean disconnectOnError;
protected boolean stringToCharCastAllowed;
protected boolean symbolAsFieldSupported;
protected short floatDefaultColumnType;
protected short integerDefaultColumnType;
@Before
public void before() {
......@@ -104,6 +107,8 @@ abstract class BaseLineTcpContextTest extends AbstractCairoTest {
disconnected = true;
netMsgBufferSize.set(512);
disconnectOnError = false;
floatDefaultColumnType = ColumnType.DOUBLE;
integerDefaultColumnType = ColumnType.LONG;
lineTcpConfiguration = createNoAuthReceiverConfiguration(provideLineTcpNetworkFacade());
}
......@@ -193,6 +198,16 @@ abstract class BaseLineTcpContextTest extends AbstractCairoTest {
return symbolAsFieldSupported;
}
@Override
public short getDefaultColumnTypeForFloat() {
return floatDefaultColumnType;
}
@Override
public short getDefaultColumnTypeForInteger() {
return integerDefaultColumnType;
}
@Override
public MicrosecondClock getMicrosecondClock() {
return new MicrosecondClockImpl() {
......
......@@ -25,6 +25,7 @@
package io.questdb.cutlass.line.tcp;
import io.questdb.cairo.CairoException;
import io.questdb.cairo.ColumnType;
import io.questdb.std.Files;
import io.questdb.std.Unsafe;
import io.questdb.test.tools.TestUtils;
......@@ -59,6 +60,8 @@ public class LineTcpAuthConnectionContextTest extends BaseLineTcpContextTest {
disconnected = true;
netMsgBufferSize.set(1024);
maxSendBytes = 1024;
floatDefaultColumnType = ColumnType.DOUBLE;
integerDefaultColumnType = ColumnType.LONG;
lineTcpConfiguration = createReceiverConfiguration(true, new LineTcpNetworkFacade() {
@Override
public int send(long fd, long buffer, int bufferLen) {
......
......@@ -1080,6 +1080,64 @@ public class LineTcpConnectionContextTest extends BaseLineTcpContextTest {
});
}
@Test
public void testAddIntegerColumnAsByte() throws Exception {
integerDefaultColumnType = ColumnType.BYTE;
testDefaultColumnType(ColumnType.BYTE, "21i", "21", "0");
}
@Test
public void testAddIntegerColumnAsShort() throws Exception {
integerDefaultColumnType = ColumnType.SHORT;
testDefaultColumnType(ColumnType.SHORT, "21i", "21", "0");
}
@Test
public void testAddIntegerColumnAsInt() throws Exception {
integerDefaultColumnType = ColumnType.INT;
testDefaultColumnType(ColumnType.INT, "21i", "21", "NaN");
}
@Test
public void testAddIntegerColumnAsLong() throws Exception {
integerDefaultColumnType = ColumnType.LONG;
testDefaultColumnType(ColumnType.LONG, "21i", "21", "NaN");
}
@Test
public void testAddFloatColumnAsDouble() throws Exception {
floatDefaultColumnType = ColumnType.DOUBLE;
testDefaultColumnType(ColumnType.DOUBLE, "24.3", "24.3", "NaN");
}
@Test
public void testAddFloatColumnAsFloat() throws Exception {
floatDefaultColumnType = ColumnType.FLOAT;
testDefaultColumnType(ColumnType.FLOAT, "24.3", "24.3000", "NaN");
}
private void testDefaultColumnType(short expectedType, String ilpValue, String tableValue, String emptyValue) throws Exception {
String table = "addDefColType";
addTable(table);
runInContext(() -> {
recvBuffer =
table + ",location=us-midwest temperature=82 1465839830100400200\n" +
table + ",location=us-eastcoast temperature=81,newcol=" + ilpValue + " 1465839830101400200\n";
do {
handleContextIO();
Assert.assertFalse(disconnected);
} while (recvBuffer.length() > 0);
closeContext();
String expected = "location\ttemperature\ttimestamp\tnewcol\n" +
"us-midwest\t82.0\t2016-06-13T17:43:50.100400Z\t" + emptyValue + "\n" +
"us-eastcoast\t81.0\t2016-06-13T17:43:50.101400Z\t" + tableValue + "\n";
try (TableReader reader = new TableReader(configuration, table)) {
assertCursorTwoPass(expected, reader.getCursor(), reader.getMetadata());
Assert.assertEquals(expectedType, ColumnType.tagOf(reader.getMetadata().getColumnType("newcol")));
}
});
}
@Test
public void testDuplicateNewFieldAlternating() throws Exception {
String table = "dupField";
......
......@@ -25,8 +25,6 @@
package io.questdb.cutlass.line.udp;
import io.questdb.cairo.*;
import io.questdb.cairo.security.AllowAllCairoSecurityContext;
import io.questdb.cutlass.line.LineProtoNanoTimestampAdapter;
import io.questdb.std.*;
import io.questdb.std.datetime.microtime.MicrosecondClock;
import io.questdb.std.datetime.microtime.TimestampFormatUtils;
......@@ -40,20 +38,83 @@ import org.junit.Test;
public class LineUdpParserImplTest extends AbstractCairoTest {
@Test
public void testAddColumn() throws Exception {
public void testAddColumnDefaultLong() throws Exception {
testAddColumnInteger(ColumnType.LONG, "NaN");
}
@Test
public void testAddColumnDefaultInteger() throws Exception {
testAddColumnInteger(ColumnType.INT, "NaN");
}
@Test
public void testAddColumnDefaultShort() throws Exception {
testAddColumnInteger(ColumnType.SHORT, "0");
}
@Test
public void testAddColumnDefaultByte() throws Exception {
testAddColumnInteger(ColumnType.BYTE, "0");
}
private void testAddColumnInteger(short colType, String nullValue) throws Exception {
final String expected = "tag\ttag2\tfield\tf4\tfield2\tfx\ttimestamp\tf5\n" +
"abc\txyz\t10000\t9.034\tstr\ttrue\t1970-01-01T00:01:40.000000Z\tNaN\n" +
"woopsie\tdaisy\t2000\t3.0889100000000003\tcomment\ttrue\t1970-01-01T00:01:40.000000Z\tNaN\n" +
"444\td555\t510\t1.4000000000000001\tcomment\ttrue\t1970-01-01T00:01:40.000000Z\t55\n" +
"666\t777\t410\t1.1\tcomment\\ X\tfalse\t1970-01-01T00:01:40.000000Z\tNaN\n";
"abc\txyz\t100\t9.034\tstr\ttrue\t1970-01-01T00:01:40.000000Z\t" + nullValue + "\n" +
"woopsie\tdaisy\t127\t3.0889100000000003\tcomment\ttrue\t1970-01-01T00:01:40.000000Z\t" + nullValue + "\n" +
"444\td555\t110\t1.4000000000000001\tcomment\ttrue\t1970-01-01T00:01:40.000000Z\t55\n" +
"666\t777\t40\t1.1\tcomment\\ X\tfalse\t1970-01-01T00:01:40.000000Z\t" + nullValue + "\n";
final String lines = "tab,tag=abc,tag2=xyz field=10000i,f4=9.034,field2=\"str\",fx=true 100000000000\n" +
"tab,tag=woopsie,tag2=daisy field=2000i,f4=3.08891,field2=\"comment\",fx=true 100000000000\n" +
"tab,tag=444,tag2=d555 field=510i,f4=1.4,f5=55i,field2=\"comment\",fx=true 100000000000\n" +
"tab,tag=666,tag2=777 field=410i,f4=1.1,field2=\"comment\\ X\",fx=false 100000000000\n";
final String lines = "tab,tag=abc,tag2=xyz field=100i,f4=9.034,field2=\"str\",fx=true 100000000000\n" +
"tab,tag=woopsie,tag2=daisy field=127i,f4=3.08891,field2=\"comment\",fx=true 100000000000\n" +
"tab,tag=444,tag2=d555 field=110i,f4=1.4,f5=55i,field2=\"comment\",fx=true 100000000000\n" +
"tab,tag=666,tag2=777 field=40i,f4=1.1,field2=\"comment\\ X\",fx=false 100000000000\n";
assertThat(expected, lines, "tab", configuration, new DefaultLineUdpReceiverConfiguration() {
@Override
public short getDefaultColumnTypeForInteger() {
return colType;
}
});
assertThat(expected, lines, "tab");
try (TableReader reader = new TableReader(configuration, "tab")) {
Assert.assertEquals(colType, reader.getMetadata().getColumnType("f5"));
}
}
@Test
public void testAddColumnDefaultFloat() throws Exception {
testAddColumnFloat(ColumnType.FLOAT, "tag\ttag2\tfield\tf4\tfield2\tfx\ttimestamp\tf5\n" +
"abc\txyz\t100\t9.0340\tstr\ttrue\t1970-01-01T00:01:40.000000Z\tNaN\n" +
"woopsie\tdaisy\t127\t3.0889\tcomment\ttrue\t1970-01-01T00:01:40.000000Z\tNaN\n" +
"444\td555\t110\t1.4000\tcomment\ttrue\t1970-01-01T00:01:40.000000Z\t55.0000\n" +
"666\t777\t40\t1.1000\tcomment\\ X\tfalse\t1970-01-01T00:01:40.000000Z\tNaN\n");
}
@Test
public void testAddColumnDefaultDouble() throws Exception {
testAddColumnFloat(ColumnType.DOUBLE, "tag\ttag2\tfield\tf4\tfield2\tfx\ttimestamp\tf5\n" +
"abc\txyz\t100\t9.034\tstr\ttrue\t1970-01-01T00:01:40.000000Z\tNaN\n" +
"woopsie\tdaisy\t127\t3.0889100000000003\tcomment\ttrue\t1970-01-01T00:01:40.000000Z\tNaN\n" +
"444\td555\t110\t1.4000000000000001\tcomment\ttrue\t1970-01-01T00:01:40.000000Z\t55.0\n" +
"666\t777\t40\t1.1\tcomment\\ X\tfalse\t1970-01-01T00:01:40.000000Z\tNaN\n");
}
private void testAddColumnFloat(short colType, String expected) throws Exception {
final String lines = "tab,tag=abc,tag2=xyz field=100i,f4=9.034,field2=\"str\",fx=true 100000000000\n" +
"tab,tag=woopsie,tag2=daisy field=127i,f4=3.08891,field2=\"comment\",fx=true 100000000000\n" +
"tab,tag=444,tag2=d555 field=110i,f4=1.4,f5=55,field2=\"comment\",fx=true 100000000000\n" +
"tab,tag=666,tag2=777 field=40i,f4=1.1,field2=\"comment\\ X\",fx=false 100000000000\n";
assertThat(expected, lines, "tab", configuration, new DefaultLineUdpReceiverConfiguration() {
@Override
public short getDefaultColumnTypeForFloat() {
return colType;
}
});
try (TableReader reader = new TableReader(configuration, "tab")) {
Assert.assertEquals(colType, reader.getMetadata().getColumnType("f5"));
}
}
@Test
......@@ -716,10 +777,10 @@ public class LineUdpParserImplTest extends AbstractCairoTest {
}
}
private void assertThat(String expected, String lines, CharSequence tableName, CairoConfiguration configuration) throws Exception {
private void assertThat(String expected, String lines, CharSequence tableName, CairoConfiguration configuration, LineUdpReceiverConfiguration udpConfiguration) throws Exception {
TestUtils.assertMemoryLeak(() -> {
try (CairoEngine engine = new CairoEngine(configuration)) {
try (LineUdpParserImpl parser = new LineUdpParserImpl(engine, new DefaultLineUdpReceiverConfiguration())) {
try (LineUdpParserImpl parser = new LineUdpParserImpl(engine, udpConfiguration)) {
byte[] bytes = lines.getBytes(Files.UTF_8);
int len = bytes.length;
long mem = Unsafe.malloc(len, MemoryTag.NATIVE_DEFAULT);
......@@ -742,6 +803,10 @@ public class LineUdpParserImplTest extends AbstractCairoTest {
});
}
private void assertThat(String expected, String lines, CharSequence tableName, CairoConfiguration configuration) throws Exception {
assertThat(expected, lines, tableName, configuration, new DefaultLineUdpReceiverConfiguration());
}
private void assertThat(String expected, String lines, CharSequence tableName) throws Exception {
assertThat(expected, lines, tableName, configuration);
}
......
......@@ -85,11 +85,22 @@ public class LineUdpParserSupportTest extends LineUdpInsertTest {
Assert.assertEquals(ColumnType.LONG, LineUdpParserSupport.getValueType("123i"));
Assert.assertEquals(ColumnType.LONG, LineUdpParserSupport.getValueType("1i"));
Assert.assertEquals(ColumnType.INT, LineUdpParserSupport.getValueType("123i", ColumnType.DOUBLE, ColumnType.INT));
Assert.assertEquals(ColumnType.INT, LineUdpParserSupport.getValueType("1i", ColumnType.FLOAT, ColumnType.INT));
Assert.assertEquals(ColumnType.SHORT, LineUdpParserSupport.getValueType("123i", ColumnType.DOUBLE, ColumnType.SHORT));
Assert.assertEquals(ColumnType.SHORT, LineUdpParserSupport.getValueType("1i", ColumnType.DOUBLE, ColumnType.SHORT));
Assert.assertEquals(ColumnType.BYTE, LineUdpParserSupport.getValueType("123i", ColumnType.FLOAT, ColumnType.BYTE));
Assert.assertEquals(ColumnType.BYTE, LineUdpParserSupport.getValueType("1i", ColumnType.DOUBLE, ColumnType.BYTE));
Assert.assertEquals(ColumnType.DOUBLE, LineUdpParserSupport.getValueType("1.45"));
Assert.assertEquals(ColumnType.DOUBLE, LineUdpParserSupport.getValueType("1e-13"));
Assert.assertEquals(ColumnType.DOUBLE, LineUdpParserSupport.getValueType("1.0"));
Assert.assertEquals(ColumnType.DOUBLE, LineUdpParserSupport.getValueType("1"));
Assert.assertEquals(ColumnType.FLOAT, LineUdpParserSupport.getValueType("1.45", ColumnType.FLOAT, ColumnType.LONG));
Assert.assertEquals(ColumnType.FLOAT, LineUdpParserSupport.getValueType("1e-13", ColumnType.FLOAT, ColumnType.INT));
Assert.assertEquals(ColumnType.FLOAT, LineUdpParserSupport.getValueType("1.0", ColumnType.FLOAT, ColumnType.BYTE));
Assert.assertEquals(ColumnType.FLOAT, LineUdpParserSupport.getValueType("1", ColumnType.FLOAT, ColumnType.LONG));
Assert.assertEquals(ColumnType.TIMESTAMP, LineUdpParserSupport.getValueType("123t"));
Assert.assertEquals(ColumnType.UNDEFINED, LineUdpParserSupport.getValueType("aaa\""));
......
......@@ -185,6 +185,8 @@ line.tcp.default.partition.by=YEAR
line.tcp.min.idle.ms.before.writer.release=5000
line.default.partition.by=MONTH
line.float.default.column.type=FLOAT
line.integer.default.column.type=INT
pg.binary.param.count.capacity=9
pg.select.cache.enabled=false
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册