未验证 提交 db08f1e2 编写于 作者: P Patrick Mackinlay 提交者: GitHub

chore(ilp): Handle trying to convert ILP strings into non STRING colu… (#1029)

上级 582af89c
......@@ -24,8 +24,24 @@
package io.questdb.cutlass.line.tcp;
import java.io.Closeable;
import java.util.Arrays;
import java.util.concurrent.locks.ReadWriteLock;
import org.jetbrains.annotations.NotNull;
import io.questdb.Telemetry;
import io.questdb.cairo.*;
import io.questdb.cairo.CairoConfiguration;
import io.questdb.cairo.CairoEngine;
import io.questdb.cairo.CairoException;
import io.questdb.cairo.CairoSecurityContext;
import io.questdb.cairo.ColumnType;
import io.questdb.cairo.EntryUnavailableException;
import io.questdb.cairo.TableReader;
import io.questdb.cairo.TableReaderMetadata;
import io.questdb.cairo.TableStructure;
import io.questdb.cairo.TableUtils;
import io.questdb.cairo.TableWriter;
import io.questdb.cairo.TableWriter.Row;
import io.questdb.cairo.security.AllowAllCairoSecurityContext;
import io.questdb.cairo.sql.SymbolTable;
......@@ -34,22 +50,30 @@ import io.questdb.cutlass.line.LineProtoTimestampAdapter;
import io.questdb.cutlass.line.tcp.NewLineProtoParser.ProtoEntity;
import io.questdb.log.Log;
import io.questdb.log.LogFactory;
import io.questdb.mp.*;
import io.questdb.mp.FanOut;
import io.questdb.mp.Job;
import io.questdb.mp.MPSequence;
import io.questdb.mp.RingQueue;
import io.questdb.mp.SCSequence;
import io.questdb.mp.Sequence;
import io.questdb.mp.WorkerPool;
import io.questdb.network.IODispatcher;
import io.questdb.network.IOOperation;
import io.questdb.network.IORequestProcessor;
import io.questdb.std.*;
import io.questdb.std.CharSequenceIntHashMap;
import io.questdb.std.CharSequenceObjHashMap;
import io.questdb.std.Chars;
import io.questdb.std.Misc;
import io.questdb.std.ObjIntHashMap;
import io.questdb.std.ObjList;
import io.questdb.std.Unsafe;
import io.questdb.std.Vect;
import io.questdb.std.datetime.microtime.MicrosecondClock;
import io.questdb.std.datetime.millitime.MillisecondClock;
import io.questdb.std.str.DirectCharSink;
import io.questdb.std.str.FloatingDirectCharSink;
import io.questdb.std.str.Path;
import io.questdb.tasks.TelemetryTask;
import org.jetbrains.annotations.NotNull;
import java.io.Closeable;
import java.util.Arrays;
import java.util.concurrent.locks.ReadWriteLock;
class LineTcpMeasurementScheduler implements Closeable {
private static final Log LOG = LogFactory.getLog(LineTcpMeasurementScheduler.class);
......@@ -695,8 +719,14 @@ class LineTcpMeasurementScheduler implements Closeable {
bufPos += Integer.BYTES;
long hi = bufPos + 2L * len;
job.floatingCharSink.asCharSequence(bufPos, hi);
row.putStr(colIndex, job.floatingCharSink);
bufPos = hi;
final int colType = writer.getMetadata().getColumnType(colIndex);
if (colType == ColumnType.STRING) {
row.putStr(colIndex, job.floatingCharSink);
} else {
throw CairoException.instance(0).put("line protocol STRING cannot be inserted into column type ").put(ColumnType.nameOf(colType)).put(") [entityType=")
.put(entityType).put(']');
}
break;
}
......
......@@ -566,6 +566,30 @@ public class LineTcpConnectionContextTest extends AbstractCairoTest {
});
}
@Test
public void testColumnConversion2() throws Exception {
runInContext(() -> {
try (
@SuppressWarnings("resource")
TableModel model = new TableModel(configuration, "t_ilp21",
PartitionBy.NONE).col("l", ColumnType.LONG)) {
CairoTestUtils.create(model);
}
microSecondTicks = 1465839830102800L;
recvBuffer = "t_ilp21 l=843530699759026177i\n" +
"t_ilp21 l=\"843530699759026178\"\n" +
"t_ilp21 l=843530699759026179i\n";
handleContextIO();
Assert.assertFalse(disconnected);
waitForIOCompletion();
closeContext();
String expected = "l\n" +
"843530699759026177\n" +
"843530699759026179\n";
assertTable(expected, "t_ilp21");
});
}
@Test
public void testColumnNameWithSlash1() throws Exception {
runInContext(() -> {
......
......@@ -49,6 +49,7 @@ import org.junit.Test;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.security.PrivateKey;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.LockSupport;
public class LineTcpServerTest extends AbstractCairoTest {
......@@ -155,22 +156,22 @@ public class LineTcpServerTest extends AbstractCairoTest {
@Test
public void testGoodAuthenticated() throws Exception {
test(AUTH_KEY_ID1, AUTH_PRIVATE_KEY1, 768, 1_000);
test(AUTH_KEY_ID1, AUTH_PRIVATE_KEY1, 768, 1_000, false);
}
@Test(expected = NetworkError.class)
public void testInvalidSignature() throws Exception {
test(AUTH_KEY_ID1, AUTH_PRIVATE_KEY2, 768, 100);
test(AUTH_KEY_ID1, AUTH_PRIVATE_KEY2, 768, 6_000, true);
}
@Test(expected = NetworkError.class)
public void testInvalidUser() throws Exception {
test(AUTH_KEY_ID2, AUTH_PRIVATE_KEY2, 768, 100);
test(AUTH_KEY_ID2, AUTH_PRIVATE_KEY2, 768, 6_000, true);
}
@Test
public void testUnauthenticated() throws Exception {
test(null, null, 200, 1_000);
test(null, null, 200, 1_000, false);
}
@Test
......@@ -397,7 +398,8 @@ public class LineTcpServerTest extends AbstractCairoTest {
String authKeyId,
PrivateKey authPrivateKey,
int msgBufferSize,
final int nRows
final int nRows,
boolean expectDisconnect
) throws Exception {
this.authKeyId = authKeyId;
this.msgBufferSize = msgBufferSize;
......@@ -472,6 +474,10 @@ public class LineTcpServerTest extends AbstractCairoTest {
sb.append('\n');
sender.$(ts * 1000);
sender.flush();
if (expectDisconnect) {
// To prevent all data being buffered before the expected disconnect slow sending
Thread.sleep(100);
}
ts += rand.nextInt(1000);
}
......@@ -480,7 +486,11 @@ public class LineTcpServerTest extends AbstractCairoTest {
sender.close();
}
tablesCreated.await();
Assert.assertFalse(expectDisconnect);
boolean ready = tablesCreated.await(TimeUnit.MINUTES.toNanos(1));
if (!ready) {
throw new IllegalStateException("Timeout waiting for tables to be created");
}
int nRowsWritten;
do {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册