未验证 提交 65b9fba7 编写于 作者: A Alex Pelagenko 提交者: GitHub

fix(cairo): handle Table writer commit error in ILP (#1989)

上级 ced49b39
/*******************************************************************************
* ___ _ ____ ____
* / _ \ _ _ ___ ___| |_| _ \| __ )
* | | | | | | |/ _ \/ __| __| | | | _ \
* | |_| | |_| | __/\__ \ |_| |_| | |_) |
* \__\_\\__,_|\___||___/\__|____/|____/
*
* Copyright (c) 2014-2019 Appsicle
* Copyright (c) 2019-2022 QuestDB
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY t5, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
******************************************************************************/
package org.questdb;
import io.questdb.cutlass.line.LineTcpSender;
import io.questdb.griffin.model.IntervalUtils;
import io.questdb.network.Net;
import io.questdb.std.NumericException;
import io.questdb.std.Os;
import io.questdb.std.Rnd;
import io.questdb.std.datetime.microtime.Timestamps;
import io.questdb.std.str.StringSink;
import java.util.concurrent.locks.LockSupport;
/*
CREATE TABLE 'request_logs' (
rid STRING,
auui SYMBOL capacity 256 CACHE index capacity 262144,
puui SYMBOL capacity 256 CACHE index capacity 262144,
buuid SYMBOL capacity 256 CACHE,
atuuid SYMBOL capacity 256 CACHE index capacity 262144,
creg STRING, node_uid SYMBOL capacity 256 CACHE index capacity 262144,
nnet SYMBOL capacity 256 CACHE,
nv STRING,
nr SYMBOL capacity 256 CACHE,
code1 SYMBOL capacity 256 CACHE,
name1 STRING,
id5 STRING,
il FLOAT,
iat FLOAT,
id3 STRING,
rpcm SYMBOL capacity 256 CACHE index capacity 262144,
id4 STRING,
origin STRING,
ts TIMESTAMP,
lo6 LONG,
rst6 SHORT,
t5 SYMBOL capacity 256 CACHE,
lo5 LONG,
b1 STRING,
b2 STRING,
mob BOOLEAN
)
timestamp (ts) PARTITION BY HOUR;
alter TABLE 'request_logs' set PARAM maxUncommittedRows = 20000;
*/
public class LineTCPSenderMainFileLimitSimulation {
private static final StringSink sink = new StringSink();
private static final String[] auui = new String[50];
private static final String[] puui = new String[70];
private static final String[] atuuid = new String[101];
private static final String[] node_uid = new String[46];
private static final String[] rpcm = new String[41];
private static final String[] buuid = new String[71];
private static final String[] nnet = new String[70];
private static final String[] nr = new String[70];
private static final String[] code1 = new String[70];
private static final String[] t5 = new String[70];
public static void main(String[] args) throws NumericException {
Rnd rnd = new Rnd();
generateStrings(rnd, auui, 16);
generateStrings(rnd, puui, 16);
generateStrings(rnd, atuuid, 16);
generateStrings(rnd, node_uid, 16);
generateStrings(rnd, rpcm, 8);
generateStrings(rnd, buuid, 8);
generateStrings(rnd, nnet, 8);
generateStrings(rnd, nr, 8);
generateStrings(rnd, code1, 8);
generateStrings(rnd, t5, 8);
String hostid4v4 = "127.0.0.1";
int port = 9009;
int bufferCapacity = 8 * 1024;
try (LineTcpSender sender = new LineTcpSender(Net.parseIPv4(hostid4v4), port, bufferCapacity)) {
// fillDates(rnd, sender);
long ts = Os.currentTimeNanos();
while (true) {
long shift = rnd.nextLong(Timestamps.HOUR_MICROS * 1000L / 3);
if (rnd.nextBoolean()) {
shift = 0;
}
sendLine(rnd, sender, ts - shift);
LockSupport.parkNanos(10);
ts += 1000_000L;
}
}
}
private static void sendLine(Rnd rnd, LineTcpSender sender, long ts) {
sender.metric("request_logs")
.tag("auui", auui[rnd.nextInt(auui.length)])
.tag("puui", puui[rnd.nextInt(puui.length)])
.tag("atuuid", atuuid[rnd.nextInt(atuuid.length)])
.tag("node_uid", node_uid[rnd.nextInt(node_uid.length)])
.tag("rpcm", rpcm[rnd.nextInt(rpcm.length)])
.tag("buuid", buuid[rnd.nextInt(buuid.length)])
.tag("nnet", nnet[rnd.nextInt(nnet.length)])
.tag("nr", nr[rnd.nextInt(nr.length)])
.tag("code1", code1[rnd.nextInt(code1.length)])
.tag("t5", code1[rnd.nextInt(t5.length)])
.field("rid", rnd.nextString(16))
.field("nv", rnd.nextString(8))
.field("name1", rnd.nextString(rnd.nextPositiveInt() % 15))
.field("id5", rnd.nextString(rnd.nextPositiveInt() % 12))
.field("il", rnd.nextFloat())
.field("iat", rnd.nextFloat())
.field("id3", rnd.nextString(rnd.nextPositiveInt() % 12))
.field("id4", rnd.nextString(15))
.field("origin", rnd.nextString(rnd.nextPositiveInt() % 10))
.field("lo6", rnd.nextLong())
.field("rst6", rnd.nextShort())
.field("lo5", rnd.nextLong())
.field("b1", rnd.nextString(rnd.nextPositiveInt() % 25))
.field("b2", rnd.nextString(rnd.nextPositiveInt() % 15))
.field("mob", rnd.nextBoolean())
.$(ts);
}
private static void fillDates(Rnd rnd, LineTcpSender sender) throws NumericException {
long period = Timestamps.MINUTE_MICROS * 1000L * 10;
long ts = IntervalUtils.parseFloorPartialDate("2022-02-25") * 1000L;
long endTs = IntervalUtils.parseFloorPartialDate("2022-03-26T20") * 1000L;
while (ts < endTs) {
sendLine(rnd, sender, ts);
ts += period + rnd.nextLong(Timestamps.MINUTE_MICROS * 1000L);
}
sender.flush();
}
private static void generateStrings(Rnd rnd, String[] auui, int length) {
for (int i = 0; i < auui.length; i++) {
auui[i] = rnd.nextString(length);
}
}
}
/*******************************************************************************
* ___ _ ____ ____
* / _ \ _ _ ___ ___| |_| _ \| __ )
* | | | | | | |/ _ \/ __| __| | | | _ \
* | |_| | |_| | __/\__ \ |_| |_| | |_) |
* \__\_\\__,_|\___||___/\__|____/|____/
*
* Copyright (c) 2014-2019 Appsicle
* Copyright (c) 2019-2022 QuestDB
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
******************************************************************************/
package io.questdb.cairo;
import io.questdb.std.ThreadLocal;
public class CommitFailedException extends Exception {
private static final ThreadLocal<CommitFailedException> tlException = new ThreadLocal<>(CommitFailedException::new);
public static CommitFailedException instance(Throwable reason) {
CommitFailedException ex = tlException.get();
assert (ex = new CommitFailedException()) != null;
ex.initCause(reason);
return ex;
}
}
\ No newline at end of file
......@@ -24,10 +24,7 @@
package io.questdb.cutlass.line.tcp;
import io.questdb.cairo.CairoException;
import io.questdb.cairo.ColumnType;
import io.questdb.cairo.TableUtils;
import io.questdb.cairo.TableWriter;
import io.questdb.cairo.*;
import io.questdb.cutlass.line.LineProtoTimestampAdapter;
import io.questdb.log.Log;
import io.questdb.log.LogFactory;
......@@ -85,7 +82,7 @@ class LineTcpMeasurementEvent implements Closeable {
tableUpdateDetails.releaseWriter(commitOnWriterClose);
}
void append() {
void append() throws CommitFailedException {
TableWriter.Row row = null;
try {
TableWriter writer = tableUpdateDetails.getWriter();
......@@ -208,10 +205,13 @@ class LineTcpMeasurementEvent implements Closeable {
}
row.append();
tableUpdateDetails.commitIfMaxUncommittedRowsCountReached();
} catch (CommitFailedException commitFailedException) {
throw commitFailedException;
} catch (Throwable th) {
LOG.error()
.$("could not write line protocol measurement [tableName=").$(tableUpdateDetails.getTableNameUtf16())
.$(", message=").$(th.getMessage())
.$(th)
.I$();
if (row != null) {
row.cancel();
......
......@@ -198,16 +198,16 @@ class LineTcpMeasurementScheduler implements Closeable {
return false;
}
public boolean processWriterReleaseEvent(LineTcpMeasurementEvent event, int workerId) {
public void processWriterReleaseEvent(LineTcpMeasurementEvent event, int workerId) {
tableUpdateDetailsLock.readLock().lock();
try {
final TableUpdateDetails tab = event.getTableUpdateDetails();
if (tab.getWriterThreadId() != workerId) {
return true;
return;
}
if (tableUpdateDetailsUtf16.keyIndex(tab.getTableNameUtf16()) < 0) {
if (!event.getTableUpdateDetails().isWriterInError() && tableUpdateDetailsUtf16.keyIndex(tab.getTableNameUtf16()) < 0) {
// Table must have been re-assigned to an IO thread
return true;
return;
}
LOG.info()
.$("releasing writer, its been idle since ").$ts(tab.getLastMeasurementMillis() * 1_000)
......@@ -218,7 +218,6 @@ class LineTcpMeasurementScheduler implements Closeable {
} finally {
tableUpdateDetailsLock.readLock().unlock();
}
return true;
}
private static long getEventSlotSize(int maxMeasurementSize) {
......@@ -334,6 +333,9 @@ class LineTcpMeasurementScheduler implements Closeable {
long seq = getNextPublisherEventSequence(writerThreadId);
if (seq > -1) {
try {
if (tab.isWriterInError()) {
throw CairoException.instance(0).put("writer is in error, aborting ILP pipeline");
}
queue[writerThreadId].get(seq).createMeasurementEvent(
tab,
parser,
......
......@@ -108,10 +108,14 @@ class LineTcpWriterJob implements Job, Closeable {
// the heap based solution mentioned above will eliminate the minimum search
// we could just process the min element of the heap until we hit the first commit
// time greater than millis and that will be our nextCommitTime
long tableNextCommitTime = assignedTables.getQuick(n).commitIfIntervalElapsed(wallClockMillis);
if (tableNextCommitTime < minTableNextCommitTime) {
// taking the earliest commit time
minTableNextCommitTime = tableNextCommitTime;
try {
long tableNextCommitTime = assignedTables.getQuick(n).commitIfIntervalElapsed(wallClockMillis);
if (tableNextCommitTime < minTableNextCommitTime) {
// taking the earliest commit time
minTableNextCommitTime = tableNextCommitTime;
}
} catch (Throwable th) {
metrics.healthCheck().incrementUnhandledErrors();
}
}
// if no tables, just use the default commit interval
......@@ -130,58 +134,57 @@ class LineTcpWriterJob implements Job, Closeable {
}
busy = true;
final LineTcpMeasurementEvent event = queue.get(cursor);
boolean eventProcessed;
try {
// we check the event's writer thread ID to avoid consuming
// incomplete events
final TableUpdateDetails tab = event.getTableUpdateDetails();
boolean closeWriter = false;
if (event.getWriterWorkerId() == workerId) {
try {
if (!tab.isAssignedToJob()) {
assignedTables.add(tab);
tab.setAssignedToJob(true);
nextCommitTime = millisecondClock.getTicks();
LOG.info()
.$("assigned table to writer thread [tableName=").$(tab.getTableNameUtf16())
.$(", threadId=").$(workerId)
.I$();
if (tab.isWriterInError()) {
closeWriter = true;
} else {
if (!tab.isAssignedToJob()) {
assignedTables.add(tab);
tab.setAssignedToJob(true);
nextCommitTime = millisecondClock.getTicks();
LOG.info()
.$("assigned table to writer thread [tableName=").$(tab.getTableNameUtf16())
.$(", threadId=").$(workerId)
.I$();
}
event.append();
}
event.append();
eventProcessed = true;
} catch (Throwable ex) {
tab.setWriterInError();
metrics.healthCheck().incrementUnhandledErrors();
LOG.error()
.$("closing writer because of error [table=").$(tab.getTableNameUtf16())
.$(",ex=").$(ex)
.I$();
closeWriter = true;
event.createWriterReleaseEvent(tab, false);
eventProcessed = false;
// This is a critical error, so we treat it as an unhandled one.
metrics.healthCheck().incrementUnhandledErrors();
}
} else {
if (event.getWriterWorkerId() == LineTcpMeasurementEventType.ALL_WRITERS_RELEASE_WRITER) {
eventProcessed = scheduler.processWriterReleaseEvent(event, workerId);
assignedTables.remove(tab);
tab.setAssignedToJob(false);
nextCommitTime = millisecondClock.getTicks();
} else {
eventProcessed = true;
closeWriter = true;
}
}
if (closeWriter && tab.getWriter() != null) {
scheduler.processWriterReleaseEvent(event, workerId);
assignedTables.remove(tab);
tab.setAssignedToJob(false);
nextCommitTime = millisecondClock.getTicks();
}
} catch (Throwable ex) {
eventProcessed = true;
LOG.error().$("failed to process ILP event because of exception [ex=").$(ex).I$();
}
// by not releasing cursor we force the sequence to return us the same value over and over
// until cursor value is released
if (eventProcessed) {
sequence.done(cursor);
} else {
return false;
}
sequence.done(cursor);
}
}
......
......@@ -59,6 +59,7 @@ public class TableUpdateDetails implements Closeable {
private long lastMeasurementMillis = Long.MAX_VALUE;
private long nextCommitTime;
private int networkIOOwnerCount = 0;
private volatile boolean writerInError;
TableUpdateDetails(
LineTcpReceiverConfiguration configuration,
......@@ -96,6 +97,14 @@ public class TableUpdateDetails implements Closeable {
}
public boolean isWriterInError() {
return writerInError;
}
public void setWriterInError() {
writerInError = true;
}
@Override
public void close() {
synchronized (this) {
......@@ -116,7 +125,9 @@ public class TableUpdateDetails implements Closeable {
closeLocals();
if (null != writer) {
try {
writer.commit();
if (!writerInError) {
writer.commit();
}
} catch (Throwable ex) {
LOG.error().$("cannot commit writer transaction, rolling back before releasing it [table=").$(tableNameUtf16).$(",ex=").$(ex).I$();
} finally {
......@@ -176,7 +187,7 @@ public class TableUpdateDetails implements Closeable {
}
}
private void commit(boolean withLag) {
private void commit(boolean withLag) throws CommitFailedException {
if (writer.getUncommittedRowCount() > 0) {
try {
LOG.debug().$("time-based commit " + (withLag ? "with lag " : "") + "[rows=").$(writer.getUncommittedRowCount()).$(", table=").$(tableNameUtf16).I$();
......@@ -186,17 +197,19 @@ public class TableUpdateDetails implements Closeable {
writer.commit();
}
} catch (Throwable ex) {
setWriterInError();
LOG.error().$("could not commit [table=").$(tableNameUtf16).$(", e=").$(ex).I$();
try {
writer.rollback();
} catch (Throwable th) {
LOG.error().$("could not perform emergency rollback [table=").$(tableNameUtf16).$(", e=").$(th).I$();
}
throw CommitFailedException.instance(ex);
}
}
}
long commitIfIntervalElapsed(long wallClockMillis) {
long commitIfIntervalElapsed(long wallClockMillis) throws CommitFailedException {
if (wallClockMillis < nextCommitTime) {
return nextCommitTime;
}
......@@ -208,7 +221,7 @@ public class TableUpdateDetails implements Closeable {
return nextCommitTime;
}
void commitIfMaxUncommittedRowsCountReached() {
void commitIfMaxUncommittedRowsCountReached() throws CommitFailedException {
final long rowsSinceCommit = writer.getUncommittedRowCount();
if (rowsSinceCommit < writer.getMetadata().getMaxUncommittedRows()) {
if ((rowsSinceCommit & writerTickRowsCountMod) == 0) {
......@@ -219,7 +232,19 @@ public class TableUpdateDetails implements Closeable {
}
LOG.debug().$("max-uncommitted-rows commit with lag [").$(tableNameUtf16).I$();
nextCommitTime = millisecondClock.getTicks() + writer.getCommitInterval();
writer.commitWithLag();
try {
writer.commitWithLag();
} catch (Throwable th) {
LOG.error()
.$("could not commit line protocol measurement [tableName=").$(writer.getTableName())
.$(", message=").$(th.getMessage())
.$(th)
.I$();
writer.rollback();
throw CommitFailedException.instance(th);
}
// Tick after commit.
writer.tick(false);
}
......
......@@ -35,6 +35,7 @@ import io.questdb.std.*;
import io.questdb.std.datetime.microtime.MicrosecondClock;
import io.questdb.std.datetime.microtime.MicrosecondClockImpl;
import io.questdb.std.str.FloatingDirectCharSink;
import io.questdb.std.str.Path;
import org.junit.Assert;
import org.junit.Before;
......@@ -335,6 +336,7 @@ abstract class BaseLineTcpContextTest extends AbstractCairoTest {
});
Assert.assertFalse(context.invalid());
Assert.assertEquals(FD, context.getFd());
workerPool.assignCleaner(Path.CLEANER);
workerPool.start(LOG);
}
......
......@@ -31,6 +31,7 @@ import io.questdb.griffin.SqlException;
import io.questdb.griffin.SqlExecutionContext;
import io.questdb.griffin.SqlExecutionContextImpl;
import io.questdb.std.Chars;
import io.questdb.std.Files;
import io.questdb.std.FilesFacadeImpl;
import io.questdb.std.datetime.microtime.Timestamps;
import io.questdb.std.str.LPSZ;
......@@ -434,6 +435,44 @@ public class LineTcpConnectionContextTest extends BaseLineTcpContextTest {
}, null, null);
}
@Test
public void testCairoExceptionOnCommit() throws Exception {
String table = "commitException";
configOverrideMaxUncommittedRows = 1;
netMsgBufferSize.set(60);
runInContext(
new FilesFacadeImpl() {
@Override
public long openRW(LPSZ name, long opts) {
if (Chars.endsWith(name, "1970-01-01.1" + Files.SEPARATOR + "temperature.d")) {
return -1;
}
return super.openRW(name, opts);
}
},
() -> {
recvBuffer =
table + ",location=us-midwest temperature=82 99000\n" +
table + ",location=us-midwest temperature=83 90000\n" +
table + ",location=us-eastcoast temperature=81,broken=23 80000\n" +
table + ",location=us-midwest temperature=85 70000\n" +
table + ",location=us-eastcoast temperature=89 60000\n" +
table + ",location=us-eastcoast temperature=80 50000\n" +
table + ",location=us-westcost temperature=82 40000\n";
do {
handleContextIO();
} while (!disconnected && recvBuffer.length() > 0);
Assert.assertTrue(disconnected);
Assert.assertTrue(recvBuffer.length() > 0);
closeContext();
String expected = "location\ttemperature\ttimestamp\n" +
"us-midwest\t82.0\t1970-01-01T00:00:00.000099Z\n";
assertTable(expected, table);
}, null, null);
}
@Test
public void testCairoExceptionOnCreateTable() throws Exception {
String table = "cairoEx";
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册