未验证 提交 871477b5 编写于 作者: A Alex Pelagenko 提交者: GitHub

fix(ilp): ILP client to escaped quotes in strings fields (#1433)

上级 af2574dd
......@@ -1516,6 +1516,8 @@ public class TableWriter implements Closeable {
masterRef--;
o3MasterRef = -1;
rowFunction = switchPartitionFunction;
row.activeColumns = columns;
row.activeNullSetters = nullSetters;
}
return;
}
......
......@@ -110,12 +110,13 @@ public final class TxWriter extends TxReader implements Closeable {
}
public void cancelRow() {
if (transientRowCount == 0 && txPartitionCount > 1) {
if (transientRowCount == 1 && txPartitionCount > 1) {
// we have to undo creation of partition
txPartitionCount--;
fixedRowCount -= prevTransientRowCount;
transientRowCount = prevTransientRowCount;
transientRowCount = prevTransientRowCount + 1; // When row cancel finishes 1 is subtracted. Add 1 to compensate.
attachedPartitions.setPos(attachedPartitions.size() - LONGS_PER_TX_ATTACHED_PARTITION);
prevTransientRowCount = txMem.getLong(TX_OFFSET_TRANSIENT_ROW_COUNT);
}
maxTimestamp = prevMaxTimestamp;
......
......@@ -48,6 +48,7 @@ public class LineUdpSender extends AbstractCharSink implements Closeable {
private final long sockaddr;
protected final long fd;
protected final NetworkFacade nf;
private boolean quoted = false;
private long lo;
private long hi;
......@@ -138,7 +139,11 @@ public class LineUdpSender extends AbstractCharSink implements Closeable {
}
public LineUdpSender field(CharSequence name, CharSequence value) {
field(name).putQuoted(value);
field(name).put('"');
quoted = true;
encodeUtf8(value);
quoted = false;
put('"');
return this;
}
......@@ -195,6 +200,7 @@ public class LineUdpSender extends AbstractCharSink implements Closeable {
if (hasMetric) {
throw CairoException.instance(0).put("duplicate metric");
}
quoted = false;
hasMetric = true;
return put(metric);
}
......@@ -236,12 +242,21 @@ public class LineUdpSender extends AbstractCharSink implements Closeable {
case ' ':
case ',':
case '=':
case '"':
case '\\':
put('\\');
if (!quoted) {
put('\\');
}
default:
put(c);
break;
case '"':
if (quoted) {
put('\\');
}
put('\"');
break;
case '\\':
put('\\').put('\\');
break;
}
}
......
......@@ -143,10 +143,6 @@ public class LineTcpParser implements Closeable {
// take the byte
byte b = Unsafe.getUnsafe().getByte(bufAt);
hasNonAscii |= b < 0;
if (!hasNonAscii)
{
int i = 0;
}
boolean endOfLine = false;
boolean appendByte = false;
switch (b) {
......@@ -340,6 +336,12 @@ public class LineTcpParser implements Closeable {
if (endOfEntityByte == (byte) '\n') {
return true;
}
} else if (tagsComplete && (endOfEntityByte == '\n' || endOfEntityByte == '\r')) {
if (currentEntity != null && currentEntity.getType() == ENTITY_TYPE_TAG) {
// One token after last tag, and no fields
// This must be timestamp
return expectTimestamp(endOfEntityByte, bufHi);
}
}
if (tagsComplete) {
......
......@@ -30,6 +30,7 @@ import io.questdb.cairo.vm.Vm;
import io.questdb.cairo.vm.api.MemoryARW;
import io.questdb.cairo.vm.api.MemoryCMARW;
import io.questdb.cairo.vm.api.MemoryMAR;
import io.questdb.griffin.model.IntervalUtils;
import io.questdb.log.Log;
import io.questdb.log.LogFactory;
import io.questdb.std.*;
......@@ -855,6 +856,38 @@ public class TableWriterTest extends AbstractCairoTest {
});
}
@Test
public void testCancelSecondRowNonPartitioned() throws Exception {
TestUtils.assertMemoryLeak(() -> {
int N = 10000;
create(FF, PartitionBy.NONE, N);
try (TableWriter writer = new TableWriter(configuration, PRODUCT)) {
long ts = TimestampFormatUtils.parseTimestamp("2013-03-04T00:00:00.000Z");
TableWriter.Row r = writer.newRow(ts);
r.putInt(0, 1234);
r.append();
r = writer.newRow(ts);
r.putInt(0, 1234);
r.cancel();
Assert.assertEquals(1, writer.size());
populateProducts(writer, new Rnd(), ts, N, 60 * 60000 * 1000L);
Assert.assertEquals(N + 1, writer.size());
writer.commit();
}
try (TableWriter writer = new TableWriter(configuration, PRODUCT)) {
Assert.assertEquals(N + 1, writer.size());
}
});
}
@Test
public void testCancelFirstRowNonPartitioned() throws Exception {
TestUtils.assertMemoryLeak(() -> {
......@@ -919,6 +952,36 @@ public class TableWriterTest extends AbstractCairoTest {
});
}
@Test
public void testCancelFirstRowSecondPartition() throws Exception {
TestUtils.assertMemoryLeak(() -> {
create(FF, PartitionBy.DAY, 4);
try (TableWriter writer = new TableWriter(configuration, PRODUCT)) {
writer.newRow(TimestampFormatUtils.parseTimestamp("2013-03-01T00:00:00.000Z")).append();
writer.newRow(TimestampFormatUtils.parseTimestamp("2013-03-01T00:00:00.000Z")).append();
TableWriter.Row r = writer.newRow(TimestampFormatUtils.parseTimestamp("2013-03-04T00:00:00.000Z"));
r.cancel();
writer.commit();
Assert.assertEquals(2, writer.size());
writer.newRow(TimestampFormatUtils.parseTimestamp("2013-03-01T00:00:00.000Z")).append();
writer.newRow(TimestampFormatUtils.parseTimestamp("2013-03-01T00:00:00.000Z")).append();
r = writer.newRow(TimestampFormatUtils.parseTimestamp("2013-03-05T00:00:00.000Z"));
r.cancel();
r.cancel();
Assert.assertEquals(4, writer.size());
}
// Last 2 rows are not committed, expect size to revert to 2
try (TableWriter writer = new TableWriter(configuration, PRODUCT)) {
Assert.assertEquals(2, writer.size());
}
});
}
@Test
public void testCancelFirstRowPartitioned2() throws Exception {
TestUtils.assertMemoryLeak(() -> {
......@@ -1909,6 +1972,54 @@ public class TableWriterTest extends AbstractCairoTest {
});
}
@Test
public void testO3AferRowCancel() throws Exception {
TestUtils.assertMemoryLeak(() -> {
try (TableModel model = new TableModel(configuration, "weather", PartitionBy.DAY)
.col("windspeed", ColumnType.DOUBLE)
.timestamp()) {
CairoTestUtils.create(model);
}
try (TableWriter writer = new TableWriter(configuration, "weather")) {
TableWriter.Row r;
r = writer.newRow(IntervalUtils.parseFloorPartialDate("2021-01-31"));
r.putDouble(0, 1.0);
r.append();
// Out of order
r = writer.newRow(IntervalUtils.parseFloorPartialDate("2021-01-30"));
r.putDouble(0, 1.0);
r.cancel();
// Back in order
r = writer.newRow(IntervalUtils.parseFloorPartialDate("2021-02-01"));
r.putDouble(0, 1.0);
r.append();
Assert.assertEquals(2, writer.size());
writer.commit();
}
long[] expectedTs = new long[] {
IntervalUtils.parseFloorPartialDate("2021-01-31"),
IntervalUtils.parseFloorPartialDate("2021-02-01")
};
try (TableReader reader = new TableReader(configuration, "weather")) {
int col = reader.getMetadata().getColumnIndex("timestamp");
RecordCursor cursor = reader.getCursor();
final Record r = cursor.getRecord();
int i = 0;
while (cursor.hasNext()) {
Assert.assertEquals("Row " + i, expectedTs[i++], r.getTimestamp(col));
}
Assert.assertEquals(expectedTs.length, i);
}
});
}
@Test
public void testOpenUnsupportedIndex() throws Exception {
TestUtils.assertMemoryLeak(() -> {
......
......@@ -33,7 +33,6 @@ import io.questdb.std.*;
import io.questdb.std.str.StringSink;
import io.questdb.test.tools.TestUtils;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
......@@ -41,7 +40,6 @@ import java.nio.charset.StandardCharsets;
public class LineTcpParser2Test extends LineUdpLexerTest {
private final LineTcpParser lineTcpParser = new LineTcpParser();
private ErrorCode lastErrorCode;
private boolean onErrorLine;
private long startOfLineAddr;
......@@ -50,6 +48,30 @@ public class LineTcpParser2Test extends LineUdpLexerTest {
Os.init();
}
@Override
public void testDanglingCommaOnTag() {
assertThat(
"measurement,tag=value field=x 10000\n",
"measurement,tag=value, field=x 10000\n"
);
}
@Override
public void testNoFieldValue2() {
assertThat(
"measurement,tag=x f= 10000\n",
"measurement,tag=x f= 10000\n"
);
}
@Override
public void testNoFieldValue3() {
assertThat(
"measurement,tag=x f= 10000\n",
"measurement,tag=x f=, 10000\n"
);
}
@Override
public void testNoTagValue1() {
assertThat(
......@@ -95,59 +117,74 @@ public class LineTcpParser2Test extends LineUdpLexerTest {
}
@Test
public void testWithQuotedStringsWithSpaces() {
public void testNoFields() {
// Single space char between last tag and timestamp
assertThat(
"measurement,tag=value,tag2=value field=10000i,field2=\"longstring\",fld3=\"short string\" 100000\n",
"measurement,tag=value,tag2=value field=10000i,field2=\"longstring\",fld3=\"short string\" 100000\n"
"measurement,tag=x 10000\n",
"measurement,tag=x 10000\n"
);
}
@Test
public void testWithQuotedStringsWithEscapedQuotes() {
// Single space char after last tag and invalid timestamp
assertThat(
"measurement,tag=value,tag2=value field=10000i,field2=\"str\" escaped\\ end\" 100000\n",
"measurement,tag=value,tag2=value field=10000i,field2=\"str\\\" escaped\\\\ end\" 100000\n"
"--ERROR=INVALID_TIMESTAMP--","measurement,tag=x 10000i\n"
);
// Double space char between last tag and timestamp
assertThat(
"measurement field2=\"double escaped \\ \" and quoted\" 100000\n",
"measurement field2=\"double escaped \\\\ \\\" and quoted\" 100000\n"
"measurement,tag=x 10000\n",
"measurement,tag=x 10000\n"
);
// Double space char between last tag and invalid timestamp
assertThat(
"measurement field2=\"double escaped \\\" and quoted2\" 100000\n",
"measurement field2=\"double escaped \\\\\\\" and quoted2\" 100000\n"
"--ERROR=INVALID_TIMESTAMP--","measurement,tag=x 10000i\n"
);
assertThat(
"measurement,tag=value,tag2=value field=10000i,field2=\"str=special,end\" 100000\n",
"measurement,tag=value,tag2=value field=10000i,field2=\"str=special,end\" 100000\n"
);
}
@Test
public void testNoFieldsAndNotTags() {
assertThat("--ERROR=INCOMPLETE_FIELD--","measurement 10000\n"); // One space char
assertThat("--ERROR=NO_FIELDS--","measurement 10000\n"); // Two space chars
}
@Test
public void testSupportsUnquotedStrings() {
assertThat(
"measurement,tag=value,tag2=value field=10000i,field2=\"str=special,end\",field3=34 100000\n",
"measurement,tag=value,tag2=value field=10000i,field2=\"str=special,end\",field3=34 100000\n"
"measurement,t\"ag=value with space,tag2=value field=10000i,field 2=strend 100000\n",
"measurement,t\\\"ag=value\\ with\\ space,tag2=value field=10000i,field\\ 2=strend 100000\n"
);
}
@Test
public void testWithQuotedStringsWithEscapedQuotesUnsuccessful() {
public void testSupportsUnquotedStringsWithQuoteInMiddle() {
assertThat(
"-- error --\n",
"measurement,tag=value,tag2=value field=10000i,field2=\"str=special,lineend\n"
"measurement,t\"ag=value with space,tag2=value field=10000i,field 2=str\"end 100000\n",
"measurement,t\\\"ag=value\\ with\\ space,tag2=value field=10000i,field\\ 2=str\"end 100000\n"
);
assertThat(
"measurement,t\"ag=value with space,tag2=value field=10000i,field 2=str\"end\" 100000\n",
"measurement,t\\\"ag=value\\ with\\ space,tag2=value field=10000i,field\\ 2=str\"end\" 100000\n"
);
}
@Test
public void testWithEscapedTagValues() {
public void testSupportsUtf8Chars() {
assertThat(
"measurement,tag=value with space,tag2=value field=10000i,field2=\"str=special,end\" 100000\n",
"measurement,tag=value\\ with\\ space,tag2=value field=10000i,field2=\"str=special,end\" 100000\n"
"लаблअца,символ=значение1 поле=значение2,поле2=\"значение3\" 123--non ascii--\n",
"लаблअца,символ=значение1 поле=значение2,поле2=\"значение3\" 123\n"
);
assertThat(
"measurement,tag=value\\with\\slash,tag2=value field=10000i,field2=\"str=special,end\\ \" 100000\n",
"measurement,tag=value\\\\with\\\\slash,tag2=value field=10000i,field2=\"str=special,end\\\\ \" 100000\n"
"लаблअца,символ=значение2 161--non ascii--\n",
"लаблअца,символ=значение2 161\n"
);
assertThat(
"table,tag=ok field=\"значение2 non ascii quoted\" 161--non ascii--\n",
"table,tag=ok field=\"значение2 non ascii quoted\" 161\n"
);
}
......@@ -165,67 +202,102 @@ public class LineTcpParser2Test extends LineUdpLexerTest {
}
@Test
public void testSupportsUnquotedStrings() {
public void testWithEscapedTagValues() {
assertThat(
"measurement,t\"ag=value with space,tag2=value field=10000i,field 2=strend 100000\n",
"measurement,t\\\"ag=value\\ with\\ space,tag2=value field=10000i,field\\ 2=strend 100000\n"
"measurement,tag=value with space,tag2=value field=10000i,field2=\"str=special,end\" 100000\n",
"measurement,tag=value\\ with\\ space,tag2=value field=10000i,field2=\"str=special,end\" 100000\n"
);
assertThat(
"measurement,tag=value\\with\\slash,tag2=value field=10000i,field2=\"str=special,end\\ \" 100000\n",
"measurement,tag=value\\\\with\\\\slash,tag2=value field=10000i,field2=\"str=special,end\\\\ \" 100000\n"
);
}
@Test
public void testSupportsUnquotedStringsWithQuoteInMiddle() {
public void testWithQuotedStringsWithEscapedQuotes() {
assertThat(
"measurement,t\"ag=value with space,tag2=value field=10000i,field 2=str\"end 100000\n",
"measurement,t\\\"ag=value\\ with\\ space,tag2=value field=10000i,field\\ 2=str\"end 100000\n"
"measurement,tag=value,tag2=value field=10000i,field2=\"str\" escaped\\ end\" 100000\n",
"measurement,tag=value,tag2=value field=10000i,field2=\"str\\\" escaped\\\\ end\" 100000\n"
);
assertThat(
"measurement,t\"ag=value with space,tag2=value field=10000i,field 2=str\"end\" 100000\n",
"measurement,t\\\"ag=value\\ with\\ space,tag2=value field=10000i,field\\ 2=str\"end\" 100000\n"
"measurement field2=\"double escaped \\ \" and quoted\" 100000\n",
"measurement field2=\"double escaped \\\\ \\\" and quoted\" 100000\n"
);
}
@Test
public void testSupportsUtf8Chars() {
assertThat(
"लаблअца,символ=значение1 поле=значение2,поле2=\"значение3\" 123--non ascii--\n",
"लаблअца,символ=значение1 поле=значение2,поле2=\"значение3\" 123\n"
"measurement field2=\"double escaped \\\" and quoted2\" 100000\n",
"measurement field2=\"double escaped \\\\\\\" and quoted2\" 100000\n"
);
assertThat(
"लаблअца,символ=значение2 161--non ascii--\n",
"लаблअца,символ=значение2 161\n"
"measurement,tag=value,tag2=value field=10000i,field2=\"str=special,end\" 100000\n",
"measurement,tag=value,tag2=value field=10000i,field2=\"str=special,end\" 100000\n"
);
assertThat(
"table,tag=ok field=\"значение2 non ascii quoted\" 161--non ascii--\n",
"table,tag=ok field=\"значение2 non ascii quoted\" 161\n"
"measurement,tag=value,tag2=value field=10000i,field2=\"str=special,end\",field3=34 100000\n",
"measurement,tag=value,tag2=value field=10000i,field2=\"str=special,end\",field3=34 100000\n"
);
}
@Override
public void testNoFieldValue2() {
@Test
public void testWithQuotedStringsWithEscapedQuotesUnsuccessful() {
assertThat(
"measurement,tag=x f= 10000\n",
"measurement,tag=x f= 10000\n"
"--ERROR=INVALID_FIELD_VALUE--",
"measurement,tag=value,tag2=value field=10000i,field2=\"str=special,lineend\n"
);
}
@Override
public void testNoFieldValue3() {
@Test
public void testWithQuotedStringsWithSpaces() {
assertThat(
"measurement,tag=x f= 10000\n",
"measurement,tag=x f=, 10000\n"
"measurement,tag=value,tag2=value field=10000i,field2=\"longstring\",fld3=\"short string\" 100000\n",
"measurement,tag=value,tag2=value field=10000i,field2=\"longstring\",fld3=\"short string\" 100000\n"
);
}
@Override
public void testDanglingCommaOnTag() {
assertThat(
"measurement,tag=value field=x 10000\n",
"measurement,tag=value, field=x 10000\n"
);
private void assembleLine() {
int nEntities = lineTcpParser.getnEntities();
Chars.utf8Decode(lineTcpParser.getMeasurementName().getLo(), lineTcpParser.getMeasurementName().getHi(), sink);
int n = 0;
boolean tagsComplete = false;
while (n < nEntities) {
ProtoEntity entity = lineTcpParser.getEntity(n++);
if (!tagsComplete && entity.getType() != LineTcpParser.ENTITY_TYPE_TAG) {
tagsComplete = true;
sink.put(' ');
} else {
sink.put(',');
}
Chars.utf8Decode(entity.getName().getLo(), entity.getName().getHi(), sink);
sink.put('=');
switch (entity.getType()) {
case LineTcpParser.ENTITY_TYPE_STRING:
sink.put('"');
Chars.utf8Decode(entity.getValue().getLo(), entity.getValue().getHi(), sink);
sink.put('"');
break;
case LineTcpParser.ENTITY_TYPE_INTEGER:
case LineTcpParser.ENTITY_TYPE_LONG256:
sink.put(entity.getValue()).put('i');
break;
default:
Chars.utf8Decode(entity.getValue().getLo(), entity.getValue().getHi(), sink);
break;
}
}
if (lineTcpParser.hasTimestamp()) {
sink.put(' ');
Numbers.append(sink, lineTcpParser.getTimestamp());
}
if (lineTcpParser.hasNonAsciiChars()) {
sink.put("--non ascii--");
}
sink.put('\n');
}
protected void assertThat(CharSequence expected, String lineStr) throws LineProtoException {
......@@ -248,7 +320,7 @@ public class LineTcpParser2Test extends LineUdpLexerTest {
try {
for (int i = start; i < len; i++) {
for(int nextBreak = 0; nextBreak < len - i; nextBreak++) {
for (int nextBreak = 0; nextBreak < len - i; nextBreak++) {
sink.clear();
resetParser(mem + fullLen);
parseMeasurement(memFull, mem, fullLen, i, 0);
......@@ -274,55 +346,6 @@ public class LineTcpParser2Test extends LineUdpLexerTest {
}
}
@Override
protected void assertError(CharSequence line, int state, int code, int position) throws LineProtoException {
byte[] bytes = line.toString().getBytes(StandardCharsets.UTF_8);
int len = bytes.length;
final boolean endWithEOL = bytes[len - 1] == '\n' || bytes[len - 1] == '\r';
if (!endWithEOL) {
len++;
}
long mem = Unsafe.malloc(len + 1, MemoryTag.NATIVE_DEFAULT);
try {
for (int i = 1; i < len; i++) {
for (int j = 0; j < bytes.length; j++) {
Unsafe.getUnsafe().putByte(mem + j, bytes[j]);
}
if (!endWithEOL) {
Unsafe.getUnsafe().putByte(mem + bytes.length, (byte) '\n');
}
sink.clear();
resetParser(mem);
parseMeasurement(mem + i);
Assert.assertTrue(parseMeasurement(mem + len));
Assert.assertNotNull(lastErrorCode);
}
} finally {
Unsafe.free(mem, len, MemoryTag.NATIVE_DEFAULT);
}
}
private void resetParser(long mem) {
lastErrorCode = null;
onErrorLine = false;
startOfLineAddr = mem;
lineTcpParser.of(mem);
}
private boolean parseMeasurement(long fullBuffer, long parseBuffer, long buffersLen, long parseLen, long prevParseLen) {
long shl = parseLen - prevParseLen;
// This will copy ILP data from fullBuffer to parseBuffer so that the data ends at the end of the buffer
long parseHi = parseBuffer + buffersLen;
Vect.memmove(parseHi - parseLen, parseHi - prevParseLen, prevParseLen);
Vect.memcpy(fullBuffer + prevParseLen, parseHi - shl, shl);
// bufHi always the same, data alwasy ends at the end of the buffer
// the only difference from iteration to iteration is where the data starts, which is set in shl
lineTcpParser.shl(shl);
return parseMeasurement(parseHi);
}
private boolean parseMeasurement(long bufHi) {
while (lineTcpParser.getBufferAddress() < bufHi) {
ParseResult rc;
......@@ -346,57 +369,36 @@ public class LineTcpParser2Test extends LineUdpLexerTest {
case ERROR:
Assert.assertFalse(onErrorLine);
onErrorLine = true;
lastErrorCode = lineTcpParser.getErrorCode();
StringSink tmpSink = new StringSink();
if (Chars.utf8Decode(startOfLineAddr, lineTcpParser.getBufferAddress(), tmpSink)) {
sink.put(tmpSink.toString());
}
sink.put("-- error --\n");
sink.put("--ERROR=");
sink.put(lineTcpParser.getErrorCode().toString());
sink.put("--");
break;
}
}
return true;
}
private void assembleLine() {
int nEntities = lineTcpParser.getnEntities();
Chars.utf8Decode(lineTcpParser.getMeasurementName().getLo(), lineTcpParser.getMeasurementName().getHi(), sink);
int n = 0;
boolean tagsComplete = false;
while (n < nEntities) {
ProtoEntity entity = lineTcpParser.getEntity(n++);
if (!tagsComplete && entity.getType() != LineTcpParser.ENTITY_TYPE_TAG) {
tagsComplete = true;
sink.put(' ');
} else {
sink.put(',');
}
Chars.utf8Decode(entity.getName().getLo(), entity.getName().getHi(), sink);
sink.put('=');
switch (entity.getType()) {
case LineTcpParser.ENTITY_TYPE_STRING:
sink.put('"');
Chars.utf8Decode(entity.getValue().getLo(), entity.getValue().getHi(), sink);
sink.put('"');
break;
case LineTcpParser.ENTITY_TYPE_INTEGER:
case LineTcpParser.ENTITY_TYPE_LONG256:
sink.put(entity.getValue()).put('i');
break;
default:
Chars.utf8Decode(entity.getValue().getLo(), entity.getValue().getHi(), sink);
break;
}
}
private boolean parseMeasurement(long fullBuffer, long parseBuffer, long buffersLen, long parseLen, long prevParseLen) {
long shl = parseLen - prevParseLen;
if (lineTcpParser.hasTimestamp()) {
sink.put(' ');
Numbers.append(sink, lineTcpParser.getTimestamp());
}
// This will copy ILP data from fullBuffer to parseBuffer so that the data ends at the end of the buffer
long parseHi = parseBuffer + buffersLen;
Vect.memmove(parseHi - parseLen, parseHi - prevParseLen, prevParseLen);
Vect.memcpy(fullBuffer + prevParseLen, parseHi - shl, shl);
if (lineTcpParser.hasNonAsciiChars()) {
sink.put("--non ascii--");
}
sink.put('\n');
// bufHi always the same, data alwasy ends at the end of the buffer
// the only difference from iteration to iteration is where the data starts, which is set in shl
lineTcpParser.shl(shl);
return parseMeasurement(parseHi);
}
private void resetParser(long mem) {
onErrorLine = false;
startOfLineAddr = mem;
lineTcpParser.of(mem);
}
}
......@@ -52,6 +52,7 @@ import io.questdb.network.NetworkError;
import io.questdb.std.*;
import io.questdb.std.datetime.microtime.MicrosecondClock;
import io.questdb.std.datetime.microtime.TimestampFormatUtils;
import io.questdb.std.datetime.microtime.Timestamps;
import io.questdb.std.str.Path;
import io.questdb.std.str.StringSink;
import io.questdb.test.tools.TestUtils;
......@@ -62,6 +63,7 @@ import org.junit.Test;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.security.PrivateKey;
import java.sql.Time;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.LockSupport;
......@@ -503,6 +505,7 @@ public class LineTcpReceiverTest extends AbstractCairoTest {
null)) {
String expected =
"out\ttimestamp\tin\n" +
"1.0\t1989-12-31T23:26:40.000000Z\tNaN\n" +
"NaN\t1990-01-01T00:00:00.000000Z\t2.0\n" +
"NaN\t1990-01-01T02:13:20.000000Z\t3.0\n" +
"NaN\t1990-01-01T05:00:00.000000Z\t4.0\n";
......@@ -779,6 +782,125 @@ public class LineTcpReceiverTest extends AbstractCairoTest {
});
}
@Test
public void testWithTcpSender() throws Exception {
runInContext((receiver) -> {
send(receiver, "table", WAIT_ENGINE_TABLE_RELEASE, () -> {
try (LineTcpSender lineTcpSender = new LineTcpSender(Net.parseIPv4("127.0.0.1"), bindPort, msgBufferSize)) {
lineTcpSender
.metric("table")
.tag("tag1", "value 1")
.tag("tag=2", "значение 2")
.field("поле=3", "{\"ключ\": \"число\"}")
.$(0);
lineTcpSender
.metric("table")
.tag("tag1", "value 2")
.$(0);
lineTcpSender
.metric("table")
.tag("tag 2", "value=\2") // Invalid column name, last line is not saved
.$(Timestamps.DAY_MICROS * 1000L);
lineTcpSender.flush();
}
});
String expected = "tag1\ttag=2\tполе=3\ttimestamp\n" +
"value 1\tзначение 2\t{\"ключ\": \"число\"}\t1970-01-01T00:00:00.000000Z\n" +
"value 2\t\t\t1970-01-01T00:00:00.000000Z\n";
assertTable(expected, "table");
});
}
@Test
public void testNewPartitionRowCancelledTwice() throws Exception {
runInContext((receiver) -> {
send(receiver, "table", WAIT_ENGINE_TABLE_RELEASE, () -> {
try (LineTcpSender lineTcpSender = new LineTcpSender(Net.parseIPv4("127.0.0.1"), bindPort, msgBufferSize)) {
lineTcpSender
.metric("table")
.tag("tag1", "value 1")
.tag("tag=2", "значение 2")
.field("поле=3", "{\"ключ\": \"число\"}")
.$(0);
lineTcpSender
.metric("table")
.tag("tag1", "value 2")
.$(0);
lineTcpSender
.metric("table")
.tag("tag 2", "value=\2") // Invalid column name, last line is not saved
.$(Timestamps.DAY_MICROS * 1000L);
// Repeat
lineTcpSender
.metric("table")
.tag("tag 2", "value=\2") // Invalid column name, last line is not saved
.$(Timestamps.DAY_MICROS * 1000L);
lineTcpSender.flush();
}
});
String expected = "tag1\ttag=2\tполе=3\ttimestamp\n" +
"value 1\tзначение 2\t{\"ключ\": \"число\"}\t1970-01-01T00:00:00.000000Z\n" +
"value 2\t\t\t1970-01-01T00:00:00.000000Z\n";
assertTable(expected, "table");
});
}
@Test
public void testTcpSenderQuotedTagValue() throws Exception {
runInContext((receiver) -> {
send(receiver, "table", WAIT_ENGINE_TABLE_RELEASE, () -> {
try (LineTcpSender lineTcpSender = new LineTcpSender(Net.parseIPv4("127.0.0.1"), bindPort, msgBufferSize)) {
lineTcpSender
.metric("table")
.tag("tag1", "\"value 1\"")
.$(0);
lineTcpSender.flush();
}
});
String expected = "tag1\ttimestamp\n" +
"\"value 1\"\t1970-01-01T00:00:00.000000Z\n";
assertTable(expected, "table");
});
}
@Test
public void testFirstRowIsCancelled() throws Exception {
runInContext((receiver) -> {
send(receiver, "table", WAIT_ENGINE_TABLE_RELEASE, () -> {
try (LineTcpSender lineTcpSender = new LineTcpSender(Net.parseIPv4("127.0.0.1"), bindPort, msgBufferSize)) {
lineTcpSender
.metric("table")
.tag("tag 2", "value=\2") // Invalid column name, line is not saved
.$(0);
lineTcpSender
.metric("table")
.tag("tag1", "value 1")
.tag("tag=2", "значение 2")
.field("поле=3", "{\"ключ\": \"число\"}")
.$(0);
lineTcpSender
.metric("table")
.tag("tag1", "value 2")
.$(0);
lineTcpSender
.metric("table")
.tag("tag=2", "value=\\2")
.$(Timestamps.DAY_MICROS * 1000L);
lineTcpSender.flush();
}
});
String expected = "tag1\ttag=2\tполе=3\ttimestamp\n" +
"value 1\tзначение 2\t{\"ключ\": \"число\"}\t1970-01-01T00:00:00.000000Z\n" +
"value 2\t\t\t1970-01-01T00:00:00.000000Z\n" +
"\tvalue=\\2\t\t1970-01-02T00:00:00.000000Z\n";
assertTable(expected, "table");
});
}
private void assertTable(CharSequence expected, CharSequence tableName) {
try (TableReader reader = engine.getReader(AllowAllCairoSecurityContext.INSTANCE, tableName)) {
assertCursorTwoPass(expected, reader.getCursor(), reader.getMetadata());
......@@ -816,14 +938,22 @@ public class LineTcpReceiverTest extends AbstractCairoTest {
}
private void send(LineTcpReceiver receiver, String lineData, String tableName, int wait) {
send(receiver, lineData, tableName, wait, true);
send(receiver, tableName, wait, () -> {
sendToSocket(lineData, true);
});
}
private void send(LineTcpReceiver receiver, String lineData, String tableName, int wait, boolean nolinger) {
send(receiver, tableName, wait, () -> {
sendToSocket(lineData, nolinger);
});
}
public static final int WAIT_NO_WAIT = 0;
public static final int WAIT_ENGINE_TABLE_RELEASE = 1;
public static final int WAIT_ILP_TABLE_RELEASE = 2;
private void send(LineTcpReceiver receiver, String lineData, String tableName, int wait, boolean noLinger) {
private void send(LineTcpReceiver receiver, String tableName, int wait, Runnable sendToSocket) {
SOCountDownLatch releaseLatch = new SOCountDownLatch(1);
switch (wait) {
case WAIT_ENGINE_TABLE_RELEASE:
......@@ -845,26 +975,7 @@ public class LineTcpReceiverTest extends AbstractCairoTest {
}
try {
int ipv4address = Net.parseIPv4("127.0.0.1");
long sockaddr = Net.sockaddr(ipv4address, bindPort);
long fd = Net.socketTcp(true);
try {
TestUtils.assertConnect(fd, sockaddr, noLinger);
byte[] lineDataBytes = lineData.getBytes(StandardCharsets.UTF_8);
long bufaddr = Unsafe.malloc(lineDataBytes.length, MemoryTag.NATIVE_DEFAULT);
try {
for (int n = 0; n < lineDataBytes.length; n++) {
Unsafe.getUnsafe().putByte(bufaddr + n, lineDataBytes[n]);
}
int rc = Net.send(fd, bufaddr, lineDataBytes.length);
Assert.assertEquals(lineDataBytes.length, rc);
} finally {
Unsafe.free(bufaddr, lineDataBytes.length, MemoryTag.NATIVE_DEFAULT);
}
} finally {
Net.close(fd);
Net.freeSockAddr(sockaddr);
}
sendToSocket.run();
if (wait != WAIT_NO_WAIT) {
releaseLatch.await();
}
......@@ -880,6 +991,29 @@ public class LineTcpReceiverTest extends AbstractCairoTest {
}
}
private void sendToSocket(String lineData, boolean noLinger) {
int ipv4address = Net.parseIPv4("127.0.0.1");
long sockaddr = Net.sockaddr(ipv4address, bindPort);
long fd = Net.socketTcp(true);
try {
TestUtils.assertConnect(fd, sockaddr, noLinger);
byte[] lineDataBytes = lineData.getBytes(StandardCharsets.UTF_8);
long bufaddr = Unsafe.malloc(lineDataBytes.length, MemoryTag.NATIVE_DEFAULT);
try {
for (int n = 0; n < lineDataBytes.length; n++) {
Unsafe.getUnsafe().putByte(bufaddr + n, lineDataBytes[n]);
}
int rc = Net.send(fd, bufaddr, lineDataBytes.length);
Assert.assertEquals(lineDataBytes.length, rc);
} finally {
Unsafe.free(bufaddr, lineDataBytes.length, MemoryTag.NATIVE_DEFAULT);
}
} finally {
Net.close(fd);
Net.freeSockAddr(sockaddr);
}
}
private void send(LineTcpReceiver receiver, String lineData, String tableName) {
send(receiver, lineData, tableName, WAIT_ENGINE_TABLE_RELEASE);
}
......
......@@ -170,11 +170,6 @@ public class LineUdpLexerTest {
assertError("measurement 10000", LineUdpParser.EVT_FIELD_NAME, LineUdpParser.ERROR_EXPECTED, 12);
}
@Test
public void testNoFields4() {
assertError("measurement,tag=x 10000", LineUdpParser.EVT_FIELD_NAME, LineUdpParser.ERROR_EXPECTED, 23);
}
@Test
public void testNoMeasure1() {
assertError("tag=value field=x 10000\n", LineUdpParser.EVT_MEASUREMENT, LineUdpParser.ERROR_EXPECTED, 3);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册