未验证 提交 12adcc41 编写于 作者: V Vlad Ilyushchenko 提交者: GitHub

fix: resize issue with DirectLongList. Fixed #748 (#759)

上级 637dca66
......@@ -1233,7 +1233,6 @@ public class SqlCodeGenerator implements Mutable {
final RecordCursorFactory factory = generateSubQuery(model, executionContext);
// we require timestamp
// todo: this looks like generic code
final int timestampIndex = getTimestampIndex(model, factory);
if (timestampIndex == -1) {
Misc.free(factory);
......
......@@ -65,6 +65,7 @@ public final class SqlParser {
private final PostOrderTreeTraversalAlgo.Visitor rewriteConcat0Ref = this::rewriteConcat0;
private final PostOrderTreeTraversalAlgo.Visitor rewriteTypeQualifier0Ref = this::rewriteTypeQualifier0;
private boolean subQueryMode = false;
SqlParser(
CairoConfiguration configuration,
SqlOptimiser optimiser,
......@@ -147,18 +148,18 @@ public final class SqlParser {
}
private void expectBy(GenericLexer lexer) throws SqlException {
CharSequence tok = optTok(lexer);
if (tok == null || !isByKeyword(tok)) {
throw SqlException.$((lexer.getPosition()), "'by' expected");
if (isByKeyword(tok(lexer, "by"))) {
return;
}
throw SqlException.$((lexer.getPosition()), "'by' expected");
}
private ExpressionNode expectExpr(GenericLexer lexer) throws SqlException {
ExpressionNode n = expr(lexer, (QueryModel) null);
if (n == null) {
throw SqlException.$(lexer.getUnparsed() == null ? lexer.getPosition() : lexer.lastTokenPosition(), "Expression expected");
final ExpressionNode n = expr(lexer, (QueryModel) null);
if (n != null) {
return n;
}
return n;
throw SqlException.$(lexer.getUnparsed() == null ? lexer.getPosition() : lexer.lastTokenPosition(), "Expression expected");
}
private int expectInt(GenericLexer lexer) throws SqlException {
......@@ -1421,11 +1422,6 @@ public final class SqlParser {
return parent;
}
private ExpressionNode rewriteTypeQualifier(ExpressionNode parent) throws SqlException {
traversalAlgo.traverse(parent, rewriteTypeQualifier0Ref);
return parent;
}
private void rewriteConcat0(ExpressionNode node) {
if (node.type == ExpressionNode.OPERATION && isConcatOperator(node.token)) {
node.type = ExpressionNode.FUNCTION;
......@@ -1465,6 +1461,15 @@ public final class SqlParser {
}
}
private ExpressionNode rewriteKnownStatements(ExpressionNode parent) throws SqlException {
return rewriteConcat(rewriteCase(rewriteCount(rewriteTypeQualifier(parent))));
}
private ExpressionNode rewriteTypeQualifier(ExpressionNode parent) throws SqlException {
traversalAlgo.traverse(parent, rewriteTypeQualifier0Ref);
return parent;
}
/**
* Rewrites 'abc'::blah - type qualifier
*
......@@ -1481,10 +1486,6 @@ public final class SqlParser {
}
}
private ExpressionNode rewriteKnownStatements(ExpressionNode parent) throws SqlException {
return rewriteConcat(rewriteCase(rewriteCount(rewriteTypeQualifier(parent))));
}
private int toColumnType(GenericLexer lexer, CharSequence tok) throws SqlException {
final int type = ColumnType.columnTypeOf(tok);
if (type == -1) {
......
......@@ -47,24 +47,25 @@ class LatestByAllRecordCursor extends AbstractRecordListCursor {
@Override
protected void buildTreeMap(SqlExecutionContext executionContext) {
map.clear();
DataFrame frame;
while ((frame = this.dataFrameCursor.next()) != null) {
final int partitionIndex = frame.getPartitionIndex();
final long rowLo = frame.getRowLo();
final long rowHi = frame.getRowHi() - 1;
try {
while ((frame = this.dataFrameCursor.next()) != null) {
final int partitionIndex = frame.getPartitionIndex();
final long rowLo = frame.getRowLo();
final long rowHi = frame.getRowHi() - 1;
recordA.jumpTo(frame.getPartitionIndex(), rowHi);
for (long row = rowHi; row >= rowLo; row--) {
recordA.setRecordIndex(row);
MapKey key = map.withKey();
key.put(recordA, recordSink);
if (key.create()) {
rows.add(Rows.toRowID(partitionIndex, row));
recordA.jumpTo(frame.getPartitionIndex(), rowHi);
for (long row = rowHi; row >= rowLo; row--) {
recordA.setRecordIndex(row);
MapKey key = map.withKey();
key.put(recordA, recordSink);
if (key.create()) {
rows.add(Rows.toRowID(partitionIndex, row));
}
}
}
} finally {
map.clear();
}
map.clear();
}
}
......@@ -808,6 +808,9 @@ public class QueryModel implements Mutable, ExecutionModel, AliasTranslator, Sin
if (getLatestBy().size() > 0) {
sink.put(" latest by ");
for (int i = 0, n = getLatestBy().size(); i < n; i++) {
if (i > 0) {
sink.put(',');
}
getLatestBy().getQuick(i).toSink(sink);
}
}
......
......@@ -44,8 +44,8 @@ public class DirectLongList implements Mutable, Closeable {
public DirectLongList(long capacity) {
this.pow2 = 3;
this.address = Unsafe.malloc(this.capacity = ((capacity << 3) + Misc.CACHE_LINE_SIZE));
this.start = this.pos = address + (address & (Misc.CACHE_LINE_SIZE - 1));
this.address = Unsafe.malloc(this.capacity = ((capacity << 3)));
this.start = this.pos = address;
this.limit = pos + ((capacity - 1) << 3);
this.onePow2 = (1 << 3);
}
......@@ -65,9 +65,9 @@ public class DirectLongList implements Mutable, Closeable {
this.pos += count;
}
public int binarySearch(long v) {
int low = 0;
int high = (int) ((pos - start) >> 3) - 1;
public long binarySearch(long v) {
long low = 0;
long high = ((pos - start) >> 3) - 1;
while (low <= high) {
......@@ -75,7 +75,7 @@ public class DirectLongList implements Mutable, Closeable {
return scanSearch(v, low, high);
}
int mid = (low + high) >>> 1;
long mid = (low + high) >>> 1;
long midVal = Unsafe.getUnsafe().getLong(start + (mid << 3));
if (midVal < v)
......@@ -109,8 +109,8 @@ public class DirectLongList implements Mutable, Closeable {
return Unsafe.getUnsafe().getLong(start + (p << 3));
}
public int scanSearch(long v, int low, int high) {
for (int i = low; i < high; i++) {
public long scanSearch(long v, long low, long high) {
for (long i = low; i < high; i++) {
long f = get(i);
if (f == v) {
return i;
......@@ -141,7 +141,7 @@ public class DirectLongList implements Mutable, Closeable {
return (int) ((pos - start) >> pow2);
}
public DirectLongList subset(int lo, int hi) {
public DirectLongList subset(long lo, long hi) {
DirectLongList that = new DirectLongList(hi - lo);
Unsafe.getUnsafe().copyMemory(start + (lo << 3), that.start, (hi - lo) << 3);
that.pos += (hi - lo) << 3;
......@@ -175,11 +175,10 @@ public class DirectLongList implements Mutable, Closeable {
private void extend(long capacity) {
final long oldCapacity = this.capacity;
long address = Unsafe.realloc(this.address, oldCapacity, this.capacity = ((capacity << pow2) + Misc.CACHE_LINE_SIZE));
long start = address + (address & (Misc.CACHE_LINE_SIZE - 1));
this.pos = this.pos - this.start + start;
this.limit = start + ((capacity - 1) << pow2);
this.pos = this.pos - this.start + address;
this.limit = address + ((capacity - 1) << pow2);
this.address = address;
this.start = start;
LOG.info().$("resized [old=").$(oldCapacity).$(", new=").$(this.capacity).$(']').$();
this.start = address;
LOG.debug().$("resized [old=").$(oldCapacity).$(", new=").$(this.capacity).$(']').$();
}
}
......@@ -361,10 +361,14 @@ public class AbstractGriffinTest extends AbstractCairoTest {
long timestamp = Long.MIN_VALUE;
try (RecordCursor cursor = factory.getCursor(sqlExecutionContext)) {
final Record record = cursor.getRecord();
long c = 0;
while (cursor.hasNext()) {
long ts = record.getTimestamp(index);
Assert.assertTrue(timestamp <= ts);
if (timestamp > ts) {
Assert.fail("record #" + c);
}
timestamp = ts;
c++;
}
}
}
......
......@@ -2504,6 +2504,39 @@ public class SqlCodeGeneratorTest extends AbstractGriffinTest {
);
}
@Test
public void testLatestByTimestampInclusion() throws Exception {
assertQuery("ts\tmarket_type\tavg\n" +
"1970-01-01T00:00:00.000000Z\taaa\t0.49992728629932576\n" +
"1970-01-01T00:00:00.000000Z\tbbb\t0.500285563758478\n" +
"1970-01-01T00:00:01.000000Z\taaa\t0.500040169925671\n" +
"1970-01-01T00:00:01.000000Z\tbbb\t0.5008686113849173\n" +
"1970-01-01T00:00:02.000000Z\taaa\t0.49977074601999855\n" +
"1970-01-01T00:00:02.000000Z\tbbb\t0.4999258418217269\n" +
"1970-01-01T00:00:03.000000Z\taaa\t0.5003595019568708\n" +
"1970-01-01T00:00:03.000000Z\tbbb\t0.5002857992170555\n" +
"1970-01-01T00:00:04.000000Z\tbbb\t0.4997116251279621\n" +
"1970-01-01T00:00:04.000000Z\taaa\t0.5006208473770267\n" +
"1970-01-01T00:00:05.000000Z\tbbb\t0.49988619432529985\n" +
"1970-01-01T00:00:05.000000Z\taaa\t0.5002852550150528\n" +
"1970-01-01T00:00:06.000000Z\taaa\t0.4998229395659802\n" +
"1970-01-01T00:00:06.000000Z\tbbb\t0.4997012831335711\n" +
"1970-01-01T00:00:07.000000Z\tbbb\t0.49945806525231845\n" +
"1970-01-01T00:00:07.000000Z\taaa\t0.4995901449794158\n" +
"1970-01-01T00:00:08.000000Z\taaa\t0.5002616949495469\n" +
"1970-01-01T00:00:08.000000Z\tbbb\t0.5005399447758458\n" +
"1970-01-01T00:00:09.000000Z\taaa\t0.5003054203632804\n" +
"1970-01-01T00:00:09.000000Z\tbbb\t0.500094369884023\n",
"select ts, market_type, avg(bid_price) FROM market_updates LATEST BY ts, market_type SAMPLE BY 1s",
"create table market_updates as (select rnd_symbol('aaa','bbb') market_type, rnd_double() bid_price, timestamp_sequence(0,1) ts from long_sequence(10000000)" +
") timestamp(ts)",
"ts",
false,
true,
false
);
}
@Test
public void testLatestByNonExistingColumn() throws Exception {
assertFailure(
......
......@@ -3181,11 +3181,11 @@ public class SqlParserTest extends AbstractGriffinTest {
}
@Test
public void testLatestBySyntax() throws Exception {
assertSyntaxError(
"select * from tab latest",
24,
"'by' expected"
public void testLatestByMultipleColumns() throws SqlException {
assertQuery(
"select-group-by ts, market_type, avg(bid_price) avg from (select [ts, market_type, bid_price] from market_updates timestamp (ts) latest by ts,market_type) sample by 1s",
"select ts, market_type, avg(bid_price) FROM market_updates LATEST BY ts, market_type SAMPLE BY 1s",
modelOf("market_updates").timestamp("ts").col("market_type", ColumnType.SYMBOL).col("bid_price", ColumnType.DOUBLE)
);
}
......@@ -5427,6 +5427,15 @@ public class SqlParserTest extends AbstractGriffinTest {
);
}
@Test
public void testLatestBySyntax() throws Exception {
assertSyntaxError(
"select * from tab latest",
24,
"by expected"
);
}
@Test
public void testUnionMoveWhereIntoSubQuery() throws Exception {
assertQuery(
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册