未验证 提交 60a8ad5f 编写于 作者: S shenglian-zhou 提交者: GitHub

Merge pull request #10743 from taosdata/feature/TD-13893

Feature/td 13893 hyperloglog function
......@@ -2819,7 +2819,8 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
case TSDB_FUNC_ELAPSED:
case TSDB_FUNC_MODE:
case TSDB_FUNC_STATE_COUNT:
case TSDB_FUNC_STATE_DURATION:{
case TSDB_FUNC_STATE_DURATION:
case TSDB_FUNC_HYPERLOGLOG:{
// 1. valid the number of parameters
int32_t numOfParams =
(pItem->pNode->Expr.paramList == NULL) ? 0 : (int32_t)taosArrayGetSize(pItem->pNode->Expr.paramList);
......@@ -2890,7 +2891,8 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
if (functionId == TSDB_FUNC_MODE && pColumnSchema->colId == PRIMARYKEY_TIMESTAMP_COL_INDEX &&
pColumnSchema->type == TSDB_DATA_TYPE_TIMESTAMP){
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg29);
} else if (!IS_NUMERIC_TYPE(pSchema->type) && (functionId != TSDB_FUNC_ELAPSED) && (functionId != TSDB_FUNC_MODE)) {
} else if (!IS_NUMERIC_TYPE(pSchema->type) && (functionId != TSDB_FUNC_ELAPSED) &&
(functionId != TSDB_FUNC_MODE) && (functionId != TSDB_FUNC_HYPERLOGLOG)) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg1);
} else if (IS_UNSIGNED_NUMERIC_TYPE(pSchema->type) &&
(functionId == TSDB_FUNC_DIFF || functionId == TSDB_FUNC_DERIVATIVE)) {
......@@ -4152,7 +4154,8 @@ int32_t tscTansformFuncForSTableQuery(SQueryInfo* pQueryInfo) {
(functionId == TSDB_FUNC_HISTOGRAM) ||
(functionId == TSDB_FUNC_UNIQUE) ||
(functionId == TSDB_FUNC_MODE) ||
(functionId == TSDB_FUNC_TAIL)) {
(functionId == TSDB_FUNC_TAIL) ||
(functionId == TSDB_FUNC_HYPERLOGLOG)) {
if (getResultDataInfo(pSrcSchema->type, pSrcSchema->bytes, functionId, (int32_t)pExpr->base.param[0].i64, &type, &bytes,
&interBytes, 0, true, NULL) != TSDB_CODE_SUCCESS) {
return TSDB_CODE_TSC_INVALID_OPERATION;
......
......@@ -86,8 +86,9 @@ extern "C" {
#define TSDB_FUNC_WSTART 44
#define TSDB_FUNC_WSTOP 45
#define TSDB_FUNC_WDURATION 46
#define TSDB_FUNC_HYPERLOGLOG 47
#define TSDB_FUNC_MAX_NUM 47
#define TSDB_FUNC_MAX_NUM 48
#define TSDB_FUNCSTATE_SO 0x1u // single output
#define TSDB_FUNCSTATE_MO 0x2u // dynamic number of output, not multinumber of output e.g., TOP/BOTTOM
......
......@@ -29,6 +29,7 @@
#include "queryLog.h"
#include "qUdf.h"
#include "tcompare.h"
#include "hashfunc.h"
#define GET_INPUT_DATA_LIST(x) ((char *)((x)->pInput))
#define GET_INPUT_DATA(x, y) (GET_INPUT_DATA_LIST(x) + (y) * (x)->inputBytes)
......@@ -256,11 +257,157 @@ typedef struct {
char data[];
} TailUnit;
typedef struct STailInfo {
typedef struct {
int32_t num;
TailUnit **res;
} STailInfo;
static void *getOutputInfo(SQLFunctionCtx *pCtx) {
SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx);
// only the first_stage_merge is directly written data into final output buffer
if (pCtx->stableQuery && pCtx->currentStage != MERGE_STAGE) {
return pCtx->pOutput;
} else { // during normal table query and super table at the secondary_stage, result is written to intermediate buffer
return GET_ROWCELL_INTERBUF(pResInfo);
}
}
/* hyperloglog start */
#define HLL_BUCKET_BITS 14 // The bits of the bucket
#define HLL_DATA_BITS (64-HLL_BUCKET_BITS)
#define HLL_BUCKETS (1<<HLL_BUCKET_BITS)
#define HLL_BUCKET_MASK (HLL_BUCKETS-1)
#define HLL_ALPHA_INF 0.721347520444481703680 // constant for 0.5/ln(2)
typedef struct {
uint8_t buckets[HLL_BUCKETS]; // Data bytes.
} SHLLInfo;
static void hllBucketHisto(uint8_t *buckets, int32_t* bucketHisto) {
uint64_t *word = (uint64_t*) buckets;
uint8_t *bytes;
for (int32_t j = 0; j < HLL_BUCKETS>>3; j++) {
if (*word == 0) {
bucketHisto[0] += 8;
} else {
bytes = (uint8_t*) word;
bucketHisto[bytes[0]]++;
bucketHisto[bytes[1]]++;
bucketHisto[bytes[2]]++;
bucketHisto[bytes[3]]++;
bucketHisto[bytes[4]]++;
bucketHisto[bytes[5]]++;
bucketHisto[bytes[6]]++;
bucketHisto[bytes[7]]++;
}
word++;
}
}
static double hllTau(double x) {
if (x == 0. || x == 1.) return 0.;
double zPrime;
double y = 1.0;
double z = 1 - x;
do {
x = sqrt(x);
zPrime = z;
y *= 0.5;
z -= pow(1 - x, 2)*y;
} while(zPrime != z);
return z / 3;
}
static double hllSigma(double x) {
if (x == 1.0) return INFINITY;
double zPrime;
double y = 1;
double z = x;
do {
x *= x;
zPrime = z;
z += x * y;
y += y;
} while(zPrime != z);
return z;
}
// estimate the cardinality, the algorithm refer this paper: "New cardinality estimation algorithms for HyperLogLog sketches"
static uint64_t hllCountCnt(uint8_t *buckets) {
double m = HLL_BUCKETS;
int32_t buckethisto[64] = {0};
hllBucketHisto(buckets,buckethisto);
double z = m * hllTau((m-buckethisto[HLL_DATA_BITS+1])/(double)m);
for (int j = HLL_DATA_BITS; j >= 1; --j) {
z += buckethisto[j];
z *= 0.5;
}
z += m * hllSigma(buckethisto[0]/(double)m);
double E = llroundl(HLL_ALPHA_INF*m*m/z);
return (uint64_t) E;
}
static uint8_t hllCountNum(void *ele, int32_t elesize, int32_t *buk) {
uint64_t hash = MurmurHash3_64(ele,elesize);
int32_t index = hash & HLL_BUCKET_MASK;
hash >>= HLL_BUCKET_BITS;
hash |= ((uint64_t)1<<HLL_DATA_BITS);
uint64_t bit = 1;
uint8_t count = 1;
while((hash & bit) == 0) {
count++;
bit <<= 1;
}
*buk = index;
return count;
}
static void hll_function(SQLFunctionCtx *pCtx) {
SHLLInfo *pHLLInfo = getOutputInfo(pCtx);
for (int32_t i = 0; i < pCtx->size; ++i) {
char *val = GET_INPUT_DATA(pCtx, i);
if (isNull(val, pCtx->inputType)) {
continue;
}
int32_t elesize = pCtx->inputBytes;
if(IS_VAR_DATA_TYPE(pCtx->inputType)) {
elesize = varDataLen(val);
val = varDataVal(val);
}
int32_t index = 0;
uint8_t count = hllCountNum(val,elesize,&index);
uint8_t oldcount = pHLLInfo->buckets[index];
if (count > oldcount) {
pHLLInfo->buckets[index] = count;
}
}
GET_RES_INFO(pCtx)->numOfRes = 1;
}
static void hll_func_merge(SQLFunctionCtx *pCtx) {
SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx);
SHLLInfo *pHLLInfo = (SHLLInfo *)GET_ROWCELL_INTERBUF(pResInfo);
SHLLInfo *pData = (SHLLInfo *)GET_INPUT_DATA_LIST(pCtx);
for (int i = 0; i < HLL_BUCKETS; i++) {
if (pData->buckets[i] > pHLLInfo->buckets[i]) {
pHLLInfo->buckets[i] = pData->buckets[i];
}
}
}
static void hll_func_finalizer(SQLFunctionCtx *pCtx) {
SHLLInfo *pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
GET_RES_INFO(pCtx)->numOfRes = 1;
*(uint64_t *)(pCtx->pOutput) = hllCountCnt(pInfo->buckets);
doFinalizer(pCtx);
}
/* hyperloglog end */
int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionId, int32_t param, int16_t *type,
int32_t *bytes, int32_t *interBytes, int16_t extLength, bool isSuperTable, SUdfInfo* pUdfInfo) {
if (!isValidDataType(dataType)) {
......@@ -428,6 +575,11 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI
*bytes = (sizeof(STailInfo) + (sizeof(TailUnit) + dataBytes + POINTER_BYTES + extLength) * param);
*interBytes = *bytes;
return TSDB_CODE_SUCCESS;
} else if (functionId == TSDB_FUNC_HYPERLOGLOG) {
*type = TSDB_DATA_TYPE_BINARY;
*bytes = sizeof(SHLLInfo);
*interBytes = sizeof(SHLLInfo);
return TSDB_CODE_SUCCESS;
} else if (functionId == TSDB_FUNC_SAMPLE) {
*type = TSDB_DATA_TYPE_BINARY;
......@@ -584,6 +736,10 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI
// the output column may be larger than sizeof(STopBotInfo)
*interBytes = (int32_t)size;
} else if (functionId == TSDB_FUNC_HYPERLOGLOG) {
*type = TSDB_DATA_TYPE_UBIGINT;
*bytes = sizeof(uint64_t);
*interBytes = sizeof(SHLLInfo);
} else if (functionId == TSDB_FUNC_SAMPLE) {
*type = (int16_t)dataType;
*bytes = dataBytes;
......@@ -2403,18 +2559,6 @@ static void copyTopBotRes(SQLFunctionCtx *pCtx, int32_t type) {
tfree(pData);
}
static void *getOutputInfo(SQLFunctionCtx *pCtx) {
SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx);
// only the first_stage_merge is directly written data into final output buffer
if (pCtx->stableQuery && pCtx->currentStage != MERGE_STAGE) {
return pCtx->pOutput;
} else { // during normal table query and super table at the secondary_stage, result is written to intermediate buffer
return GET_ROWCELL_INTERBUF(pResInfo);
}
}
/*
* keep the intermediate results during scan data blocks in the format of:
* +-----------------------------------+-------------one value pair-----------+------------next value pair-----------+
......@@ -5815,6 +5959,7 @@ static void wduration_function(SQLFunctionCtx *pCtx) {
}
*(int64_t *)(pCtx->pOutput) = duration;
}
/////////////////////////////////////////////////////////////////////////////////////////////
/*
* function compatible list.
......@@ -5835,8 +5980,8 @@ int32_t functionCompatList[] = {
1, 1, 1, 1, -1, 1, 1, 1, 5, 1, 1,
// tid_tag, deriv, csum, mavg, sample, block_info, elapsed, histogram, unique, mode, tail
6, 8, -1, -1, -1, 7, 1, -1, -1, 1, -1,
// stateCount, stateDuration, wstart, wstop, wduration,
1, 1, 1, 1, 1,
// stateCount, stateDuration, wstart, wstop, wduration, hyperloglog
1, 1, 1, 1, 1, 1
};
SAggFunctionInfo aAggs[TSDB_FUNC_MAX_NUM] = {{
......@@ -6405,5 +6550,17 @@ SAggFunctionInfo aAggs[TSDB_FUNC_MAX_NUM] = {{
doFinalizer,
copy_function,
dataBlockRequired,
},
{
// 47
"hyperloglog",
TSDB_FUNC_HYPERLOGLOG,
TSDB_FUNC_HYPERLOGLOG,
TSDB_BASE_FUNC_SO,
function_setup,
hll_function,
hll_func_finalizer,
hll_func_merge,
dataBlockRequired,
}
};
......@@ -33,7 +33,8 @@ typedef void (*_hash_free_fn_t)(void *param);
*/
uint32_t MurmurHash3_32(const char *key, uint32_t len);
/**
uint64_t MurmurHash3_64(const void *key, uint32_t len);
/**
*
* @param key
* @param len
......
......@@ -78,6 +78,42 @@ uint32_t MurmurHash3_32(const char *key, uint32_t len) {
return h1;
}
uint64_t MurmurHash3_64(const void *key, uint32_t len) {
const uint64_t m = 0x87c37b91114253d5;
const int r = 47;
uint32_t seed = 0x12345678;
uint64_t h = seed ^ (len * m);
const uint8_t *data = (const uint8_t *)key;
const uint8_t *end = data + (len-(len&7));
while(data != end) {
uint64_t k = *((uint64_t*)data);
k *= m;
k ^= k >> r;
k *= m;
h ^= k;
h *= m;
data += 8;
}
switch(len & 7) {
case 7: h ^= (uint64_t)data[6] << 48; /* fall-thru */
case 6: h ^= (uint64_t)data[5] << 40; /* fall-thru */
case 5: h ^= (uint64_t)data[4] << 32; /* fall-thru */
case 4: h ^= (uint64_t)data[3] << 24; /* fall-thru */
case 3: h ^= (uint64_t)data[2] << 16; /* fall-thru */
case 2: h ^= (uint64_t)data[1] << 8; /* fall-thru */
case 1: h ^= (uint64_t)data[0];
h *= m; /* fall-thru */
};
h ^= h >> r;
h *= m;
h ^= h >> r;
return h;
}
uint32_t taosIntHash_32(const char *key, uint32_t UNUSED_PARAM(len)) { return *(uint32_t *)key; }
uint32_t taosIntHash_16(const char *key, uint32_t UNUSED_PARAM(len)) { return *(uint16_t *)key; }
uint32_t taosIntHash_8(const char *key, uint32_t UNUSED_PARAM(len)) { return *(uint8_t *)key; }
......
###################################################################
# Copyright (c) 2021 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
import sys
from util.log import *
from util.cases import *
from util.sql import *
class TDTestCase:
def caseDescription(self):
'''
case1<markwang>: [TD-13893] hyperloglog unique
'''
return
def init(self, conn, logSql):
tdLog.debug("start to execute %s" % __file__)
tdSql.init(conn.cursor(), logSql)
self._conn = conn
def run(self):
print("running {}".format(__file__))
tdSql.execute("drop database if exists hll")
tdSql.execute("create database if not exists hll")
tdSql.execute('use hll')
tdSql.execute('create table shll (ts timestamp, dbig bigint, dsmall smallint, dbool bool, dtiny tinyint unsigned, dfloat float, ddouble double, dnchar nchar(4093), dbinary binary(64), dtime timestamp) tags (tbinary nchar(4093), tint int)')
tdSql.execute('create table hll1 using shll tags ("t1", 1)')
tdSql.execute('create table hll2 using shll tags ("t2", 2)')
tdSql.execute('insert into hll1 values("2021-10-17 00:31:31", 1, -3276, true, 253, 3.32333, 4.984392323, "你好", "sddd", 333) ("2022-01-24 00:31:32", 1, -32767, false, 254, NULL, 4.982392323, "你好吗", "sdf",2323)')
tdSql.execute('insert into hll2 values("2021-10-15 00:31:33", 1, NULL, true, 23, 3.4, 4.982392323, "你好吗", "sdf", 333) ("2021-12-24 00:31:34", 2, 32767, NULL, NULL, NULL, 4.982392323, NULL, "sddd", NULL) ("2022-01-01 08:00:05", 19, 3276, true, 2, 3.323222, 4.92323, "试试", "sddd", 1645434434000)')
tdSql.execute('insert into hll2 values("2021-10-17 00:31:31", NULL, 32767, true, 123, 3.323232333, 4.2, NULL, NULL, NULL) ("2022-01-01 08:00:06", NULL, NULL, NULL, 35, 3.323232333, NULL, "试试", NULL, 1645434434000) ("2022-01-01 08:00:07", 9, 54, true, 25, 3.32333, NULL, "试试", NULL, 1645434434001)')
## test normal table
tdSql.query('select hyperloglog(ts) from hll2')
tdSql.checkRows(1)
tdSql.checkData(0, 0, 6)
tdSql.query('select hyperloglog(dbig) from hll2')
tdSql.checkRows(1)
tdSql.checkData(0, 0, 4)
tdSql.query('select hyperloglog(dsmall) from hll2')
tdSql.checkRows(1)
tdSql.checkData(0, 0, 3)
tdSql.query('select hyperloglog(dbool) from hll2')
tdSql.checkRows(1)
tdSql.checkData(0, 0, 1)
tdSql.query('select hyperloglog(dtiny) from hll2')
tdSql.checkRows(1)
tdSql.checkData(0, 0, 5)
tdSql.query('select hyperloglog(dfloat) from hll2')
tdSql.checkRows(1)
tdSql.checkData(0, 0, 4)
tdSql.query('select hyperloglog(ddouble) from hll2')
tdSql.checkRows(1)
tdSql.checkData(0, 0, 3)
tdSql.query('select hyperloglog(dnchar) from hll2')
tdSql.checkRows(1)
tdSql.checkData(0, 0, 2)
tdSql.query('select hyperloglog(dbinary) from hll2')
tdSql.checkRows(1)
tdSql.checkData(0, 0, 2)
## test super table
tdSql.query('select hyperloglog(dnchar) from shll')
tdSql.checkRows(1)
tdSql.checkData(0, 0, 3)
# test group by
#group by column
tdSql.query('select hyperloglog(dnchar) from shll group by dnchar')
tdSql.checkRows(4)
tdSql.checkData(0, 0, 0)
tdSql.checkData(1, 0, 1)
tdSql.checkData(2, 0, 1)
tdSql.checkData(3, 0, 1)
tdSql.query('select hyperloglog(dsmall) from shll group by dnchar')
tdSql.checkRows(4)
tdSql.checkData(0, 0, 1)
tdSql.checkData(1, 0, 1)
tdSql.checkData(2, 0, 1)
tdSql.checkData(3, 0, 2)
tdSql.query('select hyperloglog(dsmall) from hll2 group by dnchar')
tdSql.checkRows(3)
tdSql.checkData(0, 0, 1)
tdSql.checkData(1, 0, 0)
tdSql.checkData(2, 0, 2)
#group by tbname
tdSql.query('select hyperloglog(dsmall) from shll group by tbname')
tdSql.checkRows(2)
tdSql.checkData(0, 0, 2)
tdSql.checkData(1, 0, 3)
#group by tag
tdSql.query('select hyperloglog(dnchar) from shll group by tint')
tdSql.checkRows(2)
tdSql.checkData(0, 0, 2)
tdSql.checkData(1, 0, 2)
#test order by
#order by column [desc]
tdSql.query('select hyperloglog(dnchar) from shll group by dnchar order by dnchar desc')
tdSql.checkRows(4)
tdSql.checkData(0, 0, 1)
tdSql.checkData(1, 0, 1)
tdSql.checkData(2, 0, 1)
tdSql.checkData(3, 0, 0)
#order by tag
tdSql.query('select hyperloglog(dsmall) from shll group by tint order by tint desc')
tdSql.checkRows(2)
tdSql.checkData(0, 0, 3)
tdSql.checkData(1, 0, 2)
# error
tdSql.error("select hyperloglog(ts,1) from shll")
#interval
tdSql.query('select hyperloglog(dnchar) from shll interval(1s)')
tdSql.checkRows(7)
tdSql.checkData(0, 1, 1)
tdSql.checkData(1, 1, 1)
tdSql.checkData(2, 0, "2021-12-24 00:31:34")
tdSql.checkData(2, 1, 0)
tdSql.checkData(3, 1, 1)
tdSql.checkData(4, 1, 1)
tdSql.checkData(5, 1, 1)
tdSql.checkData(6, 1, 1)
tdSql.query('select hyperloglog(dnchar) from shll interval(1w)')
tdSql.checkRows(4)
tdSql.checkData(0, 1, 2)
tdSql.checkData(1, 1, 0)
tdSql.checkData(2, 1, 1)
tdSql.checkData(3, 1, 1)
#state_window
tdSql.query('select hyperloglog(dnchar) from hll2 state_window(dsmall)')
tdSql.checkRows(5)
#session
tdSql.query('select hyperloglog(dbinary) from hll2 session(ts,2w)')
tdSql.checkRows(2)
tdSql.checkData(0, 0, "2021-10-15 00:31:33")
tdSql.checkData(0, 1, 1)
tdSql.checkData(1, 0, "2021-12-24 00:31:34")
tdSql.checkData(1, 1, 1)
#where
tdSql.query('select hyperloglog(dbinary) from shll where dnchar="试试"')
tdSql.checkRows(1)
tdSql.checkData(0, 0, 1)
tdSql.query('select hyperloglog(dbinary) from shll where ts <= "2022-01-01 08:00:05"')
tdSql.checkRows(1)
tdSql.checkData(0, 0, 2)
#slimit/soffset
tdSql.query('select hyperloglog(dsmall) from shll group by dnchar slimit 2 soffset 2')
tdSql.checkRows(2)
tdSql.checkData(0, 0, 1)
tdSql.checkData(1, 0, 2)
#limit/offset
tdSql.query('select hyperloglog(dnchar) from shll interval(1s) limit 1,3')
tdSql.checkRows(3)
tdSql.checkData(0, 0, "2021-10-17 00:31:31")
tdSql.checkData(0, 1, 1)
tdSql.checkData(1, 1, 0)
tdSql.checkData(2, 1, 1)
#having
tdSql.query('select hyperloglog(dsmall) from shll group by dnchar having hyperloglog(dsmall)>1')
tdSql.checkRows(1)
tdSql.checkData(0, 0, 2)
#subquery
tdSql.query('select hyperloglog(dbinary) from (select dbinary from shll where dnchar = "试试")')
tdSql.checkRows(1)
tdSql.checkData(0, 0, 1)
#union
tdSql.query('select hyperloglog(dtiny) from hll1 union all select hyperloglog(dtiny) from hll2')
tdSql.checkRows(2)
tdSql.checkData(0, 0, 2)
tdSql.checkData(1, 0, 5)
#join
tdSql.execute('create table shll1 (ts timestamp, dbig bigint, dsmall smallint, dbool bool, dtiny tinyint unsigned, dfloat float, ddouble double, dnchar nchar(4093), dbinary binary(64), dtime timestamp) tags (tbinary nchar(4093), tint int)')
tdSql.execute('create table hll11 using shll1 tags ("t1", 1)')
tdSql.execute('insert into hll11 values("2021-10-17 00:31:31", 1, -3276, true, 253, 3.32333, 4.984392323, "你好", "sddd", 333) ("2022-01-24 00:31:32", 1, -32767, false, 254, NULL, 4.982392323, "你好吗", "sdf",2323)')
tdSql.query('select hyperloglog(shll1.ddouble) from shll, shll1 where shll.ts=shll1.ts and shll.tint=shll1.tint')
tdSql.checkRows(1)
tdSql.checkData(0, 0, 2)
def stop(self):
tdSql.close()
tdLog.success("%s successfully executed" % __file__)
tdCases.addWindows(__file__, TDTestCase())
tdCases.addLinux(__file__, TDTestCase())
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册