未验证 提交 5abad15a 编写于 作者: V Vlad Ilyushchenko 提交者: GitHub

fix(ilp): fixed TCP buffer underflow when space (' ') is escaped (#1143)

上级 e6a203c6
......@@ -46,7 +46,7 @@ public class LineTCPSenderMain {
// } else {
// sender.metric("weather2");
// }
sender.tag("location", "london").tag("by", "quest").field("temp", rnd.nextPositiveLong()).field("ok", rnd.nextPositiveInt()).$(Os.currentTimeMicros() * 1000);
sender.tag("location", "l ondon").tag("by", "quest").field("temp", rnd.nextPositiveLong()).field("ok", rnd.nextPositiveInt()).$(Os.currentTimeMicros() * 1000);
}
sender.flush();
}
......
......@@ -209,36 +209,12 @@ public class LineProtoSender extends AbstractCharSink implements Closeable {
public LineProtoSender tag(CharSequence tag, CharSequence value) {
if (hasMetric) {
put(',').putNameEscaped(tag).put('=').encodeUtf8(value);
return this;
}
throw CairoException.instance(0).put("metric expected");
}
public LineProtoSender tagEscaped(CharSequence tag, CharSequence value) {
if (hasMetric) {
put(',').putUtf8Escaped(tag).put('=').putUtf8Escaped(value);
put(',').encodeUtf8(tag).put('=').encodeUtf8(value);
return this;
}
throw CairoException.instance(0).put("metric expected");
}
private LineProtoSender putUtf8Escaped(CharSequence cs) {
for (int i = 0, n = cs.length(); i < n; i++) {
char c = cs.charAt(i);
switch (c) {
case ' ':
case ',':
case '=':
put('\\');
default:
putUtf8(c);
break;
}
}
return this;
}
private CharSink field(CharSequence name) {
if (hasMetric) {
if (noFields) {
......@@ -248,25 +224,22 @@ public class LineProtoSender extends AbstractCharSink implements Closeable {
put(',');
}
return putNameEscaped(name).put('=');
return encodeUtf8(name).put('=');
}
throw CairoException.instance(0).put("metric expected");
}
private LineProtoSender putNameEscaped(CharSequence name) {
for (int i = 0, n = name.length(); i < n; i++) {
char c = name.charAt(i);
switch (c) {
case ' ':
case ',':
case '=':
put('\\');
default:
put(c);
break;
}
@Override
protected void putUtf8Special(char c) {
switch (c) {
case ' ':
case ',':
case '=':
put('\\');
default:
put(c);
break;
}
return this;
}
private void send() {
......
......@@ -191,6 +191,8 @@ class LineTcpAuthConnectionContext extends LineTcpConnectionContext {
authenticated = true;
authState = AuthState.COMPLETE;
compactBuffer(recvBufStart + lineEnd + 1);
// we must reset start of measurement address
resetParser();
LOG.info().$('[').$(fd).$("] authentication success").$();
}
}
......
......@@ -45,17 +45,17 @@ class LineTcpConnectionContext implements IOContext, Mutable {
private final LineTcpMeasurementScheduler scheduler;
private final MillisecondClock milliClock;
private final DirectByteCharSequence byteCharSequence = new DirectByteCharSequence();
private final NewLineProtoParser protoParser = new NewLineProtoParser();
private final FloatingDirectCharSink charSink = new FloatingDirectCharSink();
protected long fd;
protected IODispatcher<LineTcpConnectionContext> dispatcher;
protected long recvBufStart;
protected long recvBufEnd;
protected long recvBufPos;
protected boolean peerDisconnected;
protected long recvBufStartOfMeasurement;
private long lastQueueFullLogMillis = 0;
private final NewLineProtoParser protoParser = new NewLineProtoParser();
private boolean goodMeasurement;
protected long recvBufStartOfMeasurement;
private final FloatingDirectCharSink charSink = new FloatingDirectCharSink();
LineTcpConnectionContext(LineTcpReceiverConfiguration configuration, LineTcpMeasurementScheduler scheduler) {
nf = configuration.getNetworkFacade();
......@@ -73,12 +73,6 @@ class LineTcpConnectionContext implements IOContext, Mutable {
resetParser();
}
private void resetParser() {
protoParser.of(recvBufStart);
goodMeasurement = true;
recvBufStartOfMeasurement = recvBufStart;
}
@Override
public void close() {
this.fd = -1;
......@@ -112,12 +106,28 @@ class LineTcpConnectionContext implements IOContext, Mutable {
return false;
}
protected final boolean compactBuffer(long recvBufNewStart) {
assert recvBufNewStart <= recvBufPos;
if (recvBufNewStart > recvBufStart) {
final int len = (int) (recvBufPos - recvBufNewStart);
/**
* Moves incompletely received measurement to start of the receive buffer. Also updates the state of the
* context and protocol parser such that all pointers that point to the incomplete measurement will remain
* valid. This allows protocol parser to resume execution from the point of where measurement ended abruptly
*
* @param recvBufStartOfMeasurement the address in receive buffer where incomplete measurement starts. Everything from
* this address to end of the receive buffer will be copied to the start of the
* receive buffer
* @return true if there was an incomplete measurement in the first place
*/
protected final boolean compactBuffer(long recvBufStartOfMeasurement) {
assert recvBufStartOfMeasurement <= recvBufPos;
if (recvBufStartOfMeasurement > recvBufStart) {
final long len = recvBufPos - recvBufStartOfMeasurement;
if (len > 0) {
Vect.memcpy(recvBufNewStart, recvBufStart, len);
Vect.memcpy(recvBufStartOfMeasurement, recvBufStart, len);
final long shl = recvBufStartOfMeasurement - recvBufStart;
protoParser.shl(shl);
this.recvBufStartOfMeasurement -= shl;
} else {
assert len == 0;
resetParser();
}
recvBufPos = recvBufStart + len;
return true;
......@@ -146,6 +156,13 @@ class LineTcpConnectionContext implements IOContext, Mutable {
return parseMeasurements(netIoJob);
}
LineTcpConnectionContext of(long clientFd, IODispatcher<LineTcpConnectionContext> dispatcher) {
this.fd = clientFd;
this.dispatcher = dispatcher;
clear();
return this;
}
protected final IOContextResult parseMeasurements(NetworkIOJob netIoJob) {
while (true) {
try {
......@@ -182,13 +199,11 @@ class LineTcpConnectionContext implements IOContext, Mutable {
}
case BUFFER_UNDERFLOW: {
if (recvBufPos == recvBufEnd) {
if (!compactBuffer(recvBufStartOfMeasurement)) {
doHandleDisconnectEvent();
return IOContextResult.NEEDS_DISCONNECT;
}
resetParser();
if (recvBufPos == recvBufEnd && !compactBuffer(recvBufStartOfMeasurement)) {
doHandleDisconnectEvent();
return IOContextResult.NEEDS_DISCONNECT;
}
if (!read()) {
if (peerDisconnected) {
return IOContextResult.NEEDS_DISCONNECT;
......@@ -205,13 +220,6 @@ class LineTcpConnectionContext implements IOContext, Mutable {
}
}
LineTcpConnectionContext of(long clientFd, IODispatcher<LineTcpConnectionContext> dispatcher) {
this.fd = clientFd;
this.dispatcher = dispatcher;
clear();
return this;
}
protected boolean read() {
int bufferRemaining = (int) (recvBufEnd - recvBufPos);
final int orig = bufferRemaining;
......@@ -228,6 +236,12 @@ class LineTcpConnectionContext implements IOContext, Mutable {
return !peerDisconnected;
}
protected void resetParser() {
protoParser.of(recvBufStart);
goodMeasurement = true;
recvBufStartOfMeasurement = recvBufStart;
}
enum IOContextResult {
NEEDS_READ, NEEDS_WRITE, QUEUE_FULL, NEEDS_DISCONNECT
}
......
......@@ -34,16 +34,6 @@ import io.questdb.std.str.DirectByteCharSequence;
public class NewLineProtoParser implements Closeable {
public static final long NULL_TIMESTAMP = Long.MIN_VALUE;
public enum ParseResult {
MEASUREMENT_COMPLETE, BUFFER_UNDERFLOW, ERROR
}
public enum ErrorCode {
EMPTY_LINE, NO_FIELDS, INCOMPLETE_TAG, INCOMPLETE_FIELD, INVALID_FIELD_SEPERATOR, INVALID_TIMESTAMP, INVALID_FIELD_VALUE
}
private static final byte ENTITY_TYPE_NONE = (byte) 0xff;
public static final byte ENTITY_TYPE_TAG = 0;
public static final byte ENTITY_TYPE_FLOAT = 1;
public static final byte ENTITY_TYPE_INTEGER = 2;
......@@ -52,10 +42,11 @@ public class NewLineProtoParser implements Closeable {
public static final byte ENTITY_TYPE_LONG256 = 5;
public static final byte ENTITY_TYPE_CACHED_TAG = 6;
public static final int N_ENTITY_TYPES = ENTITY_TYPE_CACHED_TAG + 1;
private static final byte ENTITY_TYPE_NONE = (byte) 0xff;
private final DirectByteCharSequence measurementName = new DirectByteCharSequence();
private final DirectByteCharSequence charSeq = new DirectByteCharSequence();
private final ObjList<ProtoEntity> entityCache = new ObjList<>();
private final EntityHandler entityEndOfLineHandler = this::expectEndOfLine;
private long bufAt;
private long entityLo;
private boolean tagsComplete;
......@@ -64,13 +55,44 @@ public class NewLineProtoParser implements Closeable {
private ProtoEntity currentEntity;
private ErrorCode errorCode;
private EntityHandler entityHandler;
private long timestamp;
private final EntityHandler entityTableHandler = this::expectTableName;
private final EntityHandler entityNameHandler = this::expectEntityName;
private final EntityHandler entityValueHandler = this::expectEntityValue;
private long timestamp;
private final EntityHandler entityTimestampHandler = this::expectTimestamp;
private final EntityHandler entityEndOfLineHandler = this::expectEndOfLine;
private final EntityHandler entityValueHandler = this::expectEntityValue;
private final EntityHandler entityNameHandler = this::expectEntityName;
@Override
public void close() {
}
public long getBufferAddress() {
return bufAt;
}
public ProtoEntity getEntity(int n) {
assert n < nEntities;
return entityCache.get(n);
}
public ErrorCode getErrorCode() {
return errorCode;
}
public DirectByteCharSequence getMeasurementName() {
return measurementName;
}
public long getTimestamp() {
return timestamp;
}
public int getnEntities() {
return nEntities;
}
public boolean hasTimestamp() {
return timestamp != NULL_TIMESTAMP;
}
public NewLineProtoParser of(long bufLo) {
this.bufAt = bufLo - 1;
......@@ -132,16 +154,14 @@ public class NewLineProtoParser implements Closeable {
return ParseResult.BUFFER_UNDERFLOW;
}
public void startNextMeasurement() {
bufAt++;
nEscapedChars = 0;
entityLo = bufAt;
errorCode = null;
tagsComplete = false;
nEntities = 0;
currentEntity = null;
entityHandler = entityTableHandler;
timestamp = NULL_TIMESTAMP;
public void shl(long shl) {
bufAt -= shl;
entityLo -= shl;
measurementName.shl(shl);
charSeq.shl(shl);
for (int i = 0; i < nEntities; i++) {
entityCache.getQuick(i).shl(shl);
}
}
public ParseResult skipMeasurement(long bufHi) {
......@@ -156,53 +176,21 @@ public class NewLineProtoParser implements Closeable {
return ParseResult.BUFFER_UNDERFLOW;
}
public long getBufferAddress() {
return bufAt;
}
public DirectByteCharSequence getMeasurementName() {
return measurementName;
}
public int getnEntities() {
return nEntities;
}
public ProtoEntity getEntity(int n) {
assert n < nEntities;
return entityCache.get(n);
}
public boolean hasTimestamp() {
return timestamp != NULL_TIMESTAMP;
}
public long getTimestamp() {
return timestamp;
}
public ErrorCode getErrorCode() {
return errorCode;
}
@Override
public void close() {
public void startNextMeasurement() {
bufAt++;
nEscapedChars = 0;
entityLo = bufAt;
errorCode = null;
tagsComplete = false;
nEntities = 0;
currentEntity = null;
entityHandler = entityTableHandler;
timestamp = NULL_TIMESTAMP;
}
private boolean expectTableName(byte endOfEntityByte) {
tagsComplete = endOfEntityByte == (byte) ' ';
if (endOfEntityByte == (byte) ',' || tagsComplete) {
measurementName.of(entityLo, bufAt);
entityHandler = entityNameHandler;
return true;
}
if (entityLo == bufAt) {
errorCode = ErrorCode.EMPTY_LINE;
} else {
errorCode = ErrorCode.NO_FIELDS;
}
return false;
private boolean expectEndOfLine(byte endOfEntityByte) {
assert endOfEntityByte == '\n';
return true;
}
private boolean expectEntityName(byte endOfEntityByte) {
......@@ -270,6 +258,22 @@ public class NewLineProtoParser implements Closeable {
return false;
}
private boolean expectTableName(byte endOfEntityByte) {
tagsComplete = endOfEntityByte == (byte) ' ';
if (endOfEntityByte == (byte) ',' || tagsComplete) {
measurementName.of(entityLo, bufAt);
entityHandler = entityNameHandler;
return true;
}
if (entityLo == bufAt) {
errorCode = ErrorCode.EMPTY_LINE;
} else {
errorCode = ErrorCode.NO_FIELDS;
}
return false;
}
private boolean expectTimestamp(byte endOfEntityByte) {
try {
if (endOfEntityByte == (byte) '\n') {
......@@ -285,9 +289,12 @@ public class NewLineProtoParser implements Closeable {
}
}
private boolean expectEndOfLine(byte endOfEntityByte) {
assert endOfEntityByte == '\n';
return true;
public enum ParseResult {
MEASUREMENT_COMPLETE, BUFFER_UNDERFLOW, ERROR
}
public enum ErrorCode {
EMPTY_LINE, NO_FIELDS, INCOMPLETE_TAG, INCOMPLETE_FIELD, INVALID_FIELD_SEPERATOR, INVALID_TIMESTAMP, INVALID_FIELD_VALUE
}
private interface EntityHandler {
......@@ -302,24 +309,37 @@ public class NewLineProtoParser implements Closeable {
private boolean booleanValue;
private double floatValue;
private void setName() {
name.of(entityLo, bufAt - nEscapedChars);
public boolean getBooleanValue() {
return booleanValue;
}
private boolean setValue() {
assert type == ENTITY_TYPE_NONE;
long bufHi = bufAt - nEscapedChars;
int valueLen = (int) (bufHi - entityLo);
if (valueLen <= 0) {
return false;
}
value.of(entityLo, bufHi);
if (tagsComplete) {
byte lastByte = value.byteAt(valueLen - 1);
return parse(lastByte, valueLen);
}
type = ENTITY_TYPE_TAG;
return true;
public double getFloatValue() {
return floatValue;
}
public long getIntegerValue() {
return integerValue;
}
public DirectByteCharSequence getName() {
return name;
}
public byte getType() {
return type;
}
public DirectByteCharSequence getValue() {
return value;
}
public void shl(long shl) {
name.shl(shl);
value.shl(shl);
}
private void clear() {
type = ENTITY_TYPE_NONE;
}
private boolean parse(byte lastByte, int valueLen) {
......@@ -379,32 +399,24 @@ public class NewLineProtoParser implements Closeable {
}
}
private void clear() {
type = ENTITY_TYPE_NONE;
}
public byte getType() {
return type;
}
public DirectByteCharSequence getName() {
return name;
}
public DirectByteCharSequence getValue() {
return value;
}
public long getIntegerValue() {
return integerValue;
}
public double getFloatValue() {
return floatValue;
private void setName() {
name.of(entityLo, bufAt - nEscapedChars);
}
public boolean getBooleanValue() {
return booleanValue;
private boolean setValue() {
assert type == ENTITY_TYPE_NONE;
long bufHi = bufAt - nEscapedChars;
int valueLen = (int) (bufHi - entityLo);
if (valueLen <= 0) {
return false;
}
value.of(entityLo, bufHi);
if (tagsComplete) {
byte lastByte = value.byteAt(valueLen - 1);
return parse(lastByte, valueLen);
}
type = ENTITY_TYPE_TAG;
return true;
}
}
}
......@@ -387,7 +387,7 @@ public class TextLexer implements Closeable, Mutable {
private void shift(long d) {
for (int i = 0; i < fieldIndex; i++) {
fields.getQuick(i).lshift(d);
fields.getQuick(i).shl(d);
}
this.fieldLo -= d;
this.fieldHi -= d;
......
......@@ -32,7 +32,7 @@ public class FixedTimeZoneRule implements TimeZoneRules {
}
@Override
public long getOffset(long millis, int year, boolean leap) {
public long getOffset(long utc, int year, boolean leap) {
return offset;
}
......
......@@ -25,7 +25,7 @@
package io.questdb.std.datetime;
public interface TimeZoneRules {
long getOffset(long utcOffet, int year, boolean leap);
long getOffset(long utc, int year, boolean leap);
long getOffset(long utcOffset);
}
......@@ -113,20 +113,20 @@ public class TimeZoneRulesMicros implements TimeZoneRules {
}
@Override
public long getOffset(long utcOffet, int year, boolean leap) {
public long getOffset(long utc, int year, boolean leap) {
if (standardOffset != Long.MIN_VALUE) {
return standardOffset;
}
if (ruleCount > 0 && utcOffet > cutoffTransition) {
return fromRules(utcOffet, year, leap);
if (ruleCount > 0 && utc > cutoffTransition) {
return fromRules(utc, year, leap);
}
if (utcOffet > cutoffTransition) {
if (utc > cutoffTransition) {
return lastWall;
}
return fromHistory(utcOffet);
return fromHistory(utc);
}
@Override
......@@ -163,7 +163,7 @@ public class TimeZoneRulesMicros implements TimeZoneRules {
}
}
private long fromRules(long micros, int year, boolean leap) {
private long fromRules(long utcMicros, int year, boolean leap) {
int offset = 0;
......@@ -207,17 +207,17 @@ public class TimeZoneRulesMicros implements TimeZoneRules {
long delta = offsetAfter - offset;
if (delta > 0) {
if (micros < date) {
if (utcMicros < date) {
return offset * Timestamps.SECOND_MICROS;
}
if (micros < date + delta) {
if (utcMicros < date + delta) {
return (offsetAfter + delta) * Timestamps.SECOND_MICROS;
} else {
offset = offsetAfter;
}
} else {
if (micros < date) {
if (utcMicros < date) {
return offset * Timestamps.SECOND_MICROS;
} else {
offset = offsetAfter;
......
......@@ -113,20 +113,20 @@ public class TimeZoneRulesMillis implements TimeZoneRules {
}
@Override
public long getOffset(long millis, int year, boolean leap) {
public long getOffset(long utc, int year, boolean leap) {
if (standardOffset != Long.MIN_VALUE) {
return standardOffset;
}
if (ruleCount > 0 && millis > cutoffTransition) {
return fromRules(millis, year, leap);
if (ruleCount > 0 && utc > cutoffTransition) {
return fromRules(utc, year, leap);
}
if (millis > cutoffTransition) {
if (utc > cutoffTransition) {
return lastWall;
}
return fromHistory(millis);
return fromHistory(utc);
}
@Override
......
......@@ -273,7 +273,7 @@ public abstract class AbstractCharSink implements CharSink {
}
}
private int putUtf8Internal(CharSequence cs, int hi, int i, char c) {
protected int putUtf8Internal(CharSequence cs, int hi, int i, char c) {
if (c < 2048) {
put((char) (192 | c >> 6)).put((char) (128 | c & 63));
} else if (Character.isSurrogate(c)) {
......
......@@ -64,7 +64,7 @@ public class DirectByteCharSequence extends AbstractCharSequence implements Muta
return (char) byteAt(index);
}
public void lshift(long delta) {
public void shl(long delta) {
this.lo -= delta;
this.hi -= delta;
}
......
......@@ -334,13 +334,13 @@ public class LineTcpAuthConnectionContextTest extends AbstractCairoTest {
boolean authSequenceCompleted = authenticate(AUTH_KEY_ID1, AUTH_PRIVATE_KEY1, true, false, true, false, null);
Assert.assertTrue(authSequenceCompleted);
Assert.assertFalse(disconnected);
recvBuffer = "weather,location=us-midwest temperature=82 1465839830100400200\n";
recvBuffer = "weather,location=us\\ midwest temperature=82 1465839830100400200\n";
handleContextIO();
Assert.assertFalse(disconnected);
waitForIOCompletion();
closeContext();
String expected = "location\ttemperature\ttimestamp\n" +
"us-midwest\t82.0\t2016-06-13T17:43:50.100400Z\n";
"us midwest\t82.0\t2016-06-13T17:43:50.100400Z\n";
assertTable(expected);
});
}
......@@ -351,13 +351,13 @@ public class LineTcpAuthConnectionContextTest extends AbstractCairoTest {
boolean authSequenceCompleted = authenticate(AUTH_KEY_ID1, AUTH_PRIVATE_KEY1, false, true, true, false, null);
Assert.assertTrue(authSequenceCompleted);
Assert.assertFalse(disconnected);
recvBuffer = "weather,location=us-midwest temperature=82 1465839830100400200\n";
recvBuffer = "weather,location=us\\ midwest temperature=82 1465839830100400200\n";
handleContextIO();
Assert.assertFalse(disconnected);
waitForIOCompletion();
closeContext();
String expected = "location\ttemperature\ttimestamp\n" +
"us-midwest\t82.0\t2016-06-13T17:43:50.100400Z\n";
"us midwest\t82.0\t2016-06-13T17:43:50.100400Z\n";
assertTable(expected);
});
}
......@@ -368,13 +368,13 @@ public class LineTcpAuthConnectionContextTest extends AbstractCairoTest {
boolean authSequenceCompleted = authenticate(AUTH_KEY_ID1, AUTH_PRIVATE_KEY1, true, true, true, false, null);
Assert.assertTrue(authSequenceCompleted);
Assert.assertFalse(disconnected);
recvBuffer = "weather,location=us-midwest temperature=82 1465839830100400200\n";
recvBuffer = "weather,location=us\\ midwest temperature=82 1465839830100400200\n";
handleContextIO();
Assert.assertFalse(disconnected);
waitForIOCompletion();
closeContext();
String expected = "location\ttemperature\ttimestamp\n" +
"us-midwest\t82.0\t2016-06-13T17:43:50.100400Z\n";
"us midwest\t82.0\t2016-06-13T17:43:50.100400Z\n";
assertTable(expected);
});
}
......@@ -394,13 +394,13 @@ public class LineTcpAuthConnectionContextTest extends AbstractCairoTest {
}
Assert.assertFalse(disconnected);
recvBuffer = "weather,location=us-midwest temperature=82 1465839830100400200\n";
recvBuffer = "weather,location=us\\ midwest temperature=82 1465839830100400200\n";
handleContextIO();
Assert.assertFalse(disconnected);
waitForIOCompletion();
closeContext();
String expected = "location\ttemperature\ttimestamp\n" +
"us-midwest\t82.0\t2016-06-13T17:43:50.100400Z\n";
"us midwest\t82.0\t2016-06-13T17:43:50.100400Z\n";
assertTable(expected);
});
}
......
......@@ -250,7 +250,7 @@ public class LineTcpServerTest extends AbstractCairoTest {
} catch (AssertionError e) {
int releasedCount = -tableIndex.get(tableName).getCount();
// Wait one more writer release before re-trying to compare
wait(tableIndex.get(tableName), releasedCount + 1, 20, minIdleMsBeforeWriterRelease);
wait(tableIndex.get(tableName), releasedCount + 1, minIdleMsBeforeWriterRelease);
assertTable(expectedSB, tableName);
}
} catch (Throwable err) {
......@@ -266,10 +266,10 @@ public class LineTcpServerTest extends AbstractCairoTest {
});
}
private void wait(SOUnboundedCountDownLatch latch, int value, long msTimeout, long iterations) {
private void wait(SOUnboundedCountDownLatch latch, int value, long iterations) {
while (-latch.getCount() < value && iterations-- > 0) {
try {
Thread.sleep(msTimeout);
Thread.sleep(20);
} catch (InterruptedException e) {
e.printStackTrace();
return;
......@@ -559,7 +559,7 @@ public class LineTcpServerTest extends AbstractCairoTest {
this.authKeyId = authKeyId;
this.msgBufferSize = msgBufferSize;
assertMemoryLeak(() -> {
final String[] locations = {"london", "paris", "rome"};
final String[] locations = {"x london", "paris", "rome"};
final CharSequenceHashSet tables = new CharSequenceHashSet();
tables.add("weather1");
......
......@@ -226,16 +226,16 @@ public class LinuxLineProtoReceiverTest extends AbstractCairoTest {
private void assertReceive(LineUdpReceiverConfiguration receiverCfg, ReceiverFactory factory) throws Exception {
TestUtils.assertMemoryLeak(() -> {
final String expected = "colour\tshape\tsize\ttimestamp\n" +
"blue\tsquare\t3.4000000000000004\t1970-01-01T00:01:40.000000Z\n" +
"blue\tsquare\t3.4000000000000004\t1970-01-01T00:01:40.000000Z\n" +
"blue\tsquare\t3.4000000000000004\t1970-01-01T00:01:40.000000Z\n" +
"blue\tsquare\t3.4000000000000004\t1970-01-01T00:01:40.000000Z\n" +
"blue\tsquare\t3.4000000000000004\t1970-01-01T00:01:40.000000Z\n" +
"blue\tsquare\t3.4000000000000004\t1970-01-01T00:01:40.000000Z\n" +
"blue\tsquare\t3.4000000000000004\t1970-01-01T00:01:40.000000Z\n" +
"blue\tsquare\t3.4000000000000004\t1970-01-01T00:01:40.000000Z\n" +
"blue\tsquare\t3.4000000000000004\t1970-01-01T00:01:40.000000Z\n" +
"blue\tsquare\t3.4000000000000004\t1970-01-01T00:01:40.000000Z\n";
"blue\tx square\t3.4000000000000004\t1970-01-01T00:01:40.000000Z\n" +
"blue\tx square\t3.4000000000000004\t1970-01-01T00:01:40.000000Z\n" +
"blue\tx square\t3.4000000000000004\t1970-01-01T00:01:40.000000Z\n" +
"blue\tx square\t3.4000000000000004\t1970-01-01T00:01:40.000000Z\n" +
"blue\tx square\t3.4000000000000004\t1970-01-01T00:01:40.000000Z\n" +
"blue\tx square\t3.4000000000000004\t1970-01-01T00:01:40.000000Z\n" +
"blue\tx square\t3.4000000000000004\t1970-01-01T00:01:40.000000Z\n" +
"blue\tx square\t3.4000000000000004\t1970-01-01T00:01:40.000000Z\n" +
"blue\tx square\t3.4000000000000004\t1970-01-01T00:01:40.000000Z\n" +
"blue\tx square\t3.4000000000000004\t1970-01-01T00:01:40.000000Z\n";
try (CairoEngine engine = new CairoEngine(configuration)) {
......@@ -261,7 +261,7 @@ public class LinuxLineProtoReceiverTest extends AbstractCairoTest {
try (LineProtoSender sender = new LineProtoSender(NetworkFacadeImpl.INSTANCE, 0, Net.parseIPv4("127.0.0.1"), receiverCfg.getPort(), 1400, 1)) {
for (int i = 0; i < 10; i++) {
sender.metric("tab").tag("colour", "blue").tag("shape", "square").field("size", 3.4).$(100000000000L);
sender.metric("tab").tag("colour", "blue").tag("shape", "x square").field("size", 3.4).$(100000000000L);
}
sender.flush();
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册