提交 9c59bca4 编写于 作者: wmmhello's avatar wmmhello

add hyperloglog function

上级 7503ded0
......@@ -2819,7 +2819,8 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
case TSDB_FUNC_ELAPSED:
case TSDB_FUNC_MODE:
case TSDB_FUNC_STATE_COUNT:
case TSDB_FUNC_STATE_DURATION:{
case TSDB_FUNC_STATE_DURATION:
case TSDB_FUNC_HYPERLOGLOG:{
// 1. valid the number of parameters
int32_t numOfParams =
(pItem->pNode->Expr.paramList == NULL) ? 0 : (int32_t)taosArrayGetSize(pItem->pNode->Expr.paramList);
......@@ -2890,7 +2891,8 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
if (functionId == TSDB_FUNC_MODE && pColumnSchema->colId == PRIMARYKEY_TIMESTAMP_COL_INDEX &&
pColumnSchema->type == TSDB_DATA_TYPE_TIMESTAMP){
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg29);
} else if (!IS_NUMERIC_TYPE(pSchema->type) && (functionId != TSDB_FUNC_ELAPSED) && (functionId != TSDB_FUNC_MODE)) {
} else if (!IS_NUMERIC_TYPE(pSchema->type) && (functionId != TSDB_FUNC_ELAPSED) &&
(functionId != TSDB_FUNC_MODE) && (functionId != TSDB_FUNC_HYPERLOGLOG)) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg1);
} else if (IS_UNSIGNED_NUMERIC_TYPE(pSchema->type) &&
(functionId == TSDB_FUNC_DIFF || functionId == TSDB_FUNC_DERIVATIVE)) {
......
......@@ -86,8 +86,9 @@ extern "C" {
#define TSDB_FUNC_WSTART 44
#define TSDB_FUNC_WSTOP 45
#define TSDB_FUNC_WDURATION 46
#define TSDB_FUNC_HYPERLOGLOG 47
#define TSDB_FUNC_MAX_NUM 47
#define TSDB_FUNC_MAX_NUM 48
#define TSDB_FUNCSTATE_SO 0x1u // single output
#define TSDB_FUNCSTATE_MO 0x2u // dynamic number of output, not multinumber of output e.g., TOP/BOTTOM
......
......@@ -29,6 +29,7 @@
#include "queryLog.h"
#include "qUdf.h"
#include "tcompare.h"
#include "hashfunc.h"
#define GET_INPUT_DATA_LIST(x) ((char *)((x)->pInput))
#define GET_INPUT_DATA(x, y) (GET_INPUT_DATA_LIST(x) + (y) * (x)->inputBytes)
......@@ -256,11 +257,160 @@ typedef struct {
char data[];
} TailUnit;
typedef struct STailInfo {
typedef struct {
int32_t num;
TailUnit **res;
} STailInfo;
static void *getOutputInfo(SQLFunctionCtx *pCtx) {
SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx);
// only the first_stage_merge is directly written data into final output buffer
if (pCtx->stableQuery && pCtx->currentStage != MERGE_STAGE) {
return pCtx->pOutput;
} else { // during normal table query and super table at the secondary_stage, result is written to intermediate buffer
return GET_ROWCELL_INTERBUF(pResInfo);
}
}
/* hyperloglog start */
#define HLL_BUCKET_BITS 14 // The bits of the bucket
#define HLL_DATA_BITS (64-HLL_BUCKET_BITS)
#define HLL_BUCKETS (1<<HLL_BUCKET_BITS)
#define HLL_BUCKET_MASK (HLL_BUCKETS-1)
#define HLL_ALPHA_INF 0.721347520444481703680 // constant for 0.5/ln(2)
typedef struct {
uint8_t buckets[HLL_BUCKETS]; // Data bytes.
} SHLLInfo;
static void hllBucketHisto(uint8_t *buckets, int* bucketHisto) {
uint64_t *word = (uint64_t*) buckets;
uint8_t *bytes;
int j;
for (j = 0; j < HLL_BUCKETS/8; j++) {
if (*word == 0) {
bucketHisto[0] += 8;
} else {
bytes = (uint8_t*) word;
bucketHisto[bytes[0]]++;
bucketHisto[bytes[1]]++;
bucketHisto[bytes[2]]++;
bucketHisto[bytes[3]]++;
bucketHisto[bytes[4]]++;
bucketHisto[bytes[5]]++;
bucketHisto[bytes[6]]++;
bucketHisto[bytes[7]]++;
}
word++;
}
}
static double hllTau(double x) {
if (x == 0. || x == 1.) return 0.;
double zPrime;
double y = 1.0;
double z = 1 - x;
do {
x = sqrt(x);
zPrime = z;
y *= 0.5;
z -= pow(1 - x, 2)*y;
} while(zPrime != z);
return z / 3;
}
static double hllSigma(double x) {
if (x == 1.) return INFINITY;
double zPrime;
double y = 1;
double z = x;
do {
x *= x;
zPrime = z;
z += x * y;
y += y;
} while(zPrime != z);
return z;
}
static uint64_t hllCountCnt(uint8_t *buckets) {
double m = HLL_BUCKETS;
int buckethisto[64] = {0};
hllBucketHisto(buckets,buckethisto);
double z = m * hllTau((m-buckethisto[HLL_DATA_BITS+1])/(double)m);
for (int j = HLL_DATA_BITS; j >= 1; --j) {
z += buckethisto[j];
z *= 0.5;
}
z += m * hllSigma(buckethisto[0]/(double)m);
double E = llroundl(HLL_ALPHA_INF*m*m/z);
return (uint64_t) E;
}
static int hllCountNum(void *ele, int32_t elesize, int32_t *buk) {
uint64_t hash, bit, index;
int count;
hash = MurmurHash3_64(ele,elesize);
index = hash & HLL_BUCKET_MASK;
hash >>= HLL_BUCKET_BITS;
hash |= ((uint64_t)1<<HLL_DATA_BITS);
bit = 1;
count = 1;
while((hash & bit) == 0) {
count++;
bit <<= 1;
}
*buk = (int32_t) index;
return count;
}
static void hll_function(SQLFunctionCtx *pCtx) {
SHLLInfo *pHLLInfo = getOutputInfo(pCtx);
for (int32_t i = 0; i < pCtx->size; ++i) {
char *val = GET_INPUT_DATA(pCtx, i);
if (isNull(val, pCtx->inputType)) {
continue;
}
int32_t elesize = pCtx->inputBytes;
if(IS_VAR_DATA_TYPE(pCtx->inputType)) {
elesize = varDataLen(val);
val = varDataVal(val);
}
int32_t index;
uint8_t count = hllCountNum(val,elesize,&index);
uint8_t oldcount = pHLLInfo->buckets[index];
if (count > oldcount) {
pHLLInfo->buckets[index] = count;
}
}
GET_RES_INFO(pCtx)->numOfRes = 1;
}
static void hll_func_merge(SQLFunctionCtx *pCtx) {
SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx);
SHLLInfo *pHLLInfo = (SHLLInfo *)GET_ROWCELL_INTERBUF(pResInfo);
SHLLInfo *pData = (SHLLInfo *)GET_INPUT_DATA_LIST(pCtx);
for (int i = 0; i < HLL_BUCKETS; i++) {
if (pData->buckets[i] > pHLLInfo->buckets[i]) {
pHLLInfo->buckets[i] = pData->buckets[i];
}
}
}
static void hll_func_finalizer(SQLFunctionCtx *pCtx) {
SHLLInfo *pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
GET_RES_INFO(pCtx)->numOfRes = 1;
*(uint64_t *)(pCtx->pOutput) = hllCountCnt(pInfo->buckets);
doFinalizer(pCtx);
}
/* hyperloglog end */
int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionId, int32_t param, int16_t *type,
int32_t *bytes, int32_t *interBytes, int16_t extLength, bool isSuperTable, SUdfInfo* pUdfInfo) {
if (!isValidDataType(dataType)) {
......@@ -334,6 +484,13 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI
return TSDB_CODE_SUCCESS;
}
if (functionId == TSDB_FUNC_HYPERLOGLOG) {
*type = TSDB_DATA_TYPE_UBIGINT;
*bytes = sizeof(uint64_t);
*interBytes = sizeof(SHLLInfo);
return TSDB_CODE_SUCCESS;
}
if (functionId == TSDB_FUNC_CSUM) {
if (IS_SIGNED_NUMERIC_TYPE(dataType)) {
*type = TSDB_DATA_TYPE_BIGINT;
......@@ -2403,18 +2560,6 @@ static void copyTopBotRes(SQLFunctionCtx *pCtx, int32_t type) {
tfree(pData);
}
static void *getOutputInfo(SQLFunctionCtx *pCtx) {
SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx);
// only the first_stage_merge is directly written data into final output buffer
if (pCtx->stableQuery && pCtx->currentStage != MERGE_STAGE) {
return pCtx->pOutput;
} else { // during normal table query and super table at the secondary_stage, result is written to intermediate buffer
return GET_ROWCELL_INTERBUF(pResInfo);
}
}
/*
* keep the intermediate results during scan data blocks in the format of:
* +-----------------------------------+-------------one value pair-----------+------------next value pair-----------+
......@@ -5815,6 +5960,7 @@ static void wduration_function(SQLFunctionCtx *pCtx) {
}
*(int64_t *)(pCtx->pOutput) = duration;
}
/////////////////////////////////////////////////////////////////////////////////////////////
/*
* function compatible list.
......@@ -5835,8 +5981,8 @@ int32_t functionCompatList[] = {
1, 1, 1, 1, -1, 1, 1, 1, 5, 1, 1,
// tid_tag, deriv, csum, mavg, sample, block_info, elapsed, histogram, unique, mode, tail
6, 8, -1, -1, -1, 7, 1, -1, -1, 1, -1,
// stateCount, stateDuration, wstart, wstop, wduration,
1, 1, 1, 1, 1,
// stateCount, stateDuration, wstart, wstop, wduration, hyperloglog
1, 1, 1, 1, 1, 1
};
SAggFunctionInfo aAggs[TSDB_FUNC_MAX_NUM] = {{
......@@ -6405,5 +6551,17 @@ SAggFunctionInfo aAggs[TSDB_FUNC_MAX_NUM] = {{
doFinalizer,
copy_function,
dataBlockRequired,
},
{
// 47
"hyperloglog",
TSDB_FUNC_HYPERLOGLOG,
TSDB_FUNC_HYPERLOGLOG,
TSDB_BASE_FUNC_SO,
function_setup,
hll_function,
hll_func_finalizer,
hll_func_merge,
dataBlockRequired,
}
};
......@@ -33,7 +33,8 @@ typedef void (*_hash_free_fn_t)(void *param);
*/
uint32_t MurmurHash3_32(const char *key, uint32_t len);
/**
uint64_t MurmurHash3_64(const void *key, uint32_t len);
/**
*
* @param key
* @param len
......
......@@ -78,6 +78,42 @@ uint32_t MurmurHash3_32(const char *key, uint32_t len) {
return h1;
}
uint64_t MurmurHash3_64(const void *key, uint32_t len) {
const uint64_t m = 0x87c37b91114253d5;
const int r = 47;
uint32_t seed = 0x12345678;
uint64_t h = seed ^ (len * m);
const uint8_t *data = (const uint8_t *)key;
const uint8_t *end = data + (len-(len&7));
while(data != end) {
uint64_t k = *((uint64_t*)data);
k *= m;
k ^= k >> r;
k *= m;
h ^= k;
h *= m;
data += 8;
}
switch(len & 7) {
case 7: h ^= (uint64_t)data[6] << 48; /* fall-thru */
case 6: h ^= (uint64_t)data[5] << 40; /* fall-thru */
case 5: h ^= (uint64_t)data[4] << 32; /* fall-thru */
case 4: h ^= (uint64_t)data[3] << 24; /* fall-thru */
case 3: h ^= (uint64_t)data[2] << 16; /* fall-thru */
case 2: h ^= (uint64_t)data[1] << 8; /* fall-thru */
case 1: h ^= (uint64_t)data[0];
h *= m; /* fall-thru */
};
h ^= h >> r;
h *= m;
h ^= h >> r;
return h;
}
uint32_t taosIntHash_32(const char *key, uint32_t UNUSED_PARAM(len)) { return *(uint32_t *)key; }
uint32_t taosIntHash_16(const char *key, uint32_t UNUSED_PARAM(len)) { return *(uint16_t *)key; }
uint32_t taosIntHash_8(const char *key, uint32_t UNUSED_PARAM(len)) { return *(uint8_t *)key; }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册