未验证 提交 6b0569d8 编写于 作者: P Patrick Mackinlay 提交者: GitHub

chore: questdb can be configured to limit the number of in memory rows (#329)

上级 ae43a765
......@@ -24,20 +24,19 @@
package io.questdb.griffin.engine.orderby;
import java.io.Closeable;
import io.questdb.cairo.ColumnTypes;
import io.questdb.cairo.RecordChain;
import io.questdb.cairo.RecordSink;
import io.questdb.cairo.sql.Record;
import io.questdb.cairo.sql.RecordCursor;
import io.questdb.cairo.sql.SymbolTable;
import io.questdb.griffin.engine.LimitOverflowException;
import io.questdb.std.MemoryPages;
import io.questdb.std.Misc;
import io.questdb.std.Mutable;
import io.questdb.std.Unsafe;
import java.io.Closeable;
public class RecordTreeChain implements Closeable, Mutable {
// P(8) + L + R + C(1) + REF + TOP
private static final int BLOCK_SIZE = 8 + 8 + 8 + 1 + 8 + 8;
......@@ -55,8 +54,6 @@ public class RecordTreeChain implements Closeable, Mutable {
private final RecordComparator comparator;
private final TreeCursor cursor = new TreeCursor();
private long root = -1;
private long size = 0;
private long maxSize = Long.MAX_VALUE;
public RecordTreeChain(
ColumnTypes columnTypes,
......@@ -146,7 +143,6 @@ public class RecordTreeChain implements Closeable, Mutable {
@Override
public void clear() {
size = 0;
root = -1;
this.mem.clear();
recordChain.clear();
......@@ -164,48 +160,43 @@ public class RecordTreeChain implements Closeable, Mutable {
}
public void put(Record record) {
if (size < maxSize) {
size++;
if (root == -1) {
putParent(record);
return;
}
comparator.setLeft(record);
long p = root;
long parent;
int cmp;
do {
parent = p;
long r = refOf(p);
recordChain.recordAt(recordChainRecord, r);
cmp = comparator.compare(recordChainRecord);
if (cmp < 0) {
p = leftOf(p);
} else if (cmp > 0) {
p = rightOf(p);
} else {
setRef(p, recordChain.put(record, r));
return;
}
} while (p > -1);
if (root == -1) {
putParent(record);
return;
}
p = allocateBlock();
setParent(p, parent);
long r = recordChain.put(record, -1L);
setTop(p, r);
setRef(p, r);
comparator.setLeft(record);
long p = root;
long parent;
int cmp;
do {
parent = p;
long r = refOf(p);
recordChain.recordAt(recordChainRecord, r);
cmp = comparator.compare(recordChainRecord);
if (cmp < 0) {
setLeft(parent, p);
p = leftOf(p);
} else if (cmp > 0) {
p = rightOf(p);
} else {
setRight(parent, p);
setRef(p, recordChain.put(record, r));
return;
}
fix(p);
} while (p > -1);
p = allocateBlock();
setParent(p, parent);
long r = recordChain.put(record, -1L);
setTop(p, r);
setRef(p, r);
if (cmp < 0) {
setLeft(parent, p);
} else {
throw LimitOverflowException.instance(maxSize);
setRight(parent, p);
}
fix(p);
}
private long allocateBlock() {
......@@ -307,10 +298,6 @@ public class RecordTreeChain implements Closeable, Mutable {
}
}
public void setMaxSize(long maxSize) {
this.maxSize = maxSize;
}
public class TreeCursor implements RecordCursor {
private long current;
private RecordCursor base;
......
......@@ -24,10 +24,12 @@
package io.questdb.griffin.engine.orderby;
import io.questdb.cairo.CairoException;
import io.questdb.cairo.sql.DelegatingRecordCursor;
import io.questdb.cairo.sql.Record;
import io.questdb.cairo.sql.RecordCursor;
import io.questdb.cairo.sql.SymbolTable;
import io.questdb.griffin.engine.LimitOverflowException;
class SortedRecordCursor implements DelegatingRecordCursor {
private final RecordTreeChain chain;
......@@ -80,17 +82,36 @@ class SortedRecordCursor implements DelegatingRecordCursor {
@Override
public void of(RecordCursor base) {
this.chainCursor = chain.getCursor(base);
final Record record = base.getRecord();
of(base, Long.MAX_VALUE);
}
chain.clear();
while (base.hasNext()) {
// Tree chain is liable to re-position record to
// other rows to do record comparison. We must use our
// own record instance in case base cursor keeps
// state in the record it returns.
chain.put(record);
public void of(RecordCursor base, long maxSize) {
try {
if (maxSize > base.size()) {
this.chainCursor = chain.getCursor(base);
final Record record = base.getRecord();
long nRows = 0;
chain.clear();
while (base.hasNext()) {
if (nRows < maxSize) {
nRows++;
// Tree chain is liable to re-position record to
// other rows to do record comparison. We must use our
// own record instance in case base cursor keeps
// state in the record it returns.
chain.put(record);
} else {
throw LimitOverflowException.instance(maxSize);
}
}
chainCursor.toTop();
} else {
throw LimitOverflowException.instance(maxSize);
}
} catch (CairoException ex) {
base.close();
throw ex;
}
chainCursor.toTop();
}
}
......@@ -26,14 +26,12 @@ package io.questdb.griffin.engine.orderby;
import io.questdb.cairo.AbstractRecordCursorFactory;
import io.questdb.cairo.CairoConfiguration;
import io.questdb.cairo.CairoException;
import io.questdb.cairo.ColumnTypes;
import io.questdb.cairo.RecordSink;
import io.questdb.cairo.sql.RecordCursor;
import io.questdb.cairo.sql.RecordCursorFactory;
import io.questdb.cairo.sql.RecordMetadata;
import io.questdb.griffin.SqlExecutionContext;
import io.questdb.griffin.engine.LimitOverflowException;
public class SortedRecordCursorFactory extends AbstractRecordCursorFactory {
private final RecordCursorFactory base;
......@@ -66,19 +64,8 @@ public class SortedRecordCursorFactory extends AbstractRecordCursorFactory {
@Override
public RecordCursor getCursor(SqlExecutionContext executionContext) {
RecordCursor baseCursor = base.getCursor(executionContext);
try {
long maxInMemoryRows = executionContext.getCairoSecurityContext().getMaxInMemoryRows();
if (maxInMemoryRows > baseCursor.size()) {
chain.setMaxSize(maxInMemoryRows);
this.cursor.of(baseCursor);
return cursor;
}
throw LimitOverflowException.instance(maxInMemoryRows);
} catch (CairoException ex) {
baseCursor.close();
throw ex;
}
this.cursor.of(base.getCursor(executionContext), executionContext.getCairoSecurityContext().getMaxInMemoryRows());
return cursor;
}
@Override
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册