diff --git a/core/src/main/c/share/rosti.cpp b/core/src/main/c/share/rosti.cpp index 8c6724970709fd48a5c2323c377ace2e70620f7a..7983d3af05f315d249cd5822548df1ba8e390e44 100644 --- a/core/src/main/c/share/rosti.cpp +++ b/core/src/main/c/share/rosti.cpp @@ -31,30 +31,29 @@ rosti_t *alloc_rosti(const int32_t *column_types, const int32_t column_count, co value_offsets[0] = 0; for (int32_t i = 0; i < column_count; i++) { switch (column_types[i]) { - case 0: // BOOL - case 1: // BYTE + case 1: // BOOL + case 2: // BYTE slot_key_size += 1; break; - case 2: // SHORT - case 3: // CHAR + case 3: // SHORT + case 4: // CHAR slot_key_size += 2; break; - case 4: // INT - case 8: // FLOAT - case 11: // SYMBOL - store as INT + case 5: // INT + case 9: // FLOAT + case 12: // SYMBOL - store as INT slot_key_size += 4; break; - case 5: // LONG (64 bit) - case 6: // DATE - case 7: // TIMESTAMP - case 9: // DOUBLE - case 10: // STRING - store reference only + case 6: // LONG (64 bit) + case 7: // DATE + case 8: // TIMESTAMP + case 10: // DOUBLE + case 11: // STRING - store reference only slot_key_size += 8; break; - case 12: // LONG256 + case 13: // LONG256 slot_key_size += 64; break; - } value_offsets[i + 1] = slot_key_size; } diff --git a/core/src/main/c/share/vec_int_key_agg.cpp b/core/src/main/c/share/vec_int_key_agg.cpp index 76621bfd756500c0fa4423d6663eac663c30620f..b128396861d9dfea337d4750ecab06cd9ac41979 100644 --- a/core/src/main/c/share/vec_int_key_agg.cpp +++ b/core/src/main/c/share/vec_int_key_agg.cpp @@ -1,4 +1,5 @@ /******************************************************************************* + * ___ _ ____ ____ * / _ \ _ _ ___ ___| |_| _ \| __ ) * | | | | | | |/ _ \/ __| __| | | | _ \ @@ -200,7 +201,15 @@ static void kIntSumLong(to_int_fn to_int, jlong pRosti, jlong pKeys, jlong pLong if (PREDICT_FALSE(res.second)) { *reinterpret_cast(dest) = key; if (PREDICT_FALSE(val == L_MIN)) { - *reinterpret_cast(dest + value_offset) = 0; + // here is a very dirty workaround for segfault + // clang generates optimized code and aligned (movdqa) instruction for __int128 v = 0 + // but the rosti storage is dense and the value offset may be unaligned properly + if (std::is_same_v) { + *reinterpret_cast(dest + value_offset) = 0; + *reinterpret_cast(dest + value_offset + sizeof(int64_t)) = 0; + } else { + *reinterpret_cast(dest + value_offset) = 0; + } *reinterpret_cast(dest + count_offset) = 0; } else { *reinterpret_cast(dest + value_offset) = val; @@ -215,6 +224,163 @@ static void kIntSumLong(to_int_fn to_int, jlong pRosti, jlong pKeys, jlong pLong } } +struct long256_t { + uint64_t l0; + uint64_t l1; + uint64_t l2; + uint64_t l3; + + long256_t(uint64_t v0, uint64_t v1, uint64_t v2, uint64_t v3) + : l0(v0), l1(v1), l2(v2), l3(v3) + { + } + + bool is_null() const { + return l0 == L_MIN && l1 == L_MIN && l2 == L_MIN && l3 == L_MIN; + } + + void operator+=(const long256_t& rhs) { + if (rhs.is_null()) { + this->l0 = L_MIN; + this->l1 = L_MIN; + this->l2 = L_MIN; + this->l3 = L_MIN; + } else { + // The sum will overflow if both top bits are set (x & y) or if one of them + // is (x | y), and a carry from the lower place happened. If such a carry + // happens, the top bit will be 1 + 0 + 1 = 0 (& ~sum). + uint64_t carry = 0; + uint64_t l0_ = this->l0 + rhs.l0 + carry; + carry = ((this->l0 & rhs.l0) | ((this->l0 | rhs.l0) & ~l0_)) >> 63; + + uint64_t l1_ = this->l1 + rhs.l1 + carry; + carry = ((this->l1 & rhs.l1) | ((this->l1 | rhs.l1) & ~l1_)) >> 63; + + uint64_t l2_ = this->l2 + rhs.l2 + carry; + carry = ((this->l2 & rhs.l2) | ((this->l2 | rhs.l2) & ~l2_)) >> 63; + + uint64_t l3_ = this->l3 + rhs.l3 + carry; + //carry = ((this->l3 & rhs.l3) | ((this->l3 | rhs.l3) & ~l3_)) >> 63; + + this->l0 = l0_; + this->l1 = l1_; + this->l2 = l2_; + this->l3 = l3_; + } + } +}; + +static void kIntSumLong256(to_int_fn to_int, jlong pRosti, jlong pKeys, jlong pLong, jlong count, jint valueOffset) { + auto map = reinterpret_cast(pRosti); + + const auto *pl = reinterpret_cast(pLong); + const auto value_offset = map->value_offsets_[valueOffset]; + const auto count_offset = map->value_offsets_[valueOffset + 1]; + for (int i = 0; i < count; i++) { + MM_PREFETCH_T0(pl + i + 8); + const int32_t key = to_int(pKeys, i); + const long256_t& val = pl[i]; + auto res = find(map, key); + auto dest = map->slots_ + res.first; + long256_t& dst = *reinterpret_cast(dest + value_offset); + if (PREDICT_FALSE(res.second)) { + *reinterpret_cast(dest) = key; + if (PREDICT_FALSE(val.is_null())) { + *reinterpret_cast(dest + count_offset) = 0; + dst = long256_t(0,0,0,0); + } else { + dst = val; + *reinterpret_cast(dest + count_offset) = 1; + } + } else { + if (PREDICT_TRUE(!val.is_null())) { + dst += val; + *reinterpret_cast(dest + count_offset) += 1; + } + } + } +} + +static void kIntSumLong256Merge(jlong pRostiA, jlong pRostiB, jint valueOffset) { + auto map_a = reinterpret_cast(pRostiA); + auto map_b = reinterpret_cast(pRostiB); + const auto value_offset = map_b->value_offsets_[valueOffset]; + const auto count_offset = map_b->value_offsets_[valueOffset + 1]; + const auto capacity = map_b->capacity_; + const auto ctrl = map_b->ctrl_; + const auto shift = map_b->slot_size_shift_; + const auto slots = map_b->slots_; + + for (size_t i = 0; i < capacity; i++) { + ctrl_t c = ctrl[i]; + if (c > -1) { + auto src = slots + (i << shift); + auto key = *reinterpret_cast(src); + auto val = *reinterpret_cast(src + value_offset); + auto count = *reinterpret_cast(src + count_offset); + + auto res = find(map_a, key); + auto dest = map_a->slots_ + res.first; + + if (PREDICT_FALSE(res.second)) { + *reinterpret_cast(dest) = key; + } + + // when maps have non-null values, their count is >0 and val is not MIN + // on other hand + long256_t& dst = *reinterpret_cast(dest + value_offset); + const jlong old_count = *reinterpret_cast(dest + count_offset); + if (old_count > 0 && count > 0) { + dst += val; + *reinterpret_cast(dest + count_offset) += count; + } else { + *reinterpret_cast(dest + value_offset) = val; + *reinterpret_cast(dest + count_offset) = count; + } + } + } +} + +static void kIntSumLong256WrapUp(jlong pRosti, jint valueOffset, jlong n0, jlong n1, jlong n2, jlong n3, jlong valueAtNullCount) { + auto map = reinterpret_cast(pRosti); + const auto value_offset = map->value_offsets_[valueOffset]; + const auto count_offset = map->value_offsets_[valueOffset + 1]; + const auto capacity = map->capacity_; + const auto ctrl = map->ctrl_; + const auto shift = map->slot_size_shift_; + const auto slots = map->slots_; + + for (size_t i = 0; i < capacity; i++) { + ctrl_t c = ctrl[i]; + if (c > -1) { + const auto src = slots + (i << shift); + auto count = *reinterpret_cast(src + count_offset); + long256_t& srcv = *reinterpret_cast(src + value_offset); + if (PREDICT_FALSE(count == 0)) { + srcv = long256_t(L_MIN, L_MIN, L_MIN, L_MIN); + } + } + } + + // populate null value + if (valueAtNullCount > 0) { + auto nullKey = reinterpret_cast(map->slot_initial_values_)[0]; + auto res = find(map, nullKey); + // maps must have identical structure to use "shift" from map B on map A + auto dest = map->slots_ + res.first; + long256_t& dst = *reinterpret_cast(dest + value_offset); + if (PREDICT_FALSE(res.second)) { + *reinterpret_cast(dest) = nullKey; + dst = long256_t(n0, n1, n2, n3); + *reinterpret_cast(dest + count_offset) = valueAtNullCount; + } else { + long256_t valueAtNull(n0, n1, n2, n3); + dst += valueAtNull; + *reinterpret_cast(dest + count_offset) += valueAtNullCount; + } + } +} + static void kIntNSumDouble(to_int_fn to_int, jlong pRosti, jlong pKeys, jlong pDouble, jlong count, jint valueOffset) { auto map = reinterpret_cast(pRosti); const auto *pd = reinterpret_cast(pDouble); @@ -1285,6 +1451,30 @@ Java_io_questdb_std_Rosti_keyedIntSumLongLongWrapUp(JNIEnv *env, jclass cl, jlon jlong valueAtNull, jlong valueAtNullCount) { kIntSumLongWrapUp(pRosti, valueOffset, valueAtNull, valueAtNullCount); } +// sum long256 +JNIEXPORT void JNICALL +Java_io_questdb_std_Rosti_keyedHourSumLong256(JNIEnv *env, jclass cl, jlong pRosti, jlong pKeys, jlong pLong, + jlong count, jint valueOffset) { + kIntSumLong256(int64_to_hour, pRosti, pKeys, pLong, count, valueOffset); +} + +JNIEXPORT void JNICALL +Java_io_questdb_std_Rosti_keyedIntSumLong256(JNIEnv *env, jclass cl, jlong pRosti, jlong pKeys, jlong pLong, + jlong count, jint valueOffset) { + kIntSumLong256(to_int, pRosti, pKeys, pLong, count, valueOffset); +} + +JNIEXPORT void JNICALL +Java_io_questdb_std_Rosti_keyedIntSumLong256Merge(JNIEnv *env, jclass cl, jlong pRostiA, jlong pRostiB, + jint valueOffset) { + kIntSumLong256Merge(pRostiA, pRostiB, valueOffset); +} + +JNIEXPORT void JNICALL +Java_io_questdb_std_Rosti_keyedIntSumLong256WrapUp(JNIEnv *env, jclass cl, jlong pRosti, jint valueOffset, + jlong v0, jlong v1, jlong v2, jlong v3, jlong valueAtNullCount) { + kIntSumLong256WrapUp(pRosti, valueOffset, v0, v1, v2, v3, valueAtNullCount); +} // MIN long diff --git a/core/src/main/java/io/questdb/cairo/map/CompactMapValue.java b/core/src/main/java/io/questdb/cairo/map/CompactMapValue.java index a996eb52df9189456e06782ce3bc4ae368e8c4ce..fd73146af71a6599e0dbcf7d4209dab14702a3d8 100644 --- a/core/src/main/java/io/questdb/cairo/map/CompactMapValue.java +++ b/core/src/main/java/io/questdb/cairo/map/CompactMapValue.java @@ -25,6 +25,7 @@ package io.questdb.cairo.map; import io.questdb.cairo.vm.api.MemoryARW; +import io.questdb.std.Long256; public class CompactMapValue implements MapValue { @@ -210,6 +211,16 @@ public class CompactMapValue implements MapValue { record.of(currentValueOffset); } + @Override + public void addLong256(int index, Long256 value) { + + } + + @Override + public void putLong256(int index, Long256 value) { + + } + private long getValueColumnOffset(int columnIndex) { assert currentValueOffset != -1; return currentValueOffset + columnOffsets[columnIndex]; diff --git a/core/src/main/java/io/questdb/cairo/map/FastMapValue.java b/core/src/main/java/io/questdb/cairo/map/FastMapValue.java index 132b2b02bbb8fdeb64bad764d8403657164a21b3..923baa348132685b29c12500337991f83fec7e06 100644 --- a/core/src/main/java/io/questdb/cairo/map/FastMapValue.java +++ b/core/src/main/java/io/questdb/cairo/map/FastMapValue.java @@ -24,6 +24,7 @@ package io.questdb.cairo.map; +import io.questdb.std.Long256; import io.questdb.std.Unsafe; final class FastMapValue implements MapValue { @@ -207,6 +208,16 @@ final class FastMapValue implements MapValue { this.record.of(address); } + @Override + public void addLong256(int index, Long256 value) { + + } + + @Override + public void putLong256(int index, Long256 value) { + + } + private long address0(int index) { return address + valueOffsets[index]; } diff --git a/core/src/main/java/io/questdb/cairo/map/MapValue.java b/core/src/main/java/io/questdb/cairo/map/MapValue.java index f64fb061a0dbfe3f5664c3f9f7299090961f5392..327b49f785c28509cfa5ef9f6dfc7ae3fd7d8e86 100644 --- a/core/src/main/java/io/questdb/cairo/map/MapValue.java +++ b/core/src/main/java/io/questdb/cairo/map/MapValue.java @@ -25,6 +25,7 @@ package io.questdb.cairo.map; import io.questdb.cairo.sql.Record; +import io.questdb.std.Long256; public interface MapValue extends Record { @@ -85,4 +86,8 @@ public interface MapValue extends Record { void putTimestamp(int index, long value); void setMapRecordHere(); + + void addLong256(int index, Long256 value); + + void putLong256(int index, Long256 value); } diff --git a/core/src/main/java/io/questdb/griffin/SqlCodeGenerator.java b/core/src/main/java/io/questdb/griffin/SqlCodeGenerator.java index f52b839a63805d197ff1072912799b0609732160..543dab25ef60cdd85ca4de8c62211ed7162d241f 100644 --- a/core/src/main/java/io/questdb/griffin/SqlCodeGenerator.java +++ b/core/src/main/java/io/questdb/griffin/SqlCodeGenerator.java @@ -3387,6 +3387,7 @@ public class SqlCodeGenerator implements Mutable, Closeable { sumConstructors.put(ColumnType.DOUBLE, SumDoubleVectorAggregateFunction::new); sumConstructors.put(ColumnType.INT, SumIntVectorAggregateFunction::new); sumConstructors.put(ColumnType.LONG, SumLongVectorAggregateFunction::new); + sumConstructors.put(ColumnType.LONG256, SumLong256VectorAggregateFunction::new); sumConstructors.put(ColumnType.DATE, SumDateVectorAggregateFunction::new); sumConstructors.put(ColumnType.TIMESTAMP, SumTimestampVectorAggregateFunction::new); diff --git a/core/src/main/java/io/questdb/griffin/engine/functions/groupby/SumLong256GroupByFunction.java b/core/src/main/java/io/questdb/griffin/engine/functions/groupby/SumLong256GroupByFunction.java new file mode 100644 index 0000000000000000000000000000000000000000..59474f55892a20a012be303ea404809cd7bafc9f --- /dev/null +++ b/core/src/main/java/io/questdb/griffin/engine/functions/groupby/SumLong256GroupByFunction.java @@ -0,0 +1,112 @@ +/******************************************************************************* + * ___ _ ____ ____ + * / _ \ _ _ ___ ___| |_| _ \| __ ) + * | | | | | | |/ _ \/ __| __| | | | _ \ + * | |_| | |_| | __/\__ \ |_| |_| | |_) | + * \__\_\\__,_|\___||___/\__|____/|____/ + * + * Copyright (c) 2014-2019 Appsicle + * Copyright (c) 2019-2022 QuestDB + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + ******************************************************************************/ + +package io.questdb.griffin.engine.functions.groupby; + +import io.questdb.cairo.ArrayColumnTypes; +import io.questdb.cairo.ColumnType; +import io.questdb.cairo.map.MapValue; +import io.questdb.cairo.sql.Function; +import io.questdb.cairo.sql.Record; +import io.questdb.griffin.engine.functions.GroupByFunction; +import io.questdb.griffin.engine.functions.Long256Function; +import io.questdb.griffin.engine.functions.UnaryFunction; +import io.questdb.std.Long256; +import io.questdb.std.Long256Impl; +import io.questdb.std.str.CharSink; +import org.jetbrains.annotations.NotNull; + +public class SumLong256GroupByFunction extends Long256Function implements GroupByFunction, UnaryFunction { + private final Function arg; + private int valueIndex; + + public SumLong256GroupByFunction(@NotNull Function arg) { + this.arg = arg; + } + + @Override + public void computeFirst(MapValue mapValue, Record record) { + final Long256 value = arg.getLong256A(record); + if (!value.equals(Long256Impl.NULL_LONG256)) { + mapValue.putLong256(valueIndex, value); + mapValue.putLong(valueIndex + 1, 1); + } else { + mapValue.putLong256(valueIndex, Long256Impl.ZERO_LONG256); + mapValue.putLong(valueIndex + 1, 0); + } + } + + @Override + public void computeNext(MapValue mapValue, Record record) { + final Long256 value = arg.getLong256A(record); + if (!value.equals(Long256Impl.NULL_LONG256)) { + mapValue.addLong256(valueIndex, value); + mapValue.addLong(valueIndex + 1, 1); + } + } + + @Override + public void pushValueTypes(ArrayColumnTypes columnTypes) { + this.valueIndex = columnTypes.getColumnCount(); + columnTypes.add(ColumnType.LONG256); + columnTypes.add(ColumnType.LONG); + } + + @Override + public void setNull(MapValue mapValue) { + mapValue.putLong256(valueIndex, Long256Impl.NULL_LONG256); + mapValue.putLong(valueIndex + 1, 0); + } + + @Override + public Function getArg() { + return arg; + } + + @Override + public void getLong256(Record rec, CharSink sink) { + Long256Impl v = (Long256Impl) getLong256A(rec); + v.toSink(sink); + } + + @Override + public Long256 getLong256A(Record rec) { + Long256Impl res = new Long256Impl(); + if (rec.getLong(valueIndex + 1) > 0) { + res.copyFrom(rec.getLong256A(valueIndex)); + return res; + } + return Long256Impl.NULL_LONG256; + } + + @Override + public Long256 getLong256B(Record rec) { + return getLong256A(rec); + } + + @Override + public boolean isConstant() { + return false; + } +} diff --git a/core/src/main/java/io/questdb/griffin/engine/functions/groupby/SumLong256GroupByFunctionFactory.java b/core/src/main/java/io/questdb/griffin/engine/functions/groupby/SumLong256GroupByFunctionFactory.java new file mode 100644 index 0000000000000000000000000000000000000000..0dc5f3a2cf1e5da3d6ab9f30f1cb4cd4ed862f09 --- /dev/null +++ b/core/src/main/java/io/questdb/griffin/engine/functions/groupby/SumLong256GroupByFunctionFactory.java @@ -0,0 +1,49 @@ +/******************************************************************************* + * ___ _ ____ ____ + * / _ \ _ _ ___ ___| |_| _ \| __ ) + * | | | | | | |/ _ \/ __| __| | | | _ \ + * | |_| | |_| | __/\__ \ |_| |_| | |_) | + * \__\_\\__,_|\___||___/\__|____/|____/ + * + * Copyright (c) 2014-2019 Appsicle + * Copyright (c) 2019-2022 QuestDB + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + ******************************************************************************/ + +package io.questdb.griffin.engine.functions.groupby; + +import io.questdb.cairo.CairoConfiguration; +import io.questdb.cairo.sql.Function; +import io.questdb.griffin.FunctionFactory; +import io.questdb.griffin.SqlExecutionContext; +import io.questdb.std.IntList; +import io.questdb.std.ObjList; + +public class SumLong256GroupByFunctionFactory implements FunctionFactory { + @Override + public String getSignature() { + return "sum(H)"; + } + + @Override + public boolean isGroupBy() { + return true; + } + + @Override + public Function newInstance(int position, ObjList args, IntList argPositions, CairoConfiguration configuration, SqlExecutionContext sqlExecutionContext) { + return new SumLong256GroupByFunction(args.getQuick(0)); + } +} diff --git a/core/src/main/java/io/questdb/griffin/engine/functions/math/AddLong256FunctionFactory.java b/core/src/main/java/io/questdb/griffin/engine/functions/math/AddLong256FunctionFactory.java new file mode 100644 index 0000000000000000000000000000000000000000..3cf96aceceb93f9e008bcbfd09eab2339a641fa0 --- /dev/null +++ b/core/src/main/java/io/questdb/griffin/engine/functions/math/AddLong256FunctionFactory.java @@ -0,0 +1,89 @@ +/******************************************************************************* + * ___ _ ____ ____ + * / _ \ _ _ ___ ___| |_| _ \| __ ) + * | | | | | | |/ _ \/ __| __| | | | _ \ + * | |_| | |_| | __/\__ \ |_| |_| | |_) | + * \__\_\\__,_|\___||___/\__|____/|____/ + * + * Copyright (c) 2014-2019 Appsicle + * Copyright (c) 2019-2022 QuestDB + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + ******************************************************************************/ + +package io.questdb.griffin.engine.functions.math; + +import io.questdb.cairo.CairoConfiguration; +import io.questdb.cairo.sql.Function; +import io.questdb.cairo.sql.Record; +import io.questdb.griffin.FunctionFactory; +import io.questdb.griffin.SqlExecutionContext; +import io.questdb.griffin.engine.functions.BinaryFunction; +import io.questdb.griffin.engine.functions.Long256Function; +import io.questdb.std.IntList; +import io.questdb.std.Long256; +import io.questdb.std.Long256Impl; +import io.questdb.std.ObjList; +import io.questdb.std.str.CharSink; + +public class AddLong256FunctionFactory implements FunctionFactory { + @Override + public String getSignature() { + return "+(HH)"; + } + + @Override + public Function newInstance(int position, ObjList args, IntList argPositions, CairoConfiguration configuration, SqlExecutionContext sqlExecutionContext) { + return new AddLong256Func(args.getQuick(0), args.getQuick(1)); + } + + private static class AddLong256Func extends Long256Function implements BinaryFunction { + final Long256Impl long256A = new Long256Impl(); + final Long256Impl long256B = new Long256Impl(); + + final Function left; + final Function right; + + public AddLong256Func(Function left, Function right) { + this.left = left; + this.right = right; + } + + @Override + public Function getLeft() { + return left; + } + + @Override + public Function getRight() { + return right; + } + + @Override + public void getLong256(Record rec, CharSink sink) { + Long256Impl v = (Long256Impl) getLong256A(rec); + v.toSink(sink); + } + + @Override + public Long256 getLong256A(Record rec) { + return Long256Impl.add(long256A, left.getLong256A(rec), right.getLong256A(rec)); + } + + @Override + public Long256 getLong256B(Record rec) { + return Long256Impl.add(long256B, left.getLong256B(rec), right.getLong256B(rec)); + } + } +} \ No newline at end of file diff --git a/core/src/main/java/io/questdb/griffin/engine/groupby/SimpleMapValue.java b/core/src/main/java/io/questdb/griffin/engine/groupby/SimpleMapValue.java index da35145c891504725de01722c906b0a201f67fb9..a6da28d99cd01addef656b3692024d3d1213db3e 100644 --- a/core/src/main/java/io/questdb/griffin/engine/groupby/SimpleMapValue.java +++ b/core/src/main/java/io/questdb/griffin/engine/groupby/SimpleMapValue.java @@ -25,13 +25,16 @@ package io.questdb.griffin.engine.groupby; import io.questdb.cairo.map.MapValue; +import io.questdb.std.Long256; +import io.questdb.std.Long256Impl; +import io.questdb.std.Long256Util; public class SimpleMapValue implements MapValue { - + private final Long256Impl long256 = new Long256Impl(); private final long[] values; public SimpleMapValue(int columnCount) { - this.values = new long[columnCount]; + this.values = new long[4 * columnCount]; } public void copy(SimpleMapValue other) { @@ -46,72 +49,52 @@ public class SimpleMapValue implements MapValue { @Override public boolean getBool(int index) { - return values[index] == 0; + return values[4 * index] == 0; } @Override public byte getByte(int index) { - return (byte) values[index]; + return (byte) values[4 * index]; } @Override public long getDate(int index) { - return values[index]; + return values[4 * index]; } @Override public double getDouble(int index) { - return Double.longBitsToDouble(values[index]); + return Double.longBitsToDouble(values[4 * index]); } @Override public float getFloat(int index) { - return Float.intBitsToFloat((int) values[index]); + return Float.intBitsToFloat((int) values[4 * index]); } @Override public char getChar(int index) { - return (char) values[index]; + return (char) values[4 * index]; } @Override public int getInt(int index) { - return (int) values[index]; + return (int) values[4 * index]; } @Override public long getLong(int index) { - return values[index]; + return values[4 * index]; } @Override public short getShort(int index) { - return (short) values[index]; + return (short) values[4 * index]; } @Override public long getTimestamp(int index) { - return values[index]; - } - - @Override - public byte getGeoByte(int col) { - return (byte)values[col]; - } - - @Override - public short getGeoShort(int col) { - return (short) values[col]; - } - - @Override - public int getGeoInt(int col) { - return (int) values[col]; - } - - @Override - public long getGeoLong(int col) { - return values[col]; + return values[4 * index]; } @Override @@ -121,88 +104,135 @@ public class SimpleMapValue implements MapValue { @Override public void putBool(int index, boolean value) { - values[index] = value ? 0 : 1; + values[4 * index] = value ? 0 : 1; } @Override public void putByte(int index, byte value) { - values[index] = value; + values[4 * index] = value; } @Override public void addByte(int index, byte value) { - values[index] += value; + values[4 * index] += value; } @Override public void putDate(int index, long value) { - values[index] = value; + values[4 * index] = value; } @Override public void putDouble(int index, double value) { - values[index] = Double.doubleToLongBits(value); + values[4 * index] = Double.doubleToLongBits(value); } @Override public void addDouble(int index, double value) { - final double d = Double.longBitsToDouble(values[index]); - values[index] = Double.doubleToLongBits(value + d); + final double d = Double.longBitsToDouble(values[4 * index]); + values[4 * index] = Double.doubleToLongBits(value + d); } @Override public void putFloat(int index, float value) { - values[index] = Float.floatToIntBits(value); + values[4 * index] = Float.floatToIntBits(value); } @Override public void addFloat(int index, float value) { - final float d = Float.intBitsToFloat((int) values[index]); - values[index] = Float.floatToIntBits(value + d); + final float d = Float.intBitsToFloat((int) values[4 * index]); + values[4 * index] = Float.floatToIntBits(value + d); } @Override public void putInt(int index, int value) { - values[index] = value; + values[4 * index] = value; } @Override public void addInt(int index, int value) { - values[index] += value; + values[4 * index] += value; } @Override public void putLong(int index, long value) { - values[index] = value; + values[4 * index] = value; } @Override public void addLong(int index, long value) { - values[index] += value; + values[4 * index] += value; } @Override public void putShort(int index, short value) { - values[index] = value; + values[4 * index] = value; } @Override public void addShort(int index, short value) { - values[index] += value; + values[4 * index] += value; } @Override public void putChar(int index, char value) { - values[index] = value; + values[4 * index] = value; } @Override public void putTimestamp(int index, long value) { - values[index] = value; + values[4 * index] = value; + } + + @Override + public void addLong256(int index, Long256 value) { + Long256 acc = getLong256A(index); + Long256Util.add(acc, value); + final int idx = 4 * index; + values[idx] = acc.getLong0(); + values[idx + 1] = acc.getLong1(); + values[idx + 2] = acc.getLong2(); + values[idx + 3] = acc.getLong3(); + } + + @Override + public void putLong256(int index, Long256 value) { + final int idx = 4 * index; + values[idx] = value.getLong0(); + values[idx + 1] = value.getLong1(); + values[idx + 2] = value.getLong2(); + values[idx + 3] = value.getLong3(); + } + + @Override + public Long256 getLong256A(int index) { + final int idx = 4 * index; + long256.setAll(values[idx], values[idx + 1], values[idx + 2], values[idx + 3]); + return long256; + } + + @Override + public byte getGeoByte(int col) { + return (byte) values[4 * col]; } @Override public void setMapRecordHere() { throw new UnsupportedOperationException(); } + + @Override + public short getGeoShort(int col) { + return (short) values[4 * col]; + } + + @Override + public int getGeoInt(int col) { + return (int) values[4 * col]; + } + + @Override + public long getGeoLong(int col) { + return values[4 * col]; + } } diff --git a/core/src/main/java/io/questdb/griffin/engine/groupby/vect/GroupByRecordCursorFactory.java b/core/src/main/java/io/questdb/griffin/engine/groupby/vect/GroupByRecordCursorFactory.java index 87f8fa493eb5c85f29cff8523e1f96ac168c66e5..69e6351a3b28d9ab70a3480754f73d78f859c7fa 100644 --- a/core/src/main/java/io/questdb/griffin/engine/groupby/vect/GroupByRecordCursorFactory.java +++ b/core/src/main/java/io/questdb/griffin/engine/groupby/vect/GroupByRecordCursorFactory.java @@ -28,8 +28,8 @@ import io.questdb.MessageBus; import io.questdb.cairo.CairoConfiguration; import io.questdb.cairo.ColumnType; import io.questdb.cairo.ColumnTypes; -import io.questdb.cairo.sql.*; import io.questdb.cairo.sql.Record; +import io.questdb.cairo.sql.*; import io.questdb.griffin.SqlException; import io.questdb.griffin.SqlExecutionContext; import io.questdb.log.Log; @@ -378,6 +378,9 @@ public class GroupByRecordCursorFactory implements RecordCursorFactory { private class RostiRecord implements Record { private long pRow; + private final Long256Impl long256A = new Long256Impl(); + private final Long256Impl long256B = new Long256Impl(); + public void of(long pRow) { this.pRow = pRow; } @@ -438,17 +441,28 @@ public class GroupByRecordCursorFactory implements RecordCursorFactory { @Override public void getLong256(int col, CharSink sink) { - + Long256Impl v = (Long256Impl) getLong256A(col); + v.toSink(sink); } @Override public Long256 getLong256A(int col) { - return null; + return getLong256Value(long256A, col); } @Override public Long256 getLong256B(int col) { - return null; + return getLong256Value(long256B, col); + } + + public Long256 getLong256Value(Long256 dst, int col) { + final long offset = getValueOffset(col); + final long l0 = Unsafe.getUnsafe().getLong(offset); + final long l1 = Unsafe.getUnsafe().getLong(offset + Long.BYTES); + final long l2 = Unsafe.getUnsafe().getLong(offset + 2 * Long.BYTES); + final long l3 = Unsafe.getUnsafe().getLong(offset + 3 + Long.BYTES); + dst.setAll(l0, l1, l2, l3); + return dst; } @Override diff --git a/core/src/main/java/io/questdb/griffin/engine/groupby/vect/SumLong256VectorAggregateFunction.java b/core/src/main/java/io/questdb/griffin/engine/groupby/vect/SumLong256VectorAggregateFunction.java new file mode 100644 index 0000000000000000000000000000000000000000..f9f5208c4ac249c4a6572ab920a8658c91a8bcf5 --- /dev/null +++ b/core/src/main/java/io/questdb/griffin/engine/groupby/vect/SumLong256VectorAggregateFunction.java @@ -0,0 +1,177 @@ +/******************************************************************************* + * ___ _ ____ ____ + * / _ \ _ _ ___ ___| |_| _ \| __ ) + * | | | | | | |/ _ \/ __| __| | | | _ \ + * | |_| | |_| | __/\__ \ |_| |_| | |_) | + * \__\_\\__,_|\___||___/\__|____/|____/ + * + * Copyright (c) 2014-2019 Appsicle + * Copyright (c) 2019-2022 QuestDB + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + ******************************************************************************/ + +package io.questdb.griffin.engine.groupby.vect; + +import io.questdb.cairo.ArrayColumnTypes; +import io.questdb.cairo.ColumnType; +import io.questdb.cairo.sql.Record; +import io.questdb.griffin.engine.functions.Long256Function; +import io.questdb.mp.SimpleSpinLock; +import io.questdb.std.ThreadLocal; +import io.questdb.std.*; +import io.questdb.std.str.CharSink; + +import java.util.concurrent.atomic.LongAdder; + +import static io.questdb.griffin.SqlCodeGenerator.GKK_HOUR_INT; + +public class SumLong256VectorAggregateFunction extends Long256Function implements VectorAggregateFunction { + private static final ThreadLocal partialSums = new ThreadLocal<>(Long256Impl::new); + private final SimpleSpinLock lock = new SimpleSpinLock(); + private final Long256Impl sumA = new Long256Impl(); + private final Long256Impl sumB = new Long256Impl(); + private final LongAdder count = new LongAdder(); + private final int columnIndex; + private final DistinctFunc distinctFunc; + private final KeyValueFunc keyValueFunc; + private int valueOffset; + + public SumLong256VectorAggregateFunction(int keyKind, int columnIndex, int workerCount) { + this.columnIndex = columnIndex; + if (keyKind == GKK_HOUR_INT) { + distinctFunc = Rosti::keyedHourDistinct; + keyValueFunc = Rosti::keyedHourSumLong256; + } else { + distinctFunc = Rosti::keyedIntDistinct; + keyValueFunc = Rosti::keyedIntSumLong256; + } + } + + @Override + public void aggregate(long address, long addressSize, int columnSizeHint, int workerId) { + if (address != 0) { + final long count = addressSize / (Long.BYTES * 4); + Long256Impl value = sumLong256(partialSums.get(), address, count); + if (value != Long256Impl.NULL_LONG256) { + lock.lock(); + try { + Long256Util.add(sumA, value); + this.count.increment(); + } finally { + lock.unlock(); + } + } + } + } + + @Override + public void aggregate(long pRosti, long keyAddress, long valueAddress, long valueAddressSize, int columnSizeShr, int workerId) { + if (valueAddress == 0) { + distinctFunc.run(pRosti, keyAddress, valueAddressSize / (4 * Long.BYTES)); + } else { + keyValueFunc.run(pRosti, keyAddress, valueAddress, valueAddressSize / (4 * Long.BYTES), valueOffset); + } + } + + @Override + public int getColumnIndex() { + return columnIndex; + } + + @Override + public int getValueOffset() { + return valueOffset; + } + + @Override + public void initRosti(long pRosti) { + Unsafe.getUnsafe().putLong(Rosti.getInitialValueSlot(pRosti, valueOffset), 0); + Unsafe.getUnsafe().putLong(Rosti.getInitialValueSlot(pRosti, valueOffset) + Long.BYTES, 0); + Unsafe.getUnsafe().putLong(Rosti.getInitialValueSlot(pRosti, valueOffset) + 2 * Long.BYTES, 0); + Unsafe.getUnsafe().putLong(Rosti.getInitialValueSlot(pRosti, valueOffset) + 3 * Long.BYTES, 0); + Unsafe.getUnsafe().putLong(Rosti.getInitialValueSlot(pRosti, valueOffset + 1), 0); + } + + @Override + public void merge(long pRostiA, long pRostiB) { + Rosti.keyedIntSumLong256Merge(pRostiA, pRostiB, valueOffset); + } + + @Override + public void pushValueTypes(ArrayColumnTypes types) { + this.valueOffset = types.getColumnCount(); + types.add(ColumnType.LONG256); + types.add(ColumnType.LONG); + } + + @Override + public void wrapUp(long pRosti) { + Rosti.keyedIntSumLong256WrapUp(pRosti, valueOffset, sumA.getLong0(), sumA.getLong1(), sumA.getLong2(), sumA.getLong3(), count.sum()); + } + + @Override + public void clear() { + sumA.setAll(0, 0, 0, 0); + sumB.setAll(0, 0, 0, 0); + count.reset(); + } + + @Override + public void getLong256(Record rec, CharSink sink) { + Long256Impl v = (Long256Impl) getLong256A(rec); + v.toSink(sink); + } + + @Override + public Long256 getLong256A(Record rec) { + if (count.sum() > 0) { + return sumA; + } + return Long256Impl.NULL_LONG256; + } + + @Override + public Long256 getLong256B(Record rec) { + if (count.sum() > 0) { + sumB.copyFrom(sumA); + return sumB; + } + return Long256Impl.NULL_LONG256; + } + + private Long256Impl sumLong256(Long256Impl sum, long address, long count) { + boolean hasData = false; + long offset = 0; + sum.setAll(0, 0, 0, 0); + for (long i = 0; i < count; i++) { + final long l0 = Unsafe.getUnsafe().getLong(address + offset); + final long l1 = Unsafe.getUnsafe().getLong(address + offset + Long.BYTES); + final long l2 = Unsafe.getUnsafe().getLong(address + offset + Long.BYTES * 2); + final long l3 = Unsafe.getUnsafe().getLong(address + offset + Long.BYTES * 3); + + boolean isNull = l0 == Numbers.LONG_NaN && + l1 == Numbers.LONG_NaN && + l2 == Numbers.LONG_NaN && + l3 == Numbers.LONG_NaN; + + if (!isNull) { + Long256Util.add(sum, l0, l1, l2, l3); + hasData = true; + } + offset += 4 * Long.BYTES; + } + return hasData ? sum : Long256Impl.NULL_LONG256; + } +} diff --git a/core/src/main/java/io/questdb/mp/SimpleSpinLock.java b/core/src/main/java/io/questdb/mp/SimpleSpinLock.java new file mode 100644 index 0000000000000000000000000000000000000000..49325b2b404f244621e130e30461de867fc51cfd --- /dev/null +++ b/core/src/main/java/io/questdb/mp/SimpleSpinLock.java @@ -0,0 +1,48 @@ +/******************************************************************************* + * ___ _ ____ ____ + * / _ \ _ _ ___ ___| |_| _ \| __ ) + * | | | | | | |/ _ \/ __| __| | | | _ \ + * | |_| | |_| | __/\__ \ |_| |_| | |_) | + * \__\_\\__,_|\___||___/\__|____/|____/ + * + * Copyright (c) 2014-2019 Appsicle + * Copyright (c) 2019-2022 QuestDB + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + ******************************************************************************/ + +package io.questdb.mp; + +import java.util.concurrent.atomic.AtomicBoolean; + +// Simple, non-reentrant and unfair lock implementation. +// Don't try to use it for complex cases! +public class SimpleSpinLock { + AtomicBoolean lock = new AtomicBoolean(false); + + public void lock() { + while (true) { + while (lock.get()) { + // do nothing + } + if (!lock.getAndSet(true)) { + return; + } + } + } + + public void unlock() { + lock.set(false); + } +} diff --git a/core/src/main/java/io/questdb/std/Long256.java b/core/src/main/java/io/questdb/std/Long256.java index ffabd207a980d48ba409b6cda7dfd7822b314c28..3d52e65061137c293f5a7f5c44aff45117567173 100644 --- a/core/src/main/java/io/questdb/std/Long256.java +++ b/core/src/main/java/io/questdb/std/Long256.java @@ -28,7 +28,7 @@ package io.questdb.std; * A 256 bit hash with string representation up to 64 hex digits following a prefix '0x'. * (e.g. 0xaba86bf575ba7fde98b6673bb7d85bf489fd71a619cddaecba5de0378e3d22ed) */ -public interface Long256 { +public interface Long256 extends Long256Acceptor { int BYTES = 32; long getLong0(); diff --git a/core/src/main/java/io/questdb/std/Long256Impl.java b/core/src/main/java/io/questdb/std/Long256Impl.java index 3e2416a11afc2a6b1f16d5f1bdecfd1b7683e777..69578b5a8d03ffedcf29acfe014a405f54789252 100644 --- a/core/src/main/java/io/questdb/std/Long256Impl.java +++ b/core/src/main/java/io/questdb/std/Long256Impl.java @@ -26,9 +26,11 @@ package io.questdb.std; import io.questdb.std.str.CharSink; -public class Long256Impl implements Long256, Sinkable, Long256Acceptor { +public class Long256Impl implements Long256, Sinkable { public static final Long256Impl NULL_LONG256 = new Long256Impl(); + public static final Long256Impl ZERO_LONG256 = new Long256Impl(); + private long l0; private long l1; private long l2; @@ -97,6 +99,15 @@ public class Long256Impl implements Long256, Sinkable, Long256Acceptor { Numbers.appendLong256(l0, l1, l2, l3, sink); } + public static Long256Impl add(final Long256Impl sum, final Long256 x, final Long256 y) { + if (x.equals(Long256Impl.NULL_LONG256) || y.equals(Long256Impl.NULL_LONG256)) { + return Long256Impl.NULL_LONG256; + } + sum.copyFrom(x); + Long256Util.add(sum, y); + return sum; + } + static { NULL_LONG256.setAll( Numbers.LONG_NaN, @@ -104,5 +115,6 @@ public class Long256Impl implements Long256, Sinkable, Long256Acceptor { Numbers.LONG_NaN, Numbers.LONG_NaN ); + ZERO_LONG256.setAll(0, 0, 0, 0); } } diff --git a/core/src/main/java/io/questdb/std/Long256Util.java b/core/src/main/java/io/questdb/std/Long256Util.java index 5d242c640e2e0fcbf9e62487c6aa438a62943d24..04c8b81065df867032de805f71cf8ed5a71a2e67 100644 --- a/core/src/main/java/io/questdb/std/Long256Util.java +++ b/core/src/main/java/io/questdb/std/Long256Util.java @@ -55,4 +55,40 @@ public class Long256Util { return Long.compare(a.getLong0(), b.getLong0()); } + + public static void add(Long256 dst, long v0, long v1, long v2, long v3) { + boolean isNull = v0 == Numbers.LONG_NaN && + v1 == Numbers.LONG_NaN && + v2 == Numbers.LONG_NaN && + v3 == Numbers.LONG_NaN; + + if (isNull) { + dst.setAll(Numbers.LONG_NaN, + Numbers.LONG_NaN, + Numbers.LONG_NaN, + Numbers.LONG_NaN); + } else { + // The sum will overflow if both top bits are set (x & y) or if one of them + // is (x | y), and a carry from the lower place happened. If such a carry + // happens, the top bit will be 1 + 0 + 1 = 0 (& ~sum). + long carry = 0; + final long l0 = v0 + dst.getLong0() + carry; + carry = ((v0 & dst.getLong0()) | ((v0 | dst.getLong0()) & ~l0)) >>> 63; + + final long l1 = v1 + dst.getLong1() + carry; + carry = ((v1 & dst.getLong1()) | ((v1 | dst.getLong1()) & ~l1)) >>> 63; + + final long l2 = v2 + dst.getLong2() + carry; + carry = ((v2 & dst.getLong2()) | ((v2 | dst.getLong2()) & ~l2)) >>> 63; + + final long l3 = v3 + dst.getLong3() + carry; + //carry = ((v3 & dst.getLong3()) | ((v3 | dst.getLong3()) & ~l3)) >>> 63; + + dst.setAll(l0, l1, l2, l3); + } + } + + public static void add(Long256 acc, Long256 incr) { + add(acc, incr.getLong0(), incr.getLong1(), incr.getLong2(), incr.getLong3()); + } } diff --git a/core/src/main/java/io/questdb/std/Numbers.java b/core/src/main/java/io/questdb/std/Numbers.java index 418a4988d507a5a7492bd854ad89092a2045a884..fe6f85308541a49c294a8fcf3537be5504afd34c 100644 --- a/core/src/main/java/io/questdb/std/Numbers.java +++ b/core/src/main/java/io/questdb/std/Numbers.java @@ -323,7 +323,7 @@ public final class Numbers { sink.put("NaN"); return; } - int bit = value == 0 ? 0 : 64 - Long.numberOfLeadingZeros(value - 1); + int bit = value == 0 ? 0 : 64 - Long.numberOfLeadingZeros(value); LongHexAppender[] array = pad ? longHexAppenderPad64 : longHexAppender; array[bit].append(sink, value); } diff --git a/core/src/main/java/io/questdb/std/Rosti.java b/core/src/main/java/io/questdb/std/Rosti.java index 333f3b741d75dc77e585641c1ebe3a2279b8d9d4..ba307963fdf0247531d91d6ccaf0633d354c370d 100644 --- a/core/src/main/java/io/questdb/std/Rosti.java +++ b/core/src/main/java/io/questdb/std/Rosti.java @@ -141,14 +141,15 @@ public final class Rosti { public static native void keyedIntMaxIntWrapUp(long pRosti, int valueOffset, int valueAtNull); - // sum long - public static native void keyedIntSumLong(long pRosti, long pKeys, long pDouble, long count, int valueOffset); + public static native void keyedHourSumLong(long pRosti, long pKeys, long pLong, long count, int valueOffset); - public static native void keyedIntSumLongLong(long pRosti, long pKeys, long pDouble, long count, int valueOffset); + // sum long256 + public static native void keyedHourSumLong256(long pRosti, long pKeys, long pLong256, long count, int valueOffset); - public static native void keyedHourSumLong(long pRosti, long pKeys, long pDouble, long count, int valueOffset); + public static native void keyedHourSumLongLong(long pRosti, long pKeys, long pLong, long count, int valueOffset); - public static native void keyedHourSumLongLong(long pRosti, long pKeys, long pDouble, long count, int valueOffset); + // sum long + public static native void keyedIntSumLong(long pRosti, long pKeys, long pLong, long count, int valueOffset); public static native void keyedIntSumLongMerge(long pRostiA, long pRostiB, int valueOffset); @@ -158,6 +159,14 @@ public final class Rosti { public static native void keyedIntSumLongLongWrapUp(long pRosti, int valueOffset, long valueAtNull, long valueAtNullCount); + public static native void keyedIntSumLong256(long pRosti, long pKeys, long pLong, long count, int valueOffset); + + public static native void keyedIntSumLong256Merge(long pRostiA, long pRostiB, int valueOffset); + + public static native void keyedIntSumLong256WrapUp(long pRosti, int valueOffset, long v0, long v1, long v2, long v3, long valueAtNullCount); + + public static native void keyedIntSumLongLong(long pRosti, long pKeys, long pLong, long count, int valueOffset); + // avg long public static native void keyedIntAvgLongWrapUp(long pRosti, int valueOffset, double valueAtNull, long valueAtNullCount); diff --git a/core/src/main/java/module-info.java b/core/src/main/java/module-info.java index 2cd89867f0d667d299168290e866891952dfa40d..e08efc9c98783ee56f36a6829eb68661c61f541e 100644 --- a/core/src/main/java/module-info.java +++ b/core/src/main/java/module-info.java @@ -144,6 +144,7 @@ open module io.questdb { io.questdb.griffin.engine.functions.math.AddLongFunctionFactory, io.questdb.griffin.engine.functions.math.AddFloatFunctionFactory, io.questdb.griffin.engine.functions.math.AddDoubleFunctionFactory, + io.questdb.griffin.engine.functions.math.AddLong256FunctionFactory, io.questdb.griffin.engine.functions.date.AddLongToTimestampFunctionFactory, // # '-' operator, io.questdb.griffin.engine.functions.math.NegIntFunctionFactory, @@ -456,6 +457,7 @@ open module io.questdb { io.questdb.griffin.engine.functions.groupby.SumFloatGroupByFunctionFactory, io.questdb.griffin.engine.functions.groupby.SumIntGroupByFunctionFactory, io.questdb.griffin.engine.functions.groupby.SumLongGroupByFunctionFactory, + io.questdb.griffin.engine.functions.groupby.SumLong256GroupByFunctionFactory, io.questdb.griffin.engine.functions.groupby.KSumDoubleGroupByFunctionFactory, io.questdb.griffin.engine.functions.groupby.NSumDoubleGroupByFunctionFactory, // 'last' group by function diff --git a/core/src/main/resources/META-INF/services/io.questdb.griffin.FunctionFactory b/core/src/main/resources/META-INF/services/io.questdb.griffin.FunctionFactory index a8e1bda0dadd4e3349ecadf069a49fe4c79c1c20..e3b6def45b252f3b6871b6e444f07acdb7bf54a4 100644 --- a/core/src/main/resources/META-INF/services/io.questdb.griffin.FunctionFactory +++ b/core/src/main/resources/META-INF/services/io.questdb.griffin.FunctionFactory @@ -77,6 +77,7 @@ io.questdb.griffin.engine.functions.math.AddLongFunctionFactory io.questdb.griffin.engine.functions.math.AddFloatFunctionFactory io.questdb.griffin.engine.functions.math.AddDoubleFunctionFactory io.questdb.griffin.engine.functions.date.AddLongToTimestampFunctionFactory +io.questdb.griffin.engine.functions.math.AddLong256FunctionFactory # '-' operator io.questdb.griffin.engine.functions.math.NegIntFunctionFactory @@ -426,6 +427,7 @@ io.questdb.griffin.engine.functions.groupby.SumDoubleGroupByFunctionFactory io.questdb.griffin.engine.functions.groupby.SumFloatGroupByFunctionFactory io.questdb.griffin.engine.functions.groupby.SumIntGroupByFunctionFactory io.questdb.griffin.engine.functions.groupby.SumLongGroupByFunctionFactory +io.questdb.griffin.engine.functions.groupby.SumLong256GroupByFunctionFactory io.questdb.griffin.engine.functions.groupby.KSumDoubleGroupByFunctionFactory io.questdb.griffin.engine.functions.groupby.NSumDoubleGroupByFunctionFactory diff --git a/core/src/main/resources/io/questdb/bin/armlinux/libquestdb.so b/core/src/main/resources/io/questdb/bin/armlinux/libquestdb.so index 8dad555471ac3736ab27b00e65fffc42a48152af..155609eec107fbae777cbc1821f7007d03ec57e8 100644 Binary files a/core/src/main/resources/io/questdb/bin/armlinux/libquestdb.so and b/core/src/main/resources/io/questdb/bin/armlinux/libquestdb.so differ diff --git a/core/src/main/resources/io/questdb/bin/armosx/libquestdb.dylib b/core/src/main/resources/io/questdb/bin/armosx/libquestdb.dylib index ab940b4eb5c9706c97c01277f1b73dad5046eb8f..52306f4682fcd2e2ed4c82bdba2c6998c18ecaf0 100755 Binary files a/core/src/main/resources/io/questdb/bin/armosx/libquestdb.dylib and b/core/src/main/resources/io/questdb/bin/armosx/libquestdb.dylib differ diff --git a/core/src/main/resources/io/questdb/bin/linux/libquestdb.so b/core/src/main/resources/io/questdb/bin/linux/libquestdb.so index 2d7a075e057ff9b79aade6e93891ede78c958134..dfef80aa2a3b39615f2fed00d189ea9ecec30e52 100644 Binary files a/core/src/main/resources/io/questdb/bin/linux/libquestdb.so and b/core/src/main/resources/io/questdb/bin/linux/libquestdb.so differ diff --git a/core/src/main/resources/io/questdb/bin/osx/libquestdb.dylib b/core/src/main/resources/io/questdb/bin/osx/libquestdb.dylib index e7fdb673312a598e6fe218b32a27f46cc68f2da2..d389f0a8ddadba41f0af1fa16c1e2f86c0a76815 100644 Binary files a/core/src/main/resources/io/questdb/bin/osx/libquestdb.dylib and b/core/src/main/resources/io/questdb/bin/osx/libquestdb.dylib differ diff --git a/core/src/main/resources/io/questdb/bin/windows/libquestdb.dll b/core/src/main/resources/io/questdb/bin/windows/libquestdb.dll index 2ceaa85e1d343268987458e848c1c7732165a204..6784edd8bd883be14894215ef4bdc48794c83955 100644 Binary files a/core/src/main/resources/io/questdb/bin/windows/libquestdb.dll and b/core/src/main/resources/io/questdb/bin/windows/libquestdb.dll differ diff --git a/core/src/test/java/io/questdb/griffin/KeyedAggregationTest.java b/core/src/test/java/io/questdb/griffin/KeyedAggregationTest.java index bcb8dd920d166f1290a77cb1dfcb253d33a491f6..0cd5ce0bd7478b4a3fbb4d1a658d31e9b4f7afe8 100644 --- a/core/src/test/java/io/questdb/griffin/KeyedAggregationTest.java +++ b/core/src/test/java/io/questdb/griffin/KeyedAggregationTest.java @@ -89,6 +89,19 @@ public class KeyedAggregationTest extends AbstractGriffinTest { ); } + @Test + public void testHourLong256() throws Exception { + assertQuery( + "hour\tcount\tsum\n" + + "0\t36000\t0x464fffffffffffff7360\n" + + "1\t36000\t0x464fffffffffffff7360\n" + + "2\t28000\t0x36afffffffffffff92a0\n", + "select hour(ts), count(), sum(val) from tab order by 1", + "create table tab as (select timestamp_sequence(0, 100000) ts, cast(9223372036854775807 as long256) val from long_sequence(100000))", + null, true, true, true + ); + } + @Test public void testHourLongMissingFunctions() throws Exception { assertQuery( diff --git a/core/src/test/java/io/questdb/griffin/engine/AbstractFunctionFactoryTest.java b/core/src/test/java/io/questdb/griffin/engine/AbstractFunctionFactoryTest.java index 1bf563fb4afed75daccf9ebf15fc6b3f52c9c1da..18743d2fc94baee442799a19aafc612fd60cd8d0 100644 --- a/core/src/test/java/io/questdb/griffin/engine/AbstractFunctionFactoryTest.java +++ b/core/src/test/java/io/questdb/griffin/engine/AbstractFunctionFactoryTest.java @@ -511,6 +511,12 @@ public abstract class AbstractFunctionFactoryTest extends BaseFunctionFactoryTes closeFunctions(); } + public void andAssertLong256(Long256 expected) { + Assert.assertTrue(expected.equals(function1.getLong256A(record))); + Assert.assertTrue(expected.equals(function2.getLong256A(record))); + closeFunctions(); + } + public Record getRecord() { return record; } diff --git a/core/src/test/java/io/questdb/griffin/engine/functions/groupby/SumLong256GroupByFunctionFactoryTest.java b/core/src/test/java/io/questdb/griffin/engine/functions/groupby/SumLong256GroupByFunctionFactoryTest.java new file mode 100644 index 0000000000000000000000000000000000000000..05ee221d278cbff279659ed81ef41e6a17bd8000 --- /dev/null +++ b/core/src/test/java/io/questdb/griffin/engine/functions/groupby/SumLong256GroupByFunctionFactoryTest.java @@ -0,0 +1,60 @@ +/******************************************************************************* + * ___ _ ____ ____ + * / _ \ _ _ ___ ___| |_| _ \| __ ) + * | | | | | | |/ _ \/ __| __| | | | _ \ + * | |_| | |_| | __/\__ \ |_| |_| | |_) | + * \__\_\\__,_|\___||___/\__|____/|____/ + * + * Copyright (c) 2014-2019 Appsicle + * Copyright (c) 2019-2022 QuestDB + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + ******************************************************************************/ + +package io.questdb.griffin.engine.functions.groupby; + +import io.questdb.griffin.AbstractGriffinTest; +import org.junit.Test; + +public class SumLong256GroupByFunctionFactoryTest extends AbstractGriffinTest { + + @Test + public void testSumAllNull() throws Exception { + assertMemoryLeak(() -> assertSql( + "select sum(x) from (select cast(null as long256) x from long_sequence(100000))", + "sum\n\n")); + } + + @Test + public void testSumOverUnionAll() throws Exception { + assertQuery("sm\n0x06\n", + "select * from ( select sum(x) as sm from (select * from test union all select * from test ) )", + "create table test as (select cast(x as long256) x from long_sequence(2))", + null, false, true, true); + } + + @Test + public void testSumSeq() throws Exception { + assertMemoryLeak(() -> assertSql( + "select sum(x), sum(y) from (select cast(x as long256) x, x as y from long_sequence(100000))", + "sum\tsum1\n0x012a06b550\t5000050000\n")); + } + + @Test + public void testSumSeqWithFilter() throws Exception { + assertMemoryLeak(() -> assertSql( + "select sum(x), sum(y) from (select cast(x as long256) x, x as y from long_sequence(100000) where x > 10000)", + "sum\tsum1\n0x01270bb148\t4950045000\n")); + } +} \ No newline at end of file diff --git a/core/src/test/java/io/questdb/griffin/engine/functions/groupby/SumLong256VecGroupByFunctionFactoryTest.java b/core/src/test/java/io/questdb/griffin/engine/functions/groupby/SumLong256VecGroupByFunctionFactoryTest.java new file mode 100644 index 0000000000000000000000000000000000000000..3629c3d521693683d6374f553aed5132669be2cc --- /dev/null +++ b/core/src/test/java/io/questdb/griffin/engine/functions/groupby/SumLong256VecGroupByFunctionFactoryTest.java @@ -0,0 +1,74 @@ +/******************************************************************************* + * ___ _ ____ ____ + * / _ \ _ _ ___ ___| |_| _ \| __ ) + * | | | | | | |/ _ \/ __| __| | | | _ \ + * | |_| | |_| | __/\__ \ |_| |_| | |_) | + * \__\_\\__,_|\___||___/\__|____/|____/ + * + * Copyright (c) 2014-2019 Appsicle + * Copyright (c) 2019-2022 QuestDB + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + ******************************************************************************/ + +package io.questdb.griffin.engine.functions.groupby; + +import io.questdb.griffin.AbstractGriffinTest; +import org.junit.Test; + +public class SumLong256VecGroupByFunctionFactoryTest extends AbstractGriffinTest { + + @Test + public void testAllNullThenOne() throws Exception { + assertQuery( + "sum\n\n", + "select sum(f) from tab", + "create table tab as (select cast(null as long256) f from long_sequence(33))", + null, + "insert into tab select cast(123L as long256) from long_sequence(1)", + "sum\n0x7b\n", + false, + true, + true + ); + } + + @Test + public void testSimple() throws Exception { + assertQuery( + "sum\tsum1\n5050\t0x13ba\n", + "select sum(x), sum(y) from tab", + "create table tab as (select x, cast(x as long256) y from long_sequence(100))\n", + null, + false, + true, + true + ); + } + + @Test + public void testSumBigFive() throws Exception { + assertMemoryLeak(() -> { + compiler.compile("create table tab (x long256)", sqlExecutionContext); + executeInsert("insert into tab values (0xb00ee5505bd95e51dd18889bae1dee3404d446e61d5293f55ff29ba4a01ab073)"); + executeInsert("insert into tab values (0x6f64ae42c48c96a19e099d7a980099af601f70d614b709804ea60bf902c30e3e)"); + executeInsert("insert into tab values (0x7c6ec2b2ffd4a89ec87dd041359f34661ce5fa64b58567a438c725aa47e609dd)"); + executeInsert("insert into tab values (0x15ed3484045af1d460b09ed4a3984890458c09608a4ce455731bed64a1545c05)"); + executeInsert("insert into tab values (0xb6292e820db4d91ba9a74c8c459676d127590af55a4eccba93a826db814c49c6)"); + String query = "select sum(x) from tab"; + String ex = "sum\n0x67f8b94c324a68824df7e1b864ec7baaeebec676cc2ab629ee23e1880d646e59\n"; + printSqlResult(ex, query, null, false, true); + }); + } +} \ No newline at end of file diff --git a/core/src/test/java/io/questdb/griffin/engine/functions/math/AddLong256FunctionFactoryTest.java b/core/src/test/java/io/questdb/griffin/engine/functions/math/AddLong256FunctionFactoryTest.java new file mode 100644 index 0000000000000000000000000000000000000000..3e5b29f1304010b1e70f315093e977eb4caf5b4e --- /dev/null +++ b/core/src/test/java/io/questdb/griffin/engine/functions/math/AddLong256FunctionFactoryTest.java @@ -0,0 +1,126 @@ +/******************************************************************************* + * ___ _ ____ ____ + * / _ \ _ _ ___ ___| |_| _ \| __ ) + * | | | | | | |/ _ \/ __| __| | | | _ \ + * | |_| | |_| | __/\__ \ |_| |_| | |_) | + * \__\_\\__,_|\___||___/\__|____/|____/ + * + * Copyright (c) 2014-2019 Appsicle + * Copyright (c) 2019-2022 QuestDB + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + ******************************************************************************/ + +package io.questdb.griffin.engine.functions.math; + +import io.questdb.griffin.FunctionFactory; +import io.questdb.griffin.SqlException; +import io.questdb.griffin.engine.AbstractFunctionFactoryTest; +import io.questdb.std.Long256; +import io.questdb.std.Long256Impl; +import io.questdb.std.Numbers; +import org.junit.Test; + +import java.math.BigInteger; +import java.util.Random; + +public class AddLong256FunctionFactoryTest extends AbstractFunctionFactoryTest { + + @Test + public void testAddNoCarry() throws SqlException { + CharSequence tok1 = "0x7fffffffffffffff7fffffffffffffff7fffffffffffffff7fffffffffffffff"; + CharSequence tok2 = "0x7000000000000000800000000000000080000000000000008000000000000000"; + CharSequence tok3 = "0xefffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff"; + Long256 l1 = Numbers.parseLong256(tok1, tok1.length(), new Long256Impl()); + Long256 l2 = Numbers.parseLong256(tok2, tok2.length(), new Long256Impl()); + Long256 l3 = Numbers.parseLong256(tok3, tok3.length(), new Long256Impl()); + callBySignature("+(HH)", l1, l2).andAssertLong256(l3); + } + + @Test + public void testAddSimple() throws SqlException { + CharSequence tok1 = "0x01"; + CharSequence tok2 = "0x02"; + CharSequence tok3 = "0x03"; + Long256 l1 = Numbers.parseLong256(tok1, tok1.length(), new Long256Impl()); + Long256 l2 = Numbers.parseLong256(tok2, tok2.length(), new Long256Impl()); + Long256 l3 = Numbers.parseLong256(tok3, tok3.length(), new Long256Impl()); + callBySignature("+(HH)", l1, l2).andAssertLong256(l3); + } + + @Test + public void testAddWithCarry() throws SqlException { + CharSequence tok1 = "0x7ffffffffffffff7fffffffffffffff7fffffffffffffff7ffffffffffffffff"; + CharSequence tok2 = "0x1000000000000000100000000000000010000000000000001000000000000000"; + CharSequence tok3 = "0x8ffffffffffffff80ffffffffffffff80ffffffffffffff80fffffffffffffff"; + Long256 l1 = Numbers.parseLong256(tok1, tok1.length(), new Long256Impl()); + Long256 l2 = Numbers.parseLong256(tok2, tok2.length(), new Long256Impl()); + Long256 l3 = Numbers.parseLong256(tok3, tok3.length(), new Long256Impl()); + callBySignature("+(HH)", l1, l2).andAssertLong256(l3); + } + + @Test + public void testAddWithCarry1() throws SqlException { + CharSequence tok1 = "fffffffffffffff0fffffffffffffff0fffffffffffffff0ffffffffffffffff"; + CharSequence tok2 = "0000000000000000100000000000000010000000000000001000000000000000"; + BigInteger b1 = new BigInteger(tok1.toString(), 16); + BigInteger b2 = new BigInteger(tok2.toString(), 16); + //CharSequence tok2 = "0x01"; + CharSequence tok3 = b1.add(b2).toString(16); + Long256 l1 = Numbers.parseLong256(tok1, tok1.length(), new Long256Impl()); + Long256 l2 = Numbers.parseLong256(tok2, tok2.length(), new Long256Impl()); + Long256 l3 = Numbers.parseLong256(tok3, tok3.length(), new Long256Impl()); + callBySignature("+(HH)", l1, l2).andAssertLong256(l3); + } + + @Test + public void testAddWithNull() throws SqlException { + CharSequence tok1 = "0x01"; + Long256 l1 = Numbers.parseLong256(tok1, tok1.length(), new Long256Impl()); + callBySignature("+(HH)", l1, Long256Impl.NULL_LONG256).andAssertLong256(Long256Impl.NULL_LONG256); + callBySignature("+(HH)", Long256Impl.NULL_LONG256, l1).andAssertLong256(Long256Impl.NULL_LONG256); + } + + @Test + public void testAddWithOverflow() throws SqlException { + CharSequence tok1 = "0x7fffffffffffffff7fffffffffffffff7fffffffffffffff7fffffffffffffff"; + CharSequence tok2 = "0x8000000000000000800000000000000080000000000000008000000000000000"; + Long256 l1 = Numbers.parseLong256(tok1, tok1.length(), new Long256Impl()); + Long256 l2 = Numbers.parseLong256(tok2, tok2.length(), new Long256Impl()); + callBySignature("+(HH)", l1, l2).andAssertLong256(Long256Impl.NULL_LONG256); + } + + @Test + public void testRandom() throws SqlException { + Random rnd = new Random(); + for (int i = 0; i < 1000; i++) { + BigInteger b1 = new BigInteger(256, rnd); + BigInteger b2 = new BigInteger(256, rnd); + CharSequence expTok = b1.add(b2).toString(16); + Long256 expected = Numbers.parseLong256(expTok, expTok.length(), new Long256Impl()); + + CharSequence tok1 = b1.toString(16); + CharSequence tok2 = b2.toString(16); + Long256 l1 = Numbers.parseLong256(tok1, tok1.length(), new Long256Impl()); + Long256 l2 = Numbers.parseLong256(tok2, tok2.length(), new Long256Impl()); + + callBySignature("+(HH)", l1, l2).andAssertLong256(expected); + } + } + + @Override + protected FunctionFactory getFunctionFactory() { + return new AddLong256FunctionFactory(); + } +} \ No newline at end of file