From 14a662594e0997cbce34bed290e86ea72d7991b3 Mon Sep 17 00:00:00 2001 From: Andrei Pechkurov <37772591+puzpuzpuz@users.noreply.github.com> Date: Mon, 4 Sep 2023 18:14:58 +0300 Subject: [PATCH] fix(sql): fix reader leak on query with invalid interval filter (#3710) --- .../cairo/AbstractFullDataFrameCursor.java | 2 +- .../AbstractIntervalDataFrameCursor.java | 16 ++-- .../cairo/FullBwdDataFrameCursorFactory.java | 23 ++++-- .../cairo/FullFwdDataFrameCursorFactory.java | 23 ++++-- .../IntervalBwdDataFrameCursorFactory.java | 10 ++- .../IntervalFwdDataFrameCursorFactory.java | 20 +++-- .../DistinctSymbolRecordCursorFactory.java | 11 ++- .../AbstractIntervalDataFrameCursorTest.java | 25 +++---- .../griffin/IntervalFilterNoLeakTest.java | 73 +++++++++++++++++++ 9 files changed, 152 insertions(+), 51 deletions(-) create mode 100644 core/src/test/java/io/questdb/test/griffin/IntervalFilterNoLeakTest.java diff --git a/core/src/main/java/io/questdb/cairo/AbstractFullDataFrameCursor.java b/core/src/main/java/io/questdb/cairo/AbstractFullDataFrameCursor.java index a8c7ce3b7..aeac2bd69 100644 --- a/core/src/main/java/io/questdb/cairo/AbstractFullDataFrameCursor.java +++ b/core/src/main/java/io/questdb/cairo/AbstractFullDataFrameCursor.java @@ -57,9 +57,9 @@ public abstract class AbstractFullDataFrameCursor implements DataFrameCursor { } public DataFrameCursor of(TableReader reader) { - this.reader = reader; partitionHi = reader.getPartitionCount(); toTop(); + this.reader = reader; return this; } diff --git a/core/src/main/java/io/questdb/cairo/AbstractIntervalDataFrameCursor.java b/core/src/main/java/io/questdb/cairo/AbstractIntervalDataFrameCursor.java index 9e7992291..9c56cfc24 100644 --- a/core/src/main/java/io/questdb/cairo/AbstractIntervalDataFrameCursor.java +++ b/core/src/main/java/io/questdb/cairo/AbstractIntervalDataFrameCursor.java @@ -95,9 +95,9 @@ public abstract class AbstractIntervalDataFrameCursor implements DataFrameCursor } public AbstractIntervalDataFrameCursor of(TableReader reader, SqlExecutionContext sqlContext) throws SqlException { - this.reader = reader; this.intervals = this.intervalsModel.calculateIntervals(sqlContext); - calculateRanges(intervals); + calculateRanges(reader, intervals); + this.reader = reader; return this; } @@ -105,7 +105,7 @@ public abstract class AbstractIntervalDataFrameCursor implements DataFrameCursor @Override public boolean reload() { if (reader != null && reader.reload()) { - calculateRanges(intervals); + calculateRanges(reader, intervals); return true; } return false; @@ -125,13 +125,13 @@ public abstract class AbstractIntervalDataFrameCursor implements DataFrameCursor sizeSoFar = 0; } - private void calculateRanges(LongList intervals) { + private void calculateRanges(TableReader reader, LongList intervals) { size = -1; if (intervals.size() > 0) { if (PartitionBy.isPartitioned(reader.getPartitionedBy())) { - cullIntervals(intervals); + cullIntervals(reader, intervals); if (initialIntervalsLo < initialIntervalsHi) { - cullPartitions(intervals); + cullPartitions(reader, intervals); } } else { initialIntervalsLo = 0; @@ -234,7 +234,7 @@ public abstract class AbstractIntervalDataFrameCursor implements DataFrameCursor return this.size = size; } - private void cullIntervals(LongList intervals) { + private void cullIntervals(TableReader reader, LongList intervals) { int intervalsLo = intervals.binarySearch(reader.getMinTimestamp(), BinarySearch.SCAN_UP); // not a direct hit @@ -267,7 +267,7 @@ public abstract class AbstractIntervalDataFrameCursor implements DataFrameCursor } } - private void cullPartitions(LongList intervals) { + private void cullPartitions(TableReader reader, LongList intervals) { final long lo = intervals.getQuick(initialIntervalsLo * 2); long intervalLo; if (lo == Long.MIN_VALUE) { diff --git a/core/src/main/java/io/questdb/cairo/FullBwdDataFrameCursorFactory.java b/core/src/main/java/io/questdb/cairo/FullBwdDataFrameCursorFactory.java index ed8bfae18..da4cec6ee 100644 --- a/core/src/main/java/io/questdb/cairo/FullBwdDataFrameCursorFactory.java +++ b/core/src/main/java/io/questdb/cairo/FullBwdDataFrameCursorFactory.java @@ -27,6 +27,7 @@ package io.questdb.cairo; import io.questdb.cairo.sql.DataFrameCursor; import io.questdb.griffin.PlanSink; import io.questdb.griffin.SqlExecutionContext; +import io.questdb.std.Misc; public class FullBwdDataFrameCursorFactory extends AbstractDataFrameCursorFactory { private final FullBwdDataFrameCursor cursor = new FullBwdDataFrameCursor(); @@ -39,16 +40,22 @@ public class FullBwdDataFrameCursorFactory extends AbstractDataFrameCursorFactor @Override public DataFrameCursor getCursor(SqlExecutionContext executionContext, int order) { - if (order == ORDER_DESC || order == ORDER_ANY) { - return cursor.of(getReader(executionContext)); - } + final TableReader reader = getReader(executionContext); + try { + if (order == ORDER_DESC || order == ORDER_ANY) { + return cursor.of(reader); + } - // Create forward scanning cursor when needed. Factory requesting forward cursor must - // still return records in descending timestamp order. - if (fwdCursor == null) { - fwdCursor = new FullFwdDataFrameCursor(); + // Create forward scanning cursor when needed. Factory requesting forward cursor must + // still return records in descending timestamp order. + if (fwdCursor == null) { + fwdCursor = new FullFwdDataFrameCursor(); + } + return fwdCursor.of(reader); + } catch (Throwable th) { + Misc.free(reader); + throw th; } - return fwdCursor.of(getReader(executionContext)); } @Override diff --git a/core/src/main/java/io/questdb/cairo/FullFwdDataFrameCursorFactory.java b/core/src/main/java/io/questdb/cairo/FullFwdDataFrameCursorFactory.java index b4ba8b513..4fcb77676 100644 --- a/core/src/main/java/io/questdb/cairo/FullFwdDataFrameCursorFactory.java +++ b/core/src/main/java/io/questdb/cairo/FullFwdDataFrameCursorFactory.java @@ -27,6 +27,7 @@ package io.questdb.cairo; import io.questdb.cairo.sql.DataFrameCursor; import io.questdb.griffin.PlanSink; import io.questdb.griffin.SqlExecutionContext; +import io.questdb.std.Misc; public class FullFwdDataFrameCursorFactory extends AbstractDataFrameCursorFactory { private final FullFwdDataFrameCursor cursor = new FullFwdDataFrameCursor(); @@ -38,16 +39,22 @@ public class FullFwdDataFrameCursorFactory extends AbstractDataFrameCursorFactor @Override public DataFrameCursor getCursor(SqlExecutionContext executionContext, int order) { - if (order == ORDER_ASC || order == ORDER_ANY) { - return cursor.of(getReader(executionContext)); - } + final TableReader reader = getReader(executionContext); + try { + if (order == ORDER_ASC || order == ORDER_ANY) { + return cursor.of(reader); + } - // Create backward scanning cursor when needed. Factory requesting backward cursor must - // still return records in ascending timestamp order. - if (bwdCursor == null) { - bwdCursor = new FullBwdDataFrameCursor(); + // Create backward scanning cursor when needed. Factory requesting backward cursor must + // still return records in ascending timestamp order. + if (bwdCursor == null) { + bwdCursor = new FullBwdDataFrameCursor(); + } + return bwdCursor.of(reader); + } catch (Throwable th) { + Misc.free(reader); + throw th; } - return bwdCursor.of(getReader(executionContext)); } @Override diff --git a/core/src/main/java/io/questdb/cairo/IntervalBwdDataFrameCursorFactory.java b/core/src/main/java/io/questdb/cairo/IntervalBwdDataFrameCursorFactory.java index 49892cce7..c7b7e2e43 100644 --- a/core/src/main/java/io/questdb/cairo/IntervalBwdDataFrameCursorFactory.java +++ b/core/src/main/java/io/questdb/cairo/IntervalBwdDataFrameCursorFactory.java @@ -56,8 +56,14 @@ public class IntervalBwdDataFrameCursorFactory extends AbstractDataFrameCursorFa @Override public DataFrameCursor getCursor(SqlExecutionContext executionContext, int order) throws SqlException { if (order == ORDER_DESC || order == ORDER_ANY) { - cursor.of(getReader(executionContext), executionContext); - return cursor; + final TableReader reader = getReader(executionContext); + try { + cursor.of(reader, executionContext); + return cursor; + } catch (Throwable th) { + Misc.free(reader); + throw th; + } } throw new UnsupportedOperationException(); } diff --git a/core/src/main/java/io/questdb/cairo/IntervalFwdDataFrameCursorFactory.java b/core/src/main/java/io/questdb/cairo/IntervalFwdDataFrameCursorFactory.java index 1ce67ce71..0a2f934c3 100644 --- a/core/src/main/java/io/questdb/cairo/IntervalFwdDataFrameCursorFactory.java +++ b/core/src/main/java/io/questdb/cairo/IntervalFwdDataFrameCursorFactory.java @@ -56,15 +56,21 @@ public class IntervalFwdDataFrameCursorFactory extends AbstractDataFrameCursorFa @Override public DataFrameCursor getCursor(SqlExecutionContext executionContext, int order) throws SqlException { - if (order == ORDER_ASC || order == ORDER_ANY) { - cursor.of(getReader(executionContext), executionContext); - return cursor; - } + final TableReader reader = getReader(executionContext); + try { + if (order == ORDER_ASC || order == ORDER_ANY) { + cursor.of(reader, executionContext); + return cursor; + } - if (bwdCursor == null) { - bwdCursor = new IntervalBwdDataFrameCursor(intervals, cursor.getTimestampIndex()); + if (bwdCursor == null) { + bwdCursor = new IntervalBwdDataFrameCursor(intervals, cursor.getTimestampIndex()); + } + return bwdCursor.of(reader, executionContext); + } catch (Throwable th) { + Misc.free(reader); + throw th; } - return bwdCursor.of(getReader(executionContext), executionContext); } @Override diff --git a/core/src/main/java/io/questdb/griffin/engine/groupby/DistinctSymbolRecordCursorFactory.java b/core/src/main/java/io/questdb/griffin/engine/groupby/DistinctSymbolRecordCursorFactory.java index 39669027f..376a94324 100644 --- a/core/src/main/java/io/questdb/griffin/engine/groupby/DistinctSymbolRecordCursorFactory.java +++ b/core/src/main/java/io/questdb/griffin/engine/groupby/DistinctSymbolRecordCursorFactory.java @@ -52,8 +52,13 @@ public class DistinctSymbolRecordCursorFactory extends AbstractRecordCursorFacto @Override public RecordCursor getCursor(SqlExecutionContext executionContext) { TableReader reader = executionContext.getReader(tableToken, tableVersion); - cursor.of(reader); - return cursor; + try { + cursor.of(reader); + return cursor; + } catch (Throwable th) { + Misc.free(reader); + throw th; + } } @Override @@ -126,10 +131,10 @@ public class DistinctSymbolRecordCursorFactory extends AbstractRecordCursorFacto } public void of(TableReader reader) { - this.reader = reader; this.symbolMapReader = reader.getSymbolMapReader(columnIndex); this.numberOfSymbols = symbolMapReader.getSymbolCount() + (symbolMapReader.containsNullValue() ? 1 : 0); this.recordA.reset(); + this.reader = reader; } @Override diff --git a/core/src/test/java/io/questdb/test/AbstractIntervalDataFrameCursorTest.java b/core/src/test/java/io/questdb/test/AbstractIntervalDataFrameCursorTest.java index b84b44b06..fa223fd8a 100644 --- a/core/src/test/java/io/questdb/test/AbstractIntervalDataFrameCursorTest.java +++ b/core/src/test/java/io/questdb/test/AbstractIntervalDataFrameCursorTest.java @@ -27,15 +27,23 @@ import io.questdb.cairo.vm.MemoryCARWImpl; import io.questdb.std.MemoryTag; import org.junit.Test; -import static io.questdb.cairo.AbstractIntervalDataFrameCursor.binarySearch; -import static io.questdb.cairo.AbstractIntervalDataFrameCursor.SCAN_UP; -import static io.questdb.cairo.AbstractIntervalDataFrameCursor.SCAN_DOWN; +import static io.questdb.cairo.AbstractIntervalDataFrameCursor.*; import static org.junit.Assert.assertEquals; public class AbstractIntervalDataFrameCursorTest extends AbstractCairoTest { // see implementation of Vect.binarySearch64Bit static final int THRESHOLD = 65; + @Test + public void testBinarySearchOnArrayWith4Duplicates() throws Exception { + testBinarySearchOnArrayWithDuplicates(4); + } + + @Test + public void testBinarySearchOnArrayWith65Duplicates() throws Exception { + testBinarySearchOnArrayWithDuplicates(THRESHOLD + 5); + } + @Test public void testBinarySearchOnArrayWithSingleValue() throws Exception { assertMemoryLeak(() -> { @@ -81,16 +89,6 @@ public class AbstractIntervalDataFrameCursorTest extends AbstractCairoTest { }); } - @Test - public void testtestBinarySearchOnArrayWith4Duplicates() throws Exception { - testBinarySearchOnArrayWithDuplicates(4); - } - - @Test - public void testtestBinarySearchOnArrayWith65Duplicates() throws Exception { - testBinarySearchOnArrayWithDuplicates(THRESHOLD + 5); - } - private void testBinarySearchOnArrayWithDuplicates(int dupCount) throws Exception { assertMemoryLeak(() -> { int size = 1024 * 1024; @@ -115,5 +113,4 @@ public class AbstractIntervalDataFrameCursorTest extends AbstractCairoTest { } }); } - } diff --git a/core/src/test/java/io/questdb/test/griffin/IntervalFilterNoLeakTest.java b/core/src/test/java/io/questdb/test/griffin/IntervalFilterNoLeakTest.java new file mode 100644 index 000000000..2f72af28a --- /dev/null +++ b/core/src/test/java/io/questdb/test/griffin/IntervalFilterNoLeakTest.java @@ -0,0 +1,73 @@ +/******************************************************************************* + * ___ _ ____ ____ + * / _ \ _ _ ___ ___| |_| _ \| __ ) + * | | | | | | |/ _ \/ __| __| | | | _ \ + * | |_| | |_| | __/\__ \ |_| |_| | |_) | + * \__\_\\__,_|\___||___/\__|____/|____/ + * + * Copyright (c) 2014-2019 Appsicle + * Copyright (c) 2019-2023 QuestDB + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + ******************************************************************************/ + +package io.questdb.test.griffin; + +import io.questdb.test.AbstractCairoTest; +import org.junit.Test; + +public class IntervalFilterNoLeakTest extends AbstractCairoTest { + + @Test + public void testIntervalBwdNoLeak() throws Exception { + assertMemoryLeak(() -> { + ddl( + "create table x as " + + "(" + + " select" + + " rnd_double(0)*100 a," + + " timestamp_sequence(0, 10000) ts" + + " from long_sequence(3)" + + ") timestamp(ts) partition by day" + ); + + assertException( + "select * from x where ts > now() - '3 day' order by ts desc", + 0, + "inconvertible value: `3 day` [STRING -> TIMESTAMP]" + ); + }); + } + + @Test + public void testIntervalFwdNoLeak() throws Exception { + assertMemoryLeak(() -> { + ddl( + "create table x as " + + "(" + + " select" + + " rnd_double(0)*100 a," + + " timestamp_sequence(0, 10000) ts" + + " from long_sequence(3)" + + ") timestamp(ts) partition by day" + ); + + assertException( + "select * from x where ts > now() - '3 day'", + 0, + "inconvertible value: `3 day` [STRING -> TIMESTAMP]" + ); + }); + } +} -- GitLab