未验证 提交 14a66259 编写于 作者: A Andrei Pechkurov 提交者: GitHub

fix(sql): fix reader leak on query with invalid interval filter (#3710)

上级 aa26494a
...@@ -57,9 +57,9 @@ public abstract class AbstractFullDataFrameCursor implements DataFrameCursor { ...@@ -57,9 +57,9 @@ public abstract class AbstractFullDataFrameCursor implements DataFrameCursor {
} }
public DataFrameCursor of(TableReader reader) { public DataFrameCursor of(TableReader reader) {
this.reader = reader;
partitionHi = reader.getPartitionCount(); partitionHi = reader.getPartitionCount();
toTop(); toTop();
this.reader = reader;
return this; return this;
} }
......
...@@ -95,9 +95,9 @@ public abstract class AbstractIntervalDataFrameCursor implements DataFrameCursor ...@@ -95,9 +95,9 @@ public abstract class AbstractIntervalDataFrameCursor implements DataFrameCursor
} }
public AbstractIntervalDataFrameCursor of(TableReader reader, SqlExecutionContext sqlContext) throws SqlException { public AbstractIntervalDataFrameCursor of(TableReader reader, SqlExecutionContext sqlContext) throws SqlException {
this.reader = reader;
this.intervals = this.intervalsModel.calculateIntervals(sqlContext); this.intervals = this.intervalsModel.calculateIntervals(sqlContext);
calculateRanges(intervals); calculateRanges(reader, intervals);
this.reader = reader;
return this; return this;
} }
...@@ -105,7 +105,7 @@ public abstract class AbstractIntervalDataFrameCursor implements DataFrameCursor ...@@ -105,7 +105,7 @@ public abstract class AbstractIntervalDataFrameCursor implements DataFrameCursor
@Override @Override
public boolean reload() { public boolean reload() {
if (reader != null && reader.reload()) { if (reader != null && reader.reload()) {
calculateRanges(intervals); calculateRanges(reader, intervals);
return true; return true;
} }
return false; return false;
...@@ -125,13 +125,13 @@ public abstract class AbstractIntervalDataFrameCursor implements DataFrameCursor ...@@ -125,13 +125,13 @@ public abstract class AbstractIntervalDataFrameCursor implements DataFrameCursor
sizeSoFar = 0; sizeSoFar = 0;
} }
private void calculateRanges(LongList intervals) { private void calculateRanges(TableReader reader, LongList intervals) {
size = -1; size = -1;
if (intervals.size() > 0) { if (intervals.size() > 0) {
if (PartitionBy.isPartitioned(reader.getPartitionedBy())) { if (PartitionBy.isPartitioned(reader.getPartitionedBy())) {
cullIntervals(intervals); cullIntervals(reader, intervals);
if (initialIntervalsLo < initialIntervalsHi) { if (initialIntervalsLo < initialIntervalsHi) {
cullPartitions(intervals); cullPartitions(reader, intervals);
} }
} else { } else {
initialIntervalsLo = 0; initialIntervalsLo = 0;
...@@ -234,7 +234,7 @@ public abstract class AbstractIntervalDataFrameCursor implements DataFrameCursor ...@@ -234,7 +234,7 @@ public abstract class AbstractIntervalDataFrameCursor implements DataFrameCursor
return this.size = size; return this.size = size;
} }
private void cullIntervals(LongList intervals) { private void cullIntervals(TableReader reader, LongList intervals) {
int intervalsLo = intervals.binarySearch(reader.getMinTimestamp(), BinarySearch.SCAN_UP); int intervalsLo = intervals.binarySearch(reader.getMinTimestamp(), BinarySearch.SCAN_UP);
// not a direct hit // not a direct hit
...@@ -267,7 +267,7 @@ public abstract class AbstractIntervalDataFrameCursor implements DataFrameCursor ...@@ -267,7 +267,7 @@ public abstract class AbstractIntervalDataFrameCursor implements DataFrameCursor
} }
} }
private void cullPartitions(LongList intervals) { private void cullPartitions(TableReader reader, LongList intervals) {
final long lo = intervals.getQuick(initialIntervalsLo * 2); final long lo = intervals.getQuick(initialIntervalsLo * 2);
long intervalLo; long intervalLo;
if (lo == Long.MIN_VALUE) { if (lo == Long.MIN_VALUE) {
......
...@@ -27,6 +27,7 @@ package io.questdb.cairo; ...@@ -27,6 +27,7 @@ package io.questdb.cairo;
import io.questdb.cairo.sql.DataFrameCursor; import io.questdb.cairo.sql.DataFrameCursor;
import io.questdb.griffin.PlanSink; import io.questdb.griffin.PlanSink;
import io.questdb.griffin.SqlExecutionContext; import io.questdb.griffin.SqlExecutionContext;
import io.questdb.std.Misc;
public class FullBwdDataFrameCursorFactory extends AbstractDataFrameCursorFactory { public class FullBwdDataFrameCursorFactory extends AbstractDataFrameCursorFactory {
private final FullBwdDataFrameCursor cursor = new FullBwdDataFrameCursor(); private final FullBwdDataFrameCursor cursor = new FullBwdDataFrameCursor();
...@@ -39,16 +40,22 @@ public class FullBwdDataFrameCursorFactory extends AbstractDataFrameCursorFactor ...@@ -39,16 +40,22 @@ public class FullBwdDataFrameCursorFactory extends AbstractDataFrameCursorFactor
@Override @Override
public DataFrameCursor getCursor(SqlExecutionContext executionContext, int order) { public DataFrameCursor getCursor(SqlExecutionContext executionContext, int order) {
if (order == ORDER_DESC || order == ORDER_ANY) { final TableReader reader = getReader(executionContext);
return cursor.of(getReader(executionContext)); try {
} if (order == ORDER_DESC || order == ORDER_ANY) {
return cursor.of(reader);
}
// Create forward scanning cursor when needed. Factory requesting forward cursor must // Create forward scanning cursor when needed. Factory requesting forward cursor must
// still return records in descending timestamp order. // still return records in descending timestamp order.
if (fwdCursor == null) { if (fwdCursor == null) {
fwdCursor = new FullFwdDataFrameCursor(); fwdCursor = new FullFwdDataFrameCursor();
}
return fwdCursor.of(reader);
} catch (Throwable th) {
Misc.free(reader);
throw th;
} }
return fwdCursor.of(getReader(executionContext));
} }
@Override @Override
......
...@@ -27,6 +27,7 @@ package io.questdb.cairo; ...@@ -27,6 +27,7 @@ package io.questdb.cairo;
import io.questdb.cairo.sql.DataFrameCursor; import io.questdb.cairo.sql.DataFrameCursor;
import io.questdb.griffin.PlanSink; import io.questdb.griffin.PlanSink;
import io.questdb.griffin.SqlExecutionContext; import io.questdb.griffin.SqlExecutionContext;
import io.questdb.std.Misc;
public class FullFwdDataFrameCursorFactory extends AbstractDataFrameCursorFactory { public class FullFwdDataFrameCursorFactory extends AbstractDataFrameCursorFactory {
private final FullFwdDataFrameCursor cursor = new FullFwdDataFrameCursor(); private final FullFwdDataFrameCursor cursor = new FullFwdDataFrameCursor();
...@@ -38,16 +39,22 @@ public class FullFwdDataFrameCursorFactory extends AbstractDataFrameCursorFactor ...@@ -38,16 +39,22 @@ public class FullFwdDataFrameCursorFactory extends AbstractDataFrameCursorFactor
@Override @Override
public DataFrameCursor getCursor(SqlExecutionContext executionContext, int order) { public DataFrameCursor getCursor(SqlExecutionContext executionContext, int order) {
if (order == ORDER_ASC || order == ORDER_ANY) { final TableReader reader = getReader(executionContext);
return cursor.of(getReader(executionContext)); try {
} if (order == ORDER_ASC || order == ORDER_ANY) {
return cursor.of(reader);
}
// Create backward scanning cursor when needed. Factory requesting backward cursor must // Create backward scanning cursor when needed. Factory requesting backward cursor must
// still return records in ascending timestamp order. // still return records in ascending timestamp order.
if (bwdCursor == null) { if (bwdCursor == null) {
bwdCursor = new FullBwdDataFrameCursor(); bwdCursor = new FullBwdDataFrameCursor();
}
return bwdCursor.of(reader);
} catch (Throwable th) {
Misc.free(reader);
throw th;
} }
return bwdCursor.of(getReader(executionContext));
} }
@Override @Override
......
...@@ -56,8 +56,14 @@ public class IntervalBwdDataFrameCursorFactory extends AbstractDataFrameCursorFa ...@@ -56,8 +56,14 @@ public class IntervalBwdDataFrameCursorFactory extends AbstractDataFrameCursorFa
@Override @Override
public DataFrameCursor getCursor(SqlExecutionContext executionContext, int order) throws SqlException { public DataFrameCursor getCursor(SqlExecutionContext executionContext, int order) throws SqlException {
if (order == ORDER_DESC || order == ORDER_ANY) { if (order == ORDER_DESC || order == ORDER_ANY) {
cursor.of(getReader(executionContext), executionContext); final TableReader reader = getReader(executionContext);
return cursor; try {
cursor.of(reader, executionContext);
return cursor;
} catch (Throwable th) {
Misc.free(reader);
throw th;
}
} }
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();
} }
......
...@@ -56,15 +56,21 @@ public class IntervalFwdDataFrameCursorFactory extends AbstractDataFrameCursorFa ...@@ -56,15 +56,21 @@ public class IntervalFwdDataFrameCursorFactory extends AbstractDataFrameCursorFa
@Override @Override
public DataFrameCursor getCursor(SqlExecutionContext executionContext, int order) throws SqlException { public DataFrameCursor getCursor(SqlExecutionContext executionContext, int order) throws SqlException {
if (order == ORDER_ASC || order == ORDER_ANY) { final TableReader reader = getReader(executionContext);
cursor.of(getReader(executionContext), executionContext); try {
return cursor; if (order == ORDER_ASC || order == ORDER_ANY) {
} cursor.of(reader, executionContext);
return cursor;
}
if (bwdCursor == null) { if (bwdCursor == null) {
bwdCursor = new IntervalBwdDataFrameCursor(intervals, cursor.getTimestampIndex()); bwdCursor = new IntervalBwdDataFrameCursor(intervals, cursor.getTimestampIndex());
}
return bwdCursor.of(reader, executionContext);
} catch (Throwable th) {
Misc.free(reader);
throw th;
} }
return bwdCursor.of(getReader(executionContext), executionContext);
} }
@Override @Override
......
...@@ -52,8 +52,13 @@ public class DistinctSymbolRecordCursorFactory extends AbstractRecordCursorFacto ...@@ -52,8 +52,13 @@ public class DistinctSymbolRecordCursorFactory extends AbstractRecordCursorFacto
@Override @Override
public RecordCursor getCursor(SqlExecutionContext executionContext) { public RecordCursor getCursor(SqlExecutionContext executionContext) {
TableReader reader = executionContext.getReader(tableToken, tableVersion); TableReader reader = executionContext.getReader(tableToken, tableVersion);
cursor.of(reader); try {
return cursor; cursor.of(reader);
return cursor;
} catch (Throwable th) {
Misc.free(reader);
throw th;
}
} }
@Override @Override
...@@ -126,10 +131,10 @@ public class DistinctSymbolRecordCursorFactory extends AbstractRecordCursorFacto ...@@ -126,10 +131,10 @@ public class DistinctSymbolRecordCursorFactory extends AbstractRecordCursorFacto
} }
public void of(TableReader reader) { public void of(TableReader reader) {
this.reader = reader;
this.symbolMapReader = reader.getSymbolMapReader(columnIndex); this.symbolMapReader = reader.getSymbolMapReader(columnIndex);
this.numberOfSymbols = symbolMapReader.getSymbolCount() + (symbolMapReader.containsNullValue() ? 1 : 0); this.numberOfSymbols = symbolMapReader.getSymbolCount() + (symbolMapReader.containsNullValue() ? 1 : 0);
this.recordA.reset(); this.recordA.reset();
this.reader = reader;
} }
@Override @Override
......
...@@ -27,15 +27,23 @@ import io.questdb.cairo.vm.MemoryCARWImpl; ...@@ -27,15 +27,23 @@ import io.questdb.cairo.vm.MemoryCARWImpl;
import io.questdb.std.MemoryTag; import io.questdb.std.MemoryTag;
import org.junit.Test; import org.junit.Test;
import static io.questdb.cairo.AbstractIntervalDataFrameCursor.binarySearch; import static io.questdb.cairo.AbstractIntervalDataFrameCursor.*;
import static io.questdb.cairo.AbstractIntervalDataFrameCursor.SCAN_UP;
import static io.questdb.cairo.AbstractIntervalDataFrameCursor.SCAN_DOWN;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
public class AbstractIntervalDataFrameCursorTest extends AbstractCairoTest { public class AbstractIntervalDataFrameCursorTest extends AbstractCairoTest {
// see implementation of Vect.binarySearch64Bit // see implementation of Vect.binarySearch64Bit
static final int THRESHOLD = 65; static final int THRESHOLD = 65;
@Test
public void testBinarySearchOnArrayWith4Duplicates() throws Exception {
testBinarySearchOnArrayWithDuplicates(4);
}
@Test
public void testBinarySearchOnArrayWith65Duplicates() throws Exception {
testBinarySearchOnArrayWithDuplicates(THRESHOLD + 5);
}
@Test @Test
public void testBinarySearchOnArrayWithSingleValue() throws Exception { public void testBinarySearchOnArrayWithSingleValue() throws Exception {
assertMemoryLeak(() -> { assertMemoryLeak(() -> {
...@@ -81,16 +89,6 @@ public class AbstractIntervalDataFrameCursorTest extends AbstractCairoTest { ...@@ -81,16 +89,6 @@ public class AbstractIntervalDataFrameCursorTest extends AbstractCairoTest {
}); });
} }
@Test
public void testtestBinarySearchOnArrayWith4Duplicates() throws Exception {
testBinarySearchOnArrayWithDuplicates(4);
}
@Test
public void testtestBinarySearchOnArrayWith65Duplicates() throws Exception {
testBinarySearchOnArrayWithDuplicates(THRESHOLD + 5);
}
private void testBinarySearchOnArrayWithDuplicates(int dupCount) throws Exception { private void testBinarySearchOnArrayWithDuplicates(int dupCount) throws Exception {
assertMemoryLeak(() -> { assertMemoryLeak(() -> {
int size = 1024 * 1024; int size = 1024 * 1024;
...@@ -115,5 +113,4 @@ public class AbstractIntervalDataFrameCursorTest extends AbstractCairoTest { ...@@ -115,5 +113,4 @@ public class AbstractIntervalDataFrameCursorTest extends AbstractCairoTest {
} }
}); });
} }
} }
/*******************************************************************************
* ___ _ ____ ____
* / _ \ _ _ ___ ___| |_| _ \| __ )
* | | | | | | |/ _ \/ __| __| | | | _ \
* | |_| | |_| | __/\__ \ |_| |_| | |_) |
* \__\_\\__,_|\___||___/\__|____/|____/
*
* Copyright (c) 2014-2019 Appsicle
* Copyright (c) 2019-2023 QuestDB
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
******************************************************************************/
package io.questdb.test.griffin;
import io.questdb.test.AbstractCairoTest;
import org.junit.Test;
public class IntervalFilterNoLeakTest extends AbstractCairoTest {
@Test
public void testIntervalBwdNoLeak() throws Exception {
assertMemoryLeak(() -> {
ddl(
"create table x as " +
"(" +
" select" +
" rnd_double(0)*100 a," +
" timestamp_sequence(0, 10000) ts" +
" from long_sequence(3)" +
") timestamp(ts) partition by day"
);
assertException(
"select * from x where ts > now() - '3 day' order by ts desc",
0,
"inconvertible value: `3 day` [STRING -> TIMESTAMP]"
);
});
}
@Test
public void testIntervalFwdNoLeak() throws Exception {
assertMemoryLeak(() -> {
ddl(
"create table x as " +
"(" +
" select" +
" rnd_double(0)*100 a," +
" timestamp_sequence(0, 10000) ts" +
" from long_sequence(3)" +
") timestamp(ts) partition by day"
);
assertException(
"select * from x where ts > now() - '3 day'",
0,
"inconvertible value: `3 day` [STRING -> TIMESTAMP]"
);
});
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册