diff --git a/core/src/main/c/share/vect.h b/core/src/main/c/share/vect.h index 900e4f196742cb63a833b344a66c31cecf2c25a2..e6104f5027ffe59067fbe2979f3197f553ba4fbe 100644 --- a/core/src/main/c/share/vect.h +++ b/core/src/main/c/share/vect.h @@ -49,6 +49,9 @@ extern "C" { \ JNIEXPORT jdouble JNICALL Java_io_questdb_std_Vect_ ## func(JNIEnv *env, jclass cl, jlong pDouble, jlong size) { \ return func((double *) pDouble, size); \ }\ +JNIEXPORT jdouble JNICALL JavaCritical_io_questdb_std_Vect_ ## func(jlong pDouble, jlong size) { \ + return func((double *) pDouble, size); \ +}\ \ } diff --git a/core/src/main/c/share/vectkeysum.cpp b/core/src/main/c/share/vectkeysum.cpp index feeaa7753f007f341acca5b9e13ee4b08307c126..054d9e829414ecfe21066e57eaa6aee33d8dcc1f 100644 --- a/core/src/main/c/share/vectkeysum.cpp +++ b/core/src/main/c/share/vectkeysum.cpp @@ -194,6 +194,56 @@ Java_io_questdb_std_Rosti_keyedIntDistinct(JNIEnv *env, jclass cl, jlong pRosti, } } +JNIEXPORT void JNICALL +Java_io_questdb_std_Rosti_keyedIntCount(JNIEnv *env, jclass cl, jlong pRosti, jlong pKeys, jlong count, + jint valueOffset) { + + auto map = reinterpret_cast(pRosti); + const auto *pi = reinterpret_cast(pKeys); + const auto value_offset = map->value_offsets_[valueOffset]; + for (int i = 0; i < count; i++) { + _mm_prefetch(pi + 16, _MM_HINT_T0); + const int32_t v = pi[i]; + auto res = find(map, v); + auto dest = map->slots_ + res.first; + if (PREDICT_FALSE(res.second)) { + *reinterpret_cast(dest) = v; + *reinterpret_cast(dest + value_offset) = 1; + } else { + (*reinterpret_cast(dest + value_offset))++; + } + } +} + +JNIEXPORT void JNICALL +Java_io_questdb_std_Rosti_keyedIntCountMerge(JNIEnv *env, jclass cl, jlong pRostiA, jlong pRostiB, + jint valueOffset) { + auto map_a = reinterpret_cast(pRostiA); + auto map_b = reinterpret_cast(pRostiB); + const auto value_offset = map_b->value_offsets_[valueOffset]; + const auto capacity = map_b->capacity_; + const auto ctrl = map_b->ctrl_; + const auto shift = map_b->slot_size_shift_; + const auto slots = map_b->slots_; + + for (size_t i = 0; i < capacity; i++) { + if (ctrl[i] > -1) { + auto src = slots + (i << shift); + auto key = *reinterpret_cast(src); + auto count = *reinterpret_cast(src + value_offset); + auto res = find(map_a, key); + // maps must have identical structure to use "shift" from map B on map A + auto dest = map_a->slots_ + res.first; + if (PREDICT_FALSE(res.second)) { + *reinterpret_cast(dest) = key; + *reinterpret_cast(dest + value_offset) = count; + } else { + (*reinterpret_cast(dest + value_offset)) += count; + } + } + } +} + JNIEXPORT void JNICALL Java_io_questdb_std_Rosti_keyedIntKSumDoubleMerge(JNIEnv *env, jclass cl, jlong pRostiA, jlong pRostiB, jint valueOffset) { diff --git a/core/src/main/java/io/questdb/griffin/SqlCodeGenerator.java b/core/src/main/java/io/questdb/griffin/SqlCodeGenerator.java index ebc4c9e020ff04de22f33721060a62e474465c76..93b770505bb92e654c6178039a1f0a448177a3d8 100644 --- a/core/src/main/java/io/questdb/griffin/SqlCodeGenerator.java +++ b/core/src/main/java/io/questdb/griffin/SqlCodeGenerator.java @@ -541,6 +541,9 @@ public class SqlCodeGenerator implements Mutable { addVaf(new SumTimestampVectorAggregateFunction(ast.rhs.position, columnIndex), qc.getName()); continue; } + } else if (ast.type == FUNCTION && ast.paramCount == 0 && Chars.equals(ast.token, "count")) { + addVaf(new CountVectorAggregateFunction(ast.position), qc.getName()); + continue; } else if (isSingleColumnFunction(ast, "ksum")) { final int columnIndex = metadata.getColumnIndex(ast.rhs.token); final int type = metadata.getColumnType(columnIndex); @@ -1546,7 +1549,6 @@ public class SqlCodeGenerator implements Mutable { final RecordCursorFactory factory = generateSubQuery(model, executionContext); try { - // generate special case plan for "select count() from somewhere" ObjList columns = model.getBottomUpColumns(); if (columns.size() == 1) { diff --git a/core/src/main/java/io/questdb/griffin/engine/groupby/vect/CountVectorAggregateFunction.java b/core/src/main/java/io/questdb/griffin/engine/groupby/vect/CountVectorAggregateFunction.java new file mode 100644 index 0000000000000000000000000000000000000000..a607f23ee4bfa06dfe2065c8ea30a4ac9ea2f906 --- /dev/null +++ b/core/src/main/java/io/questdb/griffin/engine/groupby/vect/CountVectorAggregateFunction.java @@ -0,0 +1,93 @@ +/******************************************************************************* + * ___ _ ____ ____ + * / _ \ _ _ ___ ___| |_| _ \| __ ) + * | | | | | | |/ _ \/ __| __| | | | _ \ + * | |_| | |_| | __/\__ \ |_| |_| | |_) | + * \__\_\\__,_|\___||___/\__|____/|____/ + * + * Copyright (c) 2014-2019 Appsicle + * Copyright (c) 2019-2020 QuestDB + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + ******************************************************************************/ + +package io.questdb.griffin.engine.groupby.vect; + +import io.questdb.cairo.ArrayColumnTypes; +import io.questdb.cairo.ColumnType; +import io.questdb.cairo.sql.Record; +import io.questdb.griffin.engine.functions.LongFunction; +import io.questdb.std.Rosti; +import io.questdb.std.Unsafe; + +import java.util.concurrent.atomic.LongAdder; + +public class CountVectorAggregateFunction extends LongFunction implements VectorAggregateFunction { + private final LongAdder count = new LongAdder(); + private int valueOffset; + + public CountVectorAggregateFunction(int position) { + super(position); + } + + @Override + public void pushValueTypes(ArrayColumnTypes types) { + this.valueOffset = types.getColumnCount(); + types.add(ColumnType.LONG); + } + + @Override + public int getValueOffset() { + return valueOffset; + } + + @Override + public void initRosti(long pRosti) { + Unsafe.getUnsafe().putLong(Rosti.getInitialValueSlot(pRosti, valueOffset), 0); + } + + @Override + public void aggregate(long pRosti, long keyAddress, long valueAddress, long count, int workerId) { + Rosti.keyedIntCount(pRosti, keyAddress, count, valueOffset); + } + + @Override + public void merge(long pRostiA, long pRostiB) { + Rosti.keyedIntCountMerge(pRostiA, pRostiB, valueOffset); + } + + @Override + public void wrapUp(long pRosti) { + } + + @Override + public void aggregate(long address, long count, int workerId) { + this.count.add(count); + } + + @Override + public int getColumnIndex() { + return 0; + } + + @Override + public void clear() { + count.reset(); + } + + @Override + public long getLong(Record rec) { + return count.sum(); + } +} diff --git a/core/src/main/java/io/questdb/std/Rosti.java b/core/src/main/java/io/questdb/std/Rosti.java index 7cefa20c7942806d9d951df558fa324ef1295880..1cc01f766a536496c99c1045cd0dc740f22a2445 100644 --- a/core/src/main/java/io/questdb/std/Rosti.java +++ b/core/src/main/java/io/questdb/std/Rosti.java @@ -61,6 +61,10 @@ public final class Rosti { public static native void keyedIntDistinct(long pRosti, long pKeys, long count); + public static native void keyedIntCount(long pRosti, long pKeys, long count, int valueOffset); + + public static native void keyedIntCountMerge(long pRostiA, long pRostiB, int valueOffset); + // sum double public static native void keyedIntSumDouble(long pRosti, long pKeys, long pDouble, long count, int valueOffset); diff --git a/core/src/main/resources/io/questdb/bin/freebsd/libquestdb.so b/core/src/main/resources/io/questdb/bin/freebsd/libquestdb.so index 5ebd27e7ba4fe75ed26f7e6917af17005cc4a636..ab326f5b630f24eea4a58c63bdbf6ff9c5831e68 100755 Binary files a/core/src/main/resources/io/questdb/bin/freebsd/libquestdb.so and b/core/src/main/resources/io/questdb/bin/freebsd/libquestdb.so differ diff --git a/core/src/main/resources/io/questdb/bin/linux/libquestdb.so b/core/src/main/resources/io/questdb/bin/linux/libquestdb.so index 87d6c5aa6eb95e5236614635ad5b90c681795ab9..1dd6914cb5c43cc96fab2af8c611ac5865aeb36d 100755 Binary files a/core/src/main/resources/io/questdb/bin/linux/libquestdb.so and b/core/src/main/resources/io/questdb/bin/linux/libquestdb.so differ diff --git a/core/src/main/resources/io/questdb/bin/osx/libquestdb.dylib b/core/src/main/resources/io/questdb/bin/osx/libquestdb.dylib index 5f2c3d90db75d16e68c4766a13ed8ca68cc97c78..d4c2a968ff6f2d4cab7bd206a512ad883c29e1d8 100755 Binary files a/core/src/main/resources/io/questdb/bin/osx/libquestdb.dylib and b/core/src/main/resources/io/questdb/bin/osx/libquestdb.dylib differ diff --git a/core/src/main/resources/io/questdb/bin/windows/libquestdb.dll b/core/src/main/resources/io/questdb/bin/windows/libquestdb.dll index da9d12e9b397421529a2f84eb06890df1e900b05..af53b4162a896d3e61e24623b099b7bdda617968 100644 Binary files a/core/src/main/resources/io/questdb/bin/windows/libquestdb.dll and b/core/src/main/resources/io/questdb/bin/windows/libquestdb.dll differ diff --git a/core/src/test/java/io/questdb/griffin/KeyedAggregationTest.java b/core/src/test/java/io/questdb/griffin/KeyedAggregationTest.java index 5be09f2762cb1ff4b8bf035e483359a1899747be..5b1afb422152d79c2c003e4344b71ca79463f48c 100644 --- a/core/src/test/java/io/questdb/griffin/KeyedAggregationTest.java +++ b/core/src/test/java/io/questdb/griffin/KeyedAggregationTest.java @@ -357,6 +357,34 @@ public class KeyedAggregationTest extends AbstractGriffinTest { }); } + @Test + public void testIntSymbolAddValueMidTableCount() throws Exception { + assertMemoryLeak(() -> { + compiler.compile("create table tab as (select rnd_symbol('s1','s2','s3', null) s1 from long_sequence(1000000))", sqlExecutionContext); + compiler.compile("alter table tab add column val long", sqlExecutionContext); + compiler.compile("insert into tab select rnd_symbol('a1','a2','a3', null), rnd_long(33, 889992, 2) from long_sequence(1000000)", sqlExecutionContext); + + try ( + RecordCursorFactory factory = compiler.compile("select s1, count() from tab order by s1", sqlExecutionContext).getRecordCursorFactory(); + RecordCursor cursor = factory.getCursor(sqlExecutionContext) + ) { + + String expected = "s1\tcount\n" + + "\t500194\n" + + "a1\t248976\n" + + "a2\t250638\n" + + "a3\t250099\n" + + "s1\t249898\n" + + "s2\t250010\n" + + "s3\t250185\n"; + + sink.clear(); + printer.print(cursor, factory.getMetadata(), true); + TestUtils.assertEquals(expected, sink); + } + }); + } + @Test public void testIntSymbolAddValueMidTableMaxDate() throws Exception { assertMemoryLeak(() -> { diff --git a/core/src/test/java/io/questdb/griffin/SecurityTest.java b/core/src/test/java/io/questdb/griffin/SecurityTest.java index bbc898e5f1baf32ff4ec993dff6b2a0e570dc138..431358c27b58a556fd1ae6f9613cea682823e298 100644 --- a/core/src/test/java/io/questdb/griffin/SecurityTest.java +++ b/core/src/test/java/io/questdb/griffin/SecurityTest.java @@ -1,12 +1,35 @@ -package io.questdb.griffin; +/******************************************************************************* + * ___ _ ____ ____ + * / _ \ _ _ ___ ___| |_| _ \| __ ) + * | | | | | | |/ _ \/ __| __| | | | _ \ + * | |_| | |_| | __/\__ \ |_| |_| | |_) | + * \__\_\\__,_|\___||___/\__|____/|____/ + * + * Copyright (c) 2014-2019 Appsicle + * Copyright (c) 2019-2020 QuestDB + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + ******************************************************************************/ -import org.junit.Assert; -import org.junit.BeforeClass; -import org.junit.Test; +package io.questdb.griffin; import io.questdb.cairo.security.CairoSecurityContextImpl; import io.questdb.cairo.sql.InsertMethod; import io.questdb.cairo.sql.InsertStatement; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; public class SecurityTest extends AbstractGriffinTest { protected static SqlExecutionContext readOnlyExecutionContext; @@ -363,6 +386,15 @@ public class SecurityTest extends AbstractGriffinTest { @Test public void testMaxInMemoryRowsWithImplicitGroupBy() throws Exception { + SqlExecutionContext readOnlyExecutionContext = new SqlExecutionContextImpl( + messageBus, + 1, + engine) + .with( + new CairoSecurityContextImpl(false, + 3), + bindVariableService, + null); assertMemoryLeak(() -> { sqlExecutionContext.getRandom().reset(); compiler.compile("create table tb1 as (select" + @@ -373,18 +405,18 @@ public class SecurityTest extends AbstractGriffinTest { " from long_sequence(1000)) timestamp(ts)", sqlExecutionContext); assertQuery( "sym2\tcount\nGZ\t509\nRX\t491\n", - "select sym2, count() from tb1", + "select sym2, count() from tb1 order by sym2", null, true, readOnlyExecutionContext); try { assertQuery( "sym1\tcount\nPEHN\t265\nCPSW\t231\nHYRX\t262\nVTJW\t242\n", - "select sym1, count() from tb1", + "select sym1, count() from tb1 order by sym1", null, true, readOnlyExecutionContext); Assert.fail(); } catch (Exception ex) { - Assert.assertTrue(ex.toString().contains("limit of 2 exceeded")); + Assert.assertTrue(ex.toString().contains("limit of 3 exceeded")); } }); } diff --git a/core/src/test/java/io/questdb/griffin/engine/groupby/SampleByTest.java b/core/src/test/java/io/questdb/griffin/engine/groupby/SampleByTest.java index 66b589f026943120b5a39d5803fa803a47607dea..df6f1ab288d89387b4a48b7eb9a1f50edd62be0e 100644 --- a/core/src/test/java/io/questdb/griffin/engine/groupby/SampleByTest.java +++ b/core/src/test/java/io/questdb/griffin/engine/groupby/SampleByTest.java @@ -289,11 +289,11 @@ public class SampleByTest extends AbstractGriffinTest { @Test public void testGroupByCount() throws Exception { assertQuery("c\tcount\n" + - "XY\t6\n" + "\t5\n" + - "ZP\t5\n" + - "UU\t4\n", - "select c, count() from x", + "UU\t4\n" + + "XY\t6\n" + + "ZP\t5\n", + "select c, count() from x order by c", "create table x as " + "(" + "select" + @@ -311,12 +311,12 @@ public class SampleByTest extends AbstractGriffinTest { " long_sequence(5)" + ")", "c\tcount\n" + - "XY\t6\n" + "\t5\n" + - "ZP\t5\n" + - "UU\t4\n" + + "KK\t1\n" + "PL\t4\n" + - "KK\t1\n", + "UU\t4\n" + + "XY\t6\n" + + "ZP\t5\n", true); }