From b0688478869edbc466d8af2a18764a318bcf7397 Mon Sep 17 00:00:00 2001 From: Alex Pelagenko <2159629+ideoma@users.noreply.github.com> Date: Sun, 19 Sep 2021 20:57:19 +0100 Subject: [PATCH] fix(o3): fixing o3 crash from ilp usage (#1335) --- .../java/io/questdb/cairo/TableWriter.java | 39 ++++++--- .../cutlass/line/tcp/LineTcpServerTest.java | 79 +++++++++++++------ 2 files changed, 85 insertions(+), 33 deletions(-) diff --git a/core/src/main/java/io/questdb/cairo/TableWriter.java b/core/src/main/java/io/questdb/cairo/TableWriter.java index 628e0dec3..b4245600a 100644 --- a/core/src/main/java/io/questdb/cairo/TableWriter.java +++ b/core/src/main/java/io/questdb/cairo/TableWriter.java @@ -1553,9 +1553,13 @@ public class TableWriter implements Closeable { return; } - if (hasO3()) { - masterRef--; - setO3AppendPosition(getO3RowCount()); + if (o3MasterRef > -1) { + if (hasO3()) { + // O3 mode and there are some rows. + masterRef--; + setO3AppendPosition(getO3RowCount()); + } + // We're in O3 mode but no rows added yet. Return. return; } @@ -3000,15 +3004,16 @@ public class TableWriter implements Closeable { private long o3MoveUncommitted(final int timestampIndex) { final long committedRowCount = txFile.getCommittedFixedRowCount() + txFile.getCommittedTransientRowCount(); final long rowsAdded = txFile.getRowCount() - committedRowCount; - final long committedTransientRowCount = txFile.getTransientRowCount() - Math.min(txFile.getTransientRowCount(), rowsAdded); - if (Math.min(txFile.getTransientRowCount(), rowsAdded) > 0) { + long transientRowsAdded = Math.min(txFile.getTransientRowCount(), rowsAdded); + final long committedTransientRowCount = txFile.getTransientRowCount() - transientRowsAdded; + if (transientRowsAdded > 0) { LOG.debug() .$("o3 move uncommitted [table=").$(tableName) - .$(", transientRowsAdded=").$(Math.min(txFile.getTransientRowCount(), rowsAdded)) + .$(", transientRowsAdded=").$(transientRowsAdded) .I$(); return o3ScheduleMoveUncommitted0( timestampIndex, - Math.min(txFile.getTransientRowCount(), rowsAdded), + transientRowsAdded, committedTransientRowCount ); } @@ -3382,8 +3387,18 @@ public class TableWriter implements Closeable { size = o3RowCount << ColumnType.pow2SizeOf(columnType); } else { // Var size column - size = o3IndexMem.getLong(o3RowCount * 8); - o3IndexMem.jumpTo(o3RowCount * 8); + if (o3RowCount > 0) { + // Usually we would find var col size of row count as (index[count] - index[count-1]) + // but the record index[count] may not exist yet + // so the data size has to be calculated as (index[count-1] + len(data[count-1]) + 4) + // where len(data[count-1]) can be read as the int from var col data at offset index[count-1] + long prevOffset = o3IndexMem.getLong((o3RowCount - 1) * 8); + size = prevOffset + 2L * o3DataMem.getInt(prevOffset) + 4L; + o3IndexMem.jumpTo(o3RowCount * 8); + } else { + size = 0; + o3IndexMem.jumpTo(0); + } } o3DataMem.jumpTo(size); @@ -3560,6 +3575,12 @@ public class TableWriter implements Closeable { final long tgtDataSize = dataMem2.size(); final long tgtIndxAddr = indexMem2.resize(valueCount * Long.BYTES); final long tgtIndxSize = indexMem2.size(); + + assert srcDataAddr != 0; + assert srcIndxAddr != 0; + assert tgtDataAddr != 0; + assert tgtIndxAddr != 0; + // add max offset so that we do not have conditionals inside loop indexMem.putLong(valueCount * Long.BYTES, dataSize); final long offset = Vect.sortVarColumn( diff --git a/core/src/test/java/io/questdb/cutlass/line/tcp/LineTcpServerTest.java b/core/src/test/java/io/questdb/cutlass/line/tcp/LineTcpServerTest.java index 7ff870b74..1fce93e1c 100644 --- a/core/src/test/java/io/questdb/cutlass/line/tcp/LineTcpServerTest.java +++ b/core/src/test/java/io/questdb/cutlass/line/tcp/LineTcpServerTest.java @@ -53,6 +53,7 @@ import io.questdb.std.datetime.microtime.TimestampFormatUtils; import io.questdb.std.str.Path; import io.questdb.std.str.StringSink; import io.questdb.test.tools.TestUtils; +import org.junit.After; import org.junit.Assert; import org.junit.Test; @@ -169,6 +170,11 @@ public class LineTcpServerTest extends AbstractCairoTest { private Path path; + @After + public void cleanup() { + maxMeasurementSize = 50; + } + @Test public void testFieldsReducedNonPartitioned() throws Exception { try (TableModel m = new TableModel(configuration, "weather", PartitionBy.NONE)) { @@ -451,22 +457,17 @@ public class LineTcpServerTest extends AbstractCairoTest { @Test public void testWriter17Fields() throws Exception { - int defaultMeasurementSize = maxMeasurementSize; + maxMeasurementSize = 1024; String lineData = "tableCRASH,tag_n_1=1,tag_n_2=2,tag_n_3=3,tag_n_4=4,tag_n_5=5,tag_n_6=6," + "tag_n_7=7,tag_n_8=8,tag_n_9=9,tag_n_10=10,tag_n_11=11,tag_n_12=12,tag_n_13=13," + "tag_n_14=14,tag_n_15=15,tag_n_16=16,tag_n_17=17 value=42.4 1619509249714000000\n"; - try { - maxMeasurementSize = lineData.length(); - runInContext(() -> { - send(lineData, "tableCRASH", true, false); + runInContext(() -> { + send(lineData, "tableCRASH", true, false); - String expected = "tag_n_1\ttag_n_2\ttag_n_3\ttag_n_4\ttag_n_5\ttag_n_6\ttag_n_7\ttag_n_8\ttag_n_9\ttag_n_10\ttag_n_11\ttag_n_12\ttag_n_13\ttag_n_14\ttag_n_15\ttag_n_16\ttag_n_17\tvalue\ttimestamp\n" + - "1\t2\t3\t4\t5\t6\t7\t8\t9\t10\t11\t12\t13\t14\t15\t16\t17\t42.400000000000006\t2021-04-27T07:40:49.714000Z\n"; - assertTable(expected, "tableCRASH"); - }); - } finally { - maxMeasurementSize = defaultMeasurementSize; - } + String expected = "tag_n_1\ttag_n_2\ttag_n_3\ttag_n_4\ttag_n_5\ttag_n_6\ttag_n_7\ttag_n_8\ttag_n_9\ttag_n_10\ttag_n_11\ttag_n_12\ttag_n_13\ttag_n_14\ttag_n_15\ttag_n_16\ttag_n_17\tvalue\ttimestamp\n" + + "1\t2\t3\t4\t5\t6\t7\t8\t9\t10\t11\t12\t13\t14\t15\t16\t17\t42.400000000000006\t2021-04-27T07:40:49.714000Z\n"; + assertTable(expected, "tableCRASH"); + }); } @Test @@ -482,19 +483,13 @@ public class LineTcpServerTest extends AbstractCairoTest { CairoTestUtils.createTableWithVersion(m, ColumnType.VERSION); } - int defaultMeasurementSize = maxMeasurementSize; String lineData = "messages id=843530699759026177i,author=820703963477180437i,guild=820704412095479830i,channel=820704412095479833i,flags=6i\n"; - try { - maxMeasurementSize = lineData.length(); - runInContext(() -> { - send(lineData, "messages"); - String expected = "ts\tid\tauthor\tguild\tchannel\tflags\n" + - "1970-01-01T00:00:00.000001Z\t843530699759026177\t820703963477180437\t820704412095479830\t820704412095479833\t6\n"; - assertTable(expected, "messages"); - }); - } finally { - maxMeasurementSize = defaultMeasurementSize; - } + runInContext(() -> { + send(lineData, "messages"); + String expected = "ts\tid\tauthor\tguild\tchannel\tflags\n" + + "1970-01-01T00:00:00.000001Z\t843530699759026177\t820703963477180437\t820704412095479830\t820704412095479833\t6\n"; + assertTable(expected, "messages"); + }); } @Test @@ -641,6 +636,42 @@ public class LineTcpServerTest extends AbstractCairoTest { }); } + @Test + public void testSymbolAddedInO3Mode() throws Exception { + maxMeasurementSize = 4096; + runInContext(() -> { + String lineData = "plug,room=6A watts=\"469\" 1631817902842\n" + + "plug,room=6A watts=\"3195\" 1631817296977\n" + + "plug,room=6A watts=\"3188\" 1631817599910\n" + + "plug,room=6A watts=\"3180\" 1631817902842\n" + + "plug,label=Power,room=6A watts=\"475\" 1631817478737\n"; + send(lineData, "plug", true, false); + + String expected = "room\twatts\ttimestamp\tlabel\n" + + "6A\t3195\t1970-01-01T00:27:11.817296Z\t\n" + + "6A\t475\t1970-01-01T00:27:11.817478Z\tPower\n" + + "6A\t3188\t1970-01-01T00:27:11.817599Z\t\n" + + "6A\t3180\t1970-01-01T00:27:11.817902Z\t\n" + + "6A\t469\t1970-01-01T00:27:11.817902Z\t\n"; + assertTable(expected, "plug"); + }); + } + + @Test + public void testSymbolAddedInO3ModeFirstRow() throws Exception { + maxMeasurementSize = 4096; + runInContext(() -> { + String lineData = "plug,room=6A watts=\"1\" 2631819999000\n" + + "plug,label=Power,room=6B watts=\"22\" 1631817902842\n"; + send(lineData, "plug", true, false); + + String expected = "room\twatts\ttimestamp\tlabel\n" + + "6B\t22\t1970-01-01T00:27:11.817902Z\tPower\n" + + "6A\t1\t1970-01-01T00:43:51.819999Z\t\n"; + assertTable(expected, "plug"); + }); + } + private void assertTable(CharSequence expected, CharSequence tableName) { try (TableReader reader = engine.getReader(AllowAllCairoSecurityContext.INSTANCE, tableName)) { assertCursorTwoPass(expected, reader.getCursor(), reader.getMetadata()); -- GitLab