未验证 提交 766b1a3b 编写于 作者: G Ganlin Zhao 提交者: GitHub

Merge pull request #12575 from taosdata/feature/3.0_glzhao

feat(query): add HYPERLOGLOG function
...@@ -41,6 +41,7 @@ typedef enum EFunctionType { ...@@ -41,6 +41,7 @@ typedef enum EFunctionType {
FUNCTION_TYPE_SUM, FUNCTION_TYPE_SUM,
FUNCTION_TYPE_TWA, FUNCTION_TYPE_TWA,
FUNCTION_TYPE_HISTOGRAM, FUNCTION_TYPE_HISTOGRAM,
FUNCTION_TYPE_HYPERLOGLOG,
// nonstandard SQL function // nonstandard SQL function
FUNCTION_TYPE_BOTTOM = 500, FUNCTION_TYPE_BOTTOM = 500,
......
...@@ -40,6 +40,7 @@ typedef void (*_hash_free_fn_t)(void *); ...@@ -40,6 +40,7 @@ typedef void (*_hash_free_fn_t)(void *);
*/ */
uint32_t MurmurHash3_32(const char *key, uint32_t len); uint32_t MurmurHash3_32(const char *key, uint32_t len);
uint64_t MurmurHash3_64(const char *key, uint32_t len);
/** /**
* *
* @param key * @param key
......
...@@ -90,6 +90,10 @@ bool histogramFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultIn ...@@ -90,6 +90,10 @@ bool histogramFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultIn
int32_t histogramFunction(SqlFunctionCtx* pCtx); int32_t histogramFunction(SqlFunctionCtx* pCtx);
int32_t histogramFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); int32_t histogramFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
bool getHLLFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
int32_t hllFunction(SqlFunctionCtx* pCtx);
int32_t hllFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
bool getStateFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); bool getStateFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
bool stateFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo); bool stateFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo);
int32_t stateCountFunction(SqlFunctionCtx* pCtx); int32_t stateCountFunction(SqlFunctionCtx* pCtx);
......
...@@ -263,6 +263,21 @@ static int32_t translateHistogram(SFunctionNode* pFunc, char* pErrBuf, int32_t l ...@@ -263,6 +263,21 @@ static int32_t translateHistogram(SFunctionNode* pFunc, char* pErrBuf, int32_t l
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t translateHLL(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
if (1 != LIST_LENGTH(pFunc->pParameterList)) {
return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName);
}
SNode* pPara = nodesListGetNode(pFunc->pParameterList, 0);
if (QUERY_NODE_COLUMN != nodeType(pPara)) {
return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR,
"The input parameter of HYPERLOGLOG function can only be column");
}
pFunc->node.resType = (SDataType){.bytes = tDataTypes[TSDB_DATA_TYPE_UBIGINT].bytes, .type = TSDB_DATA_TYPE_UBIGINT};
return TSDB_CODE_SUCCESS;
}
static int32_t translateStateCount(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { static int32_t translateStateCount(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
if (3 != LIST_LENGTH(pFunc->pParameterList)) { if (3 != LIST_LENGTH(pFunc->pParameterList)) {
return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName); return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName);
...@@ -829,6 +844,16 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { ...@@ -829,6 +844,16 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.processFunc = histogramFunction, .processFunc = histogramFunction,
.finalizeFunc = histogramFinalize .finalizeFunc = histogramFinalize
}, },
{
.name = "hyperloglog",
.type = FUNCTION_TYPE_HYPERLOGLOG,
.classification = FUNC_MGT_AGG_FUNC,
.translateFunc = translateHLL,
.getEnvFunc = getHLLFuncEnv,
.initFunc = functionSetup,
.processFunc = hllFunction,
.finalizeFunc = hllFinalize
},
{ {
.name = "state_count", .name = "state_count",
.type = FUNCTION_TYPE_STATE_COUNT, .type = FUNCTION_TYPE_STATE_COUNT,
......
...@@ -28,6 +28,12 @@ ...@@ -28,6 +28,12 @@
#define TAIL_MAX_POINTS_NUM 100 #define TAIL_MAX_POINTS_NUM 100
#define TAIL_MAX_OFFSET 100 #define TAIL_MAX_OFFSET 100
#define HLL_BUCKET_BITS 14 // The bits of the bucket
#define HLL_DATA_BITS (64-HLL_BUCKET_BITS)
#define HLL_BUCKETS (1<<HLL_BUCKET_BITS)
#define HLL_BUCKET_MASK (HLL_BUCKETS-1)
#define HLL_ALPHA_INF 0.721347520444481703680 // constant for 0.5/ln(2)
typedef struct SSumRes { typedef struct SSumRes {
union { union {
int64_t isum; int64_t isum;
...@@ -129,6 +135,11 @@ typedef enum { ...@@ -129,6 +135,11 @@ typedef enum {
LOG_BIN LOG_BIN
} EHistoBinType; } EHistoBinType;
typedef struct SHLLFuncInfo {
uint64_t result;
uint8_t buckets[HLL_BUCKETS];
} SHLLInfo;
typedef struct SStateInfo { typedef struct SStateInfo {
union { union {
int64_t count; int64_t count;
...@@ -2729,6 +2740,140 @@ int32_t histogramFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { ...@@ -2729,6 +2740,140 @@ int32_t histogramFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
return pResInfo->numOfRes; return pResInfo->numOfRes;
} }
bool getHLLFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) {
pEnv->calcMemSize = sizeof(SHLLInfo);
return true;
}
static uint8_t hllCountNum(void* data, int32_t bytes, int32_t *buk) {
uint64_t hash = MurmurHash3_64(data, bytes);
int32_t index = hash & HLL_BUCKET_MASK;
hash >>= HLL_BUCKET_BITS;
hash |= ((uint64_t)1 << HLL_DATA_BITS);
uint64_t bit = 1;
uint8_t count = 1;
while((hash & bit) == 0) {
count++;
bit <<= 1;
}
*buk = index;
return count;
}
static void hllBucketHisto(uint8_t *buckets, int32_t* bucketHisto) {
uint64_t *word = (uint64_t*) buckets;
uint8_t *bytes;
for (int32_t j = 0; j < HLL_BUCKETS>>3; j++) {
if (*word == 0) {
bucketHisto[0] += 8;
} else {
bytes = (uint8_t*) word;
bucketHisto[bytes[0]]++;
bucketHisto[bytes[1]]++;
bucketHisto[bytes[2]]++;
bucketHisto[bytes[3]]++;
bucketHisto[bytes[4]]++;
bucketHisto[bytes[5]]++;
bucketHisto[bytes[6]]++;
bucketHisto[bytes[7]]++;
}
word++;
}
}
static double hllTau(double x) {
if (x == 0. || x == 1.) return 0.;
double zPrime;
double y = 1.0;
double z = 1 - x;
do {
x = sqrt(x);
zPrime = z;
y *= 0.5;
z -= pow(1 - x, 2)*y;
} while(zPrime != z);
return z / 3;
}
static double hllSigma(double x) {
if (x == 1.0) return INFINITY;
double zPrime;
double y = 1;
double z = x;
do {
x *= x;
zPrime = z;
z += x * y;
y += y;
} while(zPrime != z);
return z;
}
// estimate the cardinality, the algorithm refer this paper: "New cardinality estimation algorithms for HyperLogLog sketches"
static uint64_t hllCountCnt(uint8_t *buckets) {
double m = HLL_BUCKETS;
int32_t buckethisto[64] = {0};
hllBucketHisto(buckets,buckethisto);
double z = m * hllTau((m-buckethisto[HLL_DATA_BITS+1])/(double)m);
for (int j = HLL_DATA_BITS; j >= 1; --j) {
z += buckethisto[j];
z *= 0.5;
}
z += m * hllSigma(buckethisto[0]/(double)m);
double E = (double)llroundl(HLL_ALPHA_INF*m*m/z);
return (uint64_t) E;
}
int32_t hllFunction(SqlFunctionCtx *pCtx) {
SHLLInfo* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
SInputColumnInfoData* pInput = &pCtx->input;
SColumnInfoData* pCol = pInput->pData[0];
int32_t type = pCol->info.type;
int32_t bytes = pCol->info.bytes;
int32_t start = pInput->startRowIndex;
int32_t numOfRows = pInput->numOfRows;
int32_t numOfElems = 0;
for (int32_t i = start; i < numOfRows + start; ++i) {
if (pCol->hasNull && colDataIsNull_s(pCol, i)) {
continue;
}
numOfElems++;
char* data = colDataGetData(pCol, i);
if (IS_VAR_DATA_TYPE(type)) {
bytes = varDataLen(data);
data = varDataVal(data);
}
int32_t index = 0;
uint8_t count = hllCountNum(data, bytes, &index);
uint8_t oldcount = pInfo->buckets[index];
if (count > oldcount) {
pInfo->buckets[index] = count;
}
}
SET_VAL(GET_RES_INFO(pCtx), numOfElems, 1);
return TSDB_CODE_SUCCESS;
}
int32_t hllFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
SHLLInfo* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
pInfo->result = hllCountCnt(pInfo->buckets);
return functionFinalize(pCtx, pBlock);
}
bool getStateFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) { bool getStateFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) {
pEnv->calcMemSize = sizeof(SStateInfo); pEnv->calcMemSize = sizeof(SStateInfo);
return true; return true;
......
...@@ -30,7 +30,7 @@ ...@@ -30,7 +30,7 @@
(h) ^= (h) >> 13; \ (h) ^= (h) >> 13; \
(h) *= 0xc2b2ae35; \ (h) *= 0xc2b2ae35; \
(h) ^= (h) >> 16; } while (0) (h) ^= (h) >> 16; } while (0)
uint32_t MurmurHash3_32(const char *key, uint32_t len) { uint32_t MurmurHash3_32(const char *key, uint32_t len) {
const uint8_t *data = (const uint8_t *)key; const uint8_t *data = (const uint8_t *)key;
const int32_t nblocks = len >> 2u; const int32_t nblocks = len >> 2u;
...@@ -78,18 +78,54 @@ uint32_t MurmurHash3_32(const char *key, uint32_t len) { ...@@ -78,18 +78,54 @@ uint32_t MurmurHash3_32(const char *key, uint32_t len) {
return h1; return h1;
} }
uint64_t MurmurHash3_64(const char *key, uint32_t len) {
const uint64_t m = 0x87c37b91114253d5;
const int r = 47;
uint32_t seed = 0x12345678;
uint64_t h = seed ^ (len * m);
const uint8_t *data = (const uint8_t *)key;
const uint8_t *end = data + (len-(len&7));
while(data != end) {
uint64_t k = *((uint64_t*)data);
k *= m;
k ^= k >> r;
k *= m;
h ^= k;
h *= m;
data += 8;
}
switch(len & 7) {
case 7: h ^= (uint64_t)data[6] << 48; /* fall-thru */
case 6: h ^= (uint64_t)data[5] << 40; /* fall-thru */
case 5: h ^= (uint64_t)data[4] << 32; /* fall-thru */
case 4: h ^= (uint64_t)data[3] << 24; /* fall-thru */
case 3: h ^= (uint64_t)data[2] << 16; /* fall-thru */
case 2: h ^= (uint64_t)data[1] << 8; /* fall-thru */
case 1: h ^= (uint64_t)data[0];
h *= m; /* fall-thru */
};
h ^= h >> r;
h *= m;
h ^= h >> r;
return h;
}
uint32_t taosIntHash_32(const char *key, uint32_t UNUSED_PARAM(len)) { return *(uint32_t *)key; } uint32_t taosIntHash_32(const char *key, uint32_t UNUSED_PARAM(len)) { return *(uint32_t *)key; }
uint32_t taosIntHash_16(const char *key, uint32_t UNUSED_PARAM(len)) { return *(uint16_t *)key; } uint32_t taosIntHash_16(const char *key, uint32_t UNUSED_PARAM(len)) { return *(uint16_t *)key; }
uint32_t taosIntHash_8(const char *key, uint32_t UNUSED_PARAM(len)) { return *(uint8_t *)key; } uint32_t taosIntHash_8(const char *key, uint32_t UNUSED_PARAM(len)) { return *(uint8_t *)key; }
uint32_t taosFloatHash(const char *key, uint32_t UNUSED_PARAM(len)) { uint32_t taosFloatHash(const char *key, uint32_t UNUSED_PARAM(len)) {
float f = GET_FLOAT_VAL(key); float f = GET_FLOAT_VAL(key);
if (isnan(f)) { if (isnan(f)) {
return 0x7fc00000; return 0x7fc00000;
} }
if (FLT_EQUAL(f, 0.0)) { if (FLT_EQUAL(f, 0.0)) {
return 0; return 0;
} }
if (fabs(f) < FLT_MAX/BASE - DLT) { if (fabs(f) < FLT_MAX/BASE - DLT) {
int32_t t = (int32_t)(round(BASE * (f + DLT))); int32_t t = (int32_t)(round(BASE * (f + DLT)));
return (uint32_t)t; return (uint32_t)t;
...@@ -98,27 +134,27 @@ uint32_t taosFloatHash(const char *key, uint32_t UNUSED_PARAM(len)) { ...@@ -98,27 +134,27 @@ uint32_t taosFloatHash(const char *key, uint32_t UNUSED_PARAM(len)) {
} }
} }
uint32_t taosDoubleHash(const char *key, uint32_t UNUSED_PARAM(len)) { uint32_t taosDoubleHash(const char *key, uint32_t UNUSED_PARAM(len)) {
double f = GET_DOUBLE_VAL(key); double f = GET_DOUBLE_VAL(key);
if (isnan(f)) { if (isnan(f)) {
return 0x7fc00000; return 0x7fc00000;
} }
if (FLT_EQUAL(f, 0.0)) { if (FLT_EQUAL(f, 0.0)) {
return 0; return 0;
} }
if (fabs(f) < DBL_MAX/BASE - DLT) { if (fabs(f) < DBL_MAX/BASE - DLT) {
int32_t t = (int32_t)(round(BASE * (f + DLT))); int32_t t = (int32_t)(round(BASE * (f + DLT)));
return (uint32_t)t; return (uint32_t)t;
} else { } else {
return 0x7fc00000; return 0x7fc00000;
} }
} }
uint32_t taosIntHash_64(const char *key, uint32_t UNUSED_PARAM(len)) { uint32_t taosIntHash_64(const char *key, uint32_t UNUSED_PARAM(len)) {
uint64_t val = *(uint64_t *)key; uint64_t val = *(uint64_t *)key;
uint64_t hash = val >> 16U; uint64_t hash = val >> 16U;
hash += (val & 0xFFFFU); hash += (val & 0xFFFFU);
return (uint32_t)hash; return (uint32_t)hash;
} }
...@@ -127,39 +163,39 @@ _hash_fn_t taosGetDefaultHashFunction(int32_t type) { ...@@ -127,39 +163,39 @@ _hash_fn_t taosGetDefaultHashFunction(int32_t type) {
switch(type) { switch(type) {
case TSDB_DATA_TYPE_TIMESTAMP: case TSDB_DATA_TYPE_TIMESTAMP:
case TSDB_DATA_TYPE_UBIGINT: case TSDB_DATA_TYPE_UBIGINT:
case TSDB_DATA_TYPE_BIGINT: case TSDB_DATA_TYPE_BIGINT:
fn = taosIntHash_64; fn = taosIntHash_64;
break; break;
case TSDB_DATA_TYPE_BINARY: case TSDB_DATA_TYPE_BINARY:
fn = MurmurHash3_32; fn = MurmurHash3_32;
break; break;
case TSDB_DATA_TYPE_NCHAR: case TSDB_DATA_TYPE_NCHAR:
fn = MurmurHash3_32; fn = MurmurHash3_32;
break; break;
case TSDB_DATA_TYPE_UINT: case TSDB_DATA_TYPE_UINT:
case TSDB_DATA_TYPE_INT: case TSDB_DATA_TYPE_INT:
fn = taosIntHash_32; fn = taosIntHash_32;
break; break;
case TSDB_DATA_TYPE_SMALLINT: case TSDB_DATA_TYPE_SMALLINT:
case TSDB_DATA_TYPE_USMALLINT: case TSDB_DATA_TYPE_USMALLINT:
fn = taosIntHash_16; fn = taosIntHash_16;
break; break;
case TSDB_DATA_TYPE_BOOL: case TSDB_DATA_TYPE_BOOL:
case TSDB_DATA_TYPE_UTINYINT: case TSDB_DATA_TYPE_UTINYINT:
case TSDB_DATA_TYPE_TINYINT: case TSDB_DATA_TYPE_TINYINT:
fn = taosIntHash_8; fn = taosIntHash_8;
break;
case TSDB_DATA_TYPE_FLOAT:
fn = taosFloatHash;
break; break;
case TSDB_DATA_TYPE_FLOAT: case TSDB_DATA_TYPE_DOUBLE:
fn = taosFloatHash; fn = taosDoubleHash;
break; break;
case TSDB_DATA_TYPE_DOUBLE: default:
fn = taosDoubleHash;
break;
default:
fn = taosIntHash_32; fn = taosIntHash_32;
break; break;
} }
return fn; return fn;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册