diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index 1a3817d7a02523a3d8316dc54bfffbf07c1abe77..26903ed428443ec6edb13726e85231d31a0bb9f2 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -2819,7 +2819,8 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col case TSDB_FUNC_ELAPSED: case TSDB_FUNC_MODE: case TSDB_FUNC_STATE_COUNT: - case TSDB_FUNC_STATE_DURATION:{ + case TSDB_FUNC_STATE_DURATION: + case TSDB_FUNC_HYPERLOGLOG:{ // 1. valid the number of parameters int32_t numOfParams = (pItem->pNode->Expr.paramList == NULL) ? 0 : (int32_t)taosArrayGetSize(pItem->pNode->Expr.paramList); @@ -2890,7 +2891,8 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col if (functionId == TSDB_FUNC_MODE && pColumnSchema->colId == PRIMARYKEY_TIMESTAMP_COL_INDEX && pColumnSchema->type == TSDB_DATA_TYPE_TIMESTAMP){ return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg29); - } else if (!IS_NUMERIC_TYPE(pSchema->type) && (functionId != TSDB_FUNC_ELAPSED) && (functionId != TSDB_FUNC_MODE)) { + } else if (!IS_NUMERIC_TYPE(pSchema->type) && (functionId != TSDB_FUNC_ELAPSED) && + (functionId != TSDB_FUNC_MODE) && (functionId != TSDB_FUNC_HYPERLOGLOG)) { return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg1); } else if (IS_UNSIGNED_NUMERIC_TYPE(pSchema->type) && (functionId == TSDB_FUNC_DIFF || functionId == TSDB_FUNC_DERIVATIVE)) { diff --git a/src/query/inc/qAggMain.h b/src/query/inc/qAggMain.h index 0b87c546d570e8bf4dfc723ae9dc380442009280..1c07505b09b4bb1872c39414afd677456657d7c7 100644 --- a/src/query/inc/qAggMain.h +++ b/src/query/inc/qAggMain.h @@ -86,8 +86,9 @@ extern "C" { #define TSDB_FUNC_WSTART 44 #define TSDB_FUNC_WSTOP 45 #define TSDB_FUNC_WDURATION 46 +#define TSDB_FUNC_HYPERLOGLOG 47 -#define TSDB_FUNC_MAX_NUM 47 +#define TSDB_FUNC_MAX_NUM 48 #define TSDB_FUNCSTATE_SO 0x1u // single output #define TSDB_FUNCSTATE_MO 0x2u // dynamic number of output, not multinumber of output e.g., TOP/BOTTOM diff --git a/src/query/src/qAggMain.c b/src/query/src/qAggMain.c index bc403b8b51e10de2081bb9b280c1be2a0a95f896..821ca88d1ad06250ac5c2443e031252a735d4023 100644 --- a/src/query/src/qAggMain.c +++ b/src/query/src/qAggMain.c @@ -29,6 +29,7 @@ #include "queryLog.h" #include "qUdf.h" #include "tcompare.h" +#include "hashfunc.h" #define GET_INPUT_DATA_LIST(x) ((char *)((x)->pInput)) #define GET_INPUT_DATA(x, y) (GET_INPUT_DATA_LIST(x) + (y) * (x)->inputBytes) @@ -256,11 +257,160 @@ typedef struct { char data[]; } TailUnit; -typedef struct STailInfo { +typedef struct { int32_t num; TailUnit **res; } STailInfo; +static void *getOutputInfo(SQLFunctionCtx *pCtx) { + SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); + + // only the first_stage_merge is directly written data into final output buffer + if (pCtx->stableQuery && pCtx->currentStage != MERGE_STAGE) { + return pCtx->pOutput; + } else { // during normal table query and super table at the secondary_stage, result is written to intermediate buffer + return GET_ROWCELL_INTERBUF(pResInfo); + } +} + +/* hyperloglog start */ +#define HLL_BUCKET_BITS 14 // The bits of the bucket +#define HLL_DATA_BITS (64-HLL_BUCKET_BITS) +#define HLL_BUCKETS (1<= 1; --j) { + z += buckethisto[j]; + z *= 0.5; + } + z += m * hllSigma(buckethisto[0]/(double)m); + double E = llroundl(HLL_ALPHA_INF*m*m/z); + + return (uint64_t) E; +} + +static int hllCountNum(void *ele, int32_t elesize, int32_t *buk) { + uint64_t hash, bit, index; + int count; + + hash = MurmurHash3_64(ele,elesize); + index = hash & HLL_BUCKET_MASK; + hash >>= HLL_BUCKET_BITS; + hash |= ((uint64_t)1<size; ++i) { + char *val = GET_INPUT_DATA(pCtx, i); + if (isNull(val, pCtx->inputType)) { + continue; + } + int32_t elesize = pCtx->inputBytes; + if(IS_VAR_DATA_TYPE(pCtx->inputType)) { + elesize = varDataLen(val); + val = varDataVal(val); + } + int32_t index; + uint8_t count = hllCountNum(val,elesize,&index); + uint8_t oldcount = pHLLInfo->buckets[index]; + if (count > oldcount) { + pHLLInfo->buckets[index] = count; + } + } + GET_RES_INFO(pCtx)->numOfRes = 1; +} + +static void hll_func_merge(SQLFunctionCtx *pCtx) { + SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); + SHLLInfo *pHLLInfo = (SHLLInfo *)GET_ROWCELL_INTERBUF(pResInfo); + + SHLLInfo *pData = (SHLLInfo *)GET_INPUT_DATA_LIST(pCtx); + for (int i = 0; i < HLL_BUCKETS; i++) { + if (pData->buckets[i] > pHLLInfo->buckets[i]) { + pHLLInfo->buckets[i] = pData->buckets[i]; + } + } +} + +static void hll_func_finalizer(SQLFunctionCtx *pCtx) { + SHLLInfo *pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); + + GET_RES_INFO(pCtx)->numOfRes = 1; + *(uint64_t *)(pCtx->pOutput) = hllCountCnt(pInfo->buckets); + doFinalizer(pCtx); +} +/* hyperloglog end */ + int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionId, int32_t param, int16_t *type, int32_t *bytes, int32_t *interBytes, int16_t extLength, bool isSuperTable, SUdfInfo* pUdfInfo) { if (!isValidDataType(dataType)) { @@ -334,6 +484,13 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI return TSDB_CODE_SUCCESS; } + if (functionId == TSDB_FUNC_HYPERLOGLOG) { + *type = TSDB_DATA_TYPE_UBIGINT; + *bytes = sizeof(uint64_t); + *interBytes = sizeof(SHLLInfo); + return TSDB_CODE_SUCCESS; + } + if (functionId == TSDB_FUNC_CSUM) { if (IS_SIGNED_NUMERIC_TYPE(dataType)) { *type = TSDB_DATA_TYPE_BIGINT; @@ -2403,18 +2560,6 @@ static void copyTopBotRes(SQLFunctionCtx *pCtx, int32_t type) { tfree(pData); } -static void *getOutputInfo(SQLFunctionCtx *pCtx) { - SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); - - // only the first_stage_merge is directly written data into final output buffer - if (pCtx->stableQuery && pCtx->currentStage != MERGE_STAGE) { - return pCtx->pOutput; - } else { // during normal table query and super table at the secondary_stage, result is written to intermediate buffer - return GET_ROWCELL_INTERBUF(pResInfo); - } -} - - /* * keep the intermediate results during scan data blocks in the format of: * +-----------------------------------+-------------one value pair-----------+------------next value pair-----------+ @@ -5815,6 +5960,7 @@ static void wduration_function(SQLFunctionCtx *pCtx) { } *(int64_t *)(pCtx->pOutput) = duration; } + ///////////////////////////////////////////////////////////////////////////////////////////// /* * function compatible list. @@ -5835,8 +5981,8 @@ int32_t functionCompatList[] = { 1, 1, 1, 1, -1, 1, 1, 1, 5, 1, 1, // tid_tag, deriv, csum, mavg, sample, block_info, elapsed, histogram, unique, mode, tail 6, 8, -1, -1, -1, 7, 1, -1, -1, 1, -1, - // stateCount, stateDuration, wstart, wstop, wduration, - 1, 1, 1, 1, 1, + // stateCount, stateDuration, wstart, wstop, wduration, hyperloglog + 1, 1, 1, 1, 1, 1 }; SAggFunctionInfo aAggs[TSDB_FUNC_MAX_NUM] = {{ @@ -6405,5 +6551,17 @@ SAggFunctionInfo aAggs[TSDB_FUNC_MAX_NUM] = {{ doFinalizer, copy_function, dataBlockRequired, + }, + { + // 47 + "hyperloglog", + TSDB_FUNC_HYPERLOGLOG, + TSDB_FUNC_HYPERLOGLOG, + TSDB_BASE_FUNC_SO, + function_setup, + hll_function, + hll_func_finalizer, + hll_func_merge, + dataBlockRequired, } }; diff --git a/src/util/inc/hashfunc.h b/src/util/inc/hashfunc.h index 529188849846088106f6576242ef76e2894ccb1b..55835a60c92dc641cdef0f9bb75aa631aac4fddd 100644 --- a/src/util/inc/hashfunc.h +++ b/src/util/inc/hashfunc.h @@ -33,7 +33,8 @@ typedef void (*_hash_free_fn_t)(void *param); */ uint32_t MurmurHash3_32(const char *key, uint32_t len); -/** +uint64_t MurmurHash3_64(const void *key, uint32_t len); + /** * * @param key * @param len diff --git a/src/util/src/thashutil.c b/src/util/src/thashutil.c index 4a0208a3d0bf22f21b5f6a05513f435664e746af..545a91daf5f6ebd3721f6d005750f9cf235939cf 100644 --- a/src/util/src/thashutil.c +++ b/src/util/src/thashutil.c @@ -78,6 +78,42 @@ uint32_t MurmurHash3_32(const char *key, uint32_t len) { return h1; } +uint64_t MurmurHash3_64(const void *key, uint32_t len) { + const uint64_t m = 0x87c37b91114253d5; + const int r = 47; + uint32_t seed = 0x12345678; + uint64_t h = seed ^ (len * m); + const uint8_t *data = (const uint8_t *)key; + const uint8_t *end = data + (len-(len&7)); + + while(data != end) { + uint64_t k = *((uint64_t*)data); + + k *= m; + k ^= k >> r; + k *= m; + h ^= k; + h *= m; + data += 8; + } + + switch(len & 7) { + case 7: h ^= (uint64_t)data[6] << 48; /* fall-thru */ + case 6: h ^= (uint64_t)data[5] << 40; /* fall-thru */ + case 5: h ^= (uint64_t)data[4] << 32; /* fall-thru */ + case 4: h ^= (uint64_t)data[3] << 24; /* fall-thru */ + case 3: h ^= (uint64_t)data[2] << 16; /* fall-thru */ + case 2: h ^= (uint64_t)data[1] << 8; /* fall-thru */ + case 1: h ^= (uint64_t)data[0]; + h *= m; /* fall-thru */ + }; + + h ^= h >> r; + h *= m; + h ^= h >> r; + return h; +} + uint32_t taosIntHash_32(const char *key, uint32_t UNUSED_PARAM(len)) { return *(uint32_t *)key; } uint32_t taosIntHash_16(const char *key, uint32_t UNUSED_PARAM(len)) { return *(uint16_t *)key; } uint32_t taosIntHash_8(const char *key, uint32_t UNUSED_PARAM(len)) { return *(uint8_t *)key; }