未验证 提交 b0688478 编写于 作者: A Alex Pelagenko 提交者: GitHub

fix(o3): fixing o3 crash from ilp usage (#1335)

上级 962a7514
...@@ -1553,9 +1553,13 @@ public class TableWriter implements Closeable { ...@@ -1553,9 +1553,13 @@ public class TableWriter implements Closeable {
return; return;
} }
if (hasO3()) { if (o3MasterRef > -1) {
masterRef--; if (hasO3()) {
setO3AppendPosition(getO3RowCount()); // O3 mode and there are some rows.
masterRef--;
setO3AppendPosition(getO3RowCount());
}
// We're in O3 mode but no rows added yet. Return.
return; return;
} }
...@@ -3000,15 +3004,16 @@ public class TableWriter implements Closeable { ...@@ -3000,15 +3004,16 @@ public class TableWriter implements Closeable {
private long o3MoveUncommitted(final int timestampIndex) { private long o3MoveUncommitted(final int timestampIndex) {
final long committedRowCount = txFile.getCommittedFixedRowCount() + txFile.getCommittedTransientRowCount(); final long committedRowCount = txFile.getCommittedFixedRowCount() + txFile.getCommittedTransientRowCount();
final long rowsAdded = txFile.getRowCount() - committedRowCount; final long rowsAdded = txFile.getRowCount() - committedRowCount;
final long committedTransientRowCount = txFile.getTransientRowCount() - Math.min(txFile.getTransientRowCount(), rowsAdded); long transientRowsAdded = Math.min(txFile.getTransientRowCount(), rowsAdded);
if (Math.min(txFile.getTransientRowCount(), rowsAdded) > 0) { final long committedTransientRowCount = txFile.getTransientRowCount() - transientRowsAdded;
if (transientRowsAdded > 0) {
LOG.debug() LOG.debug()
.$("o3 move uncommitted [table=").$(tableName) .$("o3 move uncommitted [table=").$(tableName)
.$(", transientRowsAdded=").$(Math.min(txFile.getTransientRowCount(), rowsAdded)) .$(", transientRowsAdded=").$(transientRowsAdded)
.I$(); .I$();
return o3ScheduleMoveUncommitted0( return o3ScheduleMoveUncommitted0(
timestampIndex, timestampIndex,
Math.min(txFile.getTransientRowCount(), rowsAdded), transientRowsAdded,
committedTransientRowCount committedTransientRowCount
); );
} }
...@@ -3382,8 +3387,18 @@ public class TableWriter implements Closeable { ...@@ -3382,8 +3387,18 @@ public class TableWriter implements Closeable {
size = o3RowCount << ColumnType.pow2SizeOf(columnType); size = o3RowCount << ColumnType.pow2SizeOf(columnType);
} else { } else {
// Var size column // Var size column
size = o3IndexMem.getLong(o3RowCount * 8); if (o3RowCount > 0) {
o3IndexMem.jumpTo(o3RowCount * 8); // Usually we would find var col size of row count as (index[count] - index[count-1])
// but the record index[count] may not exist yet
// so the data size has to be calculated as (index[count-1] + len(data[count-1]) + 4)
// where len(data[count-1]) can be read as the int from var col data at offset index[count-1]
long prevOffset = o3IndexMem.getLong((o3RowCount - 1) * 8);
size = prevOffset + 2L * o3DataMem.getInt(prevOffset) + 4L;
o3IndexMem.jumpTo(o3RowCount * 8);
} else {
size = 0;
o3IndexMem.jumpTo(0);
}
} }
o3DataMem.jumpTo(size); o3DataMem.jumpTo(size);
...@@ -3560,6 +3575,12 @@ public class TableWriter implements Closeable { ...@@ -3560,6 +3575,12 @@ public class TableWriter implements Closeable {
final long tgtDataSize = dataMem2.size(); final long tgtDataSize = dataMem2.size();
final long tgtIndxAddr = indexMem2.resize(valueCount * Long.BYTES); final long tgtIndxAddr = indexMem2.resize(valueCount * Long.BYTES);
final long tgtIndxSize = indexMem2.size(); final long tgtIndxSize = indexMem2.size();
assert srcDataAddr != 0;
assert srcIndxAddr != 0;
assert tgtDataAddr != 0;
assert tgtIndxAddr != 0;
// add max offset so that we do not have conditionals inside loop // add max offset so that we do not have conditionals inside loop
indexMem.putLong(valueCount * Long.BYTES, dataSize); indexMem.putLong(valueCount * Long.BYTES, dataSize);
final long offset = Vect.sortVarColumn( final long offset = Vect.sortVarColumn(
......
...@@ -53,6 +53,7 @@ import io.questdb.std.datetime.microtime.TimestampFormatUtils; ...@@ -53,6 +53,7 @@ import io.questdb.std.datetime.microtime.TimestampFormatUtils;
import io.questdb.std.str.Path; import io.questdb.std.str.Path;
import io.questdb.std.str.StringSink; import io.questdb.std.str.StringSink;
import io.questdb.test.tools.TestUtils; import io.questdb.test.tools.TestUtils;
import org.junit.After;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
...@@ -169,6 +170,11 @@ public class LineTcpServerTest extends AbstractCairoTest { ...@@ -169,6 +170,11 @@ public class LineTcpServerTest extends AbstractCairoTest {
private Path path; private Path path;
@After
public void cleanup() {
maxMeasurementSize = 50;
}
@Test @Test
public void testFieldsReducedNonPartitioned() throws Exception { public void testFieldsReducedNonPartitioned() throws Exception {
try (TableModel m = new TableModel(configuration, "weather", PartitionBy.NONE)) { try (TableModel m = new TableModel(configuration, "weather", PartitionBy.NONE)) {
...@@ -451,22 +457,17 @@ public class LineTcpServerTest extends AbstractCairoTest { ...@@ -451,22 +457,17 @@ public class LineTcpServerTest extends AbstractCairoTest {
@Test @Test
public void testWriter17Fields() throws Exception { public void testWriter17Fields() throws Exception {
int defaultMeasurementSize = maxMeasurementSize; maxMeasurementSize = 1024;
String lineData = "tableCRASH,tag_n_1=1,tag_n_2=2,tag_n_3=3,tag_n_4=4,tag_n_5=5,tag_n_6=6," + String lineData = "tableCRASH,tag_n_1=1,tag_n_2=2,tag_n_3=3,tag_n_4=4,tag_n_5=5,tag_n_6=6," +
"tag_n_7=7,tag_n_8=8,tag_n_9=9,tag_n_10=10,tag_n_11=11,tag_n_12=12,tag_n_13=13," + "tag_n_7=7,tag_n_8=8,tag_n_9=9,tag_n_10=10,tag_n_11=11,tag_n_12=12,tag_n_13=13," +
"tag_n_14=14,tag_n_15=15,tag_n_16=16,tag_n_17=17 value=42.4 1619509249714000000\n"; "tag_n_14=14,tag_n_15=15,tag_n_16=16,tag_n_17=17 value=42.4 1619509249714000000\n";
try { runInContext(() -> {
maxMeasurementSize = lineData.length(); send(lineData, "tableCRASH", true, false);
runInContext(() -> {
send(lineData, "tableCRASH", true, false);
String expected = "tag_n_1\ttag_n_2\ttag_n_3\ttag_n_4\ttag_n_5\ttag_n_6\ttag_n_7\ttag_n_8\ttag_n_9\ttag_n_10\ttag_n_11\ttag_n_12\ttag_n_13\ttag_n_14\ttag_n_15\ttag_n_16\ttag_n_17\tvalue\ttimestamp\n" + String expected = "tag_n_1\ttag_n_2\ttag_n_3\ttag_n_4\ttag_n_5\ttag_n_6\ttag_n_7\ttag_n_8\ttag_n_9\ttag_n_10\ttag_n_11\ttag_n_12\ttag_n_13\ttag_n_14\ttag_n_15\ttag_n_16\ttag_n_17\tvalue\ttimestamp\n" +
"1\t2\t3\t4\t5\t6\t7\t8\t9\t10\t11\t12\t13\t14\t15\t16\t17\t42.400000000000006\t2021-04-27T07:40:49.714000Z\n"; "1\t2\t3\t4\t5\t6\t7\t8\t9\t10\t11\t12\t13\t14\t15\t16\t17\t42.400000000000006\t2021-04-27T07:40:49.714000Z\n";
assertTable(expected, "tableCRASH"); assertTable(expected, "tableCRASH");
}); });
} finally {
maxMeasurementSize = defaultMeasurementSize;
}
} }
@Test @Test
...@@ -482,19 +483,13 @@ public class LineTcpServerTest extends AbstractCairoTest { ...@@ -482,19 +483,13 @@ public class LineTcpServerTest extends AbstractCairoTest {
CairoTestUtils.createTableWithVersion(m, ColumnType.VERSION); CairoTestUtils.createTableWithVersion(m, ColumnType.VERSION);
} }
int defaultMeasurementSize = maxMeasurementSize;
String lineData = "messages id=843530699759026177i,author=820703963477180437i,guild=820704412095479830i,channel=820704412095479833i,flags=6i\n"; String lineData = "messages id=843530699759026177i,author=820703963477180437i,guild=820704412095479830i,channel=820704412095479833i,flags=6i\n";
try { runInContext(() -> {
maxMeasurementSize = lineData.length(); send(lineData, "messages");
runInContext(() -> { String expected = "ts\tid\tauthor\tguild\tchannel\tflags\n" +
send(lineData, "messages"); "1970-01-01T00:00:00.000001Z\t843530699759026177\t820703963477180437\t820704412095479830\t820704412095479833\t6\n";
String expected = "ts\tid\tauthor\tguild\tchannel\tflags\n" + assertTable(expected, "messages");
"1970-01-01T00:00:00.000001Z\t843530699759026177\t820703963477180437\t820704412095479830\t820704412095479833\t6\n"; });
assertTable(expected, "messages");
});
} finally {
maxMeasurementSize = defaultMeasurementSize;
}
} }
@Test @Test
...@@ -641,6 +636,42 @@ public class LineTcpServerTest extends AbstractCairoTest { ...@@ -641,6 +636,42 @@ public class LineTcpServerTest extends AbstractCairoTest {
}); });
} }
@Test
public void testSymbolAddedInO3Mode() throws Exception {
maxMeasurementSize = 4096;
runInContext(() -> {
String lineData = "plug,room=6A watts=\"469\" 1631817902842\n" +
"plug,room=6A watts=\"3195\" 1631817296977\n" +
"plug,room=6A watts=\"3188\" 1631817599910\n" +
"plug,room=6A watts=\"3180\" 1631817902842\n" +
"plug,label=Power,room=6A watts=\"475\" 1631817478737\n";
send(lineData, "plug", true, false);
String expected = "room\twatts\ttimestamp\tlabel\n" +
"6A\t3195\t1970-01-01T00:27:11.817296Z\t\n" +
"6A\t475\t1970-01-01T00:27:11.817478Z\tPower\n" +
"6A\t3188\t1970-01-01T00:27:11.817599Z\t\n" +
"6A\t3180\t1970-01-01T00:27:11.817902Z\t\n" +
"6A\t469\t1970-01-01T00:27:11.817902Z\t\n";
assertTable(expected, "plug");
});
}
@Test
public void testSymbolAddedInO3ModeFirstRow() throws Exception {
maxMeasurementSize = 4096;
runInContext(() -> {
String lineData = "plug,room=6A watts=\"1\" 2631819999000\n" +
"plug,label=Power,room=6B watts=\"22\" 1631817902842\n";
send(lineData, "plug", true, false);
String expected = "room\twatts\ttimestamp\tlabel\n" +
"6B\t22\t1970-01-01T00:27:11.817902Z\tPower\n" +
"6A\t1\t1970-01-01T00:43:51.819999Z\t\n";
assertTable(expected, "plug");
});
}
private void assertTable(CharSequence expected, CharSequence tableName) { private void assertTable(CharSequence expected, CharSequence tableName) {
try (TableReader reader = engine.getReader(AllowAllCairoSecurityContext.INSTANCE, tableName)) { try (TableReader reader = engine.getReader(AllowAllCairoSecurityContext.INSTANCE, tableName)) {
assertCursorTwoPass(expected, reader.getCursor(), reader.getMetadata()); assertCursorTwoPass(expected, reader.getCursor(), reader.getMetadata());
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册