From 65b9fba70e5b9d775c7813579f0b0e5925b73a0e Mon Sep 17 00:00:00 2001 From: Alex Pelagenko <2159629+ideoma@users.noreply.github.com> Date: Wed, 30 Mar 2022 18:11:34 +0100 Subject: [PATCH] fix(cairo): handle Table writer commit error in ILP (#1989) --- .../LineTCPSenderMainFileLimitSimulation.java | 164 ++++++++++++++++++ .../questdb/cairo/CommitFailedException.java | 39 +++++ .../line/tcp/LineTcpMeasurementEvent.java | 10 +- .../line/tcp/LineTcpMeasurementScheduler.java | 12 +- .../cutlass/line/tcp/LineTcpWriterJob.java | 65 +++---- .../cutlass/line/tcp/TableUpdateDetails.java | 35 +++- .../line/tcp/BaseLineTcpContextTest.java | 2 + .../tcp/LineTcpConnectionContextTest.java | 39 +++++ 8 files changed, 320 insertions(+), 46 deletions(-) create mode 100644 benchmarks/src/main/java/org/questdb/LineTCPSenderMainFileLimitSimulation.java create mode 100644 core/src/main/java/io/questdb/cairo/CommitFailedException.java diff --git a/benchmarks/src/main/java/org/questdb/LineTCPSenderMainFileLimitSimulation.java b/benchmarks/src/main/java/org/questdb/LineTCPSenderMainFileLimitSimulation.java new file mode 100644 index 000000000..aca236032 --- /dev/null +++ b/benchmarks/src/main/java/org/questdb/LineTCPSenderMainFileLimitSimulation.java @@ -0,0 +1,164 @@ +/******************************************************************************* + * ___ _ ____ ____ + * / _ \ _ _ ___ ___| |_| _ \| __ ) + * | | | | | | |/ _ \/ __| __| | | | _ \ + * | |_| | |_| | __/\__ \ |_| |_| | |_) | + * \__\_\\__,_|\___||___/\__|____/|____/ + * + * Copyright (c) 2014-2019 Appsicle + * Copyright (c) 2019-2022 QuestDB + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY t5, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + ******************************************************************************/ + +package org.questdb; + +import io.questdb.cutlass.line.LineTcpSender; +import io.questdb.griffin.model.IntervalUtils; +import io.questdb.network.Net; +import io.questdb.std.NumericException; +import io.questdb.std.Os; +import io.questdb.std.Rnd; +import io.questdb.std.datetime.microtime.Timestamps; +import io.questdb.std.str.StringSink; + +import java.util.concurrent.locks.LockSupport; + +/* +CREATE TABLE 'request_logs' ( + rid STRING, + auui SYMBOL capacity 256 CACHE index capacity 262144, + puui SYMBOL capacity 256 CACHE index capacity 262144, + buuid SYMBOL capacity 256 CACHE, + atuuid SYMBOL capacity 256 CACHE index capacity 262144, + creg STRING, node_uid SYMBOL capacity 256 CACHE index capacity 262144, + nnet SYMBOL capacity 256 CACHE, + nv STRING, + nr SYMBOL capacity 256 CACHE, + code1 SYMBOL capacity 256 CACHE, + name1 STRING, + id5 STRING, + il FLOAT, + iat FLOAT, + id3 STRING, + rpcm SYMBOL capacity 256 CACHE index capacity 262144, + id4 STRING, + origin STRING, + ts TIMESTAMP, + lo6 LONG, + rst6 SHORT, + t5 SYMBOL capacity 256 CACHE, + lo5 LONG, + b1 STRING, + b2 STRING, + mob BOOLEAN +) +timestamp (ts) PARTITION BY HOUR; + +alter TABLE 'request_logs' set PARAM maxUncommittedRows = 20000; + */ +public class LineTCPSenderMainFileLimitSimulation { + private static final StringSink sink = new StringSink(); + private static final String[] auui = new String[50]; + private static final String[] puui = new String[70]; + private static final String[] atuuid = new String[101]; + private static final String[] node_uid = new String[46]; + private static final String[] rpcm = new String[41]; + private static final String[] buuid = new String[71]; + private static final String[] nnet = new String[70]; + private static final String[] nr = new String[70]; + private static final String[] code1 = new String[70]; + private static final String[] t5 = new String[70]; + + public static void main(String[] args) throws NumericException { + Rnd rnd = new Rnd(); + generateStrings(rnd, auui, 16); + generateStrings(rnd, puui, 16); + generateStrings(rnd, atuuid, 16); + generateStrings(rnd, node_uid, 16); + generateStrings(rnd, rpcm, 8); + generateStrings(rnd, buuid, 8); + generateStrings(rnd, nnet, 8); + generateStrings(rnd, nr, 8); + generateStrings(rnd, code1, 8); + generateStrings(rnd, t5, 8); + String hostid4v4 = "127.0.0.1"; + int port = 9009; + int bufferCapacity = 8 * 1024; + + try (LineTcpSender sender = new LineTcpSender(Net.parseIPv4(hostid4v4), port, bufferCapacity)) { +// fillDates(rnd, sender); + + long ts = Os.currentTimeNanos(); + while (true) { + long shift = rnd.nextLong(Timestamps.HOUR_MICROS * 1000L / 3); + if (rnd.nextBoolean()) { + shift = 0; + } + sendLine(rnd, sender, ts - shift); + + LockSupport.parkNanos(10); + ts += 1000_000L; + } + } + } + + private static void sendLine(Rnd rnd, LineTcpSender sender, long ts) { + sender.metric("request_logs") + .tag("auui", auui[rnd.nextInt(auui.length)]) + .tag("puui", puui[rnd.nextInt(puui.length)]) + .tag("atuuid", atuuid[rnd.nextInt(atuuid.length)]) + .tag("node_uid", node_uid[rnd.nextInt(node_uid.length)]) + .tag("rpcm", rpcm[rnd.nextInt(rpcm.length)]) + .tag("buuid", buuid[rnd.nextInt(buuid.length)]) + .tag("nnet", nnet[rnd.nextInt(nnet.length)]) + .tag("nr", nr[rnd.nextInt(nr.length)]) + .tag("code1", code1[rnd.nextInt(code1.length)]) + .tag("t5", code1[rnd.nextInt(t5.length)]) + .field("rid", rnd.nextString(16)) + .field("nv", rnd.nextString(8)) + .field("name1", rnd.nextString(rnd.nextPositiveInt() % 15)) + .field("id5", rnd.nextString(rnd.nextPositiveInt() % 12)) + .field("il", rnd.nextFloat()) + .field("iat", rnd.nextFloat()) + .field("id3", rnd.nextString(rnd.nextPositiveInt() % 12)) + .field("id4", rnd.nextString(15)) + .field("origin", rnd.nextString(rnd.nextPositiveInt() % 10)) + .field("lo6", rnd.nextLong()) + .field("rst6", rnd.nextShort()) + .field("lo5", rnd.nextLong()) + .field("b1", rnd.nextString(rnd.nextPositiveInt() % 25)) + .field("b2", rnd.nextString(rnd.nextPositiveInt() % 15)) + .field("mob", rnd.nextBoolean()) + .$(ts); + } + + private static void fillDates(Rnd rnd, LineTcpSender sender) throws NumericException { + long period = Timestamps.MINUTE_MICROS * 1000L * 10; + long ts = IntervalUtils.parseFloorPartialDate("2022-02-25") * 1000L; + long endTs = IntervalUtils.parseFloorPartialDate("2022-03-26T20") * 1000L; + + while (ts < endTs) { + sendLine(rnd, sender, ts); + ts += period + rnd.nextLong(Timestamps.MINUTE_MICROS * 1000L); + } + sender.flush(); + } + + private static void generateStrings(Rnd rnd, String[] auui, int length) { + for (int i = 0; i < auui.length; i++) { + auui[i] = rnd.nextString(length); + } + } +} diff --git a/core/src/main/java/io/questdb/cairo/CommitFailedException.java b/core/src/main/java/io/questdb/cairo/CommitFailedException.java new file mode 100644 index 000000000..412f3af5b --- /dev/null +++ b/core/src/main/java/io/questdb/cairo/CommitFailedException.java @@ -0,0 +1,39 @@ +/******************************************************************************* + * ___ _ ____ ____ + * / _ \ _ _ ___ ___| |_| _ \| __ ) + * | | | | | | |/ _ \/ __| __| | | | _ \ + * | |_| | |_| | __/\__ \ |_| |_| | |_) | + * \__\_\\__,_|\___||___/\__|____/|____/ + * + * Copyright (c) 2014-2019 Appsicle + * Copyright (c) 2019-2022 QuestDB + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + ******************************************************************************/ + +package io.questdb.cairo; + +import io.questdb.std.ThreadLocal; + +public class CommitFailedException extends Exception { + private static final ThreadLocal tlException = new ThreadLocal<>(CommitFailedException::new); + + public static CommitFailedException instance(Throwable reason) { + CommitFailedException ex = tlException.get(); + assert (ex = new CommitFailedException()) != null; + ex.initCause(reason); + return ex; + } + +} \ No newline at end of file diff --git a/core/src/main/java/io/questdb/cutlass/line/tcp/LineTcpMeasurementEvent.java b/core/src/main/java/io/questdb/cutlass/line/tcp/LineTcpMeasurementEvent.java index cd3c194e6..514d5a888 100644 --- a/core/src/main/java/io/questdb/cutlass/line/tcp/LineTcpMeasurementEvent.java +++ b/core/src/main/java/io/questdb/cutlass/line/tcp/LineTcpMeasurementEvent.java @@ -24,10 +24,7 @@ package io.questdb.cutlass.line.tcp; -import io.questdb.cairo.CairoException; -import io.questdb.cairo.ColumnType; -import io.questdb.cairo.TableUtils; -import io.questdb.cairo.TableWriter; +import io.questdb.cairo.*; import io.questdb.cutlass.line.LineProtoTimestampAdapter; import io.questdb.log.Log; import io.questdb.log.LogFactory; @@ -85,7 +82,7 @@ class LineTcpMeasurementEvent implements Closeable { tableUpdateDetails.releaseWriter(commitOnWriterClose); } - void append() { + void append() throws CommitFailedException { TableWriter.Row row = null; try { TableWriter writer = tableUpdateDetails.getWriter(); @@ -208,10 +205,13 @@ class LineTcpMeasurementEvent implements Closeable { } row.append(); tableUpdateDetails.commitIfMaxUncommittedRowsCountReached(); + } catch (CommitFailedException commitFailedException) { + throw commitFailedException; } catch (Throwable th) { LOG.error() .$("could not write line protocol measurement [tableName=").$(tableUpdateDetails.getTableNameUtf16()) .$(", message=").$(th.getMessage()) + .$(th) .I$(); if (row != null) { row.cancel(); diff --git a/core/src/main/java/io/questdb/cutlass/line/tcp/LineTcpMeasurementScheduler.java b/core/src/main/java/io/questdb/cutlass/line/tcp/LineTcpMeasurementScheduler.java index c59539df9..8710422e8 100644 --- a/core/src/main/java/io/questdb/cutlass/line/tcp/LineTcpMeasurementScheduler.java +++ b/core/src/main/java/io/questdb/cutlass/line/tcp/LineTcpMeasurementScheduler.java @@ -198,16 +198,16 @@ class LineTcpMeasurementScheduler implements Closeable { return false; } - public boolean processWriterReleaseEvent(LineTcpMeasurementEvent event, int workerId) { + public void processWriterReleaseEvent(LineTcpMeasurementEvent event, int workerId) { tableUpdateDetailsLock.readLock().lock(); try { final TableUpdateDetails tab = event.getTableUpdateDetails(); if (tab.getWriterThreadId() != workerId) { - return true; + return; } - if (tableUpdateDetailsUtf16.keyIndex(tab.getTableNameUtf16()) < 0) { + if (!event.getTableUpdateDetails().isWriterInError() && tableUpdateDetailsUtf16.keyIndex(tab.getTableNameUtf16()) < 0) { // Table must have been re-assigned to an IO thread - return true; + return; } LOG.info() .$("releasing writer, its been idle since ").$ts(tab.getLastMeasurementMillis() * 1_000) @@ -218,7 +218,6 @@ class LineTcpMeasurementScheduler implements Closeable { } finally { tableUpdateDetailsLock.readLock().unlock(); } - return true; } private static long getEventSlotSize(int maxMeasurementSize) { @@ -334,6 +333,9 @@ class LineTcpMeasurementScheduler implements Closeable { long seq = getNextPublisherEventSequence(writerThreadId); if (seq > -1) { try { + if (tab.isWriterInError()) { + throw CairoException.instance(0).put("writer is in error, aborting ILP pipeline"); + } queue[writerThreadId].get(seq).createMeasurementEvent( tab, parser, diff --git a/core/src/main/java/io/questdb/cutlass/line/tcp/LineTcpWriterJob.java b/core/src/main/java/io/questdb/cutlass/line/tcp/LineTcpWriterJob.java index 9afa25fbf..5cc3ca229 100644 --- a/core/src/main/java/io/questdb/cutlass/line/tcp/LineTcpWriterJob.java +++ b/core/src/main/java/io/questdb/cutlass/line/tcp/LineTcpWriterJob.java @@ -108,10 +108,14 @@ class LineTcpWriterJob implements Job, Closeable { // the heap based solution mentioned above will eliminate the minimum search // we could just process the min element of the heap until we hit the first commit // time greater than millis and that will be our nextCommitTime - long tableNextCommitTime = assignedTables.getQuick(n).commitIfIntervalElapsed(wallClockMillis); - if (tableNextCommitTime < minTableNextCommitTime) { - // taking the earliest commit time - minTableNextCommitTime = tableNextCommitTime; + try { + long tableNextCommitTime = assignedTables.getQuick(n).commitIfIntervalElapsed(wallClockMillis); + if (tableNextCommitTime < minTableNextCommitTime) { + // taking the earliest commit time + minTableNextCommitTime = tableNextCommitTime; + } + } catch (Throwable th) { + metrics.healthCheck().incrementUnhandledErrors(); } } // if no tables, just use the default commit interval @@ -130,58 +134,57 @@ class LineTcpWriterJob implements Job, Closeable { } busy = true; final LineTcpMeasurementEvent event = queue.get(cursor); - boolean eventProcessed; try { // we check the event's writer thread ID to avoid consuming // incomplete events final TableUpdateDetails tab = event.getTableUpdateDetails(); + boolean closeWriter = false; if (event.getWriterWorkerId() == workerId) { try { - if (!tab.isAssignedToJob()) { - assignedTables.add(tab); - tab.setAssignedToJob(true); - nextCommitTime = millisecondClock.getTicks(); - LOG.info() - .$("assigned table to writer thread [tableName=").$(tab.getTableNameUtf16()) - .$(", threadId=").$(workerId) - .I$(); + if (tab.isWriterInError()) { + closeWriter = true; + } else { + if (!tab.isAssignedToJob()) { + assignedTables.add(tab); + tab.setAssignedToJob(true); + nextCommitTime = millisecondClock.getTicks(); + LOG.info() + .$("assigned table to writer thread [tableName=").$(tab.getTableNameUtf16()) + .$(", threadId=").$(workerId) + .I$(); + } + event.append(); } - event.append(); - eventProcessed = true; } catch (Throwable ex) { + tab.setWriterInError(); + metrics.healthCheck().incrementUnhandledErrors(); LOG.error() .$("closing writer because of error [table=").$(tab.getTableNameUtf16()) .$(",ex=").$(ex) .I$(); + closeWriter = true; event.createWriterReleaseEvent(tab, false); - eventProcessed = false; // This is a critical error, so we treat it as an unhandled one. - metrics.healthCheck().incrementUnhandledErrors(); } } else { if (event.getWriterWorkerId() == LineTcpMeasurementEventType.ALL_WRITERS_RELEASE_WRITER) { - eventProcessed = scheduler.processWriterReleaseEvent(event, workerId); - assignedTables.remove(tab); - tab.setAssignedToJob(false); - nextCommitTime = millisecondClock.getTicks(); - } else { - eventProcessed = true; + closeWriter = true; } } + + if (closeWriter && tab.getWriter() != null) { + scheduler.processWriterReleaseEvent(event, workerId); + assignedTables.remove(tab); + tab.setAssignedToJob(false); + nextCommitTime = millisecondClock.getTicks(); + } } catch (Throwable ex) { - eventProcessed = true; LOG.error().$("failed to process ILP event because of exception [ex=").$(ex).I$(); } - // by not releasing cursor we force the sequence to return us the same value over and over - // until cursor value is released - if (eventProcessed) { - sequence.done(cursor); - } else { - return false; - } + sequence.done(cursor); } } diff --git a/core/src/main/java/io/questdb/cutlass/line/tcp/TableUpdateDetails.java b/core/src/main/java/io/questdb/cutlass/line/tcp/TableUpdateDetails.java index 9504b4344..c0f3ce799 100644 --- a/core/src/main/java/io/questdb/cutlass/line/tcp/TableUpdateDetails.java +++ b/core/src/main/java/io/questdb/cutlass/line/tcp/TableUpdateDetails.java @@ -59,6 +59,7 @@ public class TableUpdateDetails implements Closeable { private long lastMeasurementMillis = Long.MAX_VALUE; private long nextCommitTime; private int networkIOOwnerCount = 0; + private volatile boolean writerInError; TableUpdateDetails( LineTcpReceiverConfiguration configuration, @@ -96,6 +97,14 @@ public class TableUpdateDetails implements Closeable { } + public boolean isWriterInError() { + return writerInError; + } + + public void setWriterInError() { + writerInError = true; + } + @Override public void close() { synchronized (this) { @@ -116,7 +125,9 @@ public class TableUpdateDetails implements Closeable { closeLocals(); if (null != writer) { try { - writer.commit(); + if (!writerInError) { + writer.commit(); + } } catch (Throwable ex) { LOG.error().$("cannot commit writer transaction, rolling back before releasing it [table=").$(tableNameUtf16).$(",ex=").$(ex).I$(); } finally { @@ -176,7 +187,7 @@ public class TableUpdateDetails implements Closeable { } } - private void commit(boolean withLag) { + private void commit(boolean withLag) throws CommitFailedException { if (writer.getUncommittedRowCount() > 0) { try { LOG.debug().$("time-based commit " + (withLag ? "with lag " : "") + "[rows=").$(writer.getUncommittedRowCount()).$(", table=").$(tableNameUtf16).I$(); @@ -186,17 +197,19 @@ public class TableUpdateDetails implements Closeable { writer.commit(); } } catch (Throwable ex) { + setWriterInError(); LOG.error().$("could not commit [table=").$(tableNameUtf16).$(", e=").$(ex).I$(); try { writer.rollback(); } catch (Throwable th) { LOG.error().$("could not perform emergency rollback [table=").$(tableNameUtf16).$(", e=").$(th).I$(); } + throw CommitFailedException.instance(ex); } } } - long commitIfIntervalElapsed(long wallClockMillis) { + long commitIfIntervalElapsed(long wallClockMillis) throws CommitFailedException { if (wallClockMillis < nextCommitTime) { return nextCommitTime; } @@ -208,7 +221,7 @@ public class TableUpdateDetails implements Closeable { return nextCommitTime; } - void commitIfMaxUncommittedRowsCountReached() { + void commitIfMaxUncommittedRowsCountReached() throws CommitFailedException { final long rowsSinceCommit = writer.getUncommittedRowCount(); if (rowsSinceCommit < writer.getMetadata().getMaxUncommittedRows()) { if ((rowsSinceCommit & writerTickRowsCountMod) == 0) { @@ -219,7 +232,19 @@ public class TableUpdateDetails implements Closeable { } LOG.debug().$("max-uncommitted-rows commit with lag [").$(tableNameUtf16).I$(); nextCommitTime = millisecondClock.getTicks() + writer.getCommitInterval(); - writer.commitWithLag(); + + try { + writer.commitWithLag(); + } catch (Throwable th) { + LOG.error() + .$("could not commit line protocol measurement [tableName=").$(writer.getTableName()) + .$(", message=").$(th.getMessage()) + .$(th) + .I$(); + writer.rollback(); + throw CommitFailedException.instance(th); + } + // Tick after commit. writer.tick(false); } diff --git a/core/src/test/java/io/questdb/cutlass/line/tcp/BaseLineTcpContextTest.java b/core/src/test/java/io/questdb/cutlass/line/tcp/BaseLineTcpContextTest.java index d8c5f7d91..50c4c151c 100644 --- a/core/src/test/java/io/questdb/cutlass/line/tcp/BaseLineTcpContextTest.java +++ b/core/src/test/java/io/questdb/cutlass/line/tcp/BaseLineTcpContextTest.java @@ -35,6 +35,7 @@ import io.questdb.std.*; import io.questdb.std.datetime.microtime.MicrosecondClock; import io.questdb.std.datetime.microtime.MicrosecondClockImpl; import io.questdb.std.str.FloatingDirectCharSink; +import io.questdb.std.str.Path; import org.junit.Assert; import org.junit.Before; @@ -335,6 +336,7 @@ abstract class BaseLineTcpContextTest extends AbstractCairoTest { }); Assert.assertFalse(context.invalid()); Assert.assertEquals(FD, context.getFd()); + workerPool.assignCleaner(Path.CLEANER); workerPool.start(LOG); } diff --git a/core/src/test/java/io/questdb/cutlass/line/tcp/LineTcpConnectionContextTest.java b/core/src/test/java/io/questdb/cutlass/line/tcp/LineTcpConnectionContextTest.java index 6f5998d90..529a9241e 100644 --- a/core/src/test/java/io/questdb/cutlass/line/tcp/LineTcpConnectionContextTest.java +++ b/core/src/test/java/io/questdb/cutlass/line/tcp/LineTcpConnectionContextTest.java @@ -31,6 +31,7 @@ import io.questdb.griffin.SqlException; import io.questdb.griffin.SqlExecutionContext; import io.questdb.griffin.SqlExecutionContextImpl; import io.questdb.std.Chars; +import io.questdb.std.Files; import io.questdb.std.FilesFacadeImpl; import io.questdb.std.datetime.microtime.Timestamps; import io.questdb.std.str.LPSZ; @@ -434,6 +435,44 @@ public class LineTcpConnectionContextTest extends BaseLineTcpContextTest { }, null, null); } + @Test + public void testCairoExceptionOnCommit() throws Exception { + String table = "commitException"; + configOverrideMaxUncommittedRows = 1; + netMsgBufferSize.set(60); + runInContext( + new FilesFacadeImpl() { + @Override + public long openRW(LPSZ name, long opts) { + if (Chars.endsWith(name, "1970-01-01.1" + Files.SEPARATOR + "temperature.d")) { + return -1; + } + return super.openRW(name, opts); + } + }, + () -> { + recvBuffer = + table + ",location=us-midwest temperature=82 99000\n" + + table + ",location=us-midwest temperature=83 90000\n" + + table + ",location=us-eastcoast temperature=81,broken=23 80000\n" + + table + ",location=us-midwest temperature=85 70000\n" + + table + ",location=us-eastcoast temperature=89 60000\n" + + table + ",location=us-eastcoast temperature=80 50000\n" + + table + ",location=us-westcost temperature=82 40000\n"; + do { + handleContextIO(); + } while (!disconnected && recvBuffer.length() > 0); + + Assert.assertTrue(disconnected); + Assert.assertTrue(recvBuffer.length() > 0); + closeContext(); + + String expected = "location\ttemperature\ttimestamp\n" + + "us-midwest\t82.0\t1970-01-01T00:00:00.000099Z\n"; + assertTable(expected, table); + }, null, null); + } + @Test public void testCairoExceptionOnCreateTable() throws Exception { String table = "cairoEx"; -- GitLab