提交 8b0b351d 编写于 作者: H Haojun Liao

refactor: do some internal refactor.

上级 c35f668c
...@@ -134,9 +134,9 @@ ELSE () ...@@ -134,9 +134,9 @@ ELSE ()
IF("${SIMD_SUPPORT}" MATCHES "true") IF("${SIMD_SUPPORT}" MATCHES "true")
ADD_DEFINITIONS("-mavx -mavx2") ADD_DEFINITIONS("-mavx -mavx2")
MESSAGE(STATUS "cpu simd instruction AVX/AVX2 supported") MESSAGE(STATUS "SIMD instructions (AVX/AVX2) is ACTIVATED")
ELSE() ELSE()
MESSAGE(STATUS "cpu simd instruction AVX/AVX2 NOT supported") MESSAGE(STATUS "SIMD instruction (AVX/AVX2)is NOT ACTIVATED")
ENDIF() ENDIF()
ENDIF () ENDIF ()
......
...@@ -135,5 +135,5 @@ ENDIF () ...@@ -135,5 +135,5 @@ ENDIF ()
MESSAGE(STATUS "platform arch:" ${PLATFORM_ARCH_STR}) MESSAGE(STATUS "platform arch:" ${PLATFORM_ARCH_STR})
MESSAGE("C Compiler ID: ${CMAKE_C_COMPILER_ID}") MESSAGE("C Compiler: ${CMAKE_C_COMPILER} (${CMAKE_C_COMPILER_ID}, ${CMAKE_C_COMPILER_VERSION})")
MESSAGE("CXX Compiler ID: ${CMAKE_CXX_COMPILER_ID}") MESSAGE("CXX Compiler: ${CMAKE_CXX_COMPILER} (${CMAKE_C_COMPILER_ID}, ${CMAKE_CXX_COMPILER_VERSION})")
...@@ -213,7 +213,7 @@ void taosHashSetEqualFp(SHashObj *pHashObj, _equal_fn_t fp); ...@@ -213,7 +213,7 @@ void taosHashSetEqualFp(SHashObj *pHashObj, _equal_fn_t fp);
*/ */
void taosHashSetFreeFp(SHashObj *pHashObj, _hash_free_fn_t fp); void taosHashSetFreeFp(SHashObj *pHashObj, _hash_free_fn_t fp);
int64_t taosHashGetCompTimes(SHashObj *pHashObj); //int64_t taosHashGetCompTimes(SHashObj *pHashObj);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -49,11 +49,14 @@ typedef struct SAvgRes { ...@@ -49,11 +49,14 @@ typedef struct SAvgRes {
} SAvgRes; } SAvgRes;
static void floatVectorSumAVX(const float* plist, int32_t numOfRows, SAvgRes* pRes) { static void floatVectorSumAVX(const float* plist, int32_t numOfRows, SAvgRes* pRes) {
const int32_t bitWidth = 256;
#if __AVX__ #if __AVX__
// find the start position that are aligned to 32bytes address in memory // find the start position that are aligned to 32bytes address in memory
int32_t bitWidth = 8; int32_t width = (bitWidth>>3u) / sizeof(float);
int32_t remainder = numOfRows % bitWidth;
int32_t rounds = numOfRows / bitWidth; int32_t remainder = numOfRows % width;
int32_t rounds = numOfRows / width;
const float* p = plist; const float* p = plist;
...@@ -63,14 +66,14 @@ static void floatVectorSumAVX(const float* plist, int32_t numOfRows, SAvgRes* pR ...@@ -63,14 +66,14 @@ static void floatVectorSumAVX(const float* plist, int32_t numOfRows, SAvgRes* pR
for (int32_t i = 0; i < rounds; ++i) { for (int32_t i = 0; i < rounds; ++i) {
val = _mm256_loadu_ps(p); val = _mm256_loadu_ps(p);
sum = _mm256_add_ps(sum, val); sum = _mm256_add_ps(sum, val);
p += bitWidth; p += width;
} }
// let sum up the final results // let sum up the final results
const float* q = (const float*)&sum; const float* q = (const float*)&sum;
pRes->sum.dsum += q[0] + q[1] + q[2] + q[3] + q[4] + q[5] + q[6] + q[7]; pRes->sum.dsum += q[0] + q[1] + q[2] + q[3] + q[4] + q[5] + q[6] + q[7];
int32_t startIndex = rounds * bitWidth; int32_t startIndex = rounds * width;
for (int32_t j = 0; j < remainder; ++j) { for (int32_t j = 0; j < remainder; ++j) {
pRes->sum.dsum += plist[j + startIndex]; pRes->sum.dsum += plist[j + startIndex];
} }
...@@ -78,11 +81,14 @@ static void floatVectorSumAVX(const float* plist, int32_t numOfRows, SAvgRes* pR ...@@ -78,11 +81,14 @@ static void floatVectorSumAVX(const float* plist, int32_t numOfRows, SAvgRes* pR
} }
static void doubleVectorSumAVX(const double* plist, int32_t numOfRows, SAvgRes* pRes) { static void doubleVectorSumAVX(const double* plist, int32_t numOfRows, SAvgRes* pRes) {
const int32_t bitWidth = 256;
#if __AVX__ #if __AVX__
// find the start position that are aligned to 32bytes address in memory // find the start position that are aligned to 32bytes address in memory
int32_t bitWidth = 4; int32_t width = (bitWidth>>3u) / sizeof(int64_t);
int32_t remainder = numOfRows % bitWidth;
int32_t rounds = numOfRows / bitWidth; int32_t remainder = numOfRows % width;
int32_t rounds = numOfRows / width;
const double* p = plist; const double* p = plist;
...@@ -92,70 +98,143 @@ static void doubleVectorSumAVX(const double* plist, int32_t numOfRows, SAvgRes* ...@@ -92,70 +98,143 @@ static void doubleVectorSumAVX(const double* plist, int32_t numOfRows, SAvgRes*
for (int32_t i = 0; i < rounds; ++i) { for (int32_t i = 0; i < rounds; ++i) {
val = _mm256_loadu_pd(p); val = _mm256_loadu_pd(p);
sum = _mm256_add_pd(sum, val); sum = _mm256_add_pd(sum, val);
p += bitWidth; p += width;
} }
// let sum up the final results // let sum up the final results
const double* q = (const double*)&sum; const double* q = (const double*)&sum;
pRes->sum.dsum += q[0] + q[1] + q[2] + q[3]; pRes->sum.dsum += q[0] + q[1] + q[2] + q[3];
int32_t startIndex = rounds * bitWidth; int32_t startIndex = rounds * width;
for (int32_t j = 0; j < remainder; ++j) { for (int32_t j = 0; j < remainder; ++j) {
pRes->sum.dsum += plist[j + startIndex]; pRes->sum.dsum += plist[j + startIndex];
} }
#endif #endif
} }
static void i8VectorSumAVX2(const int8_t* plist, int32_t numOfRows, SAvgRes* pRes) { static void i8VectorSumAVX2(const int8_t* plist, int32_t numOfRows, int32_t type, SAvgRes* pRes) {
const int32_t bitWidth = 256;
#if __AVX2__ #if __AVX2__
// find the start position that are aligned to 32bytes address in memory // find the start position that are aligned to 32bytes address in memory
int32_t bitWidth = 16; int32_t width = (bitWidth>>3u) / sizeof(int64_t);
int32_t remainder = numOfRows % bitWidth;
int32_t rounds = numOfRows / bitWidth; int32_t remainder = numOfRows % width;
int32_t rounds = numOfRows / width;
__m256i sum = _mm256_setzero_si256();
if (type == TSDB_DATA_TYPE_TINYINT) {
const int8_t* p = plist; const int8_t* p = plist;
for (int32_t i = 0; i < rounds; ++i) {
__m128i val = _mm_lddqu_si128((__m128i*)p);
__m256i extVal = _mm256_cvtepi8_epi64(val); // only four items will be converted into __m256i
sum = _mm256_add_epi64(sum, extVal);
p += width;
}
} else {
const uint8_t* p = (const uint8_t*)plist;
for(int32_t i = 0; i < rounds; ++i) {
__m128i val = _mm_lddqu_si128((__m128i*)p);
__m256i extVal = _mm256_cvtepu8_epi64(val); // only four items will be converted into __m256i
sum = _mm256_add_epi64(sum, extVal);
p += width;
}
}
// let sum up the final results
const int64_t* q = (const int64_t*)&sum;
pRes->sum.isum += q[0] + q[1] + q[2] + q[3];
int32_t startIndex = rounds * width;
for (int32_t j = 0; j < remainder; ++j) {
pRes->sum.isum += plist[j + startIndex];
}
#endif
}
static void i16VectorSumAVX2(const int16_t* plist, int32_t numOfRows, int32_t type, SAvgRes* pRes) {
const int32_t bitWidth = 256;
#if __AVX2__
// find the start position that are aligned to 32bytes address in memory
int32_t width = (bitWidth>>3u) / sizeof(int64_t);
int32_t remainder = numOfRows % width;
int32_t rounds = numOfRows / width;
__m256i sum = _mm256_setzero_si256(); __m256i sum = _mm256_setzero_si256();
if (type == TSDB_DATA_TYPE_SMALLINT) {
const int16_t* p = plist;
for (int32_t i = 0; i < rounds; ++i) { for (int32_t i = 0; i < rounds; ++i) {
__m256i val = _mm256_lddqu_si256((__m256i*)p); __m128i val = _mm_lddqu_si128((__m128i*)p);
// __m256i extVal = _mm256_cvtepi8_epi64(val); __m256i extVal = _mm256_cvtepi16_epi64(val); // only four items will be converted into __m256i
sum = _mm256_add_epi8(sum, val); sum = _mm256_add_epi64(sum, extVal);
p += bitWidth; p += width;
}
} else {
const uint8_t* p = (const uint8_t*)plist;
for(int32_t i = 0; i < rounds; ++i) {
__m128i val = _mm_lddqu_si128((__m128i*)p);
__m256i extVal = _mm256_cvtepu16_epi64(val); // only four items will be converted into __m256i
sum = _mm256_add_epi64(sum, extVal);
p += width;
}
} }
// let sum up the final results // let sum up the final results
const int8_t* q = (const int8_t*)&sum; const int64_t* q = (const int64_t*)&sum;
pRes->sum.isum += q[0] + q[1] + q[2] + q[3]; pRes->sum.isum += q[0] + q[1] + q[2] + q[3];
int32_t startIndex = rounds * bitWidth; int32_t startIndex = rounds * width;
for (int32_t j = 0; j < remainder; ++j) { for (int32_t j = 0; j < remainder; ++j) {
pRes->sum.isum += plist[j + startIndex]; pRes->sum.isum += plist[j + startIndex];
} }
#endif #endif
} }
static void i32VectorSumAVX2(const int32_t* plist, int32_t numOfRows, SAvgRes* pRes) { static void i32VectorSumAVX2(const int32_t* plist, int32_t numOfRows, int32_t type, SAvgRes* pRes) {
const int32_t bitWidth = 256;
#if __AVX2__ #if __AVX2__
// find the start position that are aligned to 32bytes address in memory // find the start position that are aligned to 32bytes address in memory
int32_t bitWidth = 8; int32_t width = (bitWidth>>3u) / sizeof(int64_t);
int32_t remainder = numOfRows % bitWidth;
int32_t rounds = numOfRows / bitWidth;
const int32_t* p = plist; int32_t remainder = numOfRows % width;
int32_t rounds = numOfRows / width;
__m256i sum = _mm256_setzero_si256(); __m256i sum = _mm256_setzero_si256();
if (type == TSDB_DATA_TYPE_INT) {
const int32_t* p = plist;
for (int32_t i = 0; i < rounds; ++i) { for (int32_t i = 0; i < rounds; ++i) {
__m256i val = _mm256_lddqu_si256((__m256i*)p); __m128i val = _mm_lddqu_si128((__m128i*)p);
sum = _mm256_add_epi32(sum, val); __m256i extVal = _mm256_cvtepi32_epi64(val); // only four items will be converted into __m256i
p += bitWidth; sum = _mm256_add_epi64(sum, extVal);
p += width;
}
} else {
const uint32_t* p = (const uint32_t*)plist;
for(int32_t i = 0; i < rounds; ++i) {
__m128i val = _mm_lddqu_si128((__m128i*)p);
__m256i extVal = _mm256_cvtepu32_epi64(val); // only four items will be converted into __m256i
sum = _mm256_add_epi64(sum, extVal);
p += width;
}
} }
// let sum up the final results // let sum up the final results
const int32_t* q = (const int32_t*)&sum; const int64_t* q = (const int64_t*)&sum;
pRes->sum.isum += q[0] + q[1] + q[2] + q[3] + q[4] + q[5] + q[6] + q[7]; pRes->sum.isum += q[0] + q[1] + q[2] + q[3];
int32_t startIndex = rounds * bitWidth; int32_t startIndex = rounds * width;
for (int32_t j = 0; j < remainder; ++j) { for (int32_t j = 0; j < remainder; ++j) {
pRes->sum.isum += plist[j + startIndex]; pRes->sum.isum += plist[j + startIndex];
} }
...@@ -163,27 +242,30 @@ static void i32VectorSumAVX2(const int32_t* plist, int32_t numOfRows, SAvgRes* p ...@@ -163,27 +242,30 @@ static void i32VectorSumAVX2(const int32_t* plist, int32_t numOfRows, SAvgRes* p
} }
static void i64VectorSumAVX2(const int64_t* plist, int32_t numOfRows, SAvgRes* pRes) { static void i64VectorSumAVX2(const int64_t* plist, int32_t numOfRows, SAvgRes* pRes) {
const int32_t bitWidth = 256;
#if __AVX2__ #if __AVX2__
// find the start position that are aligned to 32bytes address in memory // find the start position that are aligned to 32bytes address in memory
int32_t bitWidth = 4; int32_t width = (bitWidth>>3u) / sizeof(int64_t);
int32_t remainder = numOfRows % bitWidth;
int32_t rounds = numOfRows / bitWidth;
const int64_t* p = plist; int32_t remainder = numOfRows % width;
int32_t rounds = numOfRows / width;
__m256i sum = _mm256_setzero_si256(); __m256i sum = _mm256_setzero_si256();
const int64_t* p = plist;
for (int32_t i = 0; i < rounds; ++i) { for (int32_t i = 0; i < rounds; ++i) {
__m256i val = _mm256_lddqu_si256((__m256i*)p); __m256i val = _mm256_lddqu_si256((__m256i*)p);
sum = _mm256_add_epi64(sum, val); sum = _mm256_add_epi64(sum, val);
p += bitWidth; p += width;
} }
// let sum up the final results // let sum up the final results
const int64_t* q = (const int64_t*)&sum; const int64_t* q = (const int64_t*)&sum;
pRes->sum.isum += q[0] + q[1] + q[2] + q[3]; pRes->sum.isum += q[0] + q[1] + q[2] + q[3];
int32_t startIndex = rounds * bitWidth; int32_t startIndex = rounds * width;
for (int32_t j = 0; j < remainder; ++j) { for (int32_t j = 0; j < remainder; ++j) {
pRes->sum.isum += plist[j + startIndex]; pRes->sum.isum += plist[j + startIndex];
} }
...@@ -256,182 +338,22 @@ static int32_t calculateAvgBySMAInfo(SAvgRes* pRes, int32_t numOfRows, int32_t t ...@@ -256,182 +338,22 @@ static int32_t calculateAvgBySMAInfo(SAvgRes* pRes, int32_t numOfRows, int32_t t
return numOfElem; return numOfElem;
} }
int32_t avgFunction(SqlFunctionCtx* pCtx) { static int32_t doAddNumericVector(SColumnInfoData* pCol, int32_t type, SInputColumnInfoData *pInput, SAvgRes* pRes) {
int32_t numOfElem = 0;
const int32_t THRESHOLD_SIZE = 8;
SInputColumnInfoData* pInput = &pCtx->input;
SColumnDataAgg* pAgg = pInput->pColumnDataAgg[0];
int32_t type = pInput->pData[0]->info.type;
SAvgRes* pAvgRes = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
pAvgRes->type = type;
// computing based on the true data block
SColumnInfoData* pCol = pInput->pData[0];
int32_t start = pInput->startRowIndex; int32_t start = pInput->startRowIndex;
int32_t numOfRows = pInput->numOfRows; int32_t numOfRows = pInput->numOfRows;
int32_t numOfElems = 0;
if (IS_NULL_TYPE(type)) {
numOfElem = 0;
goto _avg_over;
}
if (pInput->colDataSMAIsSet) { // try to use SMA if available
numOfElem = calculateAvgBySMAInfo(pAvgRes, numOfRows, type, pAgg);
} else if (!pCol->hasNull) { // try to employ the simd instructions to speed up the loop
numOfElem = pInput->numOfRows;
pAvgRes->count += pInput->numOfRows;
bool simdAvaiable = tsAVXEnable && tsSIMDEnable && (numOfRows > THRESHOLD_SIZE);
switch(type) {
case TSDB_DATA_TYPE_TINYINT: {
const int8_t* plist = (const int8_t*) pCol->pData;
// 1. If the CPU supports AVX, let's employ AVX instructions to speedup this loop
if (simdAvaiable) {
i8VectorSumAVX2(plist, numOfRows, pAvgRes);
} else {
for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; ++i) {
pAvgRes->sum.isum += plist[i];
}
}
break;
}
case TSDB_DATA_TYPE_SMALLINT: {
const double* plist = (const double*)pCol->pData;
// 1. If the CPU supports AVX, let's employ AVX instructions to speedup this loop
if (simdAvaiable) {
doubleVectorSumAVX(plist, numOfRows, pAvgRes);
} else {
for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; ++i) {
pAvgRes->sum.isum += plist[i];
}
}
break;
}
case TSDB_DATA_TYPE_INT: {
const int32_t* plist = (const int32_t*) pCol->pData;
// 1. If the CPU supports AVX, let's employ AVX instructions to speedup this loop
if (simdAvaiable) {
i32VectorSumAVX2(plist, numOfRows, pAvgRes);
} else {
for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; ++i) {
pAvgRes->sum.isum += plist[i];
}
}
break;
}
case TSDB_DATA_TYPE_BIGINT: {
const int64_t* plist = (const int64_t*) pCol->pData;
// 1. If the CPU supports AVX, let's employ AVX instructions to speedup this loop
if (simdAvaiable) {
i64VectorSumAVX2(plist, numOfRows, pAvgRes);
} else {
for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; ++i) {
pAvgRes->sum.isum += plist[i];
}
}
break;
}
case TSDB_DATA_TYPE_FLOAT: {
const float* plist = (const float*) pCol->pData;
// 1. If the CPU supports AVX, let's employ AVX instructions to speedup this loop
if (simdAvaiable) {
floatVectorSumAVX(plist, numOfRows, pAvgRes);
} else {
for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; ++i) {
pAvgRes->sum.dsum += plist[i];
}
}
break;
}
case TSDB_DATA_TYPE_DOUBLE: {
const double* plist = (const double*) pCol->pData;
// 1. If the CPU supports AVX, let's employ AVX instructions to speedup this loop
if (simdAvaiable) {
doubleVectorSumAVX(plist, numOfRows, pAvgRes);
} else {
for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; ++i) {
pAvgRes->sum.dsum += plist[i];
}
}
break;
}
case TSDB_DATA_TYPE_UTINYINT: {
const double* plist = (const double*) pCol->pData;
// 1. If the CPU supports AVX, let's employ AVX instructions to speedup this loop
if (simdAvaiable) {
doubleVectorSumAVX(plist, numOfRows, pAvgRes);
} else {
for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; ++i) {
pAvgRes->sum.usum += plist[i];
}
}
break;
}
case TSDB_DATA_TYPE_USMALLINT: {
const double* plist = (const double*) pCol->pData;
// 1. If the CPU supports AVX, let's employ AVX instructions to speedup this loop
if (simdAvaiable) {
doubleVectorSumAVX(plist, numOfRows, pAvgRes);
} else {
for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; ++i) {
pAvgRes->sum.usum += plist[i];
}
}
break;
}
case TSDB_DATA_TYPE_UINT: {
const double* plist = (const double*) pCol->pData;
// 1. If the CPU supports AVX, let's employ AVX instructions to speedup this loop
if (simdAvaiable) {
doubleVectorSumAVX(plist, numOfRows, pAvgRes);
} else {
for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; ++i) {
pAvgRes->sum.usum += plist[i];
}
}
break;
}
case TSDB_DATA_TYPE_UBIGINT: {
const double* plist = (const double*) pCol->pData;
// 1. If the CPU supports AVX, let's employ AVX instructions to speedup this loop
if (simdAvaiable) {
doubleVectorSumAVX(plist, numOfRows, pAvgRes);
} else {
for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; ++i) {
pAvgRes->sum.usum += plist[i];
}
}
break;
}
default:
ASSERT(0);
}
} else {
switch (type) { switch (type) {
case TSDB_DATA_TYPE_TINYINT: { case TSDB_DATA_TYPE_TINYINT: {
int8_t* plist = (int8_t*)pCol->pData; int8_t* plist = (int8_t*)pCol->pData;
for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) { for (int32_t i = start; i < numOfRows + start; ++i) {
if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) { if (colDataIsNull_f(pCol->nullbitmap, i)) {
continue; continue;
} }
numOfElem += 1; numOfElems += 1;
pAvgRes->count += 1; pRes->count += 1;
pAvgRes->sum.isum += plist[i]; pRes->sum.isum += plist[i];
} }
break; break;
...@@ -439,28 +361,28 @@ int32_t avgFunction(SqlFunctionCtx* pCtx) { ...@@ -439,28 +361,28 @@ int32_t avgFunction(SqlFunctionCtx* pCtx) {
case TSDB_DATA_TYPE_SMALLINT: { case TSDB_DATA_TYPE_SMALLINT: {
int16_t* plist = (int16_t*)pCol->pData; int16_t* plist = (int16_t*)pCol->pData;
for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) { for (int32_t i = start; i < numOfRows + start; ++i) {
if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) { if (colDataIsNull_f(pCol->nullbitmap, i)) {
continue; continue;
} }
numOfElem += 1; numOfElems += 1;
pAvgRes->count += 1; pRes->count += 1;
pAvgRes->sum.isum += plist[i]; pRes->sum.isum += plist[i];
} }
break; break;
} }
case TSDB_DATA_TYPE_INT: { case TSDB_DATA_TYPE_INT: {
int32_t* plist = (int32_t*)pCol->pData; int32_t* plist = (int32_t*)pCol->pData;
for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) { for (int32_t i = start; i < numOfRows + start; ++i) {
if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) { if (colDataIsNull_f(pCol->nullbitmap, i)) {
continue; continue;
} }
numOfElem += 1; numOfElems += 1;
pAvgRes->count += 1; pRes->count += 1;
pAvgRes->sum.isum += plist[i]; pRes->sum.isum += plist[i];
} }
break; break;
...@@ -468,28 +390,28 @@ int32_t avgFunction(SqlFunctionCtx* pCtx) { ...@@ -468,28 +390,28 @@ int32_t avgFunction(SqlFunctionCtx* pCtx) {
case TSDB_DATA_TYPE_BIGINT: { case TSDB_DATA_TYPE_BIGINT: {
int64_t* plist = (int64_t*)pCol->pData; int64_t* plist = (int64_t*)pCol->pData;
for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) { for (int32_t i = start; i < numOfRows + start; ++i) {
if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) { if (colDataIsNull_f(pCol->nullbitmap, i)) {
continue; continue;
} }
numOfElem += 1; numOfElems += 1;
pAvgRes->count += 1; pRes->count += 1;
pAvgRes->sum.isum += plist[i]; pRes->sum.isum += plist[i];
} }
break; break;
} }
case TSDB_DATA_TYPE_UTINYINT: { case TSDB_DATA_TYPE_UTINYINT: {
uint8_t* plist = (uint8_t*)pCol->pData; uint8_t* plist = (uint8_t*)pCol->pData;
for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) { for (int32_t i = start; i < numOfRows + start; ++i) {
if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) { if (colDataIsNull_f(pCol->nullbitmap, i)) {
continue; continue;
} }
numOfElem += 1; numOfElems += 1;
pAvgRes->count += 1; pRes->count += 1;
pAvgRes->sum.usum += plist[i]; pRes->sum.usum += plist[i];
} }
break; break;
...@@ -497,28 +419,28 @@ int32_t avgFunction(SqlFunctionCtx* pCtx) { ...@@ -497,28 +419,28 @@ int32_t avgFunction(SqlFunctionCtx* pCtx) {
case TSDB_DATA_TYPE_USMALLINT: { case TSDB_DATA_TYPE_USMALLINT: {
uint16_t* plist = (uint16_t*)pCol->pData; uint16_t* plist = (uint16_t*)pCol->pData;
for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) { for (int32_t i = start; i < numOfRows + start; ++i) {
if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) { if (colDataIsNull_f(pCol->nullbitmap, i)) {
continue; continue;
} }
numOfElem += 1; numOfElems += 1;
pAvgRes->count += 1; pRes->count += 1;
pAvgRes->sum.usum += plist[i]; pRes->sum.usum += plist[i];
} }
break; break;
} }
case TSDB_DATA_TYPE_UINT: { case TSDB_DATA_TYPE_UINT: {
uint32_t* plist = (uint32_t*)pCol->pData; uint32_t* plist = (uint32_t*)pCol->pData;
for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) { for (int32_t i = start; i < numOfRows + start; ++i) {
if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) { if (colDataIsNull_f(pCol->nullbitmap, i)) {
continue; continue;
} }
numOfElem += 1; numOfElems += 1;
pAvgRes->count += 1; pRes->count += 1;
pAvgRes->sum.usum += plist[i]; pRes->sum.usum += plist[i];
} }
break; break;
...@@ -526,46 +448,42 @@ int32_t avgFunction(SqlFunctionCtx* pCtx) { ...@@ -526,46 +448,42 @@ int32_t avgFunction(SqlFunctionCtx* pCtx) {
case TSDB_DATA_TYPE_UBIGINT: { case TSDB_DATA_TYPE_UBIGINT: {
uint64_t* plist = (uint64_t*)pCol->pData; uint64_t* plist = (uint64_t*)pCol->pData;
for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) { for (int32_t i = start; i < numOfRows + start; ++i) {
if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) { if (colDataIsNull_f(pCol->nullbitmap, i)) {
continue; continue;
} }
numOfElem += 1; numOfElems += 1;
pAvgRes->count += 1; pRes->count += 1;
pAvgRes->sum.usum += plist[i]; pRes->sum.usum += plist[i];
} }
break; break;
} }
case TSDB_DATA_TYPE_FLOAT: { case TSDB_DATA_TYPE_FLOAT: {
#if 1
numOfElem = handleFloatCols(pCol, pInput, pAvgRes);
#else
float* plist = (float*)pCol->pData; float* plist = (float*)pCol->pData;
for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) { for (int32_t i = start; i < numOfRows + start; ++i) {
if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) { if (colDataIsNull_f(pCol->nullbitmap, i)) {
continue; continue;
} }
numOfElem += 1; numOfElems += 1;
pAvgRes->count += 1; pRes->count += 1;
pAvgRes->sum.dsum += plist[i]; pRes->sum.dsum += plist[i];
} }
#endif
break; break;
} }
case TSDB_DATA_TYPE_DOUBLE: { case TSDB_DATA_TYPE_DOUBLE: {
double* plist = (double*)pCol->pData; double* plist = (double*)pCol->pData;
for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) { for (int32_t i = start; i < numOfRows + start; ++i) {
if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) { if (colDataIsNull_f(pCol->nullbitmap, i)) {
continue; continue;
} }
numOfElem += 1; numOfElems += 1;
pAvgRes->count += 1; pRes->count += 1;
pAvgRes->sum.dsum += plist[i]; pRes->sum.dsum += plist[i];
} }
break; break;
} }
...@@ -573,9 +491,133 @@ int32_t avgFunction(SqlFunctionCtx* pCtx) { ...@@ -573,9 +491,133 @@ int32_t avgFunction(SqlFunctionCtx* pCtx) {
default: default:
break; break;
} }
return numOfElems;
}
int32_t avgFunction(SqlFunctionCtx* pCtx) {
int32_t numOfElem = 0;
const int32_t THRESHOLD_SIZE = 8;
SInputColumnInfoData* pInput = &pCtx->input;
SColumnDataAgg* pAgg = pInput->pColumnDataAgg[0];
int32_t type = pInput->pData[0]->info.type;
SAvgRes* pAvgRes = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
pAvgRes->type = type;
// computing based on the true data block
SColumnInfoData* pCol = pInput->pData[0];
int32_t start = pInput->startRowIndex;
int32_t numOfRows = pInput->numOfRows;
if (IS_NULL_TYPE(type)) {
goto _over;
}
if (pInput->colDataSMAIsSet) { // try to use SMA if available
numOfElem = calculateAvgBySMAInfo(pAvgRes, numOfRows, type, pAgg);
} else if (!pCol->hasNull) { // try to employ the simd instructions to speed up the loop
numOfElem = pInput->numOfRows;
pAvgRes->count += pInput->numOfRows;
bool simdAvailable = tsAVXEnable && tsSIMDEnable && (numOfRows > THRESHOLD_SIZE);
switch(type) {
case TSDB_DATA_TYPE_UTINYINT:
case TSDB_DATA_TYPE_TINYINT: {
const int8_t* plist = (const int8_t*) &pCol->pData[start];
// 1. If the CPU supports AVX, let's employ AVX instructions to speedup this loop
if (simdAvailable) {
i8VectorSumAVX2(plist, numOfRows, type, pAvgRes);
} else {
for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; ++i) {
pAvgRes->sum.usum += plist[i];
}
}
break;
}
case TSDB_DATA_TYPE_USMALLINT:
case TSDB_DATA_TYPE_SMALLINT: {
const int16_t* plist = (const int16_t*)pCol->pData;
// 1. If the CPU supports AVX, let's employ AVX instructions to speedup this loop
if (simdAvailable) {
i16VectorSumAVX2(plist, numOfRows, type, pAvgRes);
} else {
for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; ++i) {
pAvgRes->sum.isum += plist[i];
}
}
break;
}
case TSDB_DATA_TYPE_UINT:
case TSDB_DATA_TYPE_INT: {
const int32_t* plist = (const int32_t*) pCol->pData;
// 1. If the CPU supports AVX, let's employ AVX instructions to speedup this loop
if (simdAvailable) {
i32VectorSumAVX2(plist, numOfRows, type, pAvgRes);
} else {
for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; ++i) {
pAvgRes->sum.isum += plist[i];
}
}
break;
}
case TSDB_DATA_TYPE_UBIGINT:
case TSDB_DATA_TYPE_BIGINT: {
const int64_t* plist = (const int64_t*) pCol->pData;
// 1. If the CPU supports AVX, let's employ AVX instructions to speedup this loop
if (simdAvailable) {
i64VectorSumAVX2(plist, numOfRows, pAvgRes);
} else {
for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; ++i) {
pAvgRes->sum.isum += plist[i];
}
}
break;
}
case TSDB_DATA_TYPE_FLOAT: {
const float* plist = (const float*) pCol->pData;
// 1. If the CPU supports AVX, let's employ AVX instructions to speedup this loop
if (simdAvailable) {
floatVectorSumAVX(plist, numOfRows, pAvgRes);
} else {
for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; ++i) {
pAvgRes->sum.dsum += plist[i];
}
}
break;
}
case TSDB_DATA_TYPE_DOUBLE: {
const double* plist = (const double*)pCol->pData;
// 1. If the CPU supports AVX, let's employ AVX instructions to speedup this loop
if (simdAvailable) {
doubleVectorSumAVX(plist, numOfRows, pAvgRes);
} else {
for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; ++i) {
pAvgRes->sum.dsum += plist[i];
}
}
break;
}
default:
ASSERT(0);
}
} else {
numOfElem = doAddNumericVector(pCol, type, pInput, pAvgRes);
} }
_avg_over: _over:
// data in the check operation are all null, not output // data in the check operation are all null, not output
SET_VAL(GET_RES_INFO(pCtx), numOfElem, 1); SET_VAL(GET_RES_INFO(pCtx), numOfElem, 1);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
......
...@@ -67,7 +67,7 @@ struct SHashObj { ...@@ -67,7 +67,7 @@ struct SHashObj {
bool enableUpdate; // enable update bool enableUpdate; // enable update
SArray *pMemBlock; // memory block allocated for SHashEntry SArray *pMemBlock; // memory block allocated for SHashEntry
_hash_before_fn_t callbackFp; // function invoked before return the value to caller _hash_before_fn_t callbackFp; // function invoked before return the value to caller
int64_t compTimes; // int64_t compTimes;
}; };
/* /*
...@@ -147,7 +147,7 @@ static FORCE_INLINE SHashNode *doSearchInEntryList(SHashObj *pHashObj, SHashEntr ...@@ -147,7 +147,7 @@ static FORCE_INLINE SHashNode *doSearchInEntryList(SHashObj *pHashObj, SHashEntr
uint32_t hashVal) { uint32_t hashVal) {
SHashNode *pNode = pe->next; SHashNode *pNode = pe->next;
while (pNode) { while (pNode) {
atomic_add_fetch_64(&pHashObj->compTimes, 1); // atomic_add_fetch_64(&pHashObj->compTimes, 1);
if ((pNode->keyLen == keyLen) && ((*(pHashObj->equalFp))(GET_HASH_NODE_KEY(pNode), key, keyLen) == 0) && if ((pNode->keyLen == keyLen) && ((*(pHashObj->equalFp))(GET_HASH_NODE_KEY(pNode), key, keyLen) == 0) &&
pNode->removed == 0) { pNode->removed == 0) {
assert(pNode->hashVal == hashVal); assert(pNode->hashVal == hashVal);
...@@ -889,4 +889,4 @@ void *taosHashAcquire(SHashObj *pHashObj, const void *key, size_t keyLen) { ...@@ -889,4 +889,4 @@ void *taosHashAcquire(SHashObj *pHashObj, const void *key, size_t keyLen) {
void taosHashRelease(SHashObj *pHashObj, void *p) { taosHashCancelIterate(pHashObj, p); } void taosHashRelease(SHashObj *pHashObj, void *p) { taosHashCancelIterate(pHashObj, p); }
int64_t taosHashGetCompTimes(SHashObj *pHashObj) { return atomic_load_64(&pHashObj->compTimes); } //int64_t taosHashGetCompTimes(SHashObj *pHashObj) { return atomic_load_64(&pHashObj->compTimes); }
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册