From 9dcf9248d7ce534ccb95aacf020b618e00ae68ff Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Mon, 16 May 2022 22:43:06 +0800 Subject: [PATCH] feat(query): add HYPERLOGLOG function --- source/libs/function/src/builtinsimpl.c | 147 +++++++++++++++++++++++- 1 file changed, 146 insertions(+), 1 deletion(-) diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index 479a11ca4a..556015a2ac 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -26,7 +26,13 @@ #define MAVG_MAX_POINTS_NUM 1000 #define SAMPLE_MAX_POINTS_NUM 1000 #define TAIL_MAX_POINTS_NUM 100 -#define TAIL_MAX_OFFSET 100 +#define TAIL_MAX_OFFSET 10 + +#define HLL_BUCKET_BITS 14 // The bits of the bucket +#define HLL_DATA_BITS (64-HLL_BUCKET_BITS) +#define HLL_BUCKETS (1<numOfRes; } +bool getHLLFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) { + pEnv->calcMemSize = sizeof(SHLLInfo); + return true; +} + +static uint8_t hllCountNum(void* data, int32_t bytes, int32_t *buk) { + uint64_t hash = MurmurHash3_64(data, bytes); + int32_t index = hash & HLL_BUCKET_MASK; + hash >>= HLL_BUCKET_BITS; + hash |= ((uint64_t)1 << HLL_DATA_BITS); + uint64_t bit = 1; + uint8_t count = 1; + while((hash & bit) == 0) { + count++; + bit <<= 1; + } + *buk = index; + return count; +} + +static void hllBucketHisto(uint8_t *buckets, int32_t* bucketHisto) { + uint64_t *word = (uint64_t*) buckets; + uint8_t *bytes; + + for (int32_t j = 0; j < HLL_BUCKETS>>3; j++) { + if (*word == 0) { + bucketHisto[0] += 8; + } else { + bytes = (uint8_t*) word; + bucketHisto[bytes[0]]++; + bucketHisto[bytes[1]]++; + bucketHisto[bytes[2]]++; + bucketHisto[bytes[3]]++; + bucketHisto[bytes[4]]++; + bucketHisto[bytes[5]]++; + bucketHisto[bytes[6]]++; + bucketHisto[bytes[7]]++; + } + word++; + } +} +static double hllTau(double x) { + if (x == 0. || x == 1.) return 0.; + double zPrime; + double y = 1.0; + double z = 1 - x; + do { + x = sqrt(x); + zPrime = z; + y *= 0.5; + z -= pow(1 - x, 2)*y; + } while(zPrime != z); + return z / 3; +} + +static double hllSigma(double x) { + if (x == 1.0) return INFINITY; + double zPrime; + double y = 1; + double z = x; + do { + x *= x; + zPrime = z; + z += x * y; + y += y; + } while(zPrime != z); + return z; +} + +// estimate the cardinality, the algorithm refer this paper: "New cardinality estimation algorithms for HyperLogLog sketches" +static uint64_t hllCountCnt(uint8_t *buckets) { + double m = HLL_BUCKETS; + int32_t buckethisto[64] = {0}; + hllBucketHisto(buckets,buckethisto); + + double z = m * hllTau((m-buckethisto[HLL_DATA_BITS+1])/(double)m); + for (int j = HLL_DATA_BITS; j >= 1; --j) { + z += buckethisto[j]; + z *= 0.5; + } + z += m * hllSigma(buckethisto[0]/(double)m); + double E = (double)llroundl(HLL_ALPHA_INF*m*m/z); + + return (uint64_t) E; +} + + +int32_t hllFunction(SqlFunctionCtx *pCtx) { + SHLLInfo* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); + + SInputColumnInfoData* pInput = &pCtx->input; + SColumnInfoData* pCol = pInput->pData[0]; + + int32_t type = pCol->info.type; + int32_t bytes = pCol->info.bytes; + + int32_t start = pInput->startRowIndex; + int32_t numOfRows = pInput->numOfRows; + + int32_t numOfElems = 0; + for (int32_t i = start; i < numOfRows + start; ++i) { + if (pCol->hasNull && colDataIsNull_s(pCol, i)) { + continue; + } + + numOfElems++; + + char* data = colDataGetData(pCol, i); + if (IS_VAR_DATA_TYPE(type)) { + data = varDataVal(data); + bytes -= VARSTR_HEADER_SIZE; + } + + int32_t index = 0; + uint8_t count = hllCountNum(data, bytes, &index); + uint8_t oldcount = pInfo->buckets[index]; + if (count > oldcount) { + pInfo->buckets[index] = count; + } + + } + + SET_VAL(GET_RES_INFO(pCtx), numOfElems, 1); + return TSDB_CODE_SUCCESS; +} + +int32_t hllFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { + SHLLInfo* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); + + pInfo->result = hllCountCnt(pInfo->buckets); + + return functionFinalize(pCtx, pBlock); +} + bool getStateFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) { pEnv->calcMemSize = sizeof(SStateInfo); return true; -- GitLab