未验证 提交 ae43a765 编写于 作者: P Patrick Mackinlay 提交者: GitHub

chore(griffin): limiting impact of joins on hardware with configuration

上级 9ccc03d7
......@@ -24,13 +24,23 @@
package io.questdb.griffin.engine.join;
import io.questdb.cairo.*;
import io.questdb.cairo.AbstractRecordCursorFactory;
import io.questdb.cairo.CairoConfiguration;
import io.questdb.cairo.CairoException;
import io.questdb.cairo.ColumnTypes;
import io.questdb.cairo.RecordSink;
import io.questdb.cairo.map.Map;
import io.questdb.cairo.map.MapFactory;
import io.questdb.cairo.map.MapKey;
import io.questdb.cairo.map.MapValue;
import io.questdb.cairo.sql.*;
import io.questdb.cairo.sql.NoRandomAccessRecordCursor;
import io.questdb.cairo.sql.Record;
import io.questdb.cairo.sql.RecordCursor;
import io.questdb.cairo.sql.RecordCursorFactory;
import io.questdb.cairo.sql.RecordMetadata;
import io.questdb.cairo.sql.SymbolTable;
import io.questdb.griffin.SqlExecutionContext;
import io.questdb.griffin.engine.LimitOverflowException;
import io.questdb.std.Misc;
import io.questdb.std.Transient;
......@@ -78,7 +88,13 @@ public class HashJoinLightRecordCursorFactory extends AbstractRecordCursorFactor
public RecordCursor getCursor(SqlExecutionContext executionContext) {
RecordCursor slaveCursor = slaveFactory.getCursor(executionContext);
try {
buildMapOfSlaveRecords(slaveCursor);
long maxInMemoryRows = executionContext.getCairoSecurityContext().getMaxInMemoryRows();
if (maxInMemoryRows > slaveCursor.size()) {
joinKeyMap.setMaxSize(maxInMemoryRows);
buildMapOfSlaveRecords(slaveCursor);
} else {
throw LimitOverflowException.instance(maxInMemoryRows);
}
} catch (CairoException e) {
slaveCursor.close();
throw e;
......
......@@ -31,6 +31,7 @@ import io.questdb.cairo.map.MapKey;
import io.questdb.cairo.map.MapValue;
import io.questdb.cairo.sql.*;
import io.questdb.griffin.SqlExecutionContext;
import io.questdb.griffin.engine.LimitOverflowException;
import io.questdb.std.Misc;
import io.questdb.std.Transient;
......@@ -79,7 +80,13 @@ public class HashJoinRecordCursorFactory extends AbstractRecordCursorFactory {
public RecordCursor getCursor(SqlExecutionContext executionContext) {
RecordCursor slaveCursor = slaveFactory.getCursor(executionContext);
try {
buildMapOfSlaveRecords(slaveCursor);
long maxInMemoryRows = executionContext.getCairoSecurityContext().getMaxInMemoryRows();
if (maxInMemoryRows > slaveCursor.size()) {
joinKeyMap.setMaxSize(maxInMemoryRows);
buildMapOfSlaveRecords(slaveCursor);
} else {
throw LimitOverflowException.instance(maxInMemoryRows);
}
} catch (CairoException e) {
slaveCursor.close();
throw e;
......
......@@ -31,6 +31,7 @@ import io.questdb.cairo.map.MapKey;
import io.questdb.cairo.map.MapValue;
import io.questdb.cairo.sql.*;
import io.questdb.griffin.SqlExecutionContext;
import io.questdb.griffin.engine.LimitOverflowException;
import io.questdb.std.Misc;
import io.questdb.std.Transient;
......@@ -83,7 +84,13 @@ public class HashOuterJoinLightRecordCursorFactory extends AbstractRecordCursorF
public RecordCursor getCursor(SqlExecutionContext executionContext) {
RecordCursor slaveCursor = slaveFactory.getCursor(executionContext);
try {
buildMapOfSlaveRecords(slaveCursor);
long maxInMemoryRows = executionContext.getCairoSecurityContext().getMaxInMemoryRows();
if (maxInMemoryRows > slaveCursor.size()) {
joinKeyMap.setMaxSize(maxInMemoryRows);
buildMapOfSlaveRecords(slaveCursor);
} else {
throw LimitOverflowException.instance(maxInMemoryRows);
}
} catch (CairoException e) {
slaveCursor.close();
throw e;
......
......@@ -31,6 +31,7 @@ import io.questdb.cairo.map.MapKey;
import io.questdb.cairo.map.MapValue;
import io.questdb.cairo.sql.*;
import io.questdb.griffin.SqlExecutionContext;
import io.questdb.griffin.engine.LimitOverflowException;
import io.questdb.std.Misc;
import io.questdb.std.Transient;
......@@ -101,7 +102,13 @@ public class HashOuterJoinRecordCursorFactory extends AbstractRecordCursorFactor
public RecordCursor getCursor(SqlExecutionContext executionContext) {
RecordCursor slaveCursor = slaveFactory.getCursor(executionContext);
try {
buildMapOfSlaveRecords(slaveCursor);
long maxInMemoryRows = executionContext.getCairoSecurityContext().getMaxInMemoryRows();
if (maxInMemoryRows > slaveCursor.size()) {
joinKeyMap.setMaxSize(maxInMemoryRows);
buildMapOfSlaveRecords(slaveCursor);
} else {
throw LimitOverflowException.instance(maxInMemoryRows);
}
} catch (CairoException e) {
slaveCursor.close();
throw e;
......
......@@ -24,19 +24,20 @@
package io.questdb.griffin.engine.orderby;
import java.io.Closeable;
import io.questdb.cairo.ColumnTypes;
import io.questdb.cairo.RecordChain;
import io.questdb.cairo.RecordSink;
import io.questdb.cairo.sql.Record;
import io.questdb.cairo.sql.RecordCursor;
import io.questdb.cairo.sql.SymbolTable;
import io.questdb.griffin.engine.LimitOverflowException;
import io.questdb.std.MemoryPages;
import io.questdb.std.Misc;
import io.questdb.std.Mutable;
import io.questdb.std.Unsafe;
import java.io.Closeable;
public class RecordTreeChain implements Closeable, Mutable {
// P(8) + L + R + C(1) + REF + TOP
private static final int BLOCK_SIZE = 8 + 8 + 8 + 1 + 8 + 8;
......@@ -54,6 +55,8 @@ public class RecordTreeChain implements Closeable, Mutable {
private final RecordComparator comparator;
private final TreeCursor cursor = new TreeCursor();
private long root = -1;
private long size = 0;
private long maxSize = Long.MAX_VALUE;
public RecordTreeChain(
ColumnTypes columnTypes,
......@@ -143,6 +146,7 @@ public class RecordTreeChain implements Closeable, Mutable {
@Override
public void clear() {
size = 0;
root = -1;
this.mem.clear();
recordChain.clear();
......@@ -160,43 +164,48 @@ public class RecordTreeChain implements Closeable, Mutable {
}
public void put(Record record) {
if (root == -1) {
putParent(record);
return;
}
if (size < maxSize) {
size++;
if (root == -1) {
putParent(record);
return;
}
comparator.setLeft(record);
long p = root;
long parent;
int cmp;
do {
parent = p;
long r = refOf(p);
recordChain.recordAt(recordChainRecord, r);
cmp = comparator.compare(recordChainRecord);
if (cmp < 0) {
p = leftOf(p);
} else if (cmp > 0) {
p = rightOf(p);
} else {
setRef(p, recordChain.put(record, r));
return;
}
} while (p > -1);
comparator.setLeft(record);
p = allocateBlock();
setParent(p, parent);
long r = recordChain.put(record, -1L);
setTop(p, r);
setRef(p, r);
long p = root;
long parent;
int cmp;
do {
parent = p;
long r = refOf(p);
recordChain.recordAt(recordChainRecord, r);
cmp = comparator.compare(recordChainRecord);
if (cmp < 0) {
p = leftOf(p);
} else if (cmp > 0) {
p = rightOf(p);
setLeft(parent, p);
} else {
setRef(p, recordChain.put(record, r));
return;
setRight(parent, p);
}
} while (p > -1);
p = allocateBlock();
setParent(p, parent);
long r = recordChain.put(record, -1L);
setTop(p, r);
setRef(p, r);
if (cmp < 0) {
setLeft(parent, p);
fix(p);
} else {
setRight(parent, p);
throw LimitOverflowException.instance(maxSize);
}
fix(p);
}
private long allocateBlock() {
......@@ -298,6 +307,10 @@ public class RecordTreeChain implements Closeable, Mutable {
}
}
public void setMaxSize(long maxSize) {
this.maxSize = maxSize;
}
public class TreeCursor implements RecordCursor {
private long current;
private RecordCursor base;
......
......@@ -26,12 +26,14 @@ package io.questdb.griffin.engine.orderby;
import io.questdb.cairo.AbstractRecordCursorFactory;
import io.questdb.cairo.CairoConfiguration;
import io.questdb.cairo.CairoException;
import io.questdb.cairo.ColumnTypes;
import io.questdb.cairo.RecordSink;
import io.questdb.cairo.sql.RecordCursor;
import io.questdb.cairo.sql.RecordCursorFactory;
import io.questdb.cairo.sql.RecordMetadata;
import io.questdb.griffin.SqlExecutionContext;
import io.questdb.griffin.engine.LimitOverflowException;
public class SortedRecordCursorFactory extends AbstractRecordCursorFactory {
private final RecordCursorFactory base;
......@@ -64,8 +66,19 @@ public class SortedRecordCursorFactory extends AbstractRecordCursorFactory {
@Override
public RecordCursor getCursor(SqlExecutionContext executionContext) {
this.cursor.of(base.getCursor(executionContext));
return cursor;
RecordCursor baseCursor = base.getCursor(executionContext);
try {
long maxInMemoryRows = executionContext.getCairoSecurityContext().getMaxInMemoryRows();
if (maxInMemoryRows > baseCursor.size()) {
chain.setMaxSize(maxInMemoryRows);
this.cursor.of(baseCursor);
return cursor;
}
throw LimitOverflowException.instance(maxInMemoryRows);
} catch (CairoException ex) {
baseCursor.close();
throw ex;
}
}
@Override
......
......@@ -56,7 +56,6 @@ class UnionRecordCursor implements NoRandomAccessRecordCursor {
this.slaveCursor = slaveCursor;
this.masterRecord = masterCursor.getRecord();
this.slaveRecord = slaveCursor.getRecord();
map.clear();
toTop();
}
......
......@@ -33,6 +33,7 @@ import io.questdb.cairo.sql.RecordCursor;
import io.questdb.cairo.sql.RecordCursorFactory;
import io.questdb.cairo.sql.RecordMetadata;
import io.questdb.griffin.SqlExecutionContext;
import io.questdb.griffin.engine.LimitOverflowException;
import io.questdb.std.Misc;
public class UnionRecordCursorFactory implements RecordCursorFactory {
......@@ -58,11 +59,19 @@ public class UnionRecordCursorFactory implements RecordCursorFactory {
@Override
public RecordCursor getCursor(SqlExecutionContext executionContext) {
cursor.of(
masterFactory.getCursor(executionContext),
slaveFactory.getCursor(executionContext)
);
return cursor;
long maxInMemoryRows = executionContext.getCairoSecurityContext().getMaxInMemoryRows();
RecordCursor masterCursor = masterFactory.getCursor(executionContext);
RecordCursor slaveCursor = slaveFactory.getCursor(executionContext);
if (maxInMemoryRows > (masterCursor.size() + slaveCursor.size())) {
map.setMaxSize(maxInMemoryRows);
cursor.of(
masterCursor,
slaveCursor);
return cursor;
}
masterCursor.close();
slaveCursor.close();
throw LimitOverflowException.instance(maxInMemoryRows);
}
@Override
......
......@@ -131,7 +131,7 @@ public class SecurityTest extends AbstractGriffinTest {
}
@Test
public void testMaxInMemoryRowsWithOrderBy() throws Exception {
public void testMaxInMemoryRowsWithRandomAccessOrderBy() throws Exception {
assertMemoryLeak(() -> {
sqlExecutionContext.getRandom().reset();
compiler.compile("create table tb1 as (select" +
......@@ -157,6 +157,38 @@ public class SecurityTest extends AbstractGriffinTest {
});
}
@Test
public void testMaxInMemoryRowsWithoutRandomAccessOrderBy() throws Exception {
assertMemoryLeak(() -> {
sqlExecutionContext.getRandom().reset();
compiler.compile("create table tb1 as (select" +
" rnd_symbol(4,4,4,20000) sym1," +
" rnd_double(2) d1," +
" timestamp_sequence(0, 1000001000) ts1" +
" from long_sequence(10)) timestamp(ts1)", sqlExecutionContext);
compiler.compile("create table tb2 as (select" +
" rnd_symbol(3,3,3,20000) sym2," +
" rnd_double(2) d2," +
" timestamp_sequence(0, 1000000000) ts2" +
" from long_sequence(10)) timestamp(ts2)", sqlExecutionContext);
assertQuery(
"sym1\tsym2\nVTJW\tFJG\nVTJW\tULO\n",
"select sym1, sym2 from tb1 asof join tb2 where d1 < 0.3 ORDER BY d1",
null,
true, readOnlyExecutionContext);
try {
assertQuery(
"sym1\tsym2\nVTJW\tFJG\nVTJW\tULO\nPEHN\tRQQ\n",
"select sym1, sym2 from tb1 asof join tb2 where d1 < 0.34 ORDER BY d1",
null,
true, readOnlyExecutionContext);
Assert.fail();
} catch (Exception ex) {
Assert.assertTrue(ex.toString().contains("limit of 2 exceeded"));
}
});
}
@Test
public void testMaxInMemoryRowsWithDistinct() throws Exception {
assertMemoryLeak(() -> {
......@@ -351,4 +383,174 @@ public class SecurityTest extends AbstractGriffinTest {
});
}
@Test
public void testMaxInMemoryRowsWithUnion() throws Exception {
assertMemoryLeak(() -> {
sqlExecutionContext.getRandom().reset();
compiler.compile("create table tb1 as (select" +
" rnd_symbol(4,4,4,20000) sym1," +
" rnd_double(2) d1," +
" timestamp_sequence(0, 1000000000) ts1" +
" from long_sequence(10)) timestamp(ts1)", sqlExecutionContext);
compiler.compile("create table tb2 as (select" +
" rnd_symbol(3,3,3,20000) sym2," +
" rnd_double(2) d2," +
" timestamp_sequence(10000000000, 1000000000) ts2" +
" from long_sequence(1)) timestamp(ts2)", sqlExecutionContext);
assertQuery(
"sym1\td1\tts1\nVTJW\t0.1985581797355932\t1970-01-01T01:06:40.000000Z\nRQQ\t0.5522494170511608\t1970-01-01T02:46:40.000000Z\n",
"select * from tb1 where d1 < 0.2 union tb2",
"ts1",
false, readOnlyExecutionContext);
try {
assertQuery(
"sym1\td1\tts1\nVTJW\t0.1985581797355932\t1970-01-01T01:06:40.000000Z\nVTJW\t0.21583224269349388\t1970-01-01T01:40:00.000000Z\nRQQ\t0.5522494170511608\t1970-01-01T02:46:40.000000Z\n",
"select * from tb1 where d1 < 0.3 union tb2",
"ts1",
false, readOnlyExecutionContext);
Assert.fail();
} catch (Exception ex) {
Assert.assertTrue(ex.toString().contains("limit of 2 exceeded"));
}
});
}
@Test
public void testMaxInMemoryRowsWithInnerJoin() throws Exception {
assertMemoryLeak(() -> {
sqlExecutionContext.getRandom().reset();
compiler.compile("create table tb1 as (select" +
" rnd_symbol(4,4,4,20000) sym1," +
" rnd_double(2) d1," +
" timestamp_sequence(0, 1000000000) ts1" +
" from long_sequence(10)) timestamp(ts1)", sqlExecutionContext);
compiler.compile("create table tb2 as (select" +
" rnd_symbol(3,3,3,20000) sym2," +
" rnd_double(2) d2," +
" timestamp_sequence(0, 1000000000) ts2" +
" from long_sequence(10)) timestamp(ts2)", sqlExecutionContext);
assertQuery(
"sym1\tsym2\nVTJW\tFJG\nVTJW\tULO\n",
"select sym1, sym2 from tb1 inner join tb2 on tb2.ts2=tb1.ts1 where d1 < 0.3",
null,
false, sqlExecutionContext);
try {
assertQuery(
"sym1\tsym2\nVTJW\tFJG\nVTJW\tULO\n",
"select sym1, sym2 from tb1 inner join tb2 on tb2.ts2=tb1.ts1 where d1 < 0.3",
null,
false, readOnlyExecutionContext);
Assert.fail();
} catch (Exception ex) {
Assert.assertTrue(ex.toString().contains("limit of 2 exceeded"));
}
});
}
@Test
public void testMaxInMemoryRowsWithOuterJoin() throws Exception {
assertMemoryLeak(() -> {
sqlExecutionContext.getRandom().reset();
compiler.compile("create table tb1 as (select" +
" rnd_symbol(4,4,4,20000) sym1," +
" rnd_double(2) d1," +
" timestamp_sequence(0, 1000000000) ts1" +
" from long_sequence(10)) timestamp(ts1)", sqlExecutionContext);
compiler.compile("create table tb2 as (select" +
" rnd_symbol(3,3,3,20000) sym2," +
" rnd_double(2) d2," +
" timestamp_sequence(0, 1000000000) ts2" +
" from long_sequence(10)) timestamp(ts2)", sqlExecutionContext);
assertQuery(
"sym1\tsym2\nVTJW\tFJG\nVTJW\tULO\n",
"select sym1, sym2 from tb1 outer join tb2 on tb2.ts2=tb1.ts1 where d1 < 0.3",
null,
false, sqlExecutionContext);
try {
assertQuery(
"sym1\tsym2\nVTJW\tFJG\nVTJW\tULO\n",
"select sym1, sym2 from tb1 outer join tb2 on tb2.ts2=tb1.ts1 where d1 < 0.3",
null,
false, readOnlyExecutionContext);
Assert.fail();
} catch (Exception ex) {
Assert.assertTrue(ex.toString().contains("limit of 2 exceeded"));
}
});
}
@Test
public void testMaxInMemoryRowsWithFullFatInnerJoin() throws Exception {
assertMemoryLeak(() -> {
sqlExecutionContext.getRandom().reset();
compiler.compile("create table tb1 as (select" +
" rnd_symbol(4,4,4,20000) sym1," +
" rnd_double(2) d1," +
" timestamp_sequence(0, 1000000000) ts1" +
" from long_sequence(10)) timestamp(ts1)", sqlExecutionContext);
compiler.compile("create table tb2 as (select" +
" rnd_symbol(3,3,3,20000) sym2," +
" rnd_double(2) d2," +
" timestamp_sequence(0, 1000000000) ts2" +
" from long_sequence(10)) timestamp(ts2)", sqlExecutionContext);
try {
compiler.setFullSatJoins(true);
assertQuery(
"sym1\tsym2\nVTJW\tFJG\nVTJW\tULO\n",
"select sym1, sym2 from tb1 inner join tb2 on tb2.ts2=tb1.ts1 where d1 < 0.3",
null,
false, sqlExecutionContext);
try {
assertQuery(
"sym1\tsym2\nVTJW\tFJG\nVTJW\tULO\n",
"select sym1, sym2 from tb1 inner join tb2 on tb2.ts2=tb1.ts1 where d1 < 0.3",
null,
false, readOnlyExecutionContext);
Assert.fail();
} catch (Exception ex) {
Assert.assertTrue(ex.toString().contains("limit of 2 exceeded"));
}
} finally {
compiler.setFullSatJoins(false);
}
});
}
@Test
public void testMaxInMemoryRowsWithFullFatOuterJoin() throws Exception {
assertMemoryLeak(() -> {
sqlExecutionContext.getRandom().reset();
compiler.compile("create table tb1 as (select" +
" rnd_symbol(4,4,4,20000) sym1," +
" rnd_double(2) d1," +
" timestamp_sequence(0, 1000000000) ts1" +
" from long_sequence(10)) timestamp(ts1)", sqlExecutionContext);
compiler.compile("create table tb2 as (select" +
" rnd_symbol(3,3,3,20000) sym2," +
" rnd_double(2) d2," +
" timestamp_sequence(0, 1000000000) ts2" +
" from long_sequence(10)) timestamp(ts2)", sqlExecutionContext);
try {
compiler.setFullSatJoins(true);
assertQuery(
"sym1\tsym2\nVTJW\tFJG\nVTJW\tULO\n",
"select sym1, sym2 from tb1 outer join tb2 on tb2.ts2=tb1.ts1 where d1 < 0.3",
null,
false, sqlExecutionContext);
try {
assertQuery(
"sym1\tsym2\nVTJW\tFJG\nVTJW\tULO\n",
"select sym1, sym2 from tb1 outer join tb2 on tb2.ts2=tb1.ts1 where d1 < 0.3",
null,
false, readOnlyExecutionContext);
Assert.fail();
} catch (Exception ex) {
Assert.assertTrue(ex.toString().contains("limit of 2 exceeded"));
}
} finally {
compiler.setFullSatJoins(false);
}
});
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册