From a2b3276ff7ff67d5af2447937c0587aaa9d2f15c Mon Sep 17 00:00:00 2001 From: Alex Pelagenko Date: Fri, 12 Feb 2021 11:38:36 +0000 Subject: [PATCH] fix(griffin): correctly calculate vector frames when partition interval is used (#804) --- .../table/DataFrameRecordCursorFactory.java | 33 ++++----- .../questdb/griffin/KeyedAggregationTest.java | 72 +++++++++++++++++++ .../java/io/questdb/test/tools/TestUtils.java | 2 +- 3 files changed, 88 insertions(+), 19 deletions(-) diff --git a/core/src/main/java/io/questdb/griffin/engine/table/DataFrameRecordCursorFactory.java b/core/src/main/java/io/questdb/griffin/engine/table/DataFrameRecordCursorFactory.java index 819bba37e..0b6b77c8c 100644 --- a/core/src/main/java/io/questdb/griffin/engine/table/DataFrameRecordCursorFactory.java +++ b/core/src/main/java/io/questdb/griffin/engine/table/DataFrameRecordCursorFactory.java @@ -24,10 +24,7 @@ package io.questdb.griffin.engine.table; -import io.questdb.cairo.NullColumn; -import io.questdb.cairo.ReadOnlyColumn; -import io.questdb.cairo.SymbolMapReader; -import io.questdb.cairo.TableReader; +import io.questdb.cairo.*; import io.questdb.cairo.sql.*; import io.questdb.griffin.SqlExecutionContext; import io.questdb.std.IntList; @@ -182,7 +179,7 @@ public class DataFrameRecordCursorFactory extends AbstractDataFrameRecordCursorF if (top >= partitionLo) { loRemaining = 0; top -= partitionLo; - topsRemaining.setQuick(i, top); + topsRemaining.setQuick(i, Math.min(top, partitionHi - partitionLo)); } else { topsRemaining.setQuick(i, 0); loRemaining -= top; @@ -192,22 +189,19 @@ public class DataFrameRecordCursorFactory extends AbstractDataFrameRecordCursorF final ReadOnlyColumn col = reader.getColumn(TableReader.getPrimaryColumnIndex(base, columnIndexes.getQuick(i))); if (col instanceof NullColumn) { columnPageNextAddress.setQuick(i, 0); - pageNRowsRemaining.setQuick(i, partitionSize - partitionLo); + pageNRowsRemaining.setQuick(i, 0); } else { int page = pages.getQuick(i); - while (true) { - long pageSize = col.getPageSize(page) >> columnSizes.getQuick(i); - if (pageSize > loRemaining) { - long addr = col.getPageAddress(page); - addr += partitionLo << columnSizes.getQuick(i); - columnPageNextAddress.setQuick(i, addr); - pageNRowsRemaining.setQuick(i, pageSize - partitionLo); - pages.setQuick(i, page); - break; - } - loRemaining -= pageSize; - page++; + long pageSize = col.getPageSize(page) >> columnSizes.getQuick(i); + if (pageSize < loRemaining) { + throw CairoException.instance(0).put("partition is not mapped as single page, cannot perform vector calculation"); } + long addr = col.getPageAddress(page); + addr += loRemaining << columnSizes.getQuick(i); + columnPageNextAddress.setQuick(i, addr); + long pageHi = Math.min(partitionHi, pageSize); + pageNRowsRemaining.setQuick(i, pageHi - partitionLo); + pages.setQuick(i, page); } } } @@ -270,6 +264,9 @@ public class DataFrameRecordCursorFactory extends AbstractDataFrameRecordCursorF } } partitionRemaining -= min; + if (partitionRemaining < 0) { + throw CairoException.instance(0).put("incorrect frame built for vector calculation"); + } return frame; } diff --git a/core/src/test/java/io/questdb/griffin/KeyedAggregationTest.java b/core/src/test/java/io/questdb/griffin/KeyedAggregationTest.java index 1ffc4919a..53f8b9c52 100644 --- a/core/src/test/java/io/questdb/griffin/KeyedAggregationTest.java +++ b/core/src/test/java/io/questdb/griffin/KeyedAggregationTest.java @@ -865,4 +865,76 @@ public class KeyedAggregationTest extends AbstractGriffinTest { } }); } + + + @Test + public void testSumInTimestampRange() throws Exception { + long step = 1000000L; + long count = 1000000L; + long increment = count / 10; + assertMemoryLeak(() -> { + compiler.compile("create table tab as (select rnd_symbol('s1','s2','s3', null) s1, 0.5 val, timestamp_sequence(0, " + step + ") t from long_sequence(" + count + ")) timestamp(t) partition by DAY", sqlExecutionContext); + + for (long ts = 0; ts < count; ts += increment) { + try ( + RecordCursorFactory factory = compiler.compile("select sum(val) s from tab where t >= CAST(" + step + " AS TIMESTAMP) AND t < CAST(" + (ts * step) + " AS TIMESTAMP)", sqlExecutionContext).getRecordCursorFactory(); + RecordCursor cursor = factory.getCursor(sqlExecutionContext) + ) { + String value = String.valueOf((ts - 1) * 0.5); + String expected = "s\n" + + (ts > 0 ? value : "NaN") + "\n"; + + sink.clear(); + printer.print(cursor, factory.getMetadata(), true); + TestUtils.assertEquals(expected, sink); + } + } + }); + } + + @Test + public void testSumInTimestampRangeWithColTop() throws Exception { + final long step = 100000L; + final long count = 1000000L; + final long increment = count / 10; + assertMemoryLeak(() -> { + String createSql = "create table tab as (select rnd_symbol('s1','s2','s3', null) s1, 0.5 val, timestamp_sequence(0, " + step + ") t from long_sequence(" + count + ")) timestamp(t) partition by DAY"; + compiler.compile(createSql, sqlExecutionContext); + compiler.compile("alter table tab add val2 DOUBLE", sqlExecutionContext); + String insetSql = "insert into tab select rnd_symbol('s1','s2','s3', null) s1, 0.5 val, timestamp_sequence(" + count * step + ", " + step + ") t, 1 val2 from long_sequence(" + count + ")"; + compiler.compile(insetSql, sqlExecutionContext); + + // Move upper timestamp boundary + // [step, ts * step) + for (long ts = increment; ts < 2 * count; ts += increment) { + try ( + RecordCursorFactory factory = compiler.compile("select sum(val) s1, sum(val2) s2 from tab where t >= CAST(" + step + " AS TIMESTAMP) AND t < CAST(" + (ts * step) + " AS TIMESTAMP)", sqlExecutionContext).getRecordCursorFactory(); + RecordCursor cursor = factory.getCursor(sqlExecutionContext) + ) { + String expected = "s1\ts2\n" + + ((ts - 1) * 0.5) + "\t" + (ts <= count ? "NaN" : (ts - count) * 1.0) + "\n"; + + sink.clear(); + printer.print(cursor, factory.getMetadata(), true); + TestUtils.assertEquals("iteration " + ts, expected, sink); + } + } + + // Move lower timestamp boundary + // [ts * count, 2 * step * count - 1) time range + for (long ts = 0; ts < 2 * count; ts += increment) { + try ( + RecordCursorFactory factory = compiler.compile("select sum(val) s1, sum(val2) s2 from tab where t >= CAST(" + (ts * step) + " AS TIMESTAMP) AND t < CAST(" + ((2 * count - 1) * step) + " AS TIMESTAMP)", sqlExecutionContext).getRecordCursorFactory(); + RecordCursor cursor = factory.getCursor(sqlExecutionContext) + ) { + String expected = "s1\ts2\n" + + ((2 * count - ts - 1) * 0.5) + "\t" + (ts < count ? (count - 1) * 1.0 : (2 * count - ts - 1) * 1.0) + "\n"; + + sink.clear(); + printer.print(cursor, factory.getMetadata(), true); + TestUtils.assertEquals("iteration " + ts, expected, sink); + } + } + }); + } } diff --git a/core/src/test/java/io/questdb/test/tools/TestUtils.java b/core/src/test/java/io/questdb/test/tools/TestUtils.java index dc6e61724..39789e3ff 100644 --- a/core/src/test/java/io/questdb/test/tools/TestUtils.java +++ b/core/src/test/java/io/questdb/test/tools/TestUtils.java @@ -151,7 +151,7 @@ public final class TestUtils { } if (expected.length() != actual.length()) { - Assert.assertEquals(expected, actual); + Assert.assertEquals(message, expected, actual); } Assert.assertEquals(expected.length(), actual.length()); for (int i = 0; i < expected.length(); i++) { -- GitLab