提交 a820f1a1 编写于 作者: V Vlad Ilyushchenko

feat(cairo): null-aware sum(double), avg(double), min(double) and max(double)....

feat(cairo): null-aware sum(double), avg(double), min(double) and max(double). These functions run on entire column.
上级 05b0c7d1
......@@ -13,6 +13,8 @@ set(
set(
VCL_FILES2
src/main/c/share/vect_vanilla.h
src/main/c/share/vect_vanilla.cpp
src/main/c/share/vect.cpp
src/main/c/share/vcl/instrset_detect.cpp
)
......@@ -24,6 +26,7 @@ set(
src/main/c/share/net.h
src/main/c/share/zip.c
src/main/c/share/os.h
src/main/c/share/vect_vanilla.h
)
# JNI includes
......@@ -113,10 +116,10 @@ if (AARCH64)
questdb-aarch64
)
set_target_properties(
questdb PROPERTIES
COMPILE_FLAGS "-O2"
)
set_target_properties(
questdb PROPERTIES
COMPILE_FLAGS "-O2"
)
else ()
......@@ -159,12 +162,24 @@ else ()
)
set_target_properties(
questdb-sse2 PROPERTIES
COMPILE_FLAGS "-m64 -O2 -msse2 -fPIC -std=c++17"
COMPILE_FLAGS "-m64 -O2 -msse2 -fPIC -std=c++17 -Wno-attributes"
)
if (WIN32)
set_target_properties(
questdb-sse2 PROPERTIES
COMPILE_FLAGS "-m64 -O2 -msse2 -std=c++17 -Wno-attributes"
)
else ()
set_target_properties(
questdb-sse2 PROPERTIES
COMPILE_FLAGS "-m64 -O2 -msse2 -fPIC -std=c++17"
)
endif (WIN32)
set_target_properties(
questdb PROPERTIES
COMPILE_FLAGS "-m64 -O2"
COMPILE_FLAGS "-m64 -O2 "
)
target_link_libraries(
......
......@@ -134,14 +134,9 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.19</version>
<version>2.22.2</version>
<configuration>
<argLine>-Xmx512m</argLine>
<systemPropertyVariables>
<java.util.logging.SimpleFormatter.format>
<![CDATA[%1$tY-%1$tm-%1$td %1$tH:%1$tM:%1$tS %4$s %2$s %5$s%6$s%n]]>
</java.util.logging.SimpleFormatter.format>
</systemPropertyVariables>
</configuration>
</plugin>
</plugins>
......
......@@ -24,23 +24,21 @@
#include <cstdio>
#include <jni.h>
#include "../share/vect_vanilla.h"
inline double sumDouble_Vanilla(double *d, long count) {
const double *ext = d + count;
double result = 0;
double *pd = d;
for (; pd < ext; pd++) {
result += *pd;
}
return result;
}
extern "C" {
JNIEXPORT jdouble
JNICALL Java_io_questdb_std_Vect_sumDouble(JNIEnv *env, jclass cl, jlong pDouble, jlong count) {
JNIEXPORT jdouble JNICALL Java_io_questdb_std_Vect_sumDouble(JNIEnv *env, jclass cl, jlong pDouble, jlong count) {
return sumDouble_Vanilla((double*) pDouble, count);
}
JNIEXPORT jdouble JNICALL Java_io_questdb_std_Vect_avgDouble(JNIEnv *env, jclass cl, jlong pDouble, jlong count) {
return avgDouble_Vanilla((double*) pDouble, count);
}
JNIEXPORT jdouble JNICALL Java_io_questdb_std_Vect_minDouble(JNIEnv *env, jclass cl, jlong pDouble, jlong count) {
return minDouble_Vanilla((double*) pDouble, count);
}
}
......@@ -22,35 +22,72 @@
*
******************************************************************************/
#include <cstdio>
#include "vcl/vectorclass.h"
#include <cfloat>
#include "vect.h"
#define MAX_VECTOR_SIZE 512
// Define function type
// Change this to fit your purpose. Should not contain vector types:
typedef double SumDoubleType(double *, long);
// Define function name depending on which instruction set we compile for
#if INSTRSET >= 10
// function prototypes for each version
SumDoubleType sumDouble_SSE2, sumDouble_SSE41, sumDouble_AVX2, sumDouble_AVX512, sumDouble_dispatch;
#define SUM_DOUBLE F_AVX512(sumDouble)
#define AVG_DOUBLE F_AVX512(avgDouble)
#define MIN_DOUBLE F_AVX512(minDouble)
#define MAX_DOUBLE F_AVX512(maxDouble)
#elif INSTRSET >= 8
#define SUM_DOUBLE F_AVX2(sumDouble)
#define AVG_DOUBLE F_AVX2(avgDouble)
#define MIN_DOUBLE F_AVX2(minDouble)
#define MAX_DOUBLE F_AVX2(maxDouble)
#elif INSTRSET >= 5
#define SUM_DOUBLE F_SSE41(sumDouble)
#define AVG_DOUBLE F_SSE41(avgDouble)
#define MIN_DOUBLE F_SSE41(minDouble)
#define MAX_DOUBLE F_SSE41(maxDouble)
// Define function name depending on which instruction set we compile for
#if INSTRSET >= 10 // AVX512VL
#define SUM_DOUBLE sumDouble_AVX512
#elif INSTRSET >= 8 // AVX2
#define SUM_DOUBLE sumDouble_AVX2
#elif INSTRSET >= 5 // SSE4.1
#define SUM_DOUBLE sumDouble_SSE41
#elif INSTRSET >= 2
#define SUM_DOUBLE sumDouble_SSE2 // SSE2
#define SUM_DOUBLE F_SSE2(sumDouble)
#define AVG_DOUBLE F_SSE2(avgDouble)
#define MIN_DOUBLE F_SSE2(minDouble)
#define MAX_DOUBLE F_SSE2(maxDouble)
#else
#endif
#ifdef SUM_DOUBLE
// Dispatched version of the function. Compile this once for each instruction set:
double SUM_DOUBLE(double *d, long count) {
const int step = 8;
const long remainder = count - (count / step) * step;
const double *vec_lim = d + count - remainder;
double *pd = d;
Vec8d vec;
double result = 0;
for (; pd < vec_lim; pd += step) {
vec.load(pd);
double s = horizontal_add(vec);
if (s != s) {
result += sum_nan_as_zero(pd, step);
} else {
result += s;
}
}
if (remainder > 0) {
result += sum_nan_as_zero(pd, remainder);
}
return result;
}
/*
double SUM_DOUBLE_NOT_NULL(double *d, long count) {
const int step = 8;
const long remainder = count - (count / step) * step;
const double *lim = d + count;
......@@ -71,63 +108,103 @@ double SUM_DOUBLE(double *d, long count) {
}
return result;
}
*/
#endif
#if INSTRSET < 4
double AVG_DOUBLE(double *d, long count) {
const int step = 8;
const long remainder = count - (count / step) * step;
const double *vec_lim = d + count - remainder;
double sumDouble_Vanilla(double *d, long count) {
const double *ext = d + count;
double result = 0;
double *pd = d;
for (; pd < ext; pd++) {
result += *pd;
Vec8d vec;
double sum = 0;
long sumCount = 0;
for (; pd < vec_lim; pd += step) {
vec.load(pd);
double s = horizontal_add(vec);
if (s != s) {
auto v = avg_skip_nan(pd, step);
sum += v.sum;
sumCount += v.count;
} else {
sum += s;
sumCount += step;
}
}
return result;
}
// make dispatcher in only the lowest of the compiled versions
// This function pointer initially points to the dispatcher.
// After the first call it points to the selected version:
if (remainder > 0) {
auto v = avg_skip_nan(pd, remainder);
sum += v.sum;
sumCount += v.count;
}
return sum / sumCount;
}
SumDoubleType * sumDouble_pointer = &sumDouble_dispatch;
double MIN_DOUBLE(double *d, long count) {
const int step = 8;
const long remainder = count - (count / step) * step;
const double *lim = d + count;
const double *vec_lim = lim - remainder;
// Dispatcher
double sumDouble_dispatch(double * d, long size) {
const int iset = instrset_detect(); // Detect supported instruction set
if (iset >= 10) {
sumDouble_pointer = &sumDouble_AVX512; // AVX512 version
}
else if (iset >= 8) {
sumDouble_pointer = &sumDouble_AVX2; // AVX2 version
}
else if (iset >= 5) {
sumDouble_pointer = &sumDouble_SSE41; // SSE4.1 version
}
else if (iset >= 2) {
sumDouble_pointer = &sumDouble_SSE2; // SSE2 version
}
else {
sumDouble_pointer = &sumDouble_Vanilla; // vanilla version
double *pd = d;
Vec8d vec;
double min = LDBL_MAX;
for (; pd < vec_lim; pd += step) {
vec.load(pd);
double x = horizontal_min1(vec);
if (x < min) {
min = x;
}
}
// continue in dispatched version of the function
return (*sumDouble_pointer)(d, size);
}
// Entry to dispatched function call
inline double sumDouble(double *d, long size) {
return (*sumDouble_pointer)(d, size);
if (pd < lim) {
for (; pd < lim; pd++) {
double x = *pd;
if (x < min) {
min = x;
}
}
}
return min;
}
extern "C" {
double MAX_DOUBLE(double *d, long count) {
const int step = 8;
const long remainder = count - (count / step) * step;
const double *lim = d + count;
const double *vec_lim = lim - remainder;
#include <jni.h>
double *pd = d;
Vec8d vec;
double max = LDBL_MIN;
for (; pd < vec_lim; pd += step) {
vec.load(pd);
double x = horizontal_min1(vec);
if (x > max) {
max = x;
}
}
JNIEXPORT jdouble JNICALL Java_io_questdb_std_Vect_sumDouble(JNIEnv *env, jclass cl, jlong pDouble, jlong size) {
return sumDouble((double *) pDouble, size);
if (pd < lim) {
for (; pd < lim; pd++) {
double x = *pd;
if (x > max) {
max = x;
}
}
}
return max;
}
}
#endif
#if INSTRSET < 4
// Dispatchers
DISPATCHER(sumDouble)
DISPATCHER(avgDouble)
DISPATCHER(minDouble)
DISPATCHER(maxDouble)
#endif // INSTRSET == 2
//
// Created by blues on 20/02/2020.
//
#ifndef VECT_H
#define VECT_H
#include <jni.h>
#include "vcl/vectorclass.h"
#include "vect_vanilla.h"
typedef double DoubleVectFuncType(double *, long);
#define POINTER_NAME(func) func ## _pointer
#define F_AVX512(func) func ## _AVX512
#define F_AVX2(func) func ## _AVX2
#define F_SSE41(func) func ## _SSE41
#define F_SSE2(func) func ## _SSE2
#define F_VANILLA(func) func ## _Vanilla
#define F_DISPATCH(func) func ## _dispatch
#define DISPATCHER(func) \
\
DoubleVectFuncType F_SSE2(func), F_SSE41(func), F_AVX2(func), F_AVX512(func), F_DISPATCH(func); \
\
DoubleVectFuncType *POINTER_NAME(func) = &func ## _dispatch; \
\
double F_DISPATCH(func) (double *d, long count) { \
const int iset = instrset_detect(); \
if (iset >= 10) { \
POINTER_NAME(func) = &F_AVX512(func); \
} else if (iset >= 8) { \
POINTER_NAME(func) = &F_AVX2(func); \
} else if (iset >= 5) { \
POINTER_NAME(func) = &F_SSE41(func); \
} else if (iset >= 2) { \
POINTER_NAME(func) = &F_SSE2(func); \
} else { \
POINTER_NAME(func) = &F_VANILLA(func); \
}\
return (*POINTER_NAME(func))(d, count); \
} \
\
inline double func(double *d, long count) { \
return (*POINTER_NAME(func))(d, count); \
}\
\
extern "C" { \
JNIEXPORT jdouble JNICALL Java_io_questdb_std_Vect_ ## func(JNIEnv *env, jclass cl, jlong pDouble, jlong size) { \
return func((double *) pDouble, size); \
}\
\
}
#endif //VECT_H
#include <cfloat>
/*******************************************************************************
* ___ _ ____ ____
* / _ \ _ _ ___ ___| |_| _ \| __ )
* | | | | | | |/ _ \/ __| __| | | | _ \
* | |_| | |_| | __/\__ \ |_| |_| | |_) |
* \__\_\\__,_|\___||___/\__|____/|____/
*
* Copyright (c) 2014-2019 Appsicle
* Copyright (c) 2019-2020 QuestDB
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
******************************************************************************/
#include "vect_vanilla.h"
double sumDouble_Vanilla(double *d, long count) {
return sum_nan_as_zero(d, count);
}
double avgDouble_Vanilla(double *d, long count) {
auto v = avg_skip_nan(d, count);
return v.sum / v.count;
}
double minDouble_Vanilla(double *d, long count) {
const double *ext = d + count;
double min = LDBL_MAX;
double *pd = d;
for (; pd < ext; pd++) {
double x = *pd;
if (x < min) {
min = x;
}
}
return min;
}
double maxDouble_Vanilla(double *d, long count) {
const double *ext = d + count;
double max = LDBL_MIN;
double *pd = d;
for (; pd < ext; pd++) {
double x = *pd;
if (x > max) {
max = x;
}
}
return max;
}
\ No newline at end of file
/*******************************************************************************
* ___ _ ____ ____
* / _ \ _ _ ___ ___| |_| _ \| __ )
* | | | | | | |/ _ \/ __| __| | | | _ \
* | |_| | |_| | __/\__ \ |_| |_| | |_) |
* \__\_\\__,_|\___||___/\__|____/|____/
*
* Copyright (c) 2014-2019 Appsicle
* Copyright (c) 2019-2020 QuestDB
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
******************************************************************************/
#ifndef VECT_VANILLA_H
#define VECT_VANILLA_H
static inline double sum_nan_as_zero(double *pd, long count) {
const double *lim = pd + count;
double sum = 0;
for (; pd < lim; pd++) {
const double d = *pd;
if (d == d) {
sum += d;
}
}
return sum;
}
typedef struct _AVG_DOUBLE {
double sum;
long count;
} AVG_DOUBLE;
static inline AVG_DOUBLE avg_skip_nan(double *pd, long count) {
const double *lim = pd + count;
double sum = 0;
long sumCount = 0;
for (; pd < lim; pd++) {
const double d = *pd;
if (d == d) {
sum += d;
sumCount++;
}
}
AVG_DOUBLE ad;
ad.sum = sum;
ad.count = sumCount;
return ad;
}
double sumDouble_Vanilla(double *d, long count);
double avgDouble_Vanilla(double *d, long count);
double minDouble_Vanilla(double *d, long count);
double maxDouble_Vanilla(double *d, long count);
#endif //VECT_VANILLA_H
......@@ -446,9 +446,7 @@ public class TableReader implements Closeable {
}
public double sumDouble(int columnIndex) {
double result = 0;
for (int i = 0; i < partitionCount; i++) {
openPartition(i);
final int base = getColumnBase(i);
......@@ -462,22 +460,67 @@ public class TableReader implements Closeable {
}
return result;
//
// if (partitionCount > 0) {
// openPartition(0);
// int base = getColumnBase(0);
// int index = getPrimaryColumnIndex(base, columnIndex);
// final ReadOnlyColumn column = columns.getQuick(index);
// if (column instanceof OnePageMemory) {
// OnePageMemory opm = (OnePageMemory) column;
// final long addr = opm.getPageAddress();
//
// long size = opm.size();
//
// return Vect.sumDouble(addr, size / Double.BYTES);
// }
// }
// return 0;
}
public double minDouble(int columnIndex) {
double min = Double.MAX_VALUE;
for (int i = 0; i < partitionCount; i++) {
openPartition(i);
final int base = getColumnBase(i);
final int index = getPrimaryColumnIndex(base, columnIndex);
final ReadOnlyColumn column = columns.getQuick(index);
for (int pageIndex = 0, pageCount = column.getPageCount(); pageIndex < pageCount; pageIndex++) {
long a = column.getPageAddress(pageIndex);
long count = column.getPageSize(pageIndex) / Double.BYTES;
double x = Vect.minDouble(a, count);
if (x < min) {
min = x;
}
}
}
return min;
}
public double maxDouble(int columnIndex) {
double max = Double.MIN_VALUE;
for (int i = 0; i < partitionCount; i++) {
openPartition(i);
final int base = getColumnBase(i);
final int index = getPrimaryColumnIndex(base, columnIndex);
final ReadOnlyColumn column = columns.getQuick(index);
for (int pageIndex = 0, pageCount = column.getPageCount(); pageIndex < pageCount; pageIndex++) {
long a = column.getPageAddress(pageIndex);
long count = column.getPageSize(pageIndex) / Double.BYTES;
double x = Vect.maxDouble(a, count);
if (x > max) {
max = x;
}
}
}
return max;
}
public double avgDouble(int columnIndex) {
double result = 0;
long countTotal = 0;
for (int i = 0; i < partitionCount; i++) {
openPartition(i);
final int base = getColumnBase(i);
final int index = getPrimaryColumnIndex(base, columnIndex);
final ReadOnlyColumn column = columns.getQuick(index);
for (int pageIndex = 0, pageCount = column.getPageCount(); pageIndex < pageCount; pageIndex++) {
final long a = column.getPageAddress(pageIndex);
final long count = column.getPageSize(pageIndex) / Double.BYTES;
result += Vect.avgDouble(a, count);
countTotal++;
}
}
if (countTotal == 0) {
return 0;
}
return result / countTotal;
}
private void closeRemovedPartitions() {
......
......@@ -26,4 +26,10 @@ package io.questdb.std;
public final class Vect {
public static native double sumDouble(long pDouble, long count);
public static native double avgDouble(long pDouble, long count);
public static native double minDouble(long pDouble, long count);
public static native double maxDouble(long pDouble, long count);
}
......@@ -1104,6 +1104,144 @@ public class SqlCodeGeneratorTest extends AbstractGriffinTest {
}
}
@Test
public void testMinDoubleColumn() throws Exception {
final String expected = "a\tk\n";
assertQuery(expected,
"x where 1 = 0",
"create table x as " +
"(" +
"select" +
" rnd_double(0)*100 a," +
" timestamp_sequence(0, 10000) k" +
" from" +
" long_sequence(1200000)" +
") timestamp(k) partition by DAY",
"k",
false
);
try (TableReader r = new TableReader(configuration, "x")) {
Assert.assertEquals(1.743072089888109E-4, r.minDouble(0), 0.00001);
}
}
@Test
public void testMaxDoubleColumn() throws Exception {
final String expected = "a\tk\n";
assertQuery(expected,
"x where 1 = 0",
"create table x as " +
"(" +
"select" +
" rnd_double(0)*100 a," +
" timestamp_sequence(0, 10000) k" +
" from" +
" long_sequence(1200000)" +
") timestamp(k) partition by DAY",
"k",
false
);
try (TableReader r = new TableReader(configuration, "x")) {
Assert.assertEquals(78.07372283716164, r.maxDouble(0), 0.00001);
}
}
@Test
public void testMinDoubleColumnWithNaNs() throws Exception {
final String expected = "a\tk\n";
assertQuery(expected,
"x where 1 = 0",
"create table x as " +
"(" +
"select" +
" rnd_double(2)*100 a," +
" timestamp_sequence(0, 10000) k" +
" from" +
" long_sequence(120)" +
") timestamp(k) partition by DAY",
"k",
false
);
try (TableReader r = new TableReader(configuration, "x")) {
Assert.assertEquals(0.11075361080621349, r.minDouble(0), 0.00001);
}
}
@Test
public void testMaxDoubleColumnWithNaNs() throws Exception {
final String expected = "a\tk\n";
assertQuery(expected,
"x where 1 = 0",
"create table x as " +
"(" +
"select" +
" rnd_double(2)*100 a," +
" timestamp_sequence(0, 10000) k" +
" from" +
" long_sequence(120)" +
") timestamp(k) partition by DAY",
"k",
false
);
try (TableReader r = new TableReader(configuration, "x")) {
Assert.assertEquals(72.03170014947307, r.maxDouble(0), 0.00001);
}
}
@Test
public void testAvgDoubleColumn() throws Exception {
final String expected = "a\tk\n";
assertQuery(expected,
"x where 1 = 0",
"create table x as " +
"(" +
"select" +
" rnd_double(0)*100 a," +
" timestamp_sequence(0, 10000) k" +
" from" +
" long_sequence(1200000)" +
") timestamp(k) partition by DAY",
"k",
false
);
try (TableReader r = new TableReader(configuration, "x")) {
Assert.assertEquals(50.03730496259993, r.avgDouble(0), 0.00001);
}
}
@Test
public void testAvgDoubleColumnWithNaNs() throws Exception {
final String expected = "a\tk\n";
assertQuery(expected,
"x where 1 = 0",
"create table x as " +
"(" +
"select" +
" rnd_double(2)*100 a," +
" timestamp_sequence(0, 10000) k" +
" from" +
" long_sequence(1200000)" +
") timestamp(k) partition by DAY",
"k",
false
);
try (TableReader r = new TableReader(configuration, "x")) {
Assert.assertEquals(49.99614105606191, r.avgDouble(0), 0.00001);
}
}
@Test
public void testSumDoubleColumnPartitionByNone() throws Exception {
final String expected = "a\tk\n";
......@@ -1127,6 +1265,52 @@ public class SqlCodeGeneratorTest extends AbstractGriffinTest {
}
}
@Test
public void testAvgDoubleColumnPartitionByNone() throws Exception {
final String expected = "a\tk\n";
assertQuery(expected,
"x where 1 = 0",
"create table x as " +
"(" +
"select" +
" rnd_double(0)*100 a," +
" timestamp_sequence(0, 10000) k" +
" from" +
" long_sequence(1200000)" +
") timestamp(k)",
"k",
false
);
try (TableReader r = new TableReader(configuration, "x")) {
Assert.assertEquals(50.03730496259993, r.avgDouble(0), 0.00001);
}
}
@Test
public void testAvgDoubleEmptyColumn() throws Exception {
final String expected = "a\tk\n";
assertQuery(expected,
"x where 1 = 0",
"create table x as " +
"(" +
"select" +
" rnd_double(0)*100 a," +
" timestamp_sequence(0, 10000) k" +
" from" +
" long_sequence(0)" +
") timestamp(k)",
"k",
false
);
try (TableReader r = new TableReader(configuration, "x")) {
Assert.assertEquals(0, r.avgDouble(0), 0.00001);
}
}
@Test
public void testFilterSubQueryAddSymbol() throws Exception {
// no index
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册