未验证 提交 10bbc756 编写于 作者: D dapan1121 提交者: GitHub

Merge pull request #7860 from taosdata/tdigest

Tdigest
...@@ -2440,6 +2440,7 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col ...@@ -2440,6 +2440,7 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
const char* msg10 = "derivative duration should be greater than 1 Second"; const char* msg10 = "derivative duration should be greater than 1 Second";
const char* msg11 = "third parameter in derivative should be 0 or 1"; const char* msg11 = "third parameter in derivative should be 0 or 1";
const char* msg12 = "parameter is out of range [1, 100]"; const char* msg12 = "parameter is out of range [1, 100]";
const char* msg13 = "third parameter algorithm must be 'default' or 't-digest'";
switch (functionId) { switch (functionId) {
case TSDB_FUNC_COUNT: { case TSDB_FUNC_COUNT: {
...@@ -2783,8 +2784,16 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col ...@@ -2783,8 +2784,16 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
case TSDB_FUNC_PERCT: case TSDB_FUNC_PERCT:
case TSDB_FUNC_APERCT: { case TSDB_FUNC_APERCT: {
// 1. valid the number of parameters // 1. valid the number of parameters
if (pItem->pNode->Expr.paramList == NULL || taosArrayGetSize(pItem->pNode->Expr.paramList) != 2) { bool valid = true;
/* no parameters or more than one parameter for function */ if(pItem->pNode->Expr.paramList == NULL) {
valid = false;
} else if(functionId == TSDB_FUNC_APERCT) {
size_t cnt = taosArrayGetSize(pItem->pNode->Expr.paramList);
if(cnt != 2 && cnt !=3) valid = false;
} else {
if (taosArrayGetSize(pItem->pNode->Expr.paramList) != 2) valid = false;
}
if(!valid) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg2); return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg2);
} }
...@@ -2830,6 +2839,10 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col ...@@ -2830,6 +2839,10 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
SExprInfo* pExpr = NULL; SExprInfo* pExpr = NULL;
if (functionId == TSDB_FUNC_PERCT || functionId == TSDB_FUNC_APERCT) { if (functionId == TSDB_FUNC_PERCT || functionId == TSDB_FUNC_APERCT) {
// param1 double
if(pVariant->nType != TSDB_DATA_TYPE_DOUBLE && pVariant->nType != TSDB_DATA_TYPE_BIGINT){
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg5);
}
tVariantDump(pVariant, val, TSDB_DATA_TYPE_DOUBLE, true); tVariantDump(pVariant, val, TSDB_DATA_TYPE_DOUBLE, true);
double dp = GET_DOUBLE_VAL(val); double dp = GET_DOUBLE_VAL(val);
...@@ -2850,6 +2863,29 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col ...@@ -2850,6 +2863,29 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
pExpr = tscExprAppend(pQueryInfo, functionId, &index, resultType, resultSize, getNewResColId(pCmd), interResult, false); pExpr = tscExprAppend(pQueryInfo, functionId, &index, resultType, resultSize, getNewResColId(pCmd), interResult, false);
tscExprAddParams(&pExpr->base, val, TSDB_DATA_TYPE_DOUBLE, sizeof(double)); tscExprAddParams(&pExpr->base, val, TSDB_DATA_TYPE_DOUBLE, sizeof(double));
// param2 int32
if (taosArrayGetSize(pItem->pNode->Expr.paramList) == 3) {
if (pParamElem[2].pNode != NULL) {
pVariant = &pParamElem[2].pNode->value;
// check type must string
if(pVariant->nType != TSDB_DATA_TYPE_BINARY || pVariant->pz == NULL){
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg13);
}
char* pzAlgo = pVariant->pz;
int32_t algo = 0;
if(strcasecmp(pzAlgo, "t-digest") == 0) {
algo = 1;
} else if(strcasecmp(pzAlgo, "default") == 0){
algo = 0;
} else {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg13);
}
// append algo int32_t
tscExprAddParams(&pExpr->base, (char*)&algo, TSDB_DATA_TYPE_INT, sizeof(int32_t));
}
}
} else if (functionId == TSDB_FUNC_MAVG || functionId == TSDB_FUNC_SAMPLE) { } else if (functionId == TSDB_FUNC_MAVG || functionId == TSDB_FUNC_SAMPLE) {
tVariantDump(pVariant, val, TSDB_DATA_TYPE_BIGINT, true); tVariantDump(pVariant, val, TSDB_DATA_TYPE_BIGINT, true);
......
...@@ -112,6 +112,10 @@ extern "C" { ...@@ -112,6 +112,10 @@ extern "C" {
#define MAX_INTERVAL_TIME_WINDOW 1000000 // maximum allowed time windows in final results #define MAX_INTERVAL_TIME_WINDOW 1000000 // maximum allowed time windows in final results
#define TOP_BOTTOM_QUERY_LIMIT 100 #define TOP_BOTTOM_QUERY_LIMIT 100
// apercentile(arg1,agr2,arg3) param arg3 value is below:
#define ALGO_DEFAULT 0
#define ALGO_TDIGEST 1
enum { enum {
MASTER_SCAN = 0x0u, MASTER_SCAN = 0x0u,
REVERSE_SCAN = 0x1u, REVERSE_SCAN = 0x1u,
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
/*
* include/tdigest.c
*
* Copyright (c) 2016, Usman Masood <usmanm at fastmail dot fm>
*/
#ifndef TDIGEST_H
#define TDIGEST_H
#ifndef M_PI
#define M_PI 3.14159265358979323846264338327950288 /* pi */
#endif
#define DOUBLE_MAX 1.79e+308
#define ADDITION_CENTROID_NUM 2
#define COMPRESSION 400
#define GET_CENTROID(compression) (ceil(compression * M_PI / 2) + 1 + ADDITION_CENTROID_NUM)
#define GET_THRESHOLD(compression) (7.5 + 0.37 * compression - 2e-4 * pow(compression, 2))
#define TDIGEST_SIZE(compression) (sizeof(TDigest) + sizeof(SCentroid)*GET_CENTROID(compression) + sizeof(SPt)*GET_THRESHOLD(compression))
typedef struct SCentroid {
double mean;
int64_t weight;
}SCentroid;
typedef struct SPt {
double value;
int64_t weight;
}SPt;
typedef struct TDigest {
double compression;
int32_t threshold;
int64_t size;
int64_t total_weight;
double min;
double max;
int32_t num_buffered_pts;
SPt *buffered_pts;
int32_t num_centroids;
SCentroid *centroids;
}TDigest;
TDigest *tdigestNewFrom(void* pBuf, int32_t compression);
void tdigestAdd(TDigest *t, double x, int64_t w);
void tdigestMerge(TDigest *t1, TDigest *t2);
double tdigestQuantile(TDigest *t, double q);
void tdigestCompress(TDigest *t);
void tdigestFreeFrom(TDigest *t);
void tdigestAutoFill(TDigest* t, int32_t compression);
#endif /* TDIGEST_H */
...@@ -17,6 +17,7 @@ ...@@ -17,6 +17,7 @@
#include "taosdef.h" #include "taosdef.h"
#include "taosmsg.h" #include "taosmsg.h"
#include "texpr.h" #include "texpr.h"
#include "tdigest.h"
#include "ttype.h" #include "ttype.h"
#include "tsdb.h" #include "tsdb.h"
#include "tglobal.h" #include "tglobal.h"
...@@ -145,6 +146,7 @@ typedef struct SLeastsquaresInfo { ...@@ -145,6 +146,7 @@ typedef struct SLeastsquaresInfo {
typedef struct SAPercentileInfo { typedef struct SAPercentileInfo {
SHistogramInfo *pHisto; SHistogramInfo *pHisto;
TDigest* pTDigest;
} SAPercentileInfo; } SAPercentileInfo;
typedef struct STSCompInfo { typedef struct STSCompInfo {
...@@ -337,7 +339,9 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI ...@@ -337,7 +339,9 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} else if (functionId == TSDB_FUNC_APERCT) { } else if (functionId == TSDB_FUNC_APERCT) {
*type = TSDB_DATA_TYPE_BINARY; *type = TSDB_DATA_TYPE_BINARY;
*bytes = sizeof(SHistBin) * (MAX_HISTOGRAM_BIN + 1) + sizeof(SHistogramInfo) + sizeof(SAPercentileInfo); int16_t bytesHist = sizeof(SHistBin) * (MAX_HISTOGRAM_BIN + 1) + sizeof(SHistogramInfo) + sizeof(SAPercentileInfo);
int16_t bytesDigest = (int16_t)(sizeof(SAPercentileInfo) + TDIGEST_SIZE(COMPRESSION));
*bytes = MAX(bytesHist, bytesDigest);
*interBytes = *bytes; *interBytes = *bytes;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
...@@ -370,8 +374,9 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI ...@@ -370,8 +374,9 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI
} else if (functionId == TSDB_FUNC_APERCT) { } else if (functionId == TSDB_FUNC_APERCT) {
*type = TSDB_DATA_TYPE_DOUBLE; *type = TSDB_DATA_TYPE_DOUBLE;
*bytes = sizeof(double); *bytes = sizeof(double);
*interBytes = int16_t bytesHist = sizeof(SAPercentileInfo) + sizeof(SHistogramInfo) + sizeof(SHistBin) * (MAX_HISTOGRAM_BIN + 1);
sizeof(SAPercentileInfo) + sizeof(SHistogramInfo) + sizeof(SHistBin) * (MAX_HISTOGRAM_BIN + 1); int16_t bytesDigest = (int16_t)(sizeof(SAPercentileInfo) + TDIGEST_SIZE(COMPRESSION));
*interBytes = MAX(bytesHist, bytesDigest);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} else if (functionId == TSDB_FUNC_TWA) { } else if (functionId == TSDB_FUNC_TWA) {
*type = TSDB_DATA_TYPE_DOUBLE; *type = TSDB_DATA_TYPE_DOUBLE;
...@@ -2490,17 +2495,135 @@ static SAPercentileInfo *getAPerctInfo(SQLFunctionCtx *pCtx) { ...@@ -2490,17 +2495,135 @@ static SAPercentileInfo *getAPerctInfo(SQLFunctionCtx *pCtx) {
} else { } else {
pInfo = GET_ROWCELL_INTERBUF(pResInfo); pInfo = GET_ROWCELL_INTERBUF(pResInfo);
} }
buildHistogramInfo(pInfo);
return pInfo; return pInfo;
} }
//
// ----------------- tdigest -------------------
//
//////////////////////////////////////////////////////////////////////////////////
static bool tdigest_setup(SQLFunctionCtx *pCtx, SResultRowCellInfo *pResultInfo) {
if (!function_setup(pCtx, pResultInfo)) {
return false;
}
// new TDigest
SAPercentileInfo *pInfo = getAPerctInfo(pCtx);
char *tmp = (char *)pInfo + sizeof(SAPercentileInfo);
pInfo->pTDigest = tdigestNewFrom(tmp, COMPRESSION);
return true;
}
static void tdigest_do(SQLFunctionCtx *pCtx) {
int32_t notNullElems = 0;
SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx);
SAPercentileInfo * pAPerc = getAPerctInfo(pCtx);
assert(pAPerc->pTDigest != NULL);
if(pAPerc->pTDigest == NULL) {
qError("tdigest_do tdigest is null.");
return ;
}
for (int32_t i = 0; i < pCtx->size; ++i) {
char *data = GET_INPUT_DATA(pCtx, i);
if (pCtx->hasNull && isNull(data, pCtx->inputType)) {
continue;
}
notNullElems += 1;
double v = 0; // value
long long w = 1; // weigth
GET_TYPED_DATA(v, double, pCtx->inputType, data);
tdigestAdd(pAPerc->pTDigest, v, w);
}
if (!pCtx->hasNull) {
assert(pCtx->size == notNullElems);
}
SET_VAL(pCtx, notNullElems, 1);
if (notNullElems > 0) {
pResInfo->hasResult = DATA_SET_FLAG;
}
}
static void tdigest_merge(SQLFunctionCtx *pCtx) {
SAPercentileInfo *pInput = (SAPercentileInfo *)GET_INPUT_DATA_LIST(pCtx);
assert(pInput->pTDigest);
pInput->pTDigest = (TDigest*)((char*)pInput + sizeof(SAPercentileInfo));
tdigestAutoFill(pInput->pTDigest, COMPRESSION);
// input merge no elements , no need merge
if(pInput->pTDigest->num_centroids == 0 && pInput->pTDigest->num_buffered_pts == 0) {
return ;
}
SAPercentileInfo *pOutput = getAPerctInfo(pCtx);
if(pOutput->pTDigest->num_centroids == 0) {
memcpy(pOutput->pTDigest, pInput->pTDigest, (size_t)TDIGEST_SIZE(COMPRESSION));
tdigestAutoFill(pOutput->pTDigest, COMPRESSION);
} else {
tdigestMerge(pOutput->pTDigest, pInput->pTDigest);
}
SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx);
pResInfo->hasResult = DATA_SET_FLAG;
SET_VAL(pCtx, 1, 1);
}
static void tdigest_finalizer(SQLFunctionCtx *pCtx) {
double q = (pCtx->param[0].nType == TSDB_DATA_TYPE_INT) ? pCtx->param[0].i64 : pCtx->param[0].dKey;
SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx);
SAPercentileInfo * pAPerc = getAPerctInfo(pCtx);
if (pCtx->currentStage == MERGE_STAGE) {
if (pResInfo->hasResult == DATA_SET_FLAG) { // check for null
double res = tdigestQuantile(pAPerc->pTDigest, q/100);
memcpy(pCtx->pOutput, &res, sizeof(double));
} else {
setNull(pCtx->pOutput, pCtx->outputType, pCtx->outputBytes);
return;
}
} else {
if (pAPerc->pTDigest->size > 0) {
double res = tdigestQuantile(pAPerc->pTDigest, q/100);
memcpy(pCtx->pOutput, &res, sizeof(double));
} else { // no need to free
setNull(pCtx->pOutput, pCtx->outputType, pCtx->outputBytes);
return;
}
}
pAPerc->pTDigest = NULL;
doFinalizer(pCtx);
}
//////////////////////////////////////////////////////////////////////////////////
int32_t getAlgo(SQLFunctionCtx * pCtx) {
if(pCtx->numOfParams != 2){
return ALGO_DEFAULT;
}
if(pCtx->param[1].nType != TSDB_DATA_TYPE_INT) {
return ALGO_DEFAULT;
}
return (int32_t)pCtx->param[1].i64;
}
static bool apercentile_function_setup(SQLFunctionCtx *pCtx, SResultRowCellInfo* pResultInfo) { static bool apercentile_function_setup(SQLFunctionCtx *pCtx, SResultRowCellInfo* pResultInfo) {
if (getAlgo(pCtx) == ALGO_TDIGEST) {
return tdigest_setup(pCtx, pResultInfo);
}
if (!function_setup(pCtx, pResultInfo)) { if (!function_setup(pCtx, pResultInfo)) {
return false; return false;
} }
SAPercentileInfo *pInfo = getAPerctInfo(pCtx); SAPercentileInfo *pInfo = getAPerctInfo(pCtx);
buildHistogramInfo(pInfo);
char *tmp = (char *)pInfo + sizeof(SAPercentileInfo); char *tmp = (char *)pInfo + sizeof(SAPercentileInfo);
pInfo->pHisto = tHistogramCreateFrom(tmp, MAX_HISTOGRAM_BIN); pInfo->pHisto = tHistogramCreateFrom(tmp, MAX_HISTOGRAM_BIN);
...@@ -2508,10 +2631,16 @@ static bool apercentile_function_setup(SQLFunctionCtx *pCtx, SResultRowCellInfo* ...@@ -2508,10 +2631,16 @@ static bool apercentile_function_setup(SQLFunctionCtx *pCtx, SResultRowCellInfo*
} }
static void apercentile_function(SQLFunctionCtx *pCtx) { static void apercentile_function(SQLFunctionCtx *pCtx) {
if (getAlgo(pCtx) == ALGO_TDIGEST) {
tdigest_do(pCtx);
return;
}
int32_t notNullElems = 0; int32_t notNullElems = 0;
SResultRowCellInfo * pResInfo = GET_RES_INFO(pCtx); SResultRowCellInfo * pResInfo = GET_RES_INFO(pCtx);
SAPercentileInfo *pInfo = getAPerctInfo(pCtx); SAPercentileInfo *pInfo = getAPerctInfo(pCtx);
buildHistogramInfo(pInfo);
assert(pInfo->pHisto->elems != NULL); assert(pInfo->pHisto->elems != NULL);
...@@ -2540,6 +2669,11 @@ static void apercentile_function(SQLFunctionCtx *pCtx) { ...@@ -2540,6 +2669,11 @@ static void apercentile_function(SQLFunctionCtx *pCtx) {
} }
static void apercentile_func_merge(SQLFunctionCtx *pCtx) { static void apercentile_func_merge(SQLFunctionCtx *pCtx) {
if (getAlgo(pCtx) == ALGO_TDIGEST) {
tdigest_merge(pCtx);
return;
}
SAPercentileInfo *pInput = (SAPercentileInfo *)GET_INPUT_DATA_LIST(pCtx); SAPercentileInfo *pInput = (SAPercentileInfo *)GET_INPUT_DATA_LIST(pCtx);
pInput->pHisto = (SHistogramInfo*) ((char *)pInput + sizeof(SAPercentileInfo)); pInput->pHisto = (SHistogramInfo*) ((char *)pInput + sizeof(SAPercentileInfo));
...@@ -2550,6 +2684,7 @@ static void apercentile_func_merge(SQLFunctionCtx *pCtx) { ...@@ -2550,6 +2684,7 @@ static void apercentile_func_merge(SQLFunctionCtx *pCtx) {
} }
SAPercentileInfo *pOutput = getAPerctInfo(pCtx); SAPercentileInfo *pOutput = getAPerctInfo(pCtx);
buildHistogramInfo(pOutput);
SHistogramInfo *pHisto = pOutput->pHisto; SHistogramInfo *pHisto = pOutput->pHisto;
if (pHisto->numOfElems <= 0) { if (pHisto->numOfElems <= 0) {
...@@ -2570,6 +2705,11 @@ static void apercentile_func_merge(SQLFunctionCtx *pCtx) { ...@@ -2570,6 +2705,11 @@ static void apercentile_func_merge(SQLFunctionCtx *pCtx) {
} }
static void apercentile_finalizer(SQLFunctionCtx *pCtx) { static void apercentile_finalizer(SQLFunctionCtx *pCtx) {
if (getAlgo(pCtx) == ALGO_TDIGEST) {
tdigest_finalizer(pCtx);
return;
}
double v = (pCtx->param[0].nType == TSDB_DATA_TYPE_INT) ? pCtx->param[0].i64 : pCtx->param[0].dKey; double v = (pCtx->param[0].nType == TSDB_DATA_TYPE_INT) ? pCtx->param[0].i64 : pCtx->param[0].dKey;
SResultRowCellInfo * pResInfo = GET_RES_INFO(pCtx); SResultRowCellInfo * pResInfo = GET_RES_INFO(pCtx);
......
...@@ -1926,7 +1926,7 @@ static bool functionNeedToExecute(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx ...@@ -1926,7 +1926,7 @@ static bool functionNeedToExecute(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx
// in the reverse table scan, only the following functions need to be executed // in the reverse table scan, only the following functions need to be executed
if (IS_REVERSE_SCAN(pRuntimeEnv) || if (IS_REVERSE_SCAN(pRuntimeEnv) ||
(pRuntimeEnv->scanFlag == REPEAT_SCAN && functionId != TSDB_FUNC_STDDEV && functionId != TSDB_FUNC_PERCT)) { (pRuntimeEnv->scanFlag == REPEAT_SCAN && functionId != TSDB_FUNC_STDDEV && functionId != TSDB_FUNC_PERCT && functionId != TSDB_FUNC_APERCT)) {
return false; return false;
} }
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
/*
* src/tdigest.c
*
* Implementation of the t-digest data structure used to compute accurate percentiles.
*
* It is based on the MergingDigest implementation found at:
* https://github.com/tdunning/t-digest/blob/master/src/main/java/com/tdunning/math/stats/MergingDigest.java
*
* Copyright (c) 2016, Usman Masood <usmanm at fastmail dot fm>
*/
#include "os.h"
#include "osMath.h"
#include "tdigest.h"
#define INTERPOLATE(x, x0, x1) (((x) - (x0)) / ((x1) - (x0)))
//#define INTEGRATED_LOCATION(compression, q) ((compression) * (asin(2 * (q) - 1) + M_PI / 2) / M_PI)
#define INTEGRATED_LOCATION(compression, q) ((compression) * (asin(2 * (double)(q) - 1)/M_PI + (double)1/2))
#define FLOAT_EQ(f1, f2) (fabs((f1) - (f2)) <= FLT_EPSILON)
typedef struct SMergeArgs {
TDigest *t;
SCentroid *centroids;
int32_t idx;
double weight_so_far;
double k1;
double min;
double max;
}SMergeArgs;
void tdigestAutoFill(TDigest* t, int32_t compression) {
t->centroids = (SCentroid*)((char*)t + sizeof(TDigest));
t->buffered_pts = (SPt*) ((char*)t + sizeof(TDigest) + sizeof(SCentroid) * (int32_t)GET_CENTROID(compression));
}
TDigest *tdigestNewFrom(void* pBuf, int32_t compression) {
memset(pBuf, 0, (size_t)TDIGEST_SIZE(compression));
TDigest* t = (TDigest*)pBuf;
tdigestAutoFill(t, compression);
t->compression = compression;
t->size = (int64_t)GET_CENTROID(compression);
t->threshold = (int32_t)GET_THRESHOLD(compression);
t->min = DOUBLE_MAX;
t->max = -DOUBLE_MAX;
return t;
}
static int32_t cmpCentroid(const void *a, const void *b) {
SCentroid *c1 = (SCentroid *) a;
SCentroid *c2 = (SCentroid *) b;
if (c1->mean < c2->mean)
return -1;
if (c1->mean > c2->mean)
return 1;
return 0;
}
static void mergeCentroid(SMergeArgs *args, SCentroid *merge) {
double k2;
SCentroid *c = &args->centroids[args->idx];
args->weight_so_far += merge->weight;
k2 = INTEGRATED_LOCATION(args->t->size,
args->weight_so_far / args->t->total_weight);
//idx++
if(k2 - args->k1 > 1 && c->weight > 0) {
if(args->idx + 1 < args->t->size
&& merge->mean != args->centroids[args->idx].mean) {
args->idx++;
}
args->k1 = k2;
}
c = &args->centroids[args->idx];
if(c->mean == merge->mean) {
c->weight += merge->weight;
} else {
c->weight += merge->weight;
c->mean += (merge->mean - c->mean) * merge->weight / c->weight;
if (merge->weight > 0) {
args->min = MIN(merge->mean, args->min);
args->max = MAX(merge->mean, args->max);
}
}
}
void tdigestCompress(TDigest *t) {
SCentroid *unmerged_centroids;
int64_t unmerged_weight = 0;
int32_t num_unmerged = t->num_buffered_pts;
int32_t i, j;
SMergeArgs args;
if (t->num_buffered_pts <= 0)
return;
unmerged_centroids = (SCentroid*)malloc(sizeof(SCentroid) * t->num_buffered_pts);
for (i = 0; i < num_unmerged; i++) {
SPt *p = t->buffered_pts + i;
SCentroid *c = &unmerged_centroids[i];
c->mean = p->value;
c->weight = p->weight;
unmerged_weight += c->weight;
}
t->num_buffered_pts = 0;
t->total_weight += unmerged_weight;
qsort(unmerged_centroids, num_unmerged, sizeof(SCentroid), cmpCentroid);
memset(&args, 0, sizeof(SMergeArgs));
args.centroids = (SCentroid*)malloc((size_t)(sizeof(SCentroid) * t->size));
memset(args.centroids, 0, (size_t)(sizeof(SCentroid) * t->size));
args.t = t;
args.min = DOUBLE_MAX;
args.max = -DOUBLE_MAX;
i = 0;
j = 0;
while (i < num_unmerged && j < t->num_centroids) {
SCentroid *a = &unmerged_centroids[i];
SCentroid *b = &t->centroids[j];
if (a->mean <= b->mean) {
mergeCentroid(&args, a);
assert(args.idx < t->size);
i++;
} else {
mergeCentroid(&args, b);
assert(args.idx < t->size);
j++;
}
}
while (i < num_unmerged) {
mergeCentroid(&args, &unmerged_centroids[i++]);
assert(args.idx < t->size);
}
free((void*)unmerged_centroids);
while (j < t->num_centroids) {
mergeCentroid(&args, &t->centroids[j++]);
assert(args.idx < t->size);
}
if (t->total_weight > 0) {
t->min = MIN(t->min, args.min);
if (args.centroids[args.idx].weight <= 0) {
args.idx--;
}
t->num_centroids = args.idx + 1;
t->max = MAX(t->max, args.max);
}
memcpy(t->centroids, args.centroids, sizeof(SCentroid) * t->num_centroids);
free((void*)args.centroids);
}
void tdigestAdd(TDigest* t, double x, int64_t w) {
if (w == 0)
return;
int32_t i = t->num_buffered_pts;
if(i > 0 && t->buffered_pts[i-1].value == x ) {
t->buffered_pts[i].weight = w;
} else {
t->buffered_pts[i].value = x;
t->buffered_pts[i].weight = w;
t->num_buffered_pts++;
}
if (t->num_buffered_pts >= t->threshold)
tdigestCompress(t);
}
double tdigestCDF(TDigest *t, double x) {
if (t == NULL)
return 0;
int32_t i;
double left, right;
int64_t weight_so_far;
SCentroid *a, *b, tmp;
tdigestCompress(t);
if (t->num_centroids == 0)
return NAN;
if (x < t->min)
return 0;
if (x > t->max)
return 1;
if (t->num_centroids == 1) {
if (FLOAT_EQ(t->max, t->min))
return 0.5;
return INTERPOLATE(x, t->min, t->max);
}
weight_so_far = 0;
a = b = &tmp;
b->mean = t->min;
b->weight = 0;
right = 0;
for (i = 0; i < t->num_centroids; i++) {
SCentroid *c = &t->centroids[i];
left = b->mean - (a->mean + right);
a = b;
b = c;
right = (b->mean - a->mean) * a->weight / (a->weight + b->weight);
if (x < a->mean + right) {
double cdf = (weight_so_far
+ a->weight
* INTERPOLATE(x, a->mean - left, a->mean + right))
/ t->total_weight;
return MAX(cdf, 0.0);
}
weight_so_far += a->weight;
}
left = b->mean - (a->mean + right);
a = b;
right = t->max - a->mean;
if (x < a->mean + right) {
return (weight_so_far + a->weight * INTERPOLATE(x, a->mean - left, a->mean + right))
/ t->total_weight;
}
return 1;
}
double tdigestQuantile(TDigest *t, double q) {
if (t == NULL)
return 0;
int32_t i;
double left, right, idx;
int64_t weight_so_far;
SCentroid *a, *b, tmp;
tdigestCompress(t);
if (t->num_centroids == 0)
return NAN;
if (t->num_centroids == 1)
return t->centroids[0].mean;
if (FLOAT_EQ(q, 0.0))
return t->min;
if (FLOAT_EQ(q, 1.0))
return t->max;
idx = q * t->total_weight;
weight_so_far = 0;
b = &tmp;
b->mean = t->min;
b->weight = 0;
right = t->min;
for (i = 0; i < t->num_centroids; i++) {
SCentroid *c = &t->centroids[i];
a = b;
left = right;
b = c;
right = (b->weight * a->mean + a->weight * b->mean)/ (a->weight + b->weight);
if (idx < weight_so_far + a->weight) {
double p = (idx - weight_so_far) / a->weight;
return left * (1 - p) + right * p;
}
weight_so_far += a->weight;
}
left = right;
a = b;
right = t->max;
if (idx < weight_so_far + a->weight) {
double p = (idx - weight_so_far) / a->weight;
return left * (1 - p) + right * p;
}
return t->max;
}
void tdigestMerge(TDigest *t1, TDigest *t2) {
// SPoints
int32_t num_pts = t2->num_buffered_pts;
for(int32_t i = num_pts - 1; i >= 0; i--) {
SPt* p = t2->buffered_pts + i;
tdigestAdd(t1, p->value, p->weight);
t2->num_buffered_pts --;
}
// centroids
for (int32_t i = 0; i < t2->num_centroids; i++) {
tdigestAdd(t1, t2->centroids[i].mean, t2->centroids[i].weight);
}
}
...@@ -24,6 +24,7 @@ ENDIF() ...@@ -24,6 +24,7 @@ ENDIF()
SET_SOURCE_FILES_PROPERTIES(./astTest.cpp PROPERTIES COMPILE_FLAGS -w) SET_SOURCE_FILES_PROPERTIES(./astTest.cpp PROPERTIES COMPILE_FLAGS -w)
SET_SOURCE_FILES_PROPERTIES(./histogramTest.cpp PROPERTIES COMPILE_FLAGS -w) SET_SOURCE_FILES_PROPERTIES(./histogramTest.cpp PROPERTIES COMPILE_FLAGS -w)
SET_SOURCE_FILES_PROPERTIES(./percentileTest.cpp PROPERTIES COMPILE_FLAGS -w) SET_SOURCE_FILES_PROPERTIES(./percentileTest.cpp PROPERTIES COMPILE_FLAGS -w)
SET_SOURCE_FILES_PROPERTIES(./apercentileTest.cpp PROPERTIES COMPILE_FLAGS -w)
SET_SOURCE_FILES_PROPERTIES(./resultBufferTest.cpp PROPERTIES COMPILE_FLAGS -w) SET_SOURCE_FILES_PROPERTIES(./resultBufferTest.cpp PROPERTIES COMPILE_FLAGS -w)
SET_SOURCE_FILES_PROPERTIES(./tsBufTest.cpp PROPERTIES COMPILE_FLAGS -w) SET_SOURCE_FILES_PROPERTIES(./tsBufTest.cpp PROPERTIES COMPILE_FLAGS -w)
SET_SOURCE_FILES_PROPERTIES(./unitTest.cpp PROPERTIES COMPILE_FLAGS -w) SET_SOURCE_FILES_PROPERTIES(./unitTest.cpp PROPERTIES COMPILE_FLAGS -w)
......
#include <gtest/gtest.h>
#include <iostream>
#include "qResultbuf.h"
#include "taos.h"
#include "taosdef.h"
#include "assert.h"
#include "qHistogram.h"
#pragma GCC diagnostic ignored "-Wunused-function"
#pragma GCC diagnostic ignored "-Wunused-variable"
extern "C" {
#include "tdigest.h"
#include "qHistogram.h"
}
namespace {
enum {
TEST_DATA_TYPE_INT = 0,
TEST_DATA_TYPE_BIGINT,
TEST_DATA_TYPE_FLOAT,
TEST_DATA_TYPE_DOUBLE
};
enum {
TEST_DATA_MODE_SEQ = 0,
TEST_DATA_MODE_DSEQ,
TEST_DATA_MODE_RAND_PER,
TEST_DATA_MODE_RAND_LIMIT,
};
void tdigest_init(TDigest **pTDigest) {
void *tmp = calloc(1, (size_t)(TDIGEST_SIZE(COMPRESSION)));
*pTDigest = tdigestNewFrom(tmp, COMPRESSION);
}
void thistogram_init(SHistogramInfo **pHisto) {
void *tmp = calloc(1, (int16_t)(sizeof(SHistBin) * (MAX_HISTOGRAM_BIN + 1) + sizeof(SHistogramInfo)));
*pHisto = tHistogramCreateFrom(tmp, MAX_HISTOGRAM_BIN);
}
static FORCE_INLINE int64_t testGetTimestampUs() {
struct timeval systemTime;
gettimeofday(&systemTime, NULL);
return (int64_t)systemTime.tv_sec * 1000000L + (int64_t)systemTime.tv_usec;
}
double * thistogram_end(SHistogramInfo* pHisto, double* ratio, int32_t num){
assert(pHisto->numOfElems > 0);
double ratio2 = *ratio * 100;
return tHistogramUniform(pHisto, &ratio2, 1);
}
void setTestData(void *data, int64_t idx, int32_t type, int64_t value) {
switch (type) {
case TEST_DATA_TYPE_INT:
*((int32_t*)data + idx) = (int32_t)value;
break;
case TEST_DATA_TYPE_BIGINT:
*((int64_t*)data + idx) = (int64_t)value;
break;
case TEST_DATA_TYPE_FLOAT:
*((float*)data + idx) = (float)value;
break;
case TEST_DATA_TYPE_DOUBLE:
*((double*)data + idx) = (double)value;
break;
default:
assert(0);
}
}
void addDTestData(void *data, int64_t idx, int32_t type, TDigest* pTDigest) {
switch (type) {
case TEST_DATA_TYPE_INT:
tdigestAdd(pTDigest, (double)*((int32_t*)data + idx), 1);
break;
case TEST_DATA_TYPE_BIGINT:
tdigestAdd(pTDigest, (double)*((int64_t*)data + idx), 1);
break;
case TEST_DATA_TYPE_FLOAT:
tdigestAdd(pTDigest, (double)*((float*)data + idx), 1);
break;
case TEST_DATA_TYPE_DOUBLE:
tdigestAdd(pTDigest, (double)*((double*)data + idx), 1);
break;
default:
assert(0);
}
}
void addHTestData(void *data, int64_t idx, int32_t type, SHistogramInfo *pHisto) {
switch (type) {
case TEST_DATA_TYPE_INT:
tHistogramAdd(&pHisto, (double)*((int32_t*)data + idx));
break;
case TEST_DATA_TYPE_BIGINT:
tHistogramAdd(&pHisto, (double)*((int64_t*)data + idx));
break;
case TEST_DATA_TYPE_FLOAT:
tHistogramAdd(&pHisto, (double)*((float*)data + idx));
break;
case TEST_DATA_TYPE_DOUBLE:
tHistogramAdd(&pHisto, (double)*((double*)data + idx));
break;
default:
assert(0);
}
}
void initTestData(void **data, int32_t type, int64_t num, int32_t mode, int32_t randPar) {
int32_t tsize[] = {4, 8, 4, 8};
*data = malloc(num * tsize[type]);
switch (mode) {
case TEST_DATA_MODE_SEQ:
for (int64_t i = 0; i < num; ++i) {
setTestData(*data, i, type, i);
}
break;
case TEST_DATA_MODE_DSEQ:
for (int64_t i = 0; i < num; ++i) {
setTestData(*data, i, type, num - i);
}
break;
case TEST_DATA_MODE_RAND_PER: {
srand(time(NULL));
int64_t randMax = num * randPar / 100;
if (randMax == 0) {
for (int64_t i = 0; i < num; ++i) {
setTestData(*data, i, type, rand());
}
} else {
for (int64_t i = 0; i < num; ++i) {
setTestData(*data, i, type, rand() % randMax);
}
}
}
break;
case TEST_DATA_MODE_RAND_LIMIT:
srand(time(NULL));
for (int64_t i = 0; i < num; ++i) {
setTestData(*data, i, type, rand() % randPar);
}
break;
default:
assert(0);
}
}
void tdigestTest() {
printf("running %s\n", __FUNCTION__);
TDigest *pTDigest = NULL;
void *data = NULL;
SHistogramInfo *pHisto = NULL;
double ratio = 0.5;
int64_t totalNum[] = {100,10000,10000000};
int32_t numTimes = sizeof(totalNum)/sizeof(totalNum[0]);
int64_t biggestNum = totalNum[numTimes - 1];
int32_t unitNum[] = {1,10,100,1000,5000,10000,100000};
int32_t unitTimes = sizeof(unitNum)/sizeof(unitNum[0]);
int32_t dataMode[] = {TEST_DATA_MODE_SEQ, TEST_DATA_MODE_DSEQ, TEST_DATA_MODE_RAND_PER, TEST_DATA_MODE_RAND_LIMIT};
int32_t modeTimes = sizeof(dataMode)/sizeof(dataMode[0]);
int32_t dataTypes[] = {TEST_DATA_TYPE_INT, TEST_DATA_TYPE_BIGINT, TEST_DATA_TYPE_FLOAT, TEST_DATA_TYPE_DOUBLE};
int32_t typeTimes = sizeof(dataTypes)/sizeof(dataTypes[0]);
int32_t randPers[] = {0, 1, 10, 50, 90};
int32_t randPTimes = sizeof(randPers)/sizeof(randPers[0]);
int32_t randLimits[] = {10, 50, 100, 1000, 10000};
int32_t randLTimes = sizeof(randLimits)/sizeof(randLimits[0]);
double useTime[2][10][10][10][10] = {0.0};
for (int32_t i = 0; i < modeTimes; ++i) {
if (dataMode[i] == TEST_DATA_MODE_RAND_PER) {
for (int32_t p = 0; p < randPTimes; ++p) {
for (int32_t j = 0; j < typeTimes; ++j) {
initTestData(&data, dataTypes[j], biggestNum, dataMode[i], randPers[p]);
for (int32_t m = 0; m < numTimes; ++m) {
int64_t startu = testGetTimestampUs();
tdigest_init(&pTDigest);
for (int64_t n = 0; n < totalNum[m]; ++n) {
addDTestData(data, n, dataTypes[j], pTDigest);
}
double res = tdigestQuantile(pTDigest, ratio);
free(pTDigest);
useTime[0][i][j][m][p] = ((double)(testGetTimestampUs() - startu))/1000;
printf("DMode:%d,Type:%d,Num:%"PRId64",randP:%d,Used:%fms\tRES:%f\n", dataMode[i], dataTypes[j], totalNum[m], randPers[p], useTime[0][i][j][m][p], res);
startu = testGetTimestampUs();
thistogram_init(&pHisto);
for (int64_t n = 0; n < totalNum[m]; ++n) {
addHTestData(data, n, dataTypes[j], pHisto);
}
double *res2 = thistogram_end(pHisto, &ratio, 1);
free(pHisto);
useTime[1][i][j][m][p] = ((double)(testGetTimestampUs() - startu))/1000;
printf("HMode:%d,Type:%d,Num:%"PRId64",randP:%d,Used:%fms\tRES:%f\n", dataMode[i], dataTypes[j], totalNum[m], randPers[p], useTime[1][i][j][m][p], *res2);
}
free(data);
}
}
} else if (dataMode[i] == TEST_DATA_MODE_RAND_LIMIT) {
for (int32_t p = 0; p < randLTimes; ++p) {
for (int32_t j = 0; j < typeTimes; ++j) {
initTestData(&data, dataTypes[j], biggestNum, dataMode[i], randLimits[p]);
for (int64_t m = 0; m < numTimes; ++m) {
int64_t startu = testGetTimestampUs();
tdigest_init(&pTDigest);
for (int64_t n = 0; n < totalNum[m]; ++n) {
addDTestData(data, m, dataTypes[j], pTDigest);
}
double res = tdigestQuantile(pTDigest, ratio);
free(pTDigest);
useTime[0][i][j][m][p] = ((double)(testGetTimestampUs() - startu))/1000;
printf("DMode:%d,Type:%d,Num:%"PRId64",randL:%d,Used:%fms\tRES:%f\n", dataMode[i], dataTypes[j], totalNum[m], randLimits[p], useTime[0][i][j][m][p], res);
startu = testGetTimestampUs();
thistogram_init(&pHisto);
for (int64_t n = 0; n < totalNum[m]; ++n) {
addHTestData(data, n, dataTypes[j], pHisto);
}
double* res2 = thistogram_end(pHisto, &ratio, 1);
free(pHisto);
useTime[1][i][j][m][p] = ((double)(testGetTimestampUs() - startu))/1000;
printf("HMode:%d,Type:%d,Num:%"PRId64",randL:%d,Used:%fms\tRES:%f\n", dataMode[i], dataTypes[j], totalNum[m], randLimits[p], useTime[1][i][j][m][p], *res2);
}
free(data);
}
}
} else {
for (int32_t j = 0; j < typeTimes; ++j) {
initTestData(&data, dataTypes[j], biggestNum, dataMode[i], 0);
for (int64_t m = 0; m < numTimes; ++m) {
int64_t startu = testGetTimestampUs();
tdigest_init(&pTDigest);
for (int64_t n = 0; n < totalNum[m]; ++n) {
addDTestData(data, n, dataTypes[j], pTDigest);
}
double res = tdigestQuantile(pTDigest, ratio);
free(pTDigest);
useTime[0][i][j][m][0] = ((double)(testGetTimestampUs() - startu))/1000;
printf("DMode:%d,Type:%d,Num:%"PRId64",Used:%fms\tRES:%f\n", dataMode[i], dataTypes[j], totalNum[m], useTime[0][i][j][m][0], res);
startu = testGetTimestampUs();
thistogram_init(&pHisto);
for (int64_t n = 0; n < totalNum[m]; ++n) {
addHTestData(data, n, dataTypes[j], pHisto);
}
double* res2 = thistogram_end(pHisto, &ratio, 1);
free(pHisto);
useTime[1][i][j][m][0] = ((double)(testGetTimestampUs() - startu))/1000;
printf("HMode:%d,Type:%d,Num:%"PRId64",Used:%fms\tRES:%f\n", dataMode[i], dataTypes[j], totalNum[m], useTime[1][i][j][m][0], *res2);
}
free(data);
}
}
}
printf("\n\n");
for (int32_t i = 0; i < modeTimes; ++i) {
if (dataMode[i] == TEST_DATA_MODE_RAND_PER) {
for (int32_t p = 0; p < randPTimes; ++p) {
for (int32_t j = 0; j < typeTimes; ++j) {
printf("DMode:%d,Type:%d,randP:%d -", dataMode[i], dataTypes[j], randPers[p]);
for (int32_t m = 0; m < numTimes; ++m) {
printf(" %d:%f", totalNum[m], useTime[0][i][j][m][p]);
}
printf("\n");
printf("HMode:%d,Type:%d,randP:%d -", dataMode[i], dataTypes[j], randPers[p]);
for (int32_t m = 0; m < numTimes; ++m) {
printf(" %d:%f", totalNum[m], useTime[1][i][j][m][p]);
}
printf("\n");
}
}
} else if (dataMode[i] == TEST_DATA_MODE_RAND_LIMIT) {
for (int32_t p = 0; p < randLTimes; ++p) {
for (int32_t j = 0; j < typeTimes; ++j) {
printf("DMode:%d,Type:%d,randL:%d -", dataMode[i], dataTypes[j], randLimits[p]);
for (int64_t m = 0; m < numTimes; ++m) {
printf(" %d:%f", totalNum[m], useTime[0][i][j][m][p]);
}
printf("\n");
printf("HMode:%d,Type:%d,randL:%d -", dataMode[i], dataTypes[j], randLimits[p]);
for (int64_t m = 0; m < numTimes; ++m) {
printf(" %d:%f", totalNum[m], useTime[1][i][j][m][p]);
}
printf("\n");
}
}
} else {
for (int32_t j = 0; j < typeTimes; ++j) {
printf("DMode:%d,Type:%d -", dataMode[i], dataTypes[j]);
for (int64_t m = 0; m < numTimes; ++m) {
printf(" %d:%f", totalNum[m], useTime[0][i][j][m][0]);
}
printf("\n");
printf("HMode:%d,Type:%d -", dataMode[i], dataTypes[j]);
for (int64_t m = 0; m < numTimes; ++m) {
printf(" %d:%f", totalNum[m], useTime[1][i][j][m][0]);
}
printf("\n");
}
}
}
}
} // namespace
TEST(testCase, apercentileTest) {
tdigestTest();
}
################################################################### ###################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc. # Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved. # All rights reserved.
# #
...@@ -184,7 +184,11 @@ class TDSql: ...@@ -184,7 +184,11 @@ class TDSql:
if self.queryResult[row][col] != data: if self.queryResult[row][col] != data:
if self.cursor.istype(col, "TIMESTAMP"): if self.cursor.istype(col, "TIMESTAMP"):
# suppose user want to check nanosecond timestamp if a longer data passed # suppose user want to check nanosecond timestamp if a longer data passed
if (len(data) >= 28): if isinstance(data, int) or isinstance(data, float):
if pd.to_datetime(self.queryResult[row][col]) == pd.to_datetime(data):
tdLog.info("sql:%s, row:%d col:%d data:%d == expect:%s" %
(self.sql, row, col, self.queryResult[row][col], data))
elif (len(data) >= 28):
if pd.to_datetime(self.queryResult[row][col]) == pd.to_datetime(data): if pd.to_datetime(self.queryResult[row][col]) == pd.to_datetime(data):
tdLog.info("sql:%s, row:%d col:%d data:%d == expect:%s" % tdLog.info("sql:%s, row:%d col:%d data:%d == expect:%s" %
(self.sql, row, col, self.queryResult[row][col], data)) (self.sql, row, col, self.queryResult[row][col], data))
...@@ -223,6 +227,43 @@ class TDSql: ...@@ -223,6 +227,43 @@ class TDSql:
tdLog.info("sql:%s, row:%d col:%d data:%s == expect:%d" % tdLog.info("sql:%s, row:%d col:%d data:%s == expect:%d" %
(self.sql, row, col, self.queryResult[row][col], data)) (self.sql, row, col, self.queryResult[row][col], data))
def checkDeviaRation(self, row, col, data, deviation=0.001):
self.checkRowCol(row, col)
if data is None:
self.checkData(row, col, None)
return
caller = inspect.getframeinfo(inspect.stack()[1][0])
if data is not None and len(self.queryResult)==0:
tdLog.exit(f"{caller.filename}({caller.lineno}) failed: sql:{self.sql}, data:{data}, "
f"expect result is not None but it is")
args = (
caller.filename, caller.lineno, self.sql, data, type(data),
deviation, type(deviation), self.queryResult[row][col], type(self.queryResult[row][col])
)
if not(isinstance(data,int) or isinstance(data, float)):
tdLog.exit(f"{args[0]}({args[1]}) failed: sql:{args[2]}, data:{args[3]}, "
f"expect type: int or float, actual type: {args[4]}")
if not(isinstance(deviation,int) or isinstance(deviation, float)) or type(data)==type(True):
tdLog.exit(f"{args[0]}({args[1]}) failed: sql:{args[2]}, deviation:{args[5]}, "
f"expect type: int or float, actual type: {args[6]}")
if not(isinstance(self.queryResult[row][col], int) or isinstance(self.queryResult[row][col], float)):
tdLog.exit(f"{args[0]}({args[1]}) failed: sql:{args[2]}, result:{args[7]}, "
f"expect type: int or float, actual type: {args[8]}")
if data == 0:
devia = abs(self.queryResult[row][col])
else:
devia = abs((data - self.queryResult[row][col])/data)
if devia <= deviation:
tdLog.info(f"sql:{args[2]}, row:{row}, col:{col}, result data:{args[7]}, expect data:{args[3]}, "
f"actual deviation:{devia} <= expect deviation:{args[5]}")
else:
tdLog.exit(f"{args[0]}({args[1]}) failed: sql:{args[2]}, row:{row}, col:{col}, "
f"result data:{args[7]}, expect data:{args[3]},"
f"actual deviation:{devia} > expect deviation:{args[5]}")
pass
def getData(self, row, col): def getData(self, row, col):
self.checkRowCol(row, col) self.checkRowCol(row, col)
return self.queryResult[row][col] return self.queryResult[row][col]
......
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/cfg.sh -n dnode1 -c walLevel -v 1
system sh/cfg.sh -n dnode1 -c maxtablespervnode -v 4
system sh/cfg.sh -n dnode1 -c cache -v 1
system sh/exec.sh -n dnode1 -s start
sleep 100
sql connect
sql drop database if exists cdb
sql create database if not exists cdb
sql use cdb
sql create table stb4 (ts timestamp, c1 int, c2 float, c3 bigint, c4 smallint, c5 tinyint, c6 double, c7 bool, c8 binary(10), c9 nchar(9),c10 binary(16300)) TAGS(t1 int, t2 binary(10), t3 double)
sql create table tb4_0 using stb4 tags(0,'0',0.0)
sql create table tb4_1 using stb4 tags(1,'1',1.0)
sql create table tb4_2 using stb4 tags(2,'2',2.0)
sql create table tb4_3 using stb4 tags(3,'3',3.0)
sql create table tb4_4 using stb4 tags(4,'4',4.0)
$i = 0
$ts0 = 1625850000000
$blockNum = 5
$delta = 0
$tbname0 = tb4_
$a = 0
$b = 200
$c = 400
while $i < $blockNum
$x = 0
$rowNum = 200
while $x < $rowNum
$ts = $ts0 + $x
$a = $a + 1
$b = $b + 1
$c = $c + 1
$d = $x / 10
$tin = $rowNum
$binary = 'binary . $c
$binary = $binary . '
$nchar = 'nchar . $c
$nchar = $nchar . '
$tbname = 'tb4_ . $i
$tbname = $tbname . '
sql insert into $tbname values ( $ts , $a , $b , $c , $d , $d , $c , true, $binary , $nchar , $binary )
$x = $x + 1
endw
$i = $i + 1
$ts0 = $ts0 + 259200000
endw
sleep 100
sql connect
sql use cdb;
sql_error select apercentile(c1,101,1) from stb4 group by tbname;
sql_error select apercentile(c1,100,2) from stb4 group by tbname;
sql_error select apercentile(c1,52.111111111111,1,1) from stb4 group by tbname ;
sql select apercentile(c1,90,0) from stb4 group by tbname;
if $rows != 5 then
return -1
endi
if $data00 != @180.000000000@ then
return -1
endi
if $data10 != @380.000000000@ then
return -1
endi
if $data20 != @580.000000000@ then
return -1
endi
if $data30 != @780.000000000@ then
return -1
endi
if $data40 != @980.000000000@ then
return -1
endi
sql select apercentile(c1,90,1) from stb4 group by tbname;
if $rows != 5 then
return -1
endi
if $data00 != @180.500000000@ then
return -1
endi
if $data10 != @380.500000000@ then
return -1
endi
if $data20 != @580.500000000@ then
return -1
endi
if $data30 != @780.500000000@ then
return -1
endi
if $data40 != @980.500000000@ then
return -1
endi
sql select apercentile(c1,1,0) from stb4 group by tbname;
if $rows != 5 then
return -1
endi
if $data00 != @2.000000000@ then
return -1
endi
if $data10 != @202.000000000@ then
return -1
endi
if $data20 != @402.000000000@ then
return -1
endi
if $data30 != @602.000000000@ then
return -1
endi
if $data40 != @802.000000000@ then
return -1
endi
sql select apercentile(c1,1,1) from stb4 group by tbname;
if $rows != 5 then
return -1
endi
if $data00 != @2.500000000@ then
return -1
endi
if $data10 != @202.500000000@ then
return -1
endi
if $data20 != @402.500000000@ then
return -1
endi
if $data30 != @602.500000000@ then
return -1
endi
if $data40 != @802.500000000@ then
return -1
endi
sql select apercentile(c1,52.111111111111,0) from stb4 group by tbname;
if $rows != 5 then
return -1
endi
if $data00 != @104.222222222@ then
return -1
endi
if $data10 != @304.222222222@ then
return -1
endi
if $data20 != @504.222222222@ then
return -1
endi
if $data30 != @704.222222222@ then
return -1
endi
if $data40 != @904.222222222@ then
return -1
endi
sql select apercentile(c1,52.111111111111) from stb4 group by tbname;
if $rows != 5 then
return -1
endi
if $data00 != @104.222222222@ then
return -1
endi
if $data10 != @304.222222222@ then
return -1
endi
if $data20 != @504.222222222@ then
return -1
endi
if $data30 != @704.222222222@ then
return -1
endi
if $data40 != @904.222222222@ then
return -1
endi
sql select apercentile(c1,52.111111111111,1) from stb4 group by tbname;
if $rows != 5 then
return -1
endi
if $data00 != @104.722222222@ then
return -1
endi
if $data10 != @304.722222222@ then
return -1
endi
if $data20 != @504.722222222@ then
return -1
endi
if $data30 != @704.722222222@ then
return -1
endi
if $data40 != @904.722222222@ then
return -1
endi
sql select apercentile(c1,52.111111111111,1) from tb4_0;
if $rows != 1 then
return -1
endi
if $data00 != @104.722222222@ then
return -1
endi
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册