未验证 提交 a2b3276f 编写于 作者: A Alex Pelagenko 提交者: GitHub

fix(griffin): correctly calculate vector frames when partition interval is used (#804)

上级 13b1f55c
......@@ -24,10 +24,7 @@
package io.questdb.griffin.engine.table;
import io.questdb.cairo.NullColumn;
import io.questdb.cairo.ReadOnlyColumn;
import io.questdb.cairo.SymbolMapReader;
import io.questdb.cairo.TableReader;
import io.questdb.cairo.*;
import io.questdb.cairo.sql.*;
import io.questdb.griffin.SqlExecutionContext;
import io.questdb.std.IntList;
......@@ -182,7 +179,7 @@ public class DataFrameRecordCursorFactory extends AbstractDataFrameRecordCursorF
if (top >= partitionLo) {
loRemaining = 0;
top -= partitionLo;
topsRemaining.setQuick(i, top);
topsRemaining.setQuick(i, Math.min(top, partitionHi - partitionLo));
} else {
topsRemaining.setQuick(i, 0);
loRemaining -= top;
......@@ -192,22 +189,19 @@ public class DataFrameRecordCursorFactory extends AbstractDataFrameRecordCursorF
final ReadOnlyColumn col = reader.getColumn(TableReader.getPrimaryColumnIndex(base, columnIndexes.getQuick(i)));
if (col instanceof NullColumn) {
columnPageNextAddress.setQuick(i, 0);
pageNRowsRemaining.setQuick(i, partitionSize - partitionLo);
pageNRowsRemaining.setQuick(i, 0);
} else {
int page = pages.getQuick(i);
while (true) {
long pageSize = col.getPageSize(page) >> columnSizes.getQuick(i);
if (pageSize > loRemaining) {
long addr = col.getPageAddress(page);
addr += partitionLo << columnSizes.getQuick(i);
columnPageNextAddress.setQuick(i, addr);
pageNRowsRemaining.setQuick(i, pageSize - partitionLo);
pages.setQuick(i, page);
break;
}
loRemaining -= pageSize;
page++;
long pageSize = col.getPageSize(page) >> columnSizes.getQuick(i);
if (pageSize < loRemaining) {
throw CairoException.instance(0).put("partition is not mapped as single page, cannot perform vector calculation");
}
long addr = col.getPageAddress(page);
addr += loRemaining << columnSizes.getQuick(i);
columnPageNextAddress.setQuick(i, addr);
long pageHi = Math.min(partitionHi, pageSize);
pageNRowsRemaining.setQuick(i, pageHi - partitionLo);
pages.setQuick(i, page);
}
}
}
......@@ -270,6 +264,9 @@ public class DataFrameRecordCursorFactory extends AbstractDataFrameRecordCursorF
}
}
partitionRemaining -= min;
if (partitionRemaining < 0) {
throw CairoException.instance(0).put("incorrect frame built for vector calculation");
}
return frame;
}
......
......@@ -865,4 +865,76 @@ public class KeyedAggregationTest extends AbstractGriffinTest {
}
});
}
@Test
public void testSumInTimestampRange() throws Exception {
long step = 1000000L;
long count = 1000000L;
long increment = count / 10;
assertMemoryLeak(() -> {
compiler.compile("create table tab as (select rnd_symbol('s1','s2','s3', null) s1, 0.5 val, timestamp_sequence(0, " + step + ") t from long_sequence(" + count + ")) timestamp(t) partition by DAY", sqlExecutionContext);
for (long ts = 0; ts < count; ts += increment) {
try (
RecordCursorFactory factory = compiler.compile("select sum(val) s from tab where t >= CAST(" + step + " AS TIMESTAMP) AND t < CAST(" + (ts * step) + " AS TIMESTAMP)", sqlExecutionContext).getRecordCursorFactory();
RecordCursor cursor = factory.getCursor(sqlExecutionContext)
) {
String value = String.valueOf((ts - 1) * 0.5);
String expected = "s\n" +
(ts > 0 ? value : "NaN") + "\n";
sink.clear();
printer.print(cursor, factory.getMetadata(), true);
TestUtils.assertEquals(expected, sink);
}
}
});
}
@Test
public void testSumInTimestampRangeWithColTop() throws Exception {
final long step = 100000L;
final long count = 1000000L;
final long increment = count / 10;
assertMemoryLeak(() -> {
String createSql = "create table tab as (select rnd_symbol('s1','s2','s3', null) s1, 0.5 val, timestamp_sequence(0, " + step + ") t from long_sequence(" + count + ")) timestamp(t) partition by DAY";
compiler.compile(createSql, sqlExecutionContext);
compiler.compile("alter table tab add val2 DOUBLE", sqlExecutionContext);
String insetSql = "insert into tab select rnd_symbol('s1','s2','s3', null) s1, 0.5 val, timestamp_sequence(" + count * step + ", " + step + ") t, 1 val2 from long_sequence(" + count + ")";
compiler.compile(insetSql, sqlExecutionContext);
// Move upper timestamp boundary
// [step, ts * step)
for (long ts = increment; ts < 2 * count; ts += increment) {
try (
RecordCursorFactory factory = compiler.compile("select sum(val) s1, sum(val2) s2 from tab where t >= CAST(" + step + " AS TIMESTAMP) AND t < CAST(" + (ts * step) + " AS TIMESTAMP)", sqlExecutionContext).getRecordCursorFactory();
RecordCursor cursor = factory.getCursor(sqlExecutionContext)
) {
String expected = "s1\ts2\n" +
((ts - 1) * 0.5) + "\t" + (ts <= count ? "NaN" : (ts - count) * 1.0) + "\n";
sink.clear();
printer.print(cursor, factory.getMetadata(), true);
TestUtils.assertEquals("iteration " + ts, expected, sink);
}
}
// Move lower timestamp boundary
// [ts * count, 2 * step * count - 1) time range
for (long ts = 0; ts < 2 * count; ts += increment) {
try (
RecordCursorFactory factory = compiler.compile("select sum(val) s1, sum(val2) s2 from tab where t >= CAST(" + (ts * step) + " AS TIMESTAMP) AND t < CAST(" + ((2 * count - 1) * step) + " AS TIMESTAMP)", sqlExecutionContext).getRecordCursorFactory();
RecordCursor cursor = factory.getCursor(sqlExecutionContext)
) {
String expected = "s1\ts2\n" +
((2 * count - ts - 1) * 0.5) + "\t" + (ts < count ? (count - 1) * 1.0 : (2 * count - ts - 1) * 1.0) + "\n";
sink.clear();
printer.print(cursor, factory.getMetadata(), true);
TestUtils.assertEquals("iteration " + ts, expected, sink);
}
}
});
}
}
......@@ -151,7 +151,7 @@ public final class TestUtils {
}
if (expected.length() != actual.length()) {
Assert.assertEquals(expected, actual);
Assert.assertEquals(message, expected, actual);
}
Assert.assertEquals(expected.length(), actual.length());
for (int i = 0; i < expected.length(); i++) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册