You need to sign in or sign up before continuing.
提交 17d698c1 编写于 作者: A AlexDuan

first success runing ok

上级 606455a9
/*
* include/tdigest.c
*
* Copyright (c) 2016, Usman Masood <usmanm at fastmail dot fm>
*/
#ifndef TDIGEST_H
#define TDIGEST_H
#define DEFAULT_COMPRESSION 400
typedef struct Centroid {
long long weight;
double mean;
}Centroid;
typedef struct Point {
double value;
long long weight;
struct Point *next;
}Point;
typedef struct TDigest {
double compression;
int threshold;
long long size;
long long total_weight;
double min;
double max;
int num_buffered_pts;
Point *buffered_pts;
int num_centroids;
Centroid *centroids;
}TDigest;
extern struct TDigest *tdigestNew(int compression);
extern void tdigestAdd(struct TDigest *t, double x, long long w);
extern void tdigestMerge(struct TDigest *t1, struct TDigest *t2);
extern double tdigestCDF(struct TDigest *t, double x);
extern double tdigestQuantile(struct TDigest *t, double q);
extern void tdigestCompress(struct TDigest *t);
extern void tdigestFree(struct TDigest *t);
#endif /* TDIGEST_H */
...@@ -16,23 +16,24 @@ ...@@ -16,23 +16,24 @@
#include "os.h" #include "os.h"
#include "taosdef.h" #include "taosdef.h"
#include "taosmsg.h" #include "taosmsg.h"
#include "tdigest.h"
#include "texpr.h" #include "texpr.h"
#include "ttype.h"
#include "tsdb.h"
#include "tglobal.h" #include "tglobal.h"
#include "tsdb.h"
#include "ttype.h"
#include "qAggMain.h" #include "qAggMain.h"
#include "qFill.h" #include "qFill.h"
#include "qHistogram.h" #include "qHistogram.h"
#include "qPercentile.h" #include "qPercentile.h"
#include "qTsbuf.h" #include "qTsbuf.h"
#include "queryLog.h"
#include "qUdf.h" #include "qUdf.h"
#include "queryLog.h"
#define GET_INPUT_DATA_LIST(x) ((char *)((x)->pInput)) #define GET_INPUT_DATA_LIST(x) ((char *)((x)->pInput))
#define GET_INPUT_DATA(x, y) (GET_INPUT_DATA_LIST(x) + (y) * (x)->inputBytes) #define GET_INPUT_DATA(x, y) (GET_INPUT_DATA_LIST(x) + (y) * (x)->inputBytes)
#define GET_TS_LIST(x) ((TSKEY*)((x)->ptsList)) #define GET_TS_LIST(x) ((TSKEY *)((x)->ptsList))
#define GET_TS_DATA(x, y) (GET_TS_LIST(x)[(y)]) #define GET_TS_DATA(x, y) (GET_TS_LIST(x)[(y)])
#define GET_TRUE_DATA_TYPE() \ #define GET_TRUE_DATA_TYPE() \
...@@ -145,6 +146,7 @@ typedef struct SLeastsquaresInfo { ...@@ -145,6 +146,7 @@ typedef struct SLeastsquaresInfo {
typedef struct SAPercentileInfo { typedef struct SAPercentileInfo {
SHistogramInfo *pHisto; SHistogramInfo *pHisto;
TDigest* pTDigest;
} SAPercentileInfo; } SAPercentileInfo;
typedef struct STSCompInfo { typedef struct STSCompInfo {
...@@ -164,19 +166,19 @@ typedef struct SRateInfo { ...@@ -164,19 +166,19 @@ typedef struct SRateInfo {
typedef struct SDerivInfo { typedef struct SDerivInfo {
double prevValue; // previous value double prevValue; // previous value
TSKEY prevTs; // previous timestamp TSKEY prevTs; // previous timestamp
bool ignoreNegative;// ignore the negative value bool ignoreNegative; // ignore the negative value
int64_t tsWindow; // time window for derivative int64_t tsWindow; // time window for derivative
bool valueSet; // the value has been set already bool valueSet; // the value has been set already
} SDerivInfo; } SDerivInfo;
int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionId, int32_t param, int16_t *type, int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionId, int32_t param, int16_t *type,
int16_t *bytes, int32_t *interBytes, int16_t extLength, bool isSuperTable, SUdfInfo* pUdfInfo) { int16_t *bytes, int32_t *interBytes, int16_t extLength, bool isSuperTable,
SUdfInfo *pUdfInfo) {
if (!isValidDataType(dataType)) { if (!isValidDataType(dataType)) {
qError("Illegal data type %d or data type length %d", dataType, dataBytes); qError("Illegal data type %d or data type length %d", dataType, dataBytes);
return TSDB_CODE_TSC_INVALID_OPERATION; return TSDB_CODE_TSC_INVALID_OPERATION;
} }
if (functionId == TSDB_FUNC_TS || functionId == TSDB_FUNC_TS_DUMMY || functionId == TSDB_FUNC_TAG_DUMMY || if (functionId == TSDB_FUNC_TS || functionId == TSDB_FUNC_TS_DUMMY || functionId == TSDB_FUNC_TAG_DUMMY ||
functionId == TSDB_FUNC_DIFF || functionId == TSDB_FUNC_PRJ || functionId == TSDB_FUNC_TAGPRJ || functionId == TSDB_FUNC_DIFF || functionId == TSDB_FUNC_PRJ || functionId == TSDB_FUNC_TAGPRJ ||
functionId == TSDB_FUNC_TAG || functionId == TSDB_FUNC_INTERP) { functionId == TSDB_FUNC_TAG || functionId == TSDB_FUNC_INTERP) {
...@@ -195,7 +197,8 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI ...@@ -195,7 +197,8 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI
// (uid, tid) + VGID + TAGSIZE + VARSTR_HEADER_SIZE // (uid, tid) + VGID + TAGSIZE + VARSTR_HEADER_SIZE
if (functionId == TSDB_FUNC_TID_TAG) { // todo use struct if (functionId == TSDB_FUNC_TID_TAG) { // todo use struct
*type = TSDB_DATA_TYPE_BINARY; *type = TSDB_DATA_TYPE_BINARY;
*bytes = (int16_t)(dataBytes + sizeof(int16_t) + sizeof(int64_t) + sizeof(int32_t) + sizeof(int32_t) + VARSTR_HEADER_SIZE); *bytes = (int16_t)(dataBytes + sizeof(int16_t) + sizeof(int64_t) + sizeof(int32_t) + sizeof(int32_t) +
VARSTR_HEADER_SIZE);
*interBytes = 0; *interBytes = 0;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -320,8 +323,7 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI ...@@ -320,8 +323,7 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI
} else if (functionId == TSDB_FUNC_APERCT) { } else if (functionId == TSDB_FUNC_APERCT) {
*type = TSDB_DATA_TYPE_DOUBLE; *type = TSDB_DATA_TYPE_DOUBLE;
*bytes = sizeof(double); *bytes = sizeof(double);
*interBytes = *interBytes = sizeof(SAPercentileInfo) + sizeof(SHistogramInfo) + sizeof(SHistBin) * (MAX_HISTOGRAM_BIN + 1);
sizeof(SAPercentileInfo) + sizeof(SHistogramInfo) + sizeof(SHistBin) * (MAX_HISTOGRAM_BIN + 1);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} else if (functionId == TSDB_FUNC_TWA) { } else if (functionId == TSDB_FUNC_TWA) {
*type = TSDB_DATA_TYPE_DOUBLE; *type = TSDB_DATA_TYPE_DOUBLE;
...@@ -404,9 +406,9 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI ...@@ -404,9 +406,9 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI
} }
// TODO use hash table // TODO use hash table
int32_t isValidFunction(const char* name, int32_t len) { int32_t isValidFunction(const char *name, int32_t len) {
for(int32_t i = 0; i <= TSDB_FUNC_BLKINFO; ++i) { for (int32_t i = 0; i <= TSDB_FUNC_BLKINFO; ++i) {
int32_t nameLen = (int32_t) strlen(aAggs[i].name); int32_t nameLen = (int32_t)strlen(aAggs[i].name);
if (len != nameLen) { if (len != nameLen) {
continue; continue;
} }
...@@ -419,7 +421,7 @@ int32_t isValidFunction(const char* name, int32_t len) { ...@@ -419,7 +421,7 @@ int32_t isValidFunction(const char* name, int32_t len) {
return -1; return -1;
} }
static bool function_setup(SQLFunctionCtx *pCtx, SResultRowCellInfo* pResultInfo) { static bool function_setup(SQLFunctionCtx *pCtx, SResultRowCellInfo *pResultInfo) {
if (pResultInfo->initialized) { if (pResultInfo->initialized) {
return false; return false;
} }
...@@ -470,7 +472,8 @@ static void count_function(SQLFunctionCtx *pCtx) { ...@@ -470,7 +472,8 @@ static void count_function(SQLFunctionCtx *pCtx) {
numOfElem += 1; numOfElem += 1;
} }
} else { } else {
//when counting on the primary time stamp column and no statistics data is presented, use the size value directly. // when counting on the primary time stamp column and no statistics data is presented, use the size value
// directly.
numOfElem = pCtx->size; numOfElem = pCtx->size;
} }
} }
...@@ -501,7 +504,7 @@ static void count_func_merge(SQLFunctionCtx *pCtx) { ...@@ -501,7 +504,7 @@ static void count_func_merge(SQLFunctionCtx *pCtx) {
* @param filterCols * @param filterCols
* @return * @return
*/ */
int32_t countRequired(SQLFunctionCtx *pCtx, STimeWindow* w, int32_t colId) { int32_t countRequired(SQLFunctionCtx *pCtx, STimeWindow *w, int32_t colId) {
if (colId == PRIMARYKEY_TIMESTAMP_COL_INDEX) { if (colId == PRIMARYKEY_TIMESTAMP_COL_INDEX) {
return BLK_DATA_NO_NEEDED; return BLK_DATA_NO_NEEDED;
} else { } else {
...@@ -509,9 +512,7 @@ int32_t countRequired(SQLFunctionCtx *pCtx, STimeWindow* w, int32_t colId) { ...@@ -509,9 +512,7 @@ int32_t countRequired(SQLFunctionCtx *pCtx, STimeWindow* w, int32_t colId) {
} }
} }
int32_t noDataRequired(SQLFunctionCtx *pCtx, STimeWindow* w, int32_t colId) { int32_t noDataRequired(SQLFunctionCtx *pCtx, STimeWindow *w, int32_t colId) { return BLK_DATA_NO_NEEDED; }
return BLK_DATA_NO_NEEDED;
}
#define LIST_ADD_N_DOUBLE_FLOAT(x, ctx, p, t, numOfElem, tsdbType) \ #define LIST_ADD_N_DOUBLE_FLOAT(x, ctx, p, t, numOfElem, tsdbType) \
do { \ do { \
t *d = (t *)(p); \ t *d = (t *)(p); \
...@@ -519,10 +520,10 @@ int32_t noDataRequired(SQLFunctionCtx *pCtx, STimeWindow* w, int32_t colId) { ...@@ -519,10 +520,10 @@ int32_t noDataRequired(SQLFunctionCtx *pCtx, STimeWindow* w, int32_t colId) {
if (((ctx)->hasNull) && isNull((char *)&(d)[i], tsdbType)) { \ if (((ctx)->hasNull) && isNull((char *)&(d)[i], tsdbType)) { \
continue; \ continue; \
}; \ }; \
SET_DOUBLE_VAL(&(x) , GET_DOUBLE_VAL(&(x)) + GET_FLOAT_VAL(&(d)[i])); \ SET_DOUBLE_VAL(&(x), GET_DOUBLE_VAL(&(x)) + GET_FLOAT_VAL(&(d)[i])); \
(numOfElem)++; \ (numOfElem)++; \
} \ } \
} while(0) } while (0)
#define LIST_ADD_N_DOUBLE(x, ctx, p, t, numOfElem, tsdbType) \ #define LIST_ADD_N_DOUBLE(x, ctx, p, t, numOfElem, tsdbType) \
do { \ do { \
t *d = (t *)(p); \ t *d = (t *)(p); \
...@@ -530,10 +531,10 @@ int32_t noDataRequired(SQLFunctionCtx *pCtx, STimeWindow* w, int32_t colId) { ...@@ -530,10 +531,10 @@ int32_t noDataRequired(SQLFunctionCtx *pCtx, STimeWindow* w, int32_t colId) {
if (((ctx)->hasNull) && isNull((char *)&(d)[i], tsdbType)) { \ if (((ctx)->hasNull) && isNull((char *)&(d)[i], tsdbType)) { \
continue; \ continue; \
}; \ }; \
SET_DOUBLE_VAL(&(x) , (x) + (d)[i]); \ SET_DOUBLE_VAL(&(x), (x) + (d)[i]); \
(numOfElem)++; \ (numOfElem)++; \
} \ } \
} while(0) } while (0)
#define LIST_ADD_N(x, ctx, p, t, numOfElem, tsdbType) \ #define LIST_ADD_N(x, ctx, p, t, numOfElem, tsdbType) \
do { \ do { \
...@@ -545,7 +546,7 @@ int32_t noDataRequired(SQLFunctionCtx *pCtx, STimeWindow* w, int32_t colId) { ...@@ -545,7 +546,7 @@ int32_t noDataRequired(SQLFunctionCtx *pCtx, STimeWindow* w, int32_t colId) {
(x) += (d)[i]; \ (x) += (d)[i]; \
(numOfElem)++; \ (numOfElem)++; \
} \ } \
} while(0) } while (0)
#define UPDATE_DATA(ctx, left, right, num, sign, k) \ #define UPDATE_DATA(ctx, left, right, num, sign, k) \
do { \ do { \
...@@ -570,7 +571,7 @@ int32_t noDataRequired(SQLFunctionCtx *pCtx, STimeWindow* w, int32_t colId) { ...@@ -570,7 +571,7 @@ int32_t noDataRequired(SQLFunctionCtx *pCtx, STimeWindow* w, int32_t colId) {
if ((ctx)->hasNull && isNull((char *)&(list)[i], tsdbType)) { \ if ((ctx)->hasNull && isNull((char *)&(list)[i], tsdbType)) { \
continue; \ continue; \
} \ } \
TSKEY key = (ctx)->ptsList != NULL? GET_TS_DATA(ctx, i):0; \ TSKEY key = (ctx)->ptsList != NULL ? GET_TS_DATA(ctx, i) : 0; \
UPDATE_DATA(ctx, val, (list)[i], num, sign, key); \ UPDATE_DATA(ctx, val, (list)[i], num, sign, key); \
} }
...@@ -596,8 +597,8 @@ static void do_sum(SQLFunctionCtx *pCtx) { ...@@ -596,8 +597,8 @@ static void do_sum(SQLFunctionCtx *pCtx) {
uint64_t *retVal = (uint64_t *)pCtx->pOutput; uint64_t *retVal = (uint64_t *)pCtx->pOutput;
*retVal += (uint64_t)pCtx->preAggVals.statis.sum; *retVal += (uint64_t)pCtx->preAggVals.statis.sum;
} else if (IS_FLOAT_TYPE(pCtx->inputType)) { } else if (IS_FLOAT_TYPE(pCtx->inputType)) {
double *retVal = (double*) pCtx->pOutput; double *retVal = (double *)pCtx->pOutput;
SET_DOUBLE_VAL(retVal, *retVal + GET_DOUBLE_VAL((const char*)&(pCtx->preAggVals.statis.sum))); SET_DOUBLE_VAL(retVal, *retVal + GET_DOUBLE_VAL((const char *)&(pCtx->preAggVals.statis.sum)));
} }
} else { // computing based on the true data block } else { // computing based on the true data block
void *pData = GET_INPUT_DATA_LIST(pCtx); void *pData = GET_INPUT_DATA_LIST(pCtx);
...@@ -674,7 +675,7 @@ static void sum_func_merge(SQLFunctionCtx *pCtx) { ...@@ -674,7 +675,7 @@ static void sum_func_merge(SQLFunctionCtx *pCtx) {
if (IS_SIGNED_NUMERIC_TYPE(type)) { if (IS_SIGNED_NUMERIC_TYPE(type)) {
*(int64_t *)pCtx->pOutput += pInput->isum; *(int64_t *)pCtx->pOutput += pInput->isum;
} else if (IS_UNSIGNED_NUMERIC_TYPE(type)) { } else if (IS_UNSIGNED_NUMERIC_TYPE(type)) {
*(uint64_t *) pCtx->pOutput += pInput->usum; *(uint64_t *)pCtx->pOutput += pInput->usum;
} else { } else {
SET_DOUBLE_VAL((double *)pCtx->pOutput, *(double *)pCtx->pOutput + pInput->dsum); SET_DOUBLE_VAL((double *)pCtx->pOutput, *(double *)pCtx->pOutput + pInput->dsum);
} }
...@@ -688,16 +689,12 @@ static void sum_func_merge(SQLFunctionCtx *pCtx) { ...@@ -688,16 +689,12 @@ static void sum_func_merge(SQLFunctionCtx *pCtx) {
} }
} }
static int32_t statisRequired(SQLFunctionCtx *pCtx, STimeWindow* w, int32_t colId) { static int32_t statisRequired(SQLFunctionCtx *pCtx, STimeWindow *w, int32_t colId) { return BLK_DATA_STATIS_NEEDED; }
return BLK_DATA_STATIS_NEEDED;
}
static int32_t dataBlockRequired(SQLFunctionCtx *pCtx, STimeWindow* w, int32_t colId) { static int32_t dataBlockRequired(SQLFunctionCtx *pCtx, STimeWindow *w, int32_t colId) { return BLK_DATA_ALL_NEEDED; }
return BLK_DATA_ALL_NEEDED;
}
// todo: if column in current data block are null, opt for this case // todo: if column in current data block are null, opt for this case
static int32_t firstFuncRequired(SQLFunctionCtx *pCtx, STimeWindow* w, int32_t colId) { static int32_t firstFuncRequired(SQLFunctionCtx *pCtx, STimeWindow *w, int32_t colId) {
if (pCtx->order == TSDB_ORDER_DESC) { if (pCtx->order == TSDB_ORDER_DESC) {
return BLK_DATA_NO_NEEDED; return BLK_DATA_NO_NEEDED;
} }
...@@ -710,7 +707,7 @@ static int32_t firstFuncRequired(SQLFunctionCtx *pCtx, STimeWindow* w, int32_t c ...@@ -710,7 +707,7 @@ static int32_t firstFuncRequired(SQLFunctionCtx *pCtx, STimeWindow* w, int32_t c
} }
} }
static int32_t lastFuncRequired(SQLFunctionCtx *pCtx, STimeWindow* w, int32_t colId) { static int32_t lastFuncRequired(SQLFunctionCtx *pCtx, STimeWindow *w, int32_t colId) {
if (pCtx->order != pCtx->param[0].i64) { if (pCtx->order != pCtx->param[0].i64) {
return BLK_DATA_NO_NEEDED; return BLK_DATA_NO_NEEDED;
} }
...@@ -722,7 +719,7 @@ static int32_t lastFuncRequired(SQLFunctionCtx *pCtx, STimeWindow* w, int32_t co ...@@ -722,7 +719,7 @@ static int32_t lastFuncRequired(SQLFunctionCtx *pCtx, STimeWindow* w, int32_t co
} }
} }
static int32_t firstDistFuncRequired(SQLFunctionCtx *pCtx, STimeWindow* w, int32_t colId) { static int32_t firstDistFuncRequired(SQLFunctionCtx *pCtx, STimeWindow *w, int32_t colId) {
if (pCtx->order == TSDB_ORDER_DESC) { if (pCtx->order == TSDB_ORDER_DESC) {
return BLK_DATA_NO_NEEDED; return BLK_DATA_NO_NEEDED;
} }
...@@ -734,7 +731,7 @@ static int32_t firstDistFuncRequired(SQLFunctionCtx *pCtx, STimeWindow* w, int32 ...@@ -734,7 +731,7 @@ static int32_t firstDistFuncRequired(SQLFunctionCtx *pCtx, STimeWindow* w, int32
// the pCtx should be set to current Ctx and output buffer before call this function. Otherwise, pCtx->pOutput is // the pCtx should be set to current Ctx and output buffer before call this function. Otherwise, pCtx->pOutput is
// the previous windowRes output buffer, not current unloaded block. In this case, the following filter is invalid // the previous windowRes output buffer, not current unloaded block. In this case, the following filter is invalid
SFirstLastInfo *pInfo = (SFirstLastInfo*) (pCtx->pOutput + pCtx->inputBytes); SFirstLastInfo *pInfo = (SFirstLastInfo *)(pCtx->pOutput + pCtx->inputBytes);
if (pInfo->hasResult != DATA_SET_FLAG) { if (pInfo->hasResult != DATA_SET_FLAG) {
return BLK_DATA_ALL_NEEDED; return BLK_DATA_ALL_NEEDED;
} else { // data in current block is not earlier than current result } else { // data in current block is not earlier than current result
...@@ -742,7 +739,7 @@ static int32_t firstDistFuncRequired(SQLFunctionCtx *pCtx, STimeWindow* w, int32 ...@@ -742,7 +739,7 @@ static int32_t firstDistFuncRequired(SQLFunctionCtx *pCtx, STimeWindow* w, int32
} }
} }
static int32_t lastDistFuncRequired(SQLFunctionCtx *pCtx, STimeWindow* w, int32_t colId) { static int32_t lastDistFuncRequired(SQLFunctionCtx *pCtx, STimeWindow *w, int32_t colId) {
if (pCtx->order != pCtx->param[0].i64) { if (pCtx->order != pCtx->param[0].i64) {
return BLK_DATA_NO_NEEDED; return BLK_DATA_NO_NEEDED;
} }
...@@ -754,7 +751,7 @@ static int32_t lastDistFuncRequired(SQLFunctionCtx *pCtx, STimeWindow* w, int32_ ...@@ -754,7 +751,7 @@ static int32_t lastDistFuncRequired(SQLFunctionCtx *pCtx, STimeWindow* w, int32_
// the pCtx should be set to current Ctx and output buffer before call this function. Otherwise, pCtx->pOutput is // the pCtx should be set to current Ctx and output buffer before call this function. Otherwise, pCtx->pOutput is
// the previous windowRes output buffer, not current unloaded block. In this case, the following filter is invalid // the previous windowRes output buffer, not current unloaded block. In this case, the following filter is invalid
SFirstLastInfo *pInfo = (SFirstLastInfo*) (pCtx->pOutput + pCtx->inputBytes); SFirstLastInfo *pInfo = (SFirstLastInfo *)(pCtx->pOutput + pCtx->inputBytes);
if (pInfo->hasResult != DATA_SET_FLAG) { if (pInfo->hasResult != DATA_SET_FLAG) {
return BLK_DATA_ALL_NEEDED; return BLK_DATA_ALL_NEEDED;
} else { } else {
...@@ -775,7 +772,7 @@ static void avg_function(SQLFunctionCtx *pCtx) { ...@@ -775,7 +772,7 @@ static void avg_function(SQLFunctionCtx *pCtx) {
SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx);
SAvgInfo *pAvgInfo = (SAvgInfo *)GET_ROWCELL_INTERBUF(pResInfo); SAvgInfo *pAvgInfo = (SAvgInfo *)GET_ROWCELL_INTERBUF(pResInfo);
double *pVal = &pAvgInfo->sum; double * pVal = &pAvgInfo->sum;
if (pCtx->preAggVals.isSet) { // Pre-aggregation if (pCtx->preAggVals.isSet) { // Pre-aggregation
notNullElems = pCtx->size - pCtx->preAggVals.statis.numOfNull; notNullElems = pCtx->size - pCtx->preAggVals.statis.numOfNull;
...@@ -784,7 +781,7 @@ static void avg_function(SQLFunctionCtx *pCtx) { ...@@ -784,7 +781,7 @@ static void avg_function(SQLFunctionCtx *pCtx) {
if (IS_SIGNED_NUMERIC_TYPE(pCtx->inputType)) { if (IS_SIGNED_NUMERIC_TYPE(pCtx->inputType)) {
*pVal += pCtx->preAggVals.statis.sum; *pVal += pCtx->preAggVals.statis.sum;
} else if (IS_UNSIGNED_NUMERIC_TYPE(pCtx->inputType)) { } else if (IS_UNSIGNED_NUMERIC_TYPE(pCtx->inputType)) {
*pVal += (uint64_t) pCtx->preAggVals.statis.sum; *pVal += (uint64_t)pCtx->preAggVals.statis.sum;
} else if (pCtx->inputType == TSDB_DATA_TYPE_DOUBLE || pCtx->inputType == TSDB_DATA_TYPE_FLOAT) { } else if (pCtx->inputType == TSDB_DATA_TYPE_DOUBLE || pCtx->inputType == TSDB_DATA_TYPE_FLOAT) {
*pVal += GET_DOUBLE_VAL((const char *)&(pCtx->preAggVals.statis.sum)); *pVal += GET_DOUBLE_VAL((const char *)&(pCtx->preAggVals.statis.sum));
} }
...@@ -834,8 +831,8 @@ static void avg_function(SQLFunctionCtx *pCtx) { ...@@ -834,8 +831,8 @@ static void avg_function(SQLFunctionCtx *pCtx) {
static void avg_func_merge(SQLFunctionCtx *pCtx) { static void avg_func_merge(SQLFunctionCtx *pCtx) {
SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx);
double *sum = (double*) pCtx->pOutput; double *sum = (double *)pCtx->pOutput;
char *input = GET_INPUT_DATA_LIST(pCtx); char * input = GET_INPUT_DATA_LIST(pCtx);
for (int32_t i = 0; i < pCtx->size; ++i, input += pCtx->inputBytes) { for (int32_t i = 0; i < pCtx->size; ++i, input += pCtx->inputBytes) {
SAvgInfo *pInput = (SAvgInfo *)input; SAvgInfo *pInput = (SAvgInfo *)input;
...@@ -864,7 +861,7 @@ static void avg_finalizer(SQLFunctionCtx *pCtx) { ...@@ -864,7 +861,7 @@ static void avg_finalizer(SQLFunctionCtx *pCtx) {
return; return;
} }
SET_DOUBLE_VAL((double *)pCtx->pOutput,(*(double *)pCtx->pOutput) / *(int64_t *)GET_ROWCELL_INTERBUF(pResInfo)); SET_DOUBLE_VAL((double *)pCtx->pOutput, (*(double *)pCtx->pOutput) / *(int64_t *)GET_ROWCELL_INTERBUF(pResInfo));
} else { // this is the secondary merge, only in the secondary merge, the input type is TSDB_DATA_TYPE_BINARY } else { // this is the secondary merge, only in the secondary merge, the input type is TSDB_DATA_TYPE_BINARY
assert(IS_NUMERIC_TYPE(pCtx->inputType)); assert(IS_NUMERIC_TYPE(pCtx->inputType));
SAvgInfo *pAvgInfo = (SAvgInfo *)GET_ROWCELL_INTERBUF(pResInfo); SAvgInfo *pAvgInfo = (SAvgInfo *)GET_ROWCELL_INTERBUF(pResInfo);
...@@ -894,7 +891,7 @@ static void minMax_function(SQLFunctionCtx *pCtx, char *pOutput, int32_t isMin, ...@@ -894,7 +891,7 @@ static void minMax_function(SQLFunctionCtx *pCtx, char *pOutput, int32_t isMin,
return; return;
} }
void* tval = NULL; void * tval = NULL;
int16_t index = 0; int16_t index = 0;
if (isMin) { if (isMin) {
...@@ -913,9 +910,9 @@ static void minMax_function(SQLFunctionCtx *pCtx, char *pOutput, int32_t isMin, ...@@ -913,9 +910,9 @@ static void minMax_function(SQLFunctionCtx *pCtx, char *pOutput, int32_t isMin,
* *
* The following codes of 3 lines will be removed later. * The following codes of 3 lines will be removed later.
*/ */
// if (index < 0 || index >= pCtx->size + pCtx->startOffset) { // if (index < 0 || index >= pCtx->size + pCtx->startOffset) {
// index = 0; // index = 0;
// } // }
// the index is the original position, not the relative position // the index is the original position, not the relative position
key = pCtx->ptsList[index]; key = pCtx->ptsList[index];
...@@ -984,7 +981,7 @@ static void minMax_function(SQLFunctionCtx *pCtx, char *pOutput, int32_t isMin, ...@@ -984,7 +981,7 @@ static void minMax_function(SQLFunctionCtx *pCtx, char *pOutput, int32_t isMin,
return; return;
} }
void *p = GET_INPUT_DATA_LIST(pCtx); void * p = GET_INPUT_DATA_LIST(pCtx);
TSKEY *tsList = GET_TS_LIST(pCtx); TSKEY *tsList = GET_TS_LIST(pCtx);
*notNullElems = 0; *notNullElems = 0;
...@@ -996,10 +993,10 @@ static void minMax_function(SQLFunctionCtx *pCtx, char *pOutput, int32_t isMin, ...@@ -996,10 +993,10 @@ static void minMax_function(SQLFunctionCtx *pCtx, char *pOutput, int32_t isMin,
TYPED_LOOPCHECK_N(int16_t, pOutput, p, pCtx, pCtx->inputType, isMin, *notNullElems); TYPED_LOOPCHECK_N(int16_t, pOutput, p, pCtx, pCtx->inputType, isMin, *notNullElems);
} else if (pCtx->inputType == TSDB_DATA_TYPE_INT) { } else if (pCtx->inputType == TSDB_DATA_TYPE_INT) {
int32_t *pData = p; int32_t *pData = p;
int32_t *retVal = (int32_t*) pOutput; int32_t *retVal = (int32_t *)pOutput;
for (int32_t i = 0; i < pCtx->size; ++i) { for (int32_t i = 0; i < pCtx->size; ++i) {
if (pCtx->hasNull && isNull((const char*)&pData[i], pCtx->inputType)) { if (pCtx->hasNull && isNull((const char *)&pData[i], pCtx->inputType)) {
continue; continue;
} }
...@@ -1035,7 +1032,7 @@ static void minMax_function(SQLFunctionCtx *pCtx, char *pOutput, int32_t isMin, ...@@ -1035,7 +1032,7 @@ static void minMax_function(SQLFunctionCtx *pCtx, char *pOutput, int32_t isMin,
} }
} }
static bool min_func_setup(SQLFunctionCtx *pCtx, SResultRowCellInfo* pResultInfo) { static bool min_func_setup(SQLFunctionCtx *pCtx, SResultRowCellInfo *pResultInfo) {
if (!function_setup(pCtx, pResultInfo)) { if (!function_setup(pCtx, pResultInfo)) {
return false; // not initialized since it has been initialized return false; // not initialized since it has been initialized
} }
...@@ -1047,7 +1044,7 @@ static bool min_func_setup(SQLFunctionCtx *pCtx, SResultRowCellInfo* pResultInfo ...@@ -1047,7 +1044,7 @@ static bool min_func_setup(SQLFunctionCtx *pCtx, SResultRowCellInfo* pResultInfo
*((int8_t *)pCtx->pOutput) = INT8_MAX; *((int8_t *)pCtx->pOutput) = INT8_MAX;
break; break;
case TSDB_DATA_TYPE_UTINYINT: case TSDB_DATA_TYPE_UTINYINT:
*(uint8_t *) pCtx->pOutput = UINT8_MAX; *(uint8_t *)pCtx->pOutput = UINT8_MAX;
break; break;
case TSDB_DATA_TYPE_SMALLINT: case TSDB_DATA_TYPE_SMALLINT:
*((int16_t *)pCtx->pOutput) = INT16_MAX; *((int16_t *)pCtx->pOutput) = INT16_MAX;
...@@ -1080,7 +1077,7 @@ static bool min_func_setup(SQLFunctionCtx *pCtx, SResultRowCellInfo* pResultInfo ...@@ -1080,7 +1077,7 @@ static bool min_func_setup(SQLFunctionCtx *pCtx, SResultRowCellInfo* pResultInfo
return true; return true;
} }
static bool max_func_setup(SQLFunctionCtx *pCtx, SResultRowCellInfo* pResultInfo) { static bool max_func_setup(SQLFunctionCtx *pCtx, SResultRowCellInfo *pResultInfo) {
if (!function_setup(pCtx, pResultInfo)) { if (!function_setup(pCtx, pResultInfo)) {
return false; // not initialized since it has been initialized return false; // not initialized since it has been initialized
} }
...@@ -1280,7 +1277,7 @@ static void max_func_merge(SQLFunctionCtx *pCtx) { ...@@ -1280,7 +1277,7 @@ static void max_func_merge(SQLFunctionCtx *pCtx) {
static void stddev_function(SQLFunctionCtx *pCtx) { static void stddev_function(SQLFunctionCtx *pCtx) {
SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx);
SStddevInfo *pStd = GET_ROWCELL_INTERBUF(pResInfo); SStddevInfo * pStd = GET_ROWCELL_INTERBUF(pResInfo);
if (pCtx->currentStage == REPEAT_SCAN && pStd->stage == 0) { if (pCtx->currentStage == REPEAT_SCAN && pStd->stage == 0) {
pStd->stage++; pStd->stage++;
...@@ -1304,13 +1301,13 @@ static void stddev_function(SQLFunctionCtx *pCtx) { ...@@ -1304,13 +1301,13 @@ static void stddev_function(SQLFunctionCtx *pCtx) {
double *retVal = &pStd->res; double *retVal = &pStd->res;
double avg = pStd->avg; double avg = pStd->avg;
void *pData = GET_INPUT_DATA_LIST(pCtx); void * pData = GET_INPUT_DATA_LIST(pCtx);
int32_t num = 0; int32_t num = 0;
switch (pCtx->inputType) { switch (pCtx->inputType) {
case TSDB_DATA_TYPE_INT: { case TSDB_DATA_TYPE_INT: {
for (int32_t i = 0; i < pCtx->size; ++i) { for (int32_t i = 0; i < pCtx->size; ++i) {
if (pCtx->hasNull && isNull((const char*) (&((int32_t *)pData)[i]), pCtx->inputType)) { if (pCtx->hasNull && isNull((const char *)(&((int32_t *)pData)[i]), pCtx->inputType)) {
continue; continue;
} }
num += 1; num += 1;
...@@ -1377,14 +1374,14 @@ static void stddev_finalizer(SQLFunctionCtx *pCtx) { ...@@ -1377,14 +1374,14 @@ static void stddev_finalizer(SQLFunctionCtx *pCtx) {
} }
////////////////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////////////////
int32_t tsCompare(const void* p1, const void* p2) { int32_t tsCompare(const void *p1, const void *p2) {
TSKEY k = *(TSKEY*)p1; TSKEY k = *(TSKEY *)p1;
SResPair* pair = (SResPair*)p2; SResPair *pair = (SResPair *)p2;
if (k == pair->key) { if (k == pair->key) {
return 0; return 0;
} else { } else {
return k < pair->key? -1:1; return k < pair->key ? -1 : 1;
} }
} }
...@@ -1395,21 +1392,21 @@ static void stddev_dst_function(SQLFunctionCtx *pCtx) { ...@@ -1395,21 +1392,21 @@ static void stddev_dst_function(SQLFunctionCtx *pCtx) {
double *retVal = &pStd->res; double *retVal = &pStd->res;
// all data are null, no need to proceed // all data are null, no need to proceed
SArray* resList = (SArray*) pCtx->param[0].pz; SArray *resList = (SArray *)pCtx->param[0].pz;
if (resList == NULL) { if (resList == NULL) {
return; return;
} }
// find the correct group average results according to the tag value // find the correct group average results according to the tag value
int32_t len = (int32_t) taosArrayGetSize(resList); int32_t len = (int32_t)taosArrayGetSize(resList);
assert(len > 0); assert(len > 0);
double avg = 0; double avg = 0;
if (len == 1) { if (len == 1) {
SResPair* p = taosArrayGet(resList, 0); SResPair *p = taosArrayGet(resList, 0);
avg = p->avg; avg = p->avg;
} else { // todo opt performance by using iterator since the timestamp lsit is matched with the output result } else { // todo opt performance by using iterator since the timestamp lsit is matched with the output result
SResPair* p = bsearch(&pCtx->startTs, resList->pData, len, sizeof(SResPair), tsCompare); SResPair *p = bsearch(&pCtx->startTs, resList->pData, len, sizeof(SResPair), tsCompare);
if (p == NULL) { if (p == NULL) {
return; return;
} }
...@@ -1417,13 +1414,13 @@ static void stddev_dst_function(SQLFunctionCtx *pCtx) { ...@@ -1417,13 +1414,13 @@ static void stddev_dst_function(SQLFunctionCtx *pCtx) {
avg = p->avg; avg = p->avg;
} }
void *pData = GET_INPUT_DATA_LIST(pCtx); void * pData = GET_INPUT_DATA_LIST(pCtx);
int32_t num = 0; int32_t num = 0;
switch (pCtx->inputType) { switch (pCtx->inputType) {
case TSDB_DATA_TYPE_INT: { case TSDB_DATA_TYPE_INT: {
for (int32_t i = 0; i < pCtx->size; ++i) { for (int32_t i = 0; i < pCtx->size; ++i) {
if (pCtx->hasNull && isNull((const char*) (&((int32_t *)pData)[i]), pCtx->inputType)) { if (pCtx->hasNull && isNull((const char *)(&((int32_t *)pData)[i]), pCtx->inputType)) {
continue; continue;
} }
num += 1; num += 1;
...@@ -1480,7 +1477,7 @@ static void stddev_dst_function(SQLFunctionCtx *pCtx) { ...@@ -1480,7 +1477,7 @@ static void stddev_dst_function(SQLFunctionCtx *pCtx) {
static void stddev_dst_merge(SQLFunctionCtx *pCtx) { static void stddev_dst_merge(SQLFunctionCtx *pCtx) {
SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx);
SStddevdstInfo* pRes = GET_ROWCELL_INTERBUF(pResInfo); SStddevdstInfo * pRes = GET_ROWCELL_INTERBUF(pResInfo);
char *input = GET_INPUT_DATA_LIST(pCtx); char *input = GET_INPUT_DATA_LIST(pCtx);
...@@ -1510,7 +1507,7 @@ static void stddev_dst_finalizer(SQLFunctionCtx *pCtx) { ...@@ -1510,7 +1507,7 @@ static void stddev_dst_finalizer(SQLFunctionCtx *pCtx) {
} }
////////////////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////////////////
static bool first_last_function_setup(SQLFunctionCtx *pCtx, SResultRowCellInfo* pResInfo) { static bool first_last_function_setup(SQLFunctionCtx *pCtx, SResultRowCellInfo *pResInfo) {
if (!function_setup(pCtx, pResInfo)) { if (!function_setup(pCtx, pResInfo)) {
return false; return false;
} }
...@@ -1578,7 +1575,7 @@ static void first_dist_function(SQLFunctionCtx *pCtx) { ...@@ -1578,7 +1575,7 @@ static void first_dist_function(SQLFunctionCtx *pCtx) {
* 1. data block that are not loaded * 1. data block that are not loaded
* 2. scan data files in desc order * 2. scan data files in desc order
*/ */
if (pCtx->order == TSDB_ORDER_DESC/* || pCtx->preAggVals.dataBlockLoaded == false*/) { if (pCtx->order == TSDB_ORDER_DESC /* || pCtx->preAggVals.dataBlockLoaded == false*/) {
return; return;
} }
...@@ -1607,7 +1604,7 @@ static void first_dist_func_merge(SQLFunctionCtx *pCtx) { ...@@ -1607,7 +1604,7 @@ static void first_dist_func_merge(SQLFunctionCtx *pCtx) {
assert(pCtx->stableQuery); assert(pCtx->stableQuery);
char * pData = GET_INPUT_DATA_LIST(pCtx); char * pData = GET_INPUT_DATA_LIST(pCtx);
SFirstLastInfo *pInput = (SFirstLastInfo*) (pData + pCtx->outputBytes); SFirstLastInfo *pInput = (SFirstLastInfo *)(pData + pCtx->outputBytes);
if (pInput->hasResult != DATA_SET_FLAG) { if (pInput->hasResult != DATA_SET_FLAG) {
return; return;
} }
...@@ -1638,11 +1635,10 @@ static void last_function(SQLFunctionCtx *pCtx) { ...@@ -1638,11 +1635,10 @@ static void last_function(SQLFunctionCtx *pCtx) {
return; return;
} }
SResultRowCellInfo* pResInfo = GET_RES_INFO(pCtx); SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx);
int32_t notNullElems = 0; int32_t notNullElems = 0;
if (pCtx->order == TSDB_ORDER_DESC) { if (pCtx->order == TSDB_ORDER_DESC) {
for (int32_t i = pCtx->size - 1; i >= 0; --i) { for (int32_t i = pCtx->size - 1; i >= 0; --i) {
char *data = GET_INPUT_DATA(pCtx, i); char *data = GET_INPUT_DATA(pCtx, i);
if (pCtx->hasNull && isNull(data, pCtx->inputType) && (!pCtx->requireNull)) { if (pCtx->hasNull && isNull(data, pCtx->inputType) && (!pCtx->requireNull)) {
...@@ -1668,12 +1664,12 @@ static void last_function(SQLFunctionCtx *pCtx) { ...@@ -1668,12 +1664,12 @@ static void last_function(SQLFunctionCtx *pCtx) {
TSKEY ts = pCtx->ptsList ? GET_TS_DATA(pCtx, i) : 0; TSKEY ts = pCtx->ptsList ? GET_TS_DATA(pCtx, i) : 0;
char* buf = GET_ROWCELL_INTERBUF(pResInfo); char *buf = GET_ROWCELL_INTERBUF(pResInfo);
if (pResInfo->hasResult != DATA_SET_FLAG || (*(TSKEY*)buf) < ts) { if (pResInfo->hasResult != DATA_SET_FLAG || (*(TSKEY *)buf) < ts) {
pResInfo->hasResult = DATA_SET_FLAG; pResInfo->hasResult = DATA_SET_FLAG;
memcpy(pCtx->pOutput, data, pCtx->inputBytes); memcpy(pCtx->pOutput, data, pCtx->inputBytes);
*(TSKEY*)buf = ts; *(TSKEY *)buf = ts;
DO_UPDATE_TAG_COLUMNS(pCtx, ts); DO_UPDATE_TAG_COLUMNS(pCtx, ts);
} }
...@@ -1741,7 +1737,7 @@ static void last_dist_function(SQLFunctionCtx *pCtx) { ...@@ -1741,7 +1737,7 @@ static void last_dist_function(SQLFunctionCtx *pCtx) {
static void last_dist_func_merge(SQLFunctionCtx *pCtx) { static void last_dist_func_merge(SQLFunctionCtx *pCtx) {
char *pData = GET_INPUT_DATA_LIST(pCtx); char *pData = GET_INPUT_DATA_LIST(pCtx);
SFirstLastInfo *pInput = (SFirstLastInfo*) (pData + pCtx->outputBytes); SFirstLastInfo *pInput = (SFirstLastInfo *)(pData + pCtx->outputBytes);
if (pInput->hasResult != DATA_SET_FLAG) { if (pInput->hasResult != DATA_SET_FLAG) {
return; return;
} }
...@@ -1815,7 +1811,7 @@ static void valuePairAssign(tValuePair *dst, int16_t type, const char *val, int6 ...@@ -1815,7 +1811,7 @@ static void valuePairAssign(tValuePair *dst, int16_t type, const char *val, int6
memcpy(dst->pTags, pTags, (size_t)pTagInfo->tagsLen); memcpy(dst->pTags, pTags, (size_t)pTagInfo->tagsLen);
} else { // the tags are dumped from the ctx tag fields } else { // the tags are dumped from the ctx tag fields
for (int32_t i = 0; i < pTagInfo->numOfTagCols; ++i) { for (int32_t i = 0; i < pTagInfo->numOfTagCols; ++i) {
SQLFunctionCtx* ctx = pTagInfo->pTagCtxList[i]; SQLFunctionCtx *ctx = pTagInfo->pTagCtxList[i];
if (ctx->functionId == TSDB_FUNC_TS_DUMMY) { if (ctx->functionId == TSDB_FUNC_TS_DUMMY) {
ctx->tag.nType = TSDB_DATA_TYPE_BIGINT; ctx->tag.nType = TSDB_DATA_TYPE_BIGINT;
ctx->tag.i64 = tsKey; ctx->tag.i64 = tsKey;
...@@ -1834,11 +1830,10 @@ static void valuePairAssign(tValuePair *dst, int16_t type, const char *val, int6 ...@@ -1834,11 +1830,10 @@ static void valuePairAssign(tValuePair *dst, int16_t type, const char *val, int6
memcpy((dst)->pTags, (src)->pTags, (size_t)(__l)); \ memcpy((dst)->pTags, (src)->pTags, (size_t)(__l)); \
} while (0) } while (0)
static int32_t topBotComparFn(const void *p1, const void *p2, const void *param) static int32_t topBotComparFn(const void *p1, const void *p2, const void *param) {
{ uint16_t type = *(uint16_t *)param;
uint16_t type = *(uint16_t *) param; tValuePair *val1 = *(tValuePair **)p1;
tValuePair *val1 = *(tValuePair **) p1; tValuePair *val2 = *(tValuePair **)p2;
tValuePair *val2 = *(tValuePair **) p2;
if (IS_SIGNED_NUMERIC_TYPE(type)) { if (IS_SIGNED_NUMERIC_TYPE(type)) {
if (val1->v.i64 == val2->v.i64) { if (val1->v.i64 == val2->v.i64) {
...@@ -1861,13 +1856,12 @@ static int32_t topBotComparFn(const void *p1, const void *p2, const void *param) ...@@ -1861,13 +1856,12 @@ static int32_t topBotComparFn(const void *p1, const void *p2, const void *param)
return (val1->v.dKey > val2->v.dKey) ? 1 : -1; return (val1->v.dKey > val2->v.dKey) ? 1 : -1;
} }
static void topBotSwapFn(void *dst, void *src, const void *param) static void topBotSwapFn(void *dst, void *src, const void *param) {
{
char tag[32768]; char tag[32768];
tValuePair temp; tValuePair temp;
uint16_t tagLen = *(uint16_t *) param; uint16_t tagLen = *(uint16_t *)param;
tValuePair *vdst = *(tValuePair **) dst; tValuePair *vdst = *(tValuePair **)dst;
tValuePair *vsrc = *(tValuePair **) src; tValuePair *vsrc = *(tValuePair **)src;
memset(tag, 0, sizeof(tag)); memset(tag, 0, sizeof(tag));
temp.pTags = tag; temp.pTags = tag;
...@@ -1888,7 +1882,8 @@ static void do_top_function_add(STopBotInfo *pInfo, int32_t maxLen, void *pData, ...@@ -1888,7 +1882,8 @@ static void do_top_function_add(STopBotInfo *pInfo, int32_t maxLen, void *pData,
if (pInfo->num < maxLen) { if (pInfo->num < maxLen) {
valuePairAssign(pList[pInfo->num], type, (const char *)&val.i64, ts, pTags, pTagInfo, stage); valuePairAssign(pList[pInfo->num], type, (const char *)&val.i64, ts, pTags, pTagInfo, stage);
taosheapsort((void *) pList, sizeof(tValuePair **), pInfo->num + 1, (const void *) &type, topBotComparFn, (const void *) &pTagInfo->tagsLen, topBotSwapFn, 0); taosheapsort((void *)pList, sizeof(tValuePair **), pInfo->num + 1, (const void *)&type, topBotComparFn,
(const void *)&pTagInfo->tagsLen, topBotSwapFn, 0);
pInfo->num++; pInfo->num++;
} else { } else {
...@@ -1896,7 +1891,8 @@ static void do_top_function_add(STopBotInfo *pInfo, int32_t maxLen, void *pData, ...@@ -1896,7 +1891,8 @@ static void do_top_function_add(STopBotInfo *pInfo, int32_t maxLen, void *pData,
(IS_UNSIGNED_NUMERIC_TYPE(type) && val.u64 > pList[0]->v.u64) || (IS_UNSIGNED_NUMERIC_TYPE(type) && val.u64 > pList[0]->v.u64) ||
(IS_FLOAT_TYPE(type) && val.dKey > pList[0]->v.dKey)) { (IS_FLOAT_TYPE(type) && val.dKey > pList[0]->v.dKey)) {
valuePairAssign(pList[0], type, (const char *)&val.i64, ts, pTags, pTagInfo, stage); valuePairAssign(pList[0], type, (const char *)&val.i64, ts, pTags, pTagInfo, stage);
taosheapadjust((void *) pList, sizeof(tValuePair **), 0, maxLen - 1, (const void *) &type, topBotComparFn, (const void *) &pTagInfo->tagsLen, topBotSwapFn, 0); taosheapadjust((void *)pList, sizeof(tValuePair **), 0, maxLen - 1, (const void *)&type, topBotComparFn,
(const void *)&pTagInfo->tagsLen, topBotSwapFn, 0);
} }
} }
} }
...@@ -1912,7 +1908,8 @@ static void do_bottom_function_add(STopBotInfo *pInfo, int32_t maxLen, void *pDa ...@@ -1912,7 +1908,8 @@ static void do_bottom_function_add(STopBotInfo *pInfo, int32_t maxLen, void *pDa
if (pInfo->num < maxLen) { if (pInfo->num < maxLen) {
valuePairAssign(pList[pInfo->num], type, (const char *)&val.i64, ts, pTags, pTagInfo, stage); valuePairAssign(pList[pInfo->num], type, (const char *)&val.i64, ts, pTags, pTagInfo, stage);
taosheapsort((void *) pList, sizeof(tValuePair **), pInfo->num + 1, (const void *) &type, topBotComparFn, (const void *) &pTagInfo->tagsLen, topBotSwapFn, 1); taosheapsort((void *)pList, sizeof(tValuePair **), pInfo->num + 1, (const void *)&type, topBotComparFn,
(const void *)&pTagInfo->tagsLen, topBotSwapFn, 1);
pInfo->num++; pInfo->num++;
} else { } else {
...@@ -1920,7 +1917,8 @@ static void do_bottom_function_add(STopBotInfo *pInfo, int32_t maxLen, void *pDa ...@@ -1920,7 +1917,8 @@ static void do_bottom_function_add(STopBotInfo *pInfo, int32_t maxLen, void *pDa
(IS_UNSIGNED_NUMERIC_TYPE(type) && val.u64 < pList[0]->v.u64) || (IS_UNSIGNED_NUMERIC_TYPE(type) && val.u64 < pList[0]->v.u64) ||
(IS_FLOAT_TYPE(type) && val.dKey < pList[0]->v.dKey)) { (IS_FLOAT_TYPE(type) && val.dKey < pList[0]->v.dKey)) {
valuePairAssign(pList[0], type, (const char *)&val.i64, ts, pTags, pTagInfo, stage); valuePairAssign(pList[0], type, (const char *)&val.i64, ts, pTags, pTagInfo, stage);
taosheapadjust((void *) pList, sizeof(tValuePair **), 0, maxLen - 1, (const void *) &type, topBotComparFn, (const void *) &pTagInfo->tagsLen, topBotSwapFn, 1); taosheapadjust((void *)pList, sizeof(tValuePair **), 0, maxLen - 1, (const void *)&type, topBotComparFn,
(const void *)&pTagInfo->tagsLen, topBotSwapFn, 1);
} }
} }
} }
...@@ -1948,7 +1946,7 @@ static int32_t resDataAscComparFn(const void *pLeft, const void *pRight) { ...@@ -1948,7 +1946,7 @@ static int32_t resDataAscComparFn(const void *pLeft, const void *pRight) {
} else { } else {
return pLeftElem->v.dKey > pRightElem->v.dKey ? 1 : -1; return pLeftElem->v.dKey > pRightElem->v.dKey ? 1 : -1;
} }
} else if (IS_SIGNED_NUMERIC_TYPE(pLeftElem->v.nType)){ } else if (IS_SIGNED_NUMERIC_TYPE(pLeftElem->v.nType)) {
if (pLeftElem->v.i64 == pRightElem->v.i64) { if (pLeftElem->v.i64 == pRightElem->v.i64) {
return 0; return 0;
} else { } else {
...@@ -1967,7 +1965,7 @@ static int32_t resDataDescComparFn(const void *pLeft, const void *pRight) { retu ...@@ -1967,7 +1965,7 @@ static int32_t resDataDescComparFn(const void *pLeft, const void *pRight) { retu
static void copyTopBotRes(SQLFunctionCtx *pCtx, int32_t type) { static void copyTopBotRes(SQLFunctionCtx *pCtx, int32_t type) {
SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx);
STopBotInfo *pRes = GET_ROWCELL_INTERBUF(pResInfo); STopBotInfo * pRes = GET_ROWCELL_INTERBUF(pResInfo);
tValuePair **tvp = pRes->res; tValuePair **tvp = pRes->res;
...@@ -2065,13 +2063,13 @@ static STopBotInfo *getTopBotOutputInfo(SQLFunctionCtx *pCtx) { ...@@ -2065,13 +2063,13 @@ static STopBotInfo *getTopBotOutputInfo(SQLFunctionCtx *pCtx) {
// only the first_stage_merge is directly written data into final output buffer // only the first_stage_merge is directly written data into final output buffer
if (pCtx->stableQuery && pCtx->currentStage != MERGE_STAGE) { if (pCtx->stableQuery && pCtx->currentStage != MERGE_STAGE) {
return (STopBotInfo*) pCtx->pOutput; return (STopBotInfo *)pCtx->pOutput;
} else { // during normal table query and super table at the secondary_stage, result is written to intermediate buffer } else { // during normal table query and super table at the secondary_stage, result is written to intermediate
// buffer
return GET_ROWCELL_INTERBUF(pResInfo); return GET_ROWCELL_INTERBUF(pResInfo);
} }
} }
/* /*
* keep the intermediate results during scan data blocks in the format of: * keep the intermediate results during scan data blocks in the format of:
* +-----------------------------------+-------------one value pair-----------+------------next value pair-----------+ * +-----------------------------------+-------------one value pair-----------+------------next value pair-----------+
...@@ -2080,13 +2078,13 @@ static STopBotInfo *getTopBotOutputInfo(SQLFunctionCtx *pCtx) { ...@@ -2080,13 +2078,13 @@ static STopBotInfo *getTopBotOutputInfo(SQLFunctionCtx *pCtx) {
*/ */
static void buildTopBotStruct(STopBotInfo *pTopBotInfo, SQLFunctionCtx *pCtx) { static void buildTopBotStruct(STopBotInfo *pTopBotInfo, SQLFunctionCtx *pCtx) {
char *tmp = (char *)pTopBotInfo + sizeof(STopBotInfo); char *tmp = (char *)pTopBotInfo + sizeof(STopBotInfo);
pTopBotInfo->res = (tValuePair**) tmp; pTopBotInfo->res = (tValuePair **)tmp;
tmp += POINTER_BYTES * pCtx->param[0].i64; tmp += POINTER_BYTES * pCtx->param[0].i64;
size_t size = sizeof(tValuePair) + pCtx->tagInfo.tagsLen; size_t size = sizeof(tValuePair) + pCtx->tagInfo.tagsLen;
for (int32_t i = 0; i < pCtx->param[0].i64; ++i) { for (int32_t i = 0; i < pCtx->param[0].i64; ++i) {
pTopBotInfo->res[i] = (tValuePair*) tmp; pTopBotInfo->res[i] = (tValuePair *)tmp;
pTopBotInfo->res[i]->pTags = tmp + sizeof(tValuePair); pTopBotInfo->res[i]->pTags = tmp + sizeof(tValuePair);
tmp += size; tmp += size;
} }
...@@ -2105,11 +2103,12 @@ bool topbot_datablock_filter(SQLFunctionCtx *pCtx, const char *minval, const cha ...@@ -2105,11 +2103,12 @@ bool topbot_datablock_filter(SQLFunctionCtx *pCtx, const char *minval, const cha
return true; return true;
} }
if ((void *)pTopBotInfo->res[0] != (void *)((char *)pTopBotInfo + sizeof(STopBotInfo) + POINTER_BYTES * pCtx->param[0].i64)) { if ((void *)pTopBotInfo->res[0] !=
(void *)((char *)pTopBotInfo + sizeof(STopBotInfo) + POINTER_BYTES * pCtx->param[0].i64)) {
buildTopBotStruct(pTopBotInfo, pCtx); buildTopBotStruct(pTopBotInfo, pCtx);
} }
tValuePair **pRes = (tValuePair**) pTopBotInfo->res; tValuePair **pRes = (tValuePair **)pTopBotInfo->res;
if (pCtx->functionId == TSDB_FUNC_TOP) { if (pCtx->functionId == TSDB_FUNC_TOP) {
switch (pCtx->inputType) { switch (pCtx->inputType) {
...@@ -2148,7 +2147,7 @@ bool topbot_datablock_filter(SQLFunctionCtx *pCtx, const char *minval, const cha ...@@ -2148,7 +2147,7 @@ bool topbot_datablock_filter(SQLFunctionCtx *pCtx, const char *minval, const cha
} }
} }
static bool top_bottom_function_setup(SQLFunctionCtx *pCtx, SResultRowCellInfo* pResInfo) { static bool top_bottom_function_setup(SQLFunctionCtx *pCtx, SResultRowCellInfo *pResInfo) {
if (!function_setup(pCtx, pResInfo)) { if (!function_setup(pCtx, pResInfo)) {
return false; return false;
} }
...@@ -2177,7 +2176,7 @@ static void top_function(SQLFunctionCtx *pCtx) { ...@@ -2177,7 +2176,7 @@ static void top_function(SQLFunctionCtx *pCtx) {
notNullElems++; notNullElems++;
// NOTE: Set the default timestamp if it is missing [todo refactor] // NOTE: Set the default timestamp if it is missing [todo refactor]
TSKEY ts = (pCtx->ptsList != NULL)? GET_TS_DATA(pCtx, i):0; TSKEY ts = (pCtx->ptsList != NULL) ? GET_TS_DATA(pCtx, i) : 0;
do_top_function_add(pRes, (int32_t)pCtx->param[0].i64, data, ts, pCtx->inputType, &pCtx->tagInfo, NULL, 0); do_top_function_add(pRes, (int32_t)pCtx->param[0].i64, data, ts, pCtx->inputType, &pCtx->tagInfo, NULL, 0);
} }
...@@ -2204,9 +2203,9 @@ static void top_func_merge(SQLFunctionCtx *pCtx) { ...@@ -2204,9 +2203,9 @@ static void top_func_merge(SQLFunctionCtx *pCtx) {
// the intermediate result is binary, we only use the output data type // the intermediate result is binary, we only use the output data type
for (int32_t i = 0; i < pInput->num; ++i) { for (int32_t i = 0; i < pInput->num; ++i) {
int16_t type = (pCtx->outputType == TSDB_DATA_TYPE_FLOAT)? TSDB_DATA_TYPE_DOUBLE:pCtx->outputType; int16_t type = (pCtx->outputType == TSDB_DATA_TYPE_FLOAT) ? TSDB_DATA_TYPE_DOUBLE : pCtx->outputType;
do_top_function_add(pOutput, (int32_t)pCtx->param[0].i64, &pInput->res[i]->v.i64, pInput->res[i]->timestamp, do_top_function_add(pOutput, (int32_t)pCtx->param[0].i64, &pInput->res[i]->v.i64, pInput->res[i]->timestamp, type,
type, &pCtx->tagInfo, pInput->res[i]->pTags, pCtx->currentStage); &pCtx->tagInfo, pInput->res[i]->pTags, pCtx->currentStage);
} }
SET_VAL(pCtx, pInput->num, pOutput->num); SET_VAL(pCtx, pInput->num, pOutput->num);
...@@ -2234,7 +2233,7 @@ static void bottom_function(SQLFunctionCtx *pCtx) { ...@@ -2234,7 +2233,7 @@ static void bottom_function(SQLFunctionCtx *pCtx) {
notNullElems++; notNullElems++;
// NOTE: Set the default timestamp if it is missing [todo refactor] // NOTE: Set the default timestamp if it is missing [todo refactor]
TSKEY ts = (pCtx->ptsList != NULL)? GET_TS_DATA(pCtx, i):0; TSKEY ts = (pCtx->ptsList != NULL) ? GET_TS_DATA(pCtx, i) : 0;
do_bottom_function_add(pRes, (int32_t)pCtx->param[0].i64, data, ts, pCtx->inputType, &pCtx->tagInfo, NULL, 0); do_bottom_function_add(pRes, (int32_t)pCtx->param[0].i64, data, ts, pCtx->inputType, &pCtx->tagInfo, NULL, 0);
} }
...@@ -2262,8 +2261,8 @@ static void bottom_func_merge(SQLFunctionCtx *pCtx) { ...@@ -2262,8 +2261,8 @@ static void bottom_func_merge(SQLFunctionCtx *pCtx) {
// the intermediate result is binary, we only use the output data type // the intermediate result is binary, we only use the output data type
for (int32_t i = 0; i < pInput->num; ++i) { for (int32_t i = 0; i < pInput->num; ++i) {
int16_t type = (pCtx->outputType == TSDB_DATA_TYPE_FLOAT) ? TSDB_DATA_TYPE_DOUBLE : pCtx->outputType; int16_t type = (pCtx->outputType == TSDB_DATA_TYPE_FLOAT) ? TSDB_DATA_TYPE_DOUBLE : pCtx->outputType;
do_bottom_function_add(pOutput, (int32_t)pCtx->param[0].i64, &pInput->res[i]->v.i64, pInput->res[i]->timestamp, type, do_bottom_function_add(pOutput, (int32_t)pCtx->param[0].i64, &pInput->res[i]->v.i64, pInput->res[i]->timestamp,
&pCtx->tagInfo, pInput->res[i]->pTags, pCtx->currentStage); type, &pCtx->tagInfo, pInput->res[i]->pTags, pCtx->currentStage);
} }
SET_VAL(pCtx, pInput->num, pOutput->num); SET_VAL(pCtx, pInput->num, pOutput->num);
...@@ -2303,7 +2302,7 @@ static void top_bottom_func_finalizer(SQLFunctionCtx *pCtx) { ...@@ -2303,7 +2302,7 @@ static void top_bottom_func_finalizer(SQLFunctionCtx *pCtx) {
} }
/////////////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////////////
static bool percentile_function_setup(SQLFunctionCtx *pCtx, SResultRowCellInfo* pResultInfo) { static bool percentile_function_setup(SQLFunctionCtx *pCtx, SResultRowCellInfo *pResultInfo) {
if (!function_setup(pCtx, pResultInfo)) { if (!function_setup(pCtx, pResultInfo)) {
return false; return false;
} }
...@@ -2321,7 +2320,7 @@ static void percentile_function(SQLFunctionCtx *pCtx) { ...@@ -2321,7 +2320,7 @@ static void percentile_function(SQLFunctionCtx *pCtx) {
int32_t notNullElems = 0; int32_t notNullElems = 0;
SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx);
SPercentileInfo *pInfo = GET_ROWCELL_INTERBUF(pResInfo); SPercentileInfo * pInfo = GET_ROWCELL_INTERBUF(pResInfo);
if (pCtx->currentStage == REPEAT_SCAN && pInfo->stage == 0) { if (pCtx->currentStage == REPEAT_SCAN && pInfo->stage == 0) {
pInfo->stage += 1; pInfo->stage += 1;
...@@ -2406,9 +2405,9 @@ static void percentile_finalizer(SQLFunctionCtx *pCtx) { ...@@ -2406,9 +2405,9 @@ static void percentile_finalizer(SQLFunctionCtx *pCtx) {
double v = pCtx->param[0].nType == TSDB_DATA_TYPE_INT ? pCtx->param[0].i64 : pCtx->param[0].dKey; double v = pCtx->param[0].nType == TSDB_DATA_TYPE_INT ? pCtx->param[0].i64 : pCtx->param[0].dKey;
SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx);
SPercentileInfo* ppInfo = (SPercentileInfo *) GET_ROWCELL_INTERBUF(pResInfo); SPercentileInfo * ppInfo = (SPercentileInfo *)GET_ROWCELL_INTERBUF(pResInfo);
tMemBucket * pMemBucket = ppInfo->pMemBucket; tMemBucket *pMemBucket = ppInfo->pMemBucket;
if (pMemBucket == NULL || pMemBucket->total == 0) { // check for null if (pMemBucket == NULL || pMemBucket->total == 0) { // check for null
assert(ppInfo->numOfElems == 0); assert(ppInfo->numOfElems == 0);
setNull(pCtx->pOutput, pCtx->outputType, pCtx->outputBytes); setNull(pCtx->pOutput, pCtx->outputType, pCtx->outputBytes);
...@@ -2420,18 +2419,17 @@ static void percentile_finalizer(SQLFunctionCtx *pCtx) { ...@@ -2420,18 +2419,17 @@ static void percentile_finalizer(SQLFunctionCtx *pCtx) {
doFinalizer(pCtx); doFinalizer(pCtx);
} }
////////////////////////////////////////////////////////////////////////////////// static void buildHistogramInfo(SAPercentileInfo *pInfo) {
static void buildHistogramInfo(SAPercentileInfo* pInfo) { pInfo->pHisto = (SHistogramInfo *)((char *)pInfo + sizeof(SAPercentileInfo));
pInfo->pHisto = (SHistogramInfo*) ((char*) pInfo + sizeof(SAPercentileInfo)); pInfo->pHisto->elems = (SHistBin *)((char *)pInfo->pHisto + sizeof(SHistogramInfo));
pInfo->pHisto->elems = (SHistBin*) ((char*)pInfo->pHisto + sizeof(SHistogramInfo));
} }
static SAPercentileInfo *getAPerctInfo(SQLFunctionCtx *pCtx) { static SAPercentileInfo *getAPerctInfo(SQLFunctionCtx *pCtx) {
SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx);
SAPercentileInfo* pInfo = NULL; SAPercentileInfo * pInfo = NULL;
if (pCtx->stableQuery && pCtx->currentStage != MERGE_STAGE) { if (pCtx->stableQuery && pCtx->currentStage != MERGE_STAGE) {
pInfo = (SAPercentileInfo*) pCtx->pOutput; pInfo = (SAPercentileInfo *)pCtx->pOutput;
} else { } else {
pInfo = GET_ROWCELL_INTERBUF(pResInfo); pInfo = GET_ROWCELL_INTERBUF(pResInfo);
} }
...@@ -2440,7 +2438,117 @@ static SAPercentileInfo *getAPerctInfo(SQLFunctionCtx *pCtx) { ...@@ -2440,7 +2438,117 @@ static SAPercentileInfo *getAPerctInfo(SQLFunctionCtx *pCtx) {
return pInfo; return pInfo;
} }
static bool apercentile_function_setup(SQLFunctionCtx *pCtx, SResultRowCellInfo* pResultInfo) {
//
// ----------------- tdigest -------------------
//
//////////////////////////////////////////////////////////////////////////////////
static bool tdigest_setup(SQLFunctionCtx *pCtx, SResultRowCellInfo *pResultInfo) {
if (!function_setup(pCtx, pResultInfo)) {
return false;
}
// new TDigest
SAPercentileInfo *pAPerc = getAPerctInfo(pCtx);
int compression = 5;
if(pAPerc) {
if(pAPerc->pTDigest == NULL) {
pAPerc->pTDigest = tdigestNew(compression);
}
}
return true;
}
static void tdigest_do(SQLFunctionCtx *pCtx) {
int32_t notNullElems = 0;
SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx);
SAPercentileInfo * pAPerc = getAPerctInfo(pCtx);
assert(pAPerc->pTDigest != NULL);
if(pAPerc->pTDigest == NULL) {
qError("tdigest_do tdigest is null.");
return ;
}
for (int32_t i = 0; i < pCtx->size; ++i) {
char *data = GET_INPUT_DATA(pCtx, i);
if (pCtx->hasNull && isNull(data, pCtx->inputType)) {
continue;
}
notNullElems += 1;
double v = 0; // value
long long w = 1; // weigth
GET_TYPED_DATA(v, double, pCtx->inputType, data);
tdigestAdd(pAPerc->pTDigest, v, w);
}
tdigestCompress(pAPerc->pTDigest);
if (!pCtx->hasNull) {
assert(pCtx->size == notNullElems);
}
SET_VAL(pCtx, notNullElems, 1);
if (notNullElems > 0) {
pResInfo->hasResult = DATA_SET_FLAG;
}
}
static void tdigest_merge(SQLFunctionCtx *pCtx) {
SAPercentileInfo *pInput = (SAPercentileInfo *)GET_INPUT_DATA_LIST(pCtx);
assert(pInput->pTDigest);
SAPercentileInfo *pOutput = getAPerctInfo(pCtx);
tdigestMerge(pOutput->pTDigest, pInput->pTDigest);
SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx);
pResInfo->hasResult = DATA_SET_FLAG;
SET_VAL(pCtx, 1, 1);
}
static void tdigest_finalizer(SQLFunctionCtx *pCtx) {
double q = (pCtx->param[0].nType == TSDB_DATA_TYPE_INT) ? pCtx->param[0].i64 : pCtx->param[0].dKey;
SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx);
SAPercentileInfo * pAPerc = getAPerctInfo(pCtx);
if (pCtx->currentStage == MERGE_STAGE) {
if (pResInfo->hasResult == DATA_SET_FLAG) { // check for null
double res = tdigestQuantile(pAPerc->pTDigest, q);
memcpy(pCtx->pOutput, &res, sizeof(double));
} else {
setNull(pCtx->pOutput, pCtx->outputType, pCtx->outputBytes);
return;
}
} else {
if (pAPerc->pTDigest->size > 0) {
double res = tdigestQuantile(pAPerc->pTDigest, q);
memcpy(pCtx->pOutput, &res, sizeof(double));
} else { // no need to free
setNull(pCtx->pOutput, pCtx->outputType, pCtx->outputBytes);
return;
}
}
tdigestFree(pAPerc->pTDigest);
pAPerc->pTDigest = NULL;
doFinalizer(pCtx);
}
//////////////////////////////////////////////////////////////////////////////////
int algo = 1;
static bool apercentile_function_setup(SQLFunctionCtx *pCtx, SResultRowCellInfo *pResultInfo) {
if (algo == 1) {
return tdigest_setup(pCtx, pResultInfo);
}
if (!function_setup(pCtx, pResultInfo)) { if (!function_setup(pCtx, pResultInfo)) {
return false; return false;
} }
...@@ -2453,10 +2561,15 @@ static bool apercentile_function_setup(SQLFunctionCtx *pCtx, SResultRowCellInfo* ...@@ -2453,10 +2561,15 @@ static bool apercentile_function_setup(SQLFunctionCtx *pCtx, SResultRowCellInfo*
} }
static void apercentile_function(SQLFunctionCtx *pCtx) { static void apercentile_function(SQLFunctionCtx *pCtx) {
if (algo == 1) {
tdigest_do(pCtx);
return;
}
int32_t notNullElems = 0; int32_t notNullElems = 0;
SResultRowCellInfo * pResInfo = GET_RES_INFO(pCtx); SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx);
SAPercentileInfo *pInfo = getAPerctInfo(pCtx); SAPercentileInfo * pInfo = getAPerctInfo(pCtx);
assert(pInfo->pHisto->elems != NULL); assert(pInfo->pHisto->elems != NULL);
...@@ -2485,27 +2598,32 @@ static void apercentile_function(SQLFunctionCtx *pCtx) { ...@@ -2485,27 +2598,32 @@ static void apercentile_function(SQLFunctionCtx *pCtx) {
} }
static void apercentile_func_merge(SQLFunctionCtx *pCtx) { static void apercentile_func_merge(SQLFunctionCtx *pCtx) {
if (algo == 1) {
tdigest_merge(pCtx);
return;
}
SAPercentileInfo *pInput = (SAPercentileInfo *)GET_INPUT_DATA_LIST(pCtx); SAPercentileInfo *pInput = (SAPercentileInfo *)GET_INPUT_DATA_LIST(pCtx);
pInput->pHisto = (SHistogramInfo*) ((char *)pInput + sizeof(SAPercentileInfo)); pInput->pHisto = (SHistogramInfo *)((char *)pInput + sizeof(SAPercentileInfo));
pInput->pHisto->elems = (SHistBin*) ((char *)pInput->pHisto + sizeof(SHistogramInfo)); pInput->pHisto->elems = (SHistBin *)((char *)pInput->pHisto + sizeof(SHistogramInfo));
if (pInput->pHisto->numOfElems <= 0) { if (pInput->pHisto->numOfElems <= 0) {
return; return;
} }
SAPercentileInfo *pOutput = getAPerctInfo(pCtx); SAPercentileInfo *pOutput = getAPerctInfo(pCtx);
SHistogramInfo *pHisto = pOutput->pHisto; SHistogramInfo * pHisto = pOutput->pHisto;
if (pHisto->numOfElems <= 0) { if (pHisto->numOfElems <= 0) {
memcpy(pHisto, pInput->pHisto, sizeof(SHistogramInfo) + sizeof(SHistBin) * (MAX_HISTOGRAM_BIN + 1)); memcpy(pHisto, pInput->pHisto, sizeof(SHistogramInfo) + sizeof(SHistBin) * (MAX_HISTOGRAM_BIN + 1));
pHisto->elems = (SHistBin*) ((char *)pHisto + sizeof(SHistogramInfo)); pHisto->elems = (SHistBin *)((char *)pHisto + sizeof(SHistogramInfo));
} else { } else {
//TODO(dengyihao): avoid memcpy // TODO(dengyihao): avoid memcpy
pHisto->elems = (SHistBin*) ((char *)pHisto + sizeof(SHistogramInfo)); pHisto->elems = (SHistBin *)((char *)pHisto + sizeof(SHistogramInfo));
SHistogramInfo *pRes = tHistogramMerge(pHisto, pInput->pHisto, MAX_HISTOGRAM_BIN); SHistogramInfo *pRes = tHistogramMerge(pHisto, pInput->pHisto, MAX_HISTOGRAM_BIN);
memcpy(pHisto, pRes, sizeof(SHistogramInfo) + sizeof(SHistBin) * MAX_HISTOGRAM_BIN); memcpy(pHisto, pRes, sizeof(SHistogramInfo) + sizeof(SHistBin) * MAX_HISTOGRAM_BIN);
pHisto->elems = (SHistBin*) ((char *)pHisto + sizeof(SHistogramInfo)); pHisto->elems = (SHistBin *)((char *)pHisto + sizeof(SHistogramInfo));
tHistogramDestroy(&pRes); tHistogramDestroy(&pRes);
} }
...@@ -2515,10 +2633,15 @@ static void apercentile_func_merge(SQLFunctionCtx *pCtx) { ...@@ -2515,10 +2633,15 @@ static void apercentile_func_merge(SQLFunctionCtx *pCtx) {
} }
static void apercentile_finalizer(SQLFunctionCtx *pCtx) { static void apercentile_finalizer(SQLFunctionCtx *pCtx) {
if (algo == 1) {
tdigest_finalizer(pCtx);
return;
}
double v = (pCtx->param[0].nType == TSDB_DATA_TYPE_INT) ? pCtx->param[0].i64 : pCtx->param[0].dKey; double v = (pCtx->param[0].nType == TSDB_DATA_TYPE_INT) ? pCtx->param[0].i64 : pCtx->param[0].dKey;
SResultRowCellInfo * pResInfo = GET_RES_INFO(pCtx); SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx);
SAPercentileInfo *pOutput = GET_ROWCELL_INTERBUF(pResInfo); SAPercentileInfo * pOutput = GET_ROWCELL_INTERBUF(pResInfo);
if (pCtx->currentStage == MERGE_STAGE) { if (pCtx->currentStage == MERGE_STAGE) {
if (pResInfo->hasResult == DATA_SET_FLAG) { // check for null if (pResInfo->hasResult == DATA_SET_FLAG) { // check for null
...@@ -2550,7 +2673,7 @@ static void apercentile_finalizer(SQLFunctionCtx *pCtx) { ...@@ -2550,7 +2673,7 @@ static void apercentile_finalizer(SQLFunctionCtx *pCtx) {
} }
///////////////////////////////////////////////////////////////////////////////// /////////////////////////////////////////////////////////////////////////////////
static bool leastsquares_function_setup(SQLFunctionCtx *pCtx, SResultRowCellInfo* pResInfo) { static bool leastsquares_function_setup(SQLFunctionCtx *pCtx, SResultRowCellInfo *pResInfo) {
if (!function_setup(pCtx, pResInfo)) { if (!function_setup(pCtx, pResInfo)) {
return false; return false;
} }
...@@ -2581,8 +2704,8 @@ static bool leastsquares_function_setup(SQLFunctionCtx *pCtx, SResultRowCellInfo ...@@ -2581,8 +2704,8 @@ static bool leastsquares_function_setup(SQLFunctionCtx *pCtx, SResultRowCellInfo
} }
static void leastsquares_function(SQLFunctionCtx *pCtx) { static void leastsquares_function(SQLFunctionCtx *pCtx) {
SResultRowCellInfo * pResInfo = GET_RES_INFO(pCtx); SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx);
SLeastsquaresInfo *pInfo = GET_ROWCELL_INTERBUF(pResInfo); SLeastsquaresInfo * pInfo = GET_ROWCELL_INTERBUF(pResInfo);
double(*param)[3] = pInfo->mat; double(*param)[3] = pInfo->mat;
double x = pInfo->startVal; double x = pInfo->startVal;
...@@ -2595,7 +2718,7 @@ static void leastsquares_function(SQLFunctionCtx *pCtx) { ...@@ -2595,7 +2718,7 @@ static void leastsquares_function(SQLFunctionCtx *pCtx) {
int32_t *p = pData; int32_t *p = pData;
// LEASTSQR_CAL_LOOP(pCtx, param, pParamData, p); // LEASTSQR_CAL_LOOP(pCtx, param, pParamData, p);
for (int32_t i = 0; i < pCtx->size; ++i) { for (int32_t i = 0; i < pCtx->size; ++i) {
if (pCtx->hasNull && isNull((const char*) p, pCtx->inputType)) { if (pCtx->hasNull && isNull((const char *)p, pCtx->inputType)) {
continue; continue;
} }
...@@ -2668,8 +2791,8 @@ static void leastsquares_function(SQLFunctionCtx *pCtx) { ...@@ -2668,8 +2791,8 @@ static void leastsquares_function(SQLFunctionCtx *pCtx) {
static void leastsquares_finalizer(SQLFunctionCtx *pCtx) { static void leastsquares_finalizer(SQLFunctionCtx *pCtx) {
// no data in query // no data in query
SResultRowCellInfo * pResInfo = GET_RES_INFO(pCtx); SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx);
SLeastsquaresInfo *pInfo = GET_ROWCELL_INTERBUF(pResInfo); SLeastsquaresInfo * pInfo = GET_ROWCELL_INTERBUF(pResInfo);
if (pInfo->num == 0) { if (pInfo->num == 0) {
setNull(pCtx->pOutput, pCtx->outputType, pCtx->outputBytes); setNull(pCtx->pOutput, pCtx->outputType, pCtx->outputBytes);
...@@ -2691,8 +2814,8 @@ static void leastsquares_finalizer(SQLFunctionCtx *pCtx) { ...@@ -2691,8 +2814,8 @@ static void leastsquares_finalizer(SQLFunctionCtx *pCtx) {
param[1][2] /= param[1][1]; param[1][2] /= param[1][1];
int32_t maxOutputSize = TSDB_AVG_FUNCTION_INTER_BUFFER_SIZE - VARSTR_HEADER_SIZE; int32_t maxOutputSize = TSDB_AVG_FUNCTION_INTER_BUFFER_SIZE - VARSTR_HEADER_SIZE;
size_t n = snprintf(varDataVal(pCtx->pOutput), maxOutputSize, "{slop:%.6lf, intercept:%.6lf}", size_t n =
param[0][2], param[1][2]); snprintf(varDataVal(pCtx->pOutput), maxOutputSize, "{slop:%.6lf, intercept:%.6lf}", param[0][2], param[1][2]);
varDataSetLen(pCtx->pOutput, n); varDataSetLen(pCtx->pOutput, n);
doFinalizer(pCtx); doFinalizer(pCtx);
...@@ -2718,12 +2841,11 @@ static void col_project_function(SQLFunctionCtx *pCtx) { ...@@ -2718,12 +2841,11 @@ static void col_project_function(SQLFunctionCtx *pCtx) {
char *pData = GET_INPUT_DATA_LIST(pCtx); char *pData = GET_INPUT_DATA_LIST(pCtx);
if (pCtx->order == TSDB_ORDER_ASC) { if (pCtx->order == TSDB_ORDER_ASC) {
int32_t numOfRows = (pCtx->param[0].i64 == 1)? 1:pCtx->size; int32_t numOfRows = (pCtx->param[0].i64 == 1) ? 1 : pCtx->size;
memcpy(pCtx->pOutput, pData, (size_t) numOfRows * pCtx->inputBytes); memcpy(pCtx->pOutput, pData, (size_t)numOfRows * pCtx->inputBytes);
} else { } else {
for(int32_t i = 0; i < pCtx->size; ++i) { for (int32_t i = 0; i < pCtx->size; ++i) {
memcpy(pCtx->pOutput + (pCtx->size - 1 - i) * pCtx->inputBytes, pData + i * pCtx->inputBytes, memcpy(pCtx->pOutput + (pCtx->size - 1 - i) * pCtx->inputBytes, pData + i * pCtx->inputBytes, pCtx->inputBytes);
pCtx->inputBytes);
} }
} }
} }
...@@ -2739,7 +2861,7 @@ static void tag_project_function(SQLFunctionCtx *pCtx) { ...@@ -2739,7 +2861,7 @@ static void tag_project_function(SQLFunctionCtx *pCtx) {
assert(pCtx->inputBytes == pCtx->outputBytes); assert(pCtx->inputBytes == pCtx->outputBytes);
tVariantDump(&pCtx->tag, pCtx->pOutput, pCtx->outputType, true); tVariantDump(&pCtx->tag, pCtx->pOutput, pCtx->outputType, true);
char* data = pCtx->pOutput; char *data = pCtx->pOutput;
pCtx->pOutput += pCtx->outputBytes; pCtx->pOutput += pCtx->outputBytes;
// directly copy from the first one // directly copy from the first one
...@@ -2778,7 +2900,7 @@ enum { ...@@ -2778,7 +2900,7 @@ enum {
INITIAL_VALUE_NOT_ASSIGNED = 0, INITIAL_VALUE_NOT_ASSIGNED = 0,
}; };
static bool diff_function_setup(SQLFunctionCtx *pCtx, SResultRowCellInfo* pResInfo) { static bool diff_function_setup(SQLFunctionCtx *pCtx, SResultRowCellInfo *pResInfo) {
if (!function_setup(pCtx, pResInfo)) { if (!function_setup(pCtx, pResInfo)) {
return false; return false;
} }
...@@ -2788,13 +2910,13 @@ static bool diff_function_setup(SQLFunctionCtx *pCtx, SResultRowCellInfo* pResIn ...@@ -2788,13 +2910,13 @@ static bool diff_function_setup(SQLFunctionCtx *pCtx, SResultRowCellInfo* pResIn
return false; return false;
} }
static bool deriv_function_setup(SQLFunctionCtx *pCtx, SResultRowCellInfo* pResultInfo) { static bool deriv_function_setup(SQLFunctionCtx *pCtx, SResultRowCellInfo *pResultInfo) {
if (!function_setup(pCtx, pResultInfo)) { if (!function_setup(pCtx, pResultInfo)) {
return false; return false;
} }
// diff function require the value is set to -1 // diff function require the value is set to -1
SDerivInfo* pDerivInfo = GET_ROWCELL_INTERBUF(pResultInfo); SDerivInfo *pDerivInfo = GET_ROWCELL_INTERBUF(pResultInfo);
pDerivInfo->ignoreNegative = pCtx->param[1].i64; pDerivInfo->ignoreNegative = pCtx->param[1].i64;
pDerivInfo->prevTs = -1; pDerivInfo->prevTs = -1;
...@@ -2805,7 +2927,7 @@ static bool deriv_function_setup(SQLFunctionCtx *pCtx, SResultRowCellInfo* pResu ...@@ -2805,7 +2927,7 @@ static bool deriv_function_setup(SQLFunctionCtx *pCtx, SResultRowCellInfo* pResu
static void deriv_function(SQLFunctionCtx *pCtx) { static void deriv_function(SQLFunctionCtx *pCtx) {
SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx);
SDerivInfo* pDerivInfo = GET_ROWCELL_INTERBUF(pResInfo); SDerivInfo * pDerivInfo = GET_ROWCELL_INTERBUF(pResInfo);
void *data = GET_INPUT_DATA_LIST(pCtx); void *data = GET_INPUT_DATA_LIST(pCtx);
...@@ -2829,7 +2951,8 @@ static void deriv_function(SQLFunctionCtx *pCtx) { ...@@ -2829,7 +2951,8 @@ static void deriv_function(SQLFunctionCtx *pCtx) {
if (!pDerivInfo->valueSet) { // initial value is not set yet if (!pDerivInfo->valueSet) { // initial value is not set yet
pDerivInfo->valueSet = true; pDerivInfo->valueSet = true;
} else { } else {
SET_DOUBLE_VAL(pOutput, ((pData[i] - pDerivInfo->prevValue) * pDerivInfo->tsWindow) / (tsList[i] - pDerivInfo->prevTs)); SET_DOUBLE_VAL(
pOutput, ((pData[i] - pDerivInfo->prevValue) * pDerivInfo->tsWindow) / (tsList[i] - pDerivInfo->prevTs));
if (pDerivInfo->ignoreNegative && *pOutput < 0) { if (pDerivInfo->ignoreNegative && *pOutput < 0) {
} else { } else {
*pTimestamp = tsList[i]; *pTimestamp = tsList[i];
...@@ -2866,7 +2989,7 @@ static void deriv_function(SQLFunctionCtx *pCtx) { ...@@ -2866,7 +2989,7 @@ static void deriv_function(SQLFunctionCtx *pCtx) {
} }
} }
pDerivInfo->prevValue = (double) pData[i]; pDerivInfo->prevValue = (double)pData[i];
pDerivInfo->prevTs = tsList[i]; pDerivInfo->prevTs = tsList[i];
} }
break; break;
...@@ -3006,8 +3129,8 @@ static void diff_function(SQLFunctionCtx *pCtx) { ...@@ -3006,8 +3129,8 @@ static void diff_function(SQLFunctionCtx *pCtx) {
int32_t step = GET_FORWARD_DIRECTION_FACTOR(pCtx->order); int32_t step = GET_FORWARD_DIRECTION_FACTOR(pCtx->order);
int32_t i = (pCtx->order == TSDB_ORDER_ASC) ? 0 : pCtx->size - 1; int32_t i = (pCtx->order == TSDB_ORDER_ASC) ? 0 : pCtx->size - 1;
TSKEY* pTimestamp = pCtx->ptsOutputBuf; TSKEY *pTimestamp = pCtx->ptsOutputBuf;
TSKEY* tsList = GET_TS_LIST(pCtx); TSKEY *tsList = GET_TS_LIST(pCtx);
switch (pCtx->inputType) { switch (pCtx->inputType) {
case TSDB_DATA_TYPE_INT: { case TSDB_DATA_TYPE_INT: {
...@@ -3015,13 +3138,13 @@ static void diff_function(SQLFunctionCtx *pCtx) { ...@@ -3015,13 +3138,13 @@ static void diff_function(SQLFunctionCtx *pCtx) {
int32_t *pOutput = (int32_t *)pCtx->pOutput; int32_t *pOutput = (int32_t *)pCtx->pOutput;
for (; i < pCtx->size && i >= 0; i += step) { for (; i < pCtx->size && i >= 0; i += step) {
if (pCtx->hasNull && isNull((const char*) &pData[i], pCtx->inputType)) { if (pCtx->hasNull && isNull((const char *)&pData[i], pCtx->inputType)) {
continue; continue;
} }
if (pCtx->param[1].nType != INITIAL_VALUE_NOT_ASSIGNED) { // initial value is not set yet if (pCtx->param[1].nType != INITIAL_VALUE_NOT_ASSIGNED) { // initial value is not set yet
*pOutput = (int32_t)(pData[i] - pCtx->param[1].i64); // direct previous may be null *pOutput = (int32_t)(pData[i] - pCtx->param[1].i64); // direct previous may be null
*pTimestamp = (tsList != NULL)? tsList[i]:0; *pTimestamp = (tsList != NULL) ? tsList[i] : 0;
pOutput += 1; pOutput += 1;
pTimestamp += 1; pTimestamp += 1;
} }
...@@ -3037,13 +3160,13 @@ static void diff_function(SQLFunctionCtx *pCtx) { ...@@ -3037,13 +3160,13 @@ static void diff_function(SQLFunctionCtx *pCtx) {
int64_t *pOutput = (int64_t *)pCtx->pOutput; int64_t *pOutput = (int64_t *)pCtx->pOutput;
for (; i < pCtx->size && i >= 0; i += step) { for (; i < pCtx->size && i >= 0; i += step) {
if (pCtx->hasNull && isNull((const char*) &pData[i], pCtx->inputType)) { if (pCtx->hasNull && isNull((const char *)&pData[i], pCtx->inputType)) {
continue; continue;
} }
if (pCtx->param[1].nType != INITIAL_VALUE_NOT_ASSIGNED) { // initial value is not set yet if (pCtx->param[1].nType != INITIAL_VALUE_NOT_ASSIGNED) { // initial value is not set yet
*pOutput = pData[i] - pCtx->param[1].i64; // direct previous may be null *pOutput = pData[i] - pCtx->param[1].i64; // direct previous may be null
*pTimestamp = (tsList != NULL)? tsList[i]:0; *pTimestamp = (tsList != NULL) ? tsList[i] : 0;
pOutput += 1; pOutput += 1;
pTimestamp += 1; pTimestamp += 1;
} }
...@@ -3059,13 +3182,13 @@ static void diff_function(SQLFunctionCtx *pCtx) { ...@@ -3059,13 +3182,13 @@ static void diff_function(SQLFunctionCtx *pCtx) {
double *pOutput = (double *)pCtx->pOutput; double *pOutput = (double *)pCtx->pOutput;
for (; i < pCtx->size && i >= 0; i += step) { for (; i < pCtx->size && i >= 0; i += step) {
if (pCtx->hasNull && isNull((const char*) &pData[i], pCtx->inputType)) { if (pCtx->hasNull && isNull((const char *)&pData[i], pCtx->inputType)) {
continue; continue;
} }
if (pCtx->param[1].nType != INITIAL_VALUE_NOT_ASSIGNED) { // initial value is not set yet if (pCtx->param[1].nType != INITIAL_VALUE_NOT_ASSIGNED) { // initial value is not set yet
SET_DOUBLE_VAL(pOutput, pData[i] - pCtx->param[1].dKey); // direct previous may be null SET_DOUBLE_VAL(pOutput, pData[i] - pCtx->param[1].dKey); // direct previous may be null
*pTimestamp = (tsList != NULL)? tsList[i]:0; *pTimestamp = (tsList != NULL) ? tsList[i] : 0;
pOutput += 1; pOutput += 1;
pTimestamp += 1; pTimestamp += 1;
} }
...@@ -3081,13 +3204,13 @@ static void diff_function(SQLFunctionCtx *pCtx) { ...@@ -3081,13 +3204,13 @@ static void diff_function(SQLFunctionCtx *pCtx) {
float *pOutput = (float *)pCtx->pOutput; float *pOutput = (float *)pCtx->pOutput;
for (; i < pCtx->size && i >= 0; i += step) { for (; i < pCtx->size && i >= 0; i += step) {
if (pCtx->hasNull && isNull((const char*) &pData[i], pCtx->inputType)) { if (pCtx->hasNull && isNull((const char *)&pData[i], pCtx->inputType)) {
continue; continue;
} }
if (pCtx->param[1].nType != INITIAL_VALUE_NOT_ASSIGNED) { // initial value is not set yet if (pCtx->param[1].nType != INITIAL_VALUE_NOT_ASSIGNED) { // initial value is not set yet
*pOutput = (float)(pData[i] - pCtx->param[1].dKey); // direct previous may be null *pOutput = (float)(pData[i] - pCtx->param[1].dKey); // direct previous may be null
*pTimestamp = (tsList != NULL)? tsList[i]:0; *pTimestamp = (tsList != NULL) ? tsList[i] : 0;
pOutput += 1; pOutput += 1;
pTimestamp += 1; pTimestamp += 1;
} }
...@@ -3103,13 +3226,13 @@ static void diff_function(SQLFunctionCtx *pCtx) { ...@@ -3103,13 +3226,13 @@ static void diff_function(SQLFunctionCtx *pCtx) {
int16_t *pOutput = (int16_t *)pCtx->pOutput; int16_t *pOutput = (int16_t *)pCtx->pOutput;
for (; i < pCtx->size && i >= 0; i += step) { for (; i < pCtx->size && i >= 0; i += step) {
if (pCtx->hasNull && isNull((const char*) &pData[i], pCtx->inputType)) { if (pCtx->hasNull && isNull((const char *)&pData[i], pCtx->inputType)) {
continue; continue;
} }
if (pCtx->param[1].nType != INITIAL_VALUE_NOT_ASSIGNED) { // initial value is not set yet if (pCtx->param[1].nType != INITIAL_VALUE_NOT_ASSIGNED) { // initial value is not set yet
*pOutput = (int16_t)(pData[i] - pCtx->param[1].i64); // direct previous may be null *pOutput = (int16_t)(pData[i] - pCtx->param[1].i64); // direct previous may be null
*pTimestamp = (tsList != NULL)? tsList[i]:0; *pTimestamp = (tsList != NULL) ? tsList[i] : 0;
pOutput += 1; pOutput += 1;
pTimestamp += 1; pTimestamp += 1;
} }
...@@ -3132,7 +3255,7 @@ static void diff_function(SQLFunctionCtx *pCtx) { ...@@ -3132,7 +3255,7 @@ static void diff_function(SQLFunctionCtx *pCtx) {
if (pCtx->param[1].nType != INITIAL_VALUE_NOT_ASSIGNED) { // initial value is not set yet if (pCtx->param[1].nType != INITIAL_VALUE_NOT_ASSIGNED) { // initial value is not set yet
*pOutput = (int8_t)(pData[i] - pCtx->param[1].i64); // direct previous may be null *pOutput = (int8_t)(pData[i] - pCtx->param[1].i64); // direct previous may be null
*pTimestamp = (tsList != NULL)? tsList[i]:0; *pTimestamp = (tsList != NULL) ? tsList[i] : 0;
pOutput += 1; pOutput += 1;
pTimestamp += 1; pTimestamp += 1;
} }
...@@ -3161,7 +3284,7 @@ static void diff_function(SQLFunctionCtx *pCtx) { ...@@ -3161,7 +3284,7 @@ static void diff_function(SQLFunctionCtx *pCtx) {
} }
} }
char *getArithColumnData(void *param, const char* name, int32_t colId) { char *getArithColumnData(void *param, const char *name, int32_t colId) {
SArithmeticSupport *pSupport = (SArithmeticSupport *)param; SArithmeticSupport *pSupport = (SArithmeticSupport *)param;
int32_t index = -1; int32_t index = -1;
...@@ -3201,7 +3324,7 @@ static void arithmetic_function(SQLFunctionCtx *pCtx) { ...@@ -3201,7 +3324,7 @@ static void arithmetic_function(SQLFunctionCtx *pCtx) {
} }
///////////////////////////////////////////////////////////////////////////////// /////////////////////////////////////////////////////////////////////////////////
static bool spread_function_setup(SQLFunctionCtx *pCtx, SResultRowCellInfo* pResInfo) { static bool spread_function_setup(SQLFunctionCtx *pCtx, SResultRowCellInfo *pResInfo) {
if (!function_setup(pCtx, pResInfo)) { if (!function_setup(pCtx, pResInfo)) {
return false; return false;
} }
...@@ -3222,7 +3345,7 @@ static bool spread_function_setup(SQLFunctionCtx *pCtx, SResultRowCellInfo* pRes ...@@ -3222,7 +3345,7 @@ static bool spread_function_setup(SQLFunctionCtx *pCtx, SResultRowCellInfo* pRes
static void spread_function(SQLFunctionCtx *pCtx) { static void spread_function(SQLFunctionCtx *pCtx) {
SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx);
SSpreadInfo *pInfo = GET_ROWCELL_INTERBUF(pResInfo); SSpreadInfo * pInfo = GET_ROWCELL_INTERBUF(pResInfo);
int32_t numOfElems = 0; int32_t numOfElems = 0;
...@@ -3287,7 +3410,7 @@ static void spread_function(SQLFunctionCtx *pCtx) { ...@@ -3287,7 +3410,7 @@ static void spread_function(SQLFunctionCtx *pCtx) {
assert(pCtx->size == numOfElems); assert(pCtx->size == numOfElems);
} }
_spread_over: _spread_over:
SET_VAL(pCtx, numOfElems, 1); SET_VAL(pCtx, numOfElems, 1);
if (numOfElems > 0) { if (numOfElems > 0) {
...@@ -3354,13 +3477,12 @@ void spread_function_finalizer(SQLFunctionCtx *pCtx) { ...@@ -3354,13 +3477,12 @@ void spread_function_finalizer(SQLFunctionCtx *pCtx) {
doFinalizer(pCtx); doFinalizer(pCtx);
} }
/** /**
* param[1]: start time * param[1]: start time
* param[2]: end time * param[2]: end time
* @param pCtx * @param pCtx
*/ */
static bool twa_function_setup(SQLFunctionCtx *pCtx, SResultRowCellInfo* pResInfo) { static bool twa_function_setup(SQLFunctionCtx *pCtx, SResultRowCellInfo *pResInfo) {
if (!function_setup(pCtx, pResInfo)) { if (!function_setup(pCtx, pResInfo)) {
return false; return false;
} }
...@@ -3372,25 +3494,25 @@ static bool twa_function_setup(SQLFunctionCtx *pCtx, SResultRowCellInfo* pResInf ...@@ -3372,25 +3494,25 @@ static bool twa_function_setup(SQLFunctionCtx *pCtx, SResultRowCellInfo* pResInf
} }
static double twa_get_area(SPoint1 s, SPoint1 e) { static double twa_get_area(SPoint1 s, SPoint1 e) {
if ((s.val >= 0 && e.val >= 0)|| (s.val <=0 && e.val <= 0)) { if ((s.val >= 0 && e.val >= 0) || (s.val <= 0 && e.val <= 0)) {
return (s.val + e.val) * (e.key - s.key) / 2; return (s.val + e.val) * (e.key - s.key) / 2;
} }
double x = (s.key * e.val - e.key * s.val)/(e.val - s.val); double x = (s.key * e.val - e.key * s.val) / (e.val - s.val);
double val = (s.val * (x - s.key) + e.val * (e.key - x)) / 2; double val = (s.val * (x - s.key) + e.val * (e.key - x)) / 2;
return val; return val;
} }
static int32_t twa_function_impl(SQLFunctionCtx* pCtx, int32_t index, int32_t size) { static int32_t twa_function_impl(SQLFunctionCtx *pCtx, int32_t index, int32_t size) {
int32_t notNullElems = 0; int32_t notNullElems = 0;
SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx);
STwaInfo *pInfo = GET_ROWCELL_INTERBUF(pResInfo); STwaInfo *pInfo = GET_ROWCELL_INTERBUF(pResInfo);
TSKEY *tsList = GET_TS_LIST(pCtx); TSKEY * tsList = GET_TS_LIST(pCtx);
int32_t i = index; int32_t i = index;
int32_t step = GET_FORWARD_DIRECTION_FACTOR(pCtx->order); int32_t step = GET_FORWARD_DIRECTION_FACTOR(pCtx->order);
SPoint1* last = &pInfo->p; SPoint1 *last = &pInfo->p;
if (pCtx->start.key != INT64_MIN) { if (pCtx->start.key != INT64_MIN) {
assert((pCtx->start.key < tsList[i] && pCtx->order == TSDB_ORDER_ASC) || assert((pCtx->start.key < tsList[i] && pCtx->order == TSDB_ORDER_ASC) ||
...@@ -3418,11 +3540,11 @@ static int32_t twa_function_impl(SQLFunctionCtx* pCtx, int32_t index, int32_t si ...@@ -3418,11 +3540,11 @@ static int32_t twa_function_impl(SQLFunctionCtx* pCtx, int32_t index, int32_t si
} }
// calculate the value of // calculate the value of
switch(pCtx->inputType) { switch (pCtx->inputType) {
case TSDB_DATA_TYPE_TINYINT: { case TSDB_DATA_TYPE_TINYINT: {
int8_t *val = (int8_t*) GET_INPUT_DATA(pCtx, 0); int8_t *val = (int8_t *)GET_INPUT_DATA(pCtx, 0);
for (; i < size && i >= 0; i += step) { for (; i < size && i >= 0; i += step) {
if (pCtx->hasNull && isNull((const char*) &val[i], pCtx->inputType)) { if (pCtx->hasNull && isNull((const char *)&val[i], pCtx->inputType)) {
continue; continue;
} }
...@@ -3439,9 +3561,9 @@ static int32_t twa_function_impl(SQLFunctionCtx* pCtx, int32_t index, int32_t si ...@@ -3439,9 +3561,9 @@ static int32_t twa_function_impl(SQLFunctionCtx* pCtx, int32_t index, int32_t si
break; break;
} }
case TSDB_DATA_TYPE_SMALLINT: { case TSDB_DATA_TYPE_SMALLINT: {
int16_t *val = (int16_t*) GET_INPUT_DATA(pCtx, 0); int16_t *val = (int16_t *)GET_INPUT_DATA(pCtx, 0);
for (; i < size && i >= 0; i += step) { for (; i < size && i >= 0; i += step) {
if (pCtx->hasNull && isNull((const char*) &val[i], pCtx->inputType)) { if (pCtx->hasNull && isNull((const char *)&val[i], pCtx->inputType)) {
continue; continue;
} }
...@@ -3458,9 +3580,9 @@ static int32_t twa_function_impl(SQLFunctionCtx* pCtx, int32_t index, int32_t si ...@@ -3458,9 +3580,9 @@ static int32_t twa_function_impl(SQLFunctionCtx* pCtx, int32_t index, int32_t si
break; break;
} }
case TSDB_DATA_TYPE_INT: { case TSDB_DATA_TYPE_INT: {
int32_t *val = (int32_t*) GET_INPUT_DATA(pCtx, 0); int32_t *val = (int32_t *)GET_INPUT_DATA(pCtx, 0);
for (; i < size && i >= 0; i += step) { for (; i < size && i >= 0; i += step) {
if (pCtx->hasNull && isNull((const char*) &val[i], pCtx->inputType)) { if (pCtx->hasNull && isNull((const char *)&val[i], pCtx->inputType)) {
continue; continue;
} }
...@@ -3477,14 +3599,14 @@ static int32_t twa_function_impl(SQLFunctionCtx* pCtx, int32_t index, int32_t si ...@@ -3477,14 +3599,14 @@ static int32_t twa_function_impl(SQLFunctionCtx* pCtx, int32_t index, int32_t si
break; break;
} }
case TSDB_DATA_TYPE_BIGINT: { case TSDB_DATA_TYPE_BIGINT: {
int64_t *val = (int64_t*) GET_INPUT_DATA(pCtx, 0); int64_t *val = (int64_t *)GET_INPUT_DATA(pCtx, 0);
for (; i < size && i >= 0; i += step) { for (; i < size && i >= 0; i += step) {
if (pCtx->hasNull && isNull((const char*) &val[i], pCtx->inputType)) { if (pCtx->hasNull && isNull((const char *)&val[i], pCtx->inputType)) {
continue; continue;
} }
#ifndef _TD_NINGSI_60 #ifndef _TD_NINGSI_60
SPoint1 st = {.key = tsList[i], .val = (double) val[i]}; SPoint1 st = {.key = tsList[i], .val = (double)val[i]};
#else #else
SPoint1 st; SPoint1 st;
st.key = tsList[i]; st.key = tsList[i];
...@@ -3496,9 +3618,9 @@ static int32_t twa_function_impl(SQLFunctionCtx* pCtx, int32_t index, int32_t si ...@@ -3496,9 +3618,9 @@ static int32_t twa_function_impl(SQLFunctionCtx* pCtx, int32_t index, int32_t si
break; break;
} }
case TSDB_DATA_TYPE_FLOAT: { case TSDB_DATA_TYPE_FLOAT: {
float *val = (float*) GET_INPUT_DATA(pCtx, 0); float *val = (float *)GET_INPUT_DATA(pCtx, 0);
for (; i < size && i >= 0; i += step) { for (; i < size && i >= 0; i += step) {
if (pCtx->hasNull && isNull((const char*) &val[i], pCtx->inputType)) { if (pCtx->hasNull && isNull((const char *)&val[i], pCtx->inputType)) {
continue; continue;
} }
...@@ -3515,9 +3637,9 @@ static int32_t twa_function_impl(SQLFunctionCtx* pCtx, int32_t index, int32_t si ...@@ -3515,9 +3637,9 @@ static int32_t twa_function_impl(SQLFunctionCtx* pCtx, int32_t index, int32_t si
break; break;
} }
case TSDB_DATA_TYPE_DOUBLE: { case TSDB_DATA_TYPE_DOUBLE: {
double *val = (double*) GET_INPUT_DATA(pCtx, 0); double *val = (double *)GET_INPUT_DATA(pCtx, 0);
for (; i < size && i >= 0; i += step) { for (; i < size && i >= 0; i += step) {
if (pCtx->hasNull && isNull((const char*) &val[i], pCtx->inputType)) { if (pCtx->hasNull && isNull((const char *)&val[i], pCtx->inputType)) {
continue; continue;
} }
...@@ -3534,9 +3656,9 @@ static int32_t twa_function_impl(SQLFunctionCtx* pCtx, int32_t index, int32_t si ...@@ -3534,9 +3656,9 @@ static int32_t twa_function_impl(SQLFunctionCtx* pCtx, int32_t index, int32_t si
break; break;
} }
case TSDB_DATA_TYPE_UTINYINT: { case TSDB_DATA_TYPE_UTINYINT: {
uint8_t *val = (uint8_t*) GET_INPUT_DATA(pCtx, 0); uint8_t *val = (uint8_t *)GET_INPUT_DATA(pCtx, 0);
for (; i < size && i >= 0; i += step) { for (; i < size && i >= 0; i += step) {
if (pCtx->hasNull && isNull((const char*) &val[i], pCtx->inputType)) { if (pCtx->hasNull && isNull((const char *)&val[i], pCtx->inputType)) {
continue; continue;
} }
...@@ -3553,9 +3675,9 @@ static int32_t twa_function_impl(SQLFunctionCtx* pCtx, int32_t index, int32_t si ...@@ -3553,9 +3675,9 @@ static int32_t twa_function_impl(SQLFunctionCtx* pCtx, int32_t index, int32_t si
break; break;
} }
case TSDB_DATA_TYPE_USMALLINT: { case TSDB_DATA_TYPE_USMALLINT: {
uint16_t *val = (uint16_t*) GET_INPUT_DATA(pCtx, 0); uint16_t *val = (uint16_t *)GET_INPUT_DATA(pCtx, 0);
for (; i < size && i >= 0; i += step) { for (; i < size && i >= 0; i += step) {
if (pCtx->hasNull && isNull((const char*) &val[i], pCtx->inputType)) { if (pCtx->hasNull && isNull((const char *)&val[i], pCtx->inputType)) {
continue; continue;
} }
...@@ -3572,9 +3694,9 @@ static int32_t twa_function_impl(SQLFunctionCtx* pCtx, int32_t index, int32_t si ...@@ -3572,9 +3694,9 @@ static int32_t twa_function_impl(SQLFunctionCtx* pCtx, int32_t index, int32_t si
break; break;
} }
case TSDB_DATA_TYPE_UINT: { case TSDB_DATA_TYPE_UINT: {
uint32_t *val = (uint32_t*) GET_INPUT_DATA(pCtx, 0); uint32_t *val = (uint32_t *)GET_INPUT_DATA(pCtx, 0);
for (; i < size && i >= 0; i += step) { for (; i < size && i >= 0; i += step) {
if (pCtx->hasNull && isNull((const char*) &val[i], pCtx->inputType)) { if (pCtx->hasNull && isNull((const char *)&val[i], pCtx->inputType)) {
continue; continue;
} }
...@@ -3591,25 +3713,26 @@ static int32_t twa_function_impl(SQLFunctionCtx* pCtx, int32_t index, int32_t si ...@@ -3591,25 +3713,26 @@ static int32_t twa_function_impl(SQLFunctionCtx* pCtx, int32_t index, int32_t si
break; break;
} }
case TSDB_DATA_TYPE_UBIGINT: { case TSDB_DATA_TYPE_UBIGINT: {
uint64_t *val = (uint64_t*) GET_INPUT_DATA(pCtx, 0); uint64_t *val = (uint64_t *)GET_INPUT_DATA(pCtx, 0);
for (; i < size && i >= 0; i += step) { for (; i < size && i >= 0; i += step) {
if (pCtx->hasNull && isNull((const char*) &val[i], pCtx->inputType)) { if (pCtx->hasNull && isNull((const char *)&val[i], pCtx->inputType)) {
continue; continue;
} }
#ifndef _TD_NINGSI_60 #ifndef _TD_NINGSI_60
SPoint1 st = {.key = tsList[i], .val = (double) val[i]}; SPoint1 st = {.key = tsList[i], .val = (double)val[i]};
#else #else
SPoint1 st; SPoint1 st;
st.key = tsList[i]; st.key = tsList[i];
st.val = (double) val[i]; st.val = (double)val[i];
#endif #endif
pInfo->dOutput += twa_get_area(pInfo->p, st); pInfo->dOutput += twa_get_area(pInfo->p, st);
pInfo->p = st; pInfo->p = st;
} }
break; break;
} }
default: assert(0); default:
assert(0);
} }
// the last interpolated time window value // the last interpolated time window value
...@@ -3626,11 +3749,11 @@ static void twa_function(SQLFunctionCtx *pCtx) { ...@@ -3626,11 +3749,11 @@ static void twa_function(SQLFunctionCtx *pCtx) {
void *data = GET_INPUT_DATA_LIST(pCtx); void *data = GET_INPUT_DATA_LIST(pCtx);
SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx);
STwaInfo *pInfo = GET_ROWCELL_INTERBUF(pResInfo); STwaInfo * pInfo = GET_ROWCELL_INTERBUF(pResInfo);
// skip null value // skip null value
int32_t step = GET_FORWARD_DIRECTION_FACTOR(pCtx->order); int32_t step = GET_FORWARD_DIRECTION_FACTOR(pCtx->order);
int32_t i = (pCtx->order == TSDB_ORDER_ASC)? 0:(pCtx->size - 1); int32_t i = (pCtx->order == TSDB_ORDER_ASC) ? 0 : (pCtx->size - 1);
while (pCtx->hasNull && i < pCtx->size && i >= 0 && isNull((char *)data + pCtx->inputBytes * i, pCtx->inputType)) { while (pCtx->hasNull && i < pCtx->size && i >= 0 && isNull((char *)data + pCtx->inputBytes * i, pCtx->inputType)) {
i += step; i += step;
} }
...@@ -3677,7 +3800,7 @@ void twa_function_finalizer(SQLFunctionCtx *pCtx) { ...@@ -3677,7 +3800,7 @@ void twa_function_finalizer(SQLFunctionCtx *pCtx) {
if (pInfo->win.ekey == pInfo->win.skey) { if (pInfo->win.ekey == pInfo->win.skey) {
SET_DOUBLE_VAL((double *)pCtx->pOutput, pInfo->p.val); SET_DOUBLE_VAL((double *)pCtx->pOutput, pInfo->p.val);
} else { } else {
SET_DOUBLE_VAL((double *)pCtx->pOutput , pInfo->dOutput / (pInfo->win.ekey - pInfo->win.skey)); SET_DOUBLE_VAL((double *)pCtx->pOutput, pInfo->dOutput / (pInfo->win.ekey - pInfo->win.skey));
} }
GET_RES_INFO(pCtx)->numOfRes = 1; GET_RES_INFO(pCtx)->numOfRes = 1;
...@@ -3690,7 +3813,7 @@ void twa_function_finalizer(SQLFunctionCtx *pCtx) { ...@@ -3690,7 +3813,7 @@ void twa_function_finalizer(SQLFunctionCtx *pCtx) {
*/ */
static void interp_function_impl(SQLFunctionCtx *pCtx) { static void interp_function_impl(SQLFunctionCtx *pCtx) {
int32_t type = (int32_t) pCtx->param[2].i64; int32_t type = (int32_t)pCtx->param[2].i64;
if (type == TSDB_FILL_NONE) { if (type == TSDB_FILL_NONE) {
return; return;
} }
...@@ -3704,7 +3827,9 @@ static void interp_function_impl(SQLFunctionCtx *pCtx) { ...@@ -3704,7 +3827,9 @@ static void interp_function_impl(SQLFunctionCtx *pCtx) {
} else if (type == TSDB_FILL_SET_VALUE) { } else if (type == TSDB_FILL_SET_VALUE) {
tVariantDump(&pCtx->param[1], pCtx->pOutput, pCtx->inputType, true); tVariantDump(&pCtx->param[1], pCtx->pOutput, pCtx->inputType, true);
} else { } else {
if (pCtx->start.key != INT64_MIN && ((ascQuery && pCtx->start.key <= pCtx->startTs && pCtx->end.key >= pCtx->startTs) || ((!ascQuery) && pCtx->start.key >= pCtx->startTs && pCtx->end.key <= pCtx->startTs))) { if (pCtx->start.key != INT64_MIN &&
((ascQuery && pCtx->start.key <= pCtx->startTs && pCtx->end.key >= pCtx->startTs) ||
((!ascQuery) && pCtx->start.key >= pCtx->startTs && pCtx->end.key <= pCtx->startTs))) {
if (type == TSDB_FILL_PREV) { if (type == TSDB_FILL_PREV) {
if (IS_NUMERIC_TYPE(pCtx->inputType) || pCtx->inputType == TSDB_DATA_TYPE_BOOL) { if (IS_NUMERIC_TYPE(pCtx->inputType) || pCtx->inputType == TSDB_DATA_TYPE_BOOL) {
SET_TYPED_DATA(pCtx->pOutput, pCtx->inputType, pCtx->start.val); SET_TYPED_DATA(pCtx->pOutput, pCtx->inputType, pCtx->start.val);
...@@ -3750,14 +3875,14 @@ static void interp_function_impl(SQLFunctionCtx *pCtx) { ...@@ -3750,14 +3875,14 @@ static void interp_function_impl(SQLFunctionCtx *pCtx) {
if (pCtx->size > 1) { if (pCtx->size > 1) {
TSKEY ekey = GET_TS_DATA(pCtx, 1); TSKEY ekey = GET_TS_DATA(pCtx, 1);
if ((ascQuery && ekey > skey && ekey <= pCtx->startTs) || if ((ascQuery && ekey > skey && ekey <= pCtx->startTs) ||
((!ascQuery) && ekey < skey && ekey >= pCtx->startTs)){ ((!ascQuery) && ekey < skey && ekey >= pCtx->startTs)) {
skey = ekey; skey = ekey;
} }
} }
assignVal(pCtx->pOutput, pCtx->pInput, pCtx->outputBytes, pCtx->inputType); assignVal(pCtx->pOutput, pCtx->pInput, pCtx->outputBytes, pCtx->inputType);
} else if (type == TSDB_FILL_NEXT) { } else if (type == TSDB_FILL_NEXT) {
TSKEY ekey = skey; TSKEY ekey = skey;
char* val = NULL; char *val = NULL;
if ((ascQuery && ekey < pCtx->startTs) || ((!ascQuery) && ekey > pCtx->startTs)) { if ((ascQuery && ekey < pCtx->startTs) || ((!ascQuery) && ekey > pCtx->startTs)) {
if (pCtx->size > 1) { if (pCtx->size > 1) {
...@@ -3766,12 +3891,12 @@ static void interp_function_impl(SQLFunctionCtx *pCtx) { ...@@ -3766,12 +3891,12 @@ static void interp_function_impl(SQLFunctionCtx *pCtx) {
return; return;
} }
val = ((char*)pCtx->pInput) + pCtx->inputBytes; val = ((char *)pCtx->pInput) + pCtx->inputBytes;
} else { } else {
return; return;
} }
} else { } else {
val = (char*)pCtx->pInput; val = (char *)pCtx->pInput;
} }
assignVal(pCtx->pOutput, val, pCtx->outputBytes, pCtx->inputType); assignVal(pCtx->pOutput, val, pCtx->outputBytes, pCtx->inputType);
...@@ -3783,8 +3908,8 @@ static void interp_function_impl(SQLFunctionCtx *pCtx) { ...@@ -3783,8 +3908,8 @@ static void interp_function_impl(SQLFunctionCtx *pCtx) {
TSKEY ekey = GET_TS_DATA(pCtx, 1); TSKEY ekey = GET_TS_DATA(pCtx, 1);
// no data generated yet // no data generated yet
if ((ascQuery && !(skey <= pCtx->startTs && ekey >= pCtx->startTs)) if ((ascQuery && !(skey <= pCtx->startTs && ekey >= pCtx->startTs)) ||
|| ((!ascQuery) && !(skey >= pCtx->startTs && ekey <= pCtx->startTs))) { ((!ascQuery) && !(skey >= pCtx->startTs && ekey <= pCtx->startTs))) {
return; return;
} }
...@@ -3817,7 +3942,7 @@ static void interp_function(SQLFunctionCtx *pCtx) { ...@@ -3817,7 +3942,7 @@ static void interp_function(SQLFunctionCtx *pCtx) {
if (pCtx->size > 0) { if (pCtx->size > 0) {
bool ascQuery = (pCtx->order == TSDB_ORDER_ASC); bool ascQuery = (pCtx->order == TSDB_ORDER_ASC);
TSKEY key; TSKEY key;
char *pData; char * pData;
int32_t typedData = 0; int32_t typedData = 0;
if (ascQuery) { if (ascQuery) {
...@@ -3838,7 +3963,8 @@ static void interp_function(SQLFunctionCtx *pCtx) { ...@@ -3838,7 +3963,8 @@ static void interp_function(SQLFunctionCtx *pCtx) {
} }
} }
//if (key == pCtx->startTs && (ascQuery || !(IS_NUMERIC_TYPE(pCtx->inputType) || pCtx->inputType == TSDB_DATA_TYPE_BOOL))) { // if (key == pCtx->startTs && (ascQuery || !(IS_NUMERIC_TYPE(pCtx->inputType) || pCtx->inputType ==
// TSDB_DATA_TYPE_BOOL))) {
if (key == pCtx->startTs) { if (key == pCtx->startTs) {
if (typedData) { if (typedData) {
SET_TYPED_DATA(pCtx->pOutput, pCtx->inputType, *(double *)pData); SET_TYPED_DATA(pCtx->pOutput, pCtx->inputType, *(double *)pData);
...@@ -3850,12 +3976,12 @@ static void interp_function(SQLFunctionCtx *pCtx) { ...@@ -3850,12 +3976,12 @@ static void interp_function(SQLFunctionCtx *pCtx) {
} else { } else {
interp_function_impl(pCtx); interp_function_impl(pCtx);
} }
} else { //no qualified data rows and interpolation is required } else { // no qualified data rows and interpolation is required
interp_function_impl(pCtx); interp_function_impl(pCtx);
} }
} }
static bool ts_comp_function_setup(SQLFunctionCtx *pCtx, SResultRowCellInfo* pResInfo) { static bool ts_comp_function_setup(SQLFunctionCtx *pCtx, SResultRowCellInfo *pResInfo) {
if (!function_setup(pCtx, pResInfo)) { if (!function_setup(pCtx, pResInfo)) {
return false; // not initialized since it has been initialized return false; // not initialized since it has been initialized
} }
...@@ -3893,7 +4019,7 @@ static void ts_comp_finalize(SQLFunctionCtx *pCtx) { ...@@ -3893,7 +4019,7 @@ static void ts_comp_finalize(SQLFunctionCtx *pCtx) {
STSBuf * pTSbuf = pInfo->pTSBuf; STSBuf * pTSbuf = pInfo->pTSBuf;
tsBufFlush(pTSbuf); tsBufFlush(pTSbuf);
qDebug("total timestamp :%"PRId64, pTSbuf->numOfTotal); qDebug("total timestamp :%" PRId64, pTSbuf->numOfTotal);
// TODO refactor transfer ownership of current file // TODO refactor transfer ownership of current file
*(FILE **)pCtx->pOutput = pTSbuf->f; *(FILE **)pCtx->pOutput = pTSbuf->f;
...@@ -3914,7 +4040,7 @@ static void ts_comp_finalize(SQLFunctionCtx *pCtx) { ...@@ -3914,7 +4040,7 @@ static void ts_comp_finalize(SQLFunctionCtx *pCtx) {
////////////////////////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////////////////////////
// rate functions // rate functions
static double do_calc_rate(const SRateInfo* pRateInfo, double tickPerSec) { static double do_calc_rate(const SRateInfo *pRateInfo, double tickPerSec) {
if ((INT64_MIN == pRateInfo->lastKey) || (INT64_MIN == pRateInfo->firstKey) || if ((INT64_MIN == pRateInfo->lastKey) || (INT64_MIN == pRateInfo->firstKey) ||
(pRateInfo->firstKey >= pRateInfo->lastKey)) { (pRateInfo->firstKey >= pRateInfo->lastKey)) {
return 0.0; return 0.0;
...@@ -3940,10 +4066,10 @@ static double do_calc_rate(const SRateInfo* pRateInfo, double tickPerSec) { ...@@ -3940,10 +4066,10 @@ static double do_calc_rate(const SRateInfo* pRateInfo, double tickPerSec) {
return 0; return 0;
} }
return (duration > 0)? ((double)diff) / (duration/tickPerSec):0.0; return (duration > 0) ? ((double)diff) / (duration / tickPerSec) : 0.0;
} }
static bool rate_function_setup(SQLFunctionCtx *pCtx, SResultRowCellInfo* pResInfo) { static bool rate_function_setup(SQLFunctionCtx *pCtx, SResultRowCellInfo *pResInfo) {
if (!function_setup(pCtx, pResInfo)) { if (!function_setup(pCtx, pResInfo)) {
return false; return false;
} }
...@@ -3952,8 +4078,8 @@ static bool rate_function_setup(SQLFunctionCtx *pCtx, SResultRowCellInfo* pResIn ...@@ -3952,8 +4078,8 @@ static bool rate_function_setup(SQLFunctionCtx *pCtx, SResultRowCellInfo* pResIn
pInfo->correctionValue = 0; pInfo->correctionValue = 0;
pInfo->firstKey = INT64_MIN; pInfo->firstKey = INT64_MIN;
pInfo->lastKey = INT64_MIN; pInfo->lastKey = INT64_MIN;
pInfo->firstValue = (double) INT64_MIN; pInfo->firstValue = (double)INT64_MIN;
pInfo->lastValue = (double) INT64_MIN; pInfo->lastValue = (double)INT64_MIN;
pInfo->hasResult = 0; pInfo->hasResult = 0;
pInfo->isIRate = (pCtx->functionId == TSDB_FUNC_IRATE); pInfo->isIRate = (pCtx->functionId == TSDB_FUNC_IRATE);
...@@ -3965,7 +4091,7 @@ static void rate_function(SQLFunctionCtx *pCtx) { ...@@ -3965,7 +4091,7 @@ static void rate_function(SQLFunctionCtx *pCtx) {
int32_t notNullElems = 0; int32_t notNullElems = 0;
SRateInfo *pRateInfo = (SRateInfo *)GET_ROWCELL_INTERBUF(pResInfo); SRateInfo *pRateInfo = (SRateInfo *)GET_ROWCELL_INTERBUF(pResInfo);
TSKEY *primaryKey = GET_TS_LIST(pCtx); TSKEY * primaryKey = GET_TS_LIST(pCtx);
qDebug("%p rate_function() size:%d, hasNull:%d", pCtx, pCtx->size, pCtx->hasNull); qDebug("%p rate_function() size:%d, hasNull:%d", pCtx, pCtx->size, pCtx->hasNull);
...@@ -4018,19 +4144,19 @@ static void rate_func_copy(SQLFunctionCtx *pCtx) { ...@@ -4018,19 +4144,19 @@ static void rate_func_copy(SQLFunctionCtx *pCtx) {
SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx);
memcpy(GET_ROWCELL_INTERBUF(pResInfo), pCtx->pInput, (size_t)pCtx->inputBytes); memcpy(GET_ROWCELL_INTERBUF(pResInfo), pCtx->pInput, (size_t)pCtx->inputBytes);
pResInfo->hasResult = ((SRateInfo*)pCtx->pInput)->hasResult; pResInfo->hasResult = ((SRateInfo *)pCtx->pInput)->hasResult;
} }
static void rate_finalizer(SQLFunctionCtx *pCtx) { static void rate_finalizer(SQLFunctionCtx *pCtx) {
SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx);
SRateInfo *pRateInfo = (SRateInfo *)GET_ROWCELL_INTERBUF(pResInfo); SRateInfo * pRateInfo = (SRateInfo *)GET_ROWCELL_INTERBUF(pResInfo);
if (pRateInfo->hasResult != DATA_SET_FLAG) { if (pRateInfo->hasResult != DATA_SET_FLAG) {
setNull(pCtx->pOutput, TSDB_DATA_TYPE_DOUBLE, sizeof(double)); setNull(pCtx->pOutput, TSDB_DATA_TYPE_DOUBLE, sizeof(double));
return; return;
} }
SET_DOUBLE_VAL((double*) pCtx->pOutput, do_calc_rate(pRateInfo, (double) TSDB_TICK_PER_SECOND(pCtx->param[0].i64))); SET_DOUBLE_VAL((double *)pCtx->pOutput, do_calc_rate(pRateInfo, (double)TSDB_TICK_PER_SECOND(pCtx->param[0].i64)));
// cannot set the numOfIteratedElems again since it is set during previous iteration // cannot set the numOfIteratedElems again since it is set during previous iteration
pResInfo->numOfRes = 1; pResInfo->numOfRes = 1;
...@@ -4044,7 +4170,7 @@ static void irate_function(SQLFunctionCtx *pCtx) { ...@@ -4044,7 +4170,7 @@ static void irate_function(SQLFunctionCtx *pCtx) {
int32_t notNullElems = 0; int32_t notNullElems = 0;
SRateInfo *pRateInfo = (SRateInfo *)GET_ROWCELL_INTERBUF(pResInfo); SRateInfo *pRateInfo = (SRateInfo *)GET_ROWCELL_INTERBUF(pResInfo);
TSKEY *primaryKey = GET_TS_LIST(pCtx); TSKEY * primaryKey = GET_TS_LIST(pCtx);
for (int32_t i = pCtx->size - 1; i >= 0; --i) { for (int32_t i = pCtx->size - 1; i >= 0; --i) {
char *pData = GET_INPUT_DATA(pCtx, i); char *pData = GET_INPUT_DATA(pCtx, i);
...@@ -4083,12 +4209,12 @@ static void irate_function(SQLFunctionCtx *pCtx) { ...@@ -4083,12 +4209,12 @@ static void irate_function(SQLFunctionCtx *pCtx) {
} }
} }
void blockInfo_func(SQLFunctionCtx* pCtx) { void blockInfo_func(SQLFunctionCtx *pCtx) {
SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx);
STableBlockDist* pDist = (STableBlockDist*) GET_ROWCELL_INTERBUF(pResInfo); STableBlockDist * pDist = (STableBlockDist *)GET_ROWCELL_INTERBUF(pResInfo);
int32_t len = *(int32_t*) pCtx->pInput; int32_t len = *(int32_t *)pCtx->pInput;
blockDistInfoFromBinary((char*)pCtx->pInput + sizeof(int32_t), len, pDist); blockDistInfoFromBinary((char *)pCtx->pInput + sizeof(int32_t), len, pDist);
pDist->rowSize = (uint16_t)pCtx->param[0].i64; pDist->rowSize = (uint16_t)pCtx->param[0].i64;
memcpy(pCtx->pOutput, pCtx->pInput, sizeof(int32_t) + len); memcpy(pCtx->pOutput, pCtx->pInput, sizeof(int32_t) + len);
...@@ -4097,8 +4223,8 @@ void blockInfo_func(SQLFunctionCtx* pCtx) { ...@@ -4097,8 +4223,8 @@ void blockInfo_func(SQLFunctionCtx* pCtx) {
pResInfo->hasResult = DATA_SET_FLAG; pResInfo->hasResult = DATA_SET_FLAG;
} }
static void mergeTableBlockDist(SResultRowCellInfo* pResInfo, const STableBlockDist* pSrc) { static void mergeTableBlockDist(SResultRowCellInfo *pResInfo, const STableBlockDist *pSrc) {
STableBlockDist* pDist = (STableBlockDist*) GET_ROWCELL_INTERBUF(pResInfo); STableBlockDist *pDist = (STableBlockDist *)GET_ROWCELL_INTERBUF(pResInfo);
assert(pDist != NULL && pSrc != NULL); assert(pDist != NULL && pSrc != NULL);
pDist->numOfTables += pSrc->numOfTables; pDist->numOfTables += pSrc->numOfTables;
...@@ -4115,7 +4241,7 @@ static void mergeTableBlockDist(SResultRowCellInfo* pResInfo, const STableBlockD ...@@ -4115,7 +4241,7 @@ static void mergeTableBlockDist(SResultRowCellInfo* pResInfo, const STableBlockD
pDist->maxRows = pSrc->maxRows; pDist->maxRows = pSrc->maxRows;
pDist->minRows = pSrc->minRows; pDist->minRows = pSrc->minRows;
int32_t maxSteps = TSDB_MAX_MAX_ROW_FBLOCK/TSDB_BLOCK_DIST_STEP_ROWS; int32_t maxSteps = TSDB_MAX_MAX_ROW_FBLOCK / TSDB_BLOCK_DIST_STEP_ROWS;
if (TSDB_MAX_MAX_ROW_FBLOCK % TSDB_BLOCK_DIST_STEP_ROWS != 0) { if (TSDB_MAX_MAX_ROW_FBLOCK % TSDB_BLOCK_DIST_STEP_ROWS != 0) {
++maxSteps; ++maxSteps;
} }
...@@ -4125,16 +4251,16 @@ static void mergeTableBlockDist(SResultRowCellInfo* pResInfo, const STableBlockD ...@@ -4125,16 +4251,16 @@ static void mergeTableBlockDist(SResultRowCellInfo* pResInfo, const STableBlockD
size_t steps = taosArrayGetSize(pSrc->dataBlockInfos); size_t steps = taosArrayGetSize(pSrc->dataBlockInfos);
for (int32_t i = 0; i < steps; ++i) { for (int32_t i = 0; i < steps; ++i) {
int32_t srcNumBlocks = ((SFileBlockInfo*)taosArrayGet(pSrc->dataBlockInfos, i))->numBlocksOfStep; int32_t srcNumBlocks = ((SFileBlockInfo *)taosArrayGet(pSrc->dataBlockInfos, i))->numBlocksOfStep;
SFileBlockInfo* blockInfo = (SFileBlockInfo*)taosArrayGet(pDist->dataBlockInfos, i); SFileBlockInfo *blockInfo = (SFileBlockInfo *)taosArrayGet(pDist->dataBlockInfos, i);
blockInfo->numBlocksOfStep += srcNumBlocks; blockInfo->numBlocksOfStep += srcNumBlocks;
} }
} }
void block_func_merge(SQLFunctionCtx* pCtx) { void block_func_merge(SQLFunctionCtx *pCtx) {
STableBlockDist info = {0}; STableBlockDist info = {0};
int32_t len = *(int32_t*) pCtx->pInput; int32_t len = *(int32_t *)pCtx->pInput;
blockDistInfoFromBinary(((char*)pCtx->pInput) + sizeof(int32_t), len, &info); blockDistInfoFromBinary(((char *)pCtx->pInput) + sizeof(int32_t), len, &info);
SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx);
mergeTableBlockDist(pResInfo, &info); mergeTableBlockDist(pResInfo, &info);
taosArrayDestroy(info.dataBlockInfos); taosArrayDestroy(info.dataBlockInfos);
...@@ -4143,8 +4269,8 @@ void block_func_merge(SQLFunctionCtx* pCtx) { ...@@ -4143,8 +4269,8 @@ void block_func_merge(SQLFunctionCtx* pCtx) {
pResInfo->hasResult = DATA_SET_FLAG; pResInfo->hasResult = DATA_SET_FLAG;
} }
void getPercentiles(STableBlockDist *pTableBlockDist, int64_t totalBlocks, int32_t numOfPercents, void getPercentiles(STableBlockDist *pTableBlockDist, int64_t totalBlocks, int32_t numOfPercents, double *percents,
double* percents, int32_t* percentiles) { int32_t *percentiles) {
if (totalBlocks == 0) { if (totalBlocks == 0) {
for (int32_t i = 0; i < numOfPercents; ++i) { for (int32_t i = 0; i < numOfPercents; ++i) {
percentiles[i] = 0; percentiles[i] = 0;
...@@ -4174,16 +4300,16 @@ void getPercentiles(STableBlockDist *pTableBlockDist, int64_t totalBlocks, int32 ...@@ -4174,16 +4300,16 @@ void getPercentiles(STableBlockDist *pTableBlockDist, int64_t totalBlocks, int32
} }
for (int32_t i = 0; i < numOfPercents; ++i) { for (int32_t i = 0; i < numOfPercents; ++i) {
percentiles[i] = (percentiles[i]+1) * TSDB_BLOCK_DIST_STEP_ROWS - TSDB_BLOCK_DIST_STEP_ROWS/2; percentiles[i] = (percentiles[i] + 1) * TSDB_BLOCK_DIST_STEP_ROWS - TSDB_BLOCK_DIST_STEP_ROWS / 2;
} }
} }
void generateBlockDistResult(STableBlockDist *pTableBlockDist, char* result) { void generateBlockDistResult(STableBlockDist *pTableBlockDist, char *result) {
if (pTableBlockDist == NULL) { if (pTableBlockDist == NULL) {
return; return;
} }
SArray* blockInfos = pTableBlockDist->dataBlockInfos; SArray * blockInfos = pTableBlockDist->dataBlockInfos;
uint64_t totalRows = pTableBlockDist->totalRows; uint64_t totalRows = pTableBlockDist->totalRows;
size_t numSteps = taosArrayGetSize(blockInfos); size_t numSteps = taosArrayGetSize(blockInfos);
int64_t totalBlocks = 0; int64_t totalBlocks = 0;
...@@ -4195,7 +4321,7 @@ void generateBlockDistResult(STableBlockDist *pTableBlockDist, char* result) { ...@@ -4195,7 +4321,7 @@ void generateBlockDistResult(STableBlockDist *pTableBlockDist, char* result) {
totalBlocks += blocks; totalBlocks += blocks;
} }
avg = totalBlocks > 0 ? (int64_t)(totalRows/totalBlocks) : 0; avg = totalBlocks > 0 ? (int64_t)(totalRows / totalBlocks) : 0;
min = totalBlocks > 0 ? pTableBlockDist->minRows : 0; min = totalBlocks > 0 ? pTableBlockDist->minRows : 0;
max = totalBlocks > 0 ? pTableBlockDist->maxRows : 0; max = totalBlocks > 0 ? pTableBlockDist->maxRows : 0;
...@@ -4214,32 +4340,33 @@ void generateBlockDistResult(STableBlockDist *pTableBlockDist, char* result) { ...@@ -4214,32 +4340,33 @@ void generateBlockDistResult(STableBlockDist *pTableBlockDist, char* result) {
double percents[] = {0.05, 0.10, 0.20, 0.30, 0.40, 0.50, 0.60, 0.70, 0.80, 0.90, 0.95, 0.99}; double percents[] = {0.05, 0.10, 0.20, 0.30, 0.40, 0.50, 0.60, 0.70, 0.80, 0.90, 0.95, 0.99};
int32_t percentiles[] = {-1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1}; int32_t percentiles[] = {-1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1};
assert(sizeof(percents)/sizeof(double) == sizeof(percentiles)/sizeof(int32_t)); assert(sizeof(percents) / sizeof(double) == sizeof(percentiles) / sizeof(int32_t));
getPercentiles(pTableBlockDist, totalBlocks, sizeof(percents)/sizeof(double), percents, percentiles); getPercentiles(pTableBlockDist, totalBlocks, sizeof(percents) / sizeof(double), percents, percentiles);
uint64_t totalLen = pTableBlockDist->totalSize; uint64_t totalLen = pTableBlockDist->totalSize;
int32_t rowSize = pTableBlockDist->rowSize; int32_t rowSize = pTableBlockDist->rowSize;
int32_t smallBlocks = pTableBlockDist->numOfSmallBlocks; int32_t smallBlocks = pTableBlockDist->numOfSmallBlocks;
double compRatio = (totalRows>0) ? ((double)(totalLen)/(rowSize*totalRows)) : 1; double compRatio = (totalRows > 0) ? ((double)(totalLen) / (rowSize * totalRows)) : 1;
int sz = sprintf(result + VARSTR_HEADER_SIZE, int sz = sprintf(result + VARSTR_HEADER_SIZE,
"summary: \n\t " "summary: \n\t "
"5th=[%d], 10th=[%d], 20th=[%d], 30th=[%d], 40th=[%d], 50th=[%d]\n\t " "5th=[%d], 10th=[%d], 20th=[%d], 30th=[%d], 40th=[%d], 50th=[%d]\n\t "
"60th=[%d], 70th=[%d], 80th=[%d], 90th=[%d], 95th=[%d], 99th=[%d]\n\t " "60th=[%d], 70th=[%d], 80th=[%d], 90th=[%d], 95th=[%d], 99th=[%d]\n\t "
"Min=[%"PRId64"(Rows)] Max=[%"PRId64"(Rows)] Avg=[%"PRId64"(Rows)] Stddev=[%.2f] \n\t " "Min=[%" PRId64 "(Rows)] Max=[%" PRId64 "(Rows)] Avg=[%" PRId64
"Rows=[%"PRIu64"], Blocks=[%"PRId64"], SmallBlocks=[%d], Size=[%.3f(Kb)] Comp=[%.2f]\n\t " "(Rows)] Stddev=[%.2f] \n\t "
"Rows=[%" PRIu64 "], Blocks=[%" PRId64
"], SmallBlocks=[%d], Size=[%.3f(Kb)] Comp=[%.2f]\n\t "
"RowsInMem=[%d] \n\t", "RowsInMem=[%d] \n\t",
percentiles[0], percentiles[1], percentiles[2], percentiles[3], percentiles[4], percentiles[5], percentiles[0], percentiles[1], percentiles[2], percentiles[3], percentiles[4], percentiles[5],
percentiles[6], percentiles[7], percentiles[8], percentiles[9], percentiles[10], percentiles[11], percentiles[6], percentiles[7], percentiles[8], percentiles[9], percentiles[10], percentiles[11],
min, max, avg, stdDev, min, max, avg, stdDev, totalRows, totalBlocks, smallBlocks, totalLen / 1024.0, compRatio,
totalRows, totalBlocks, smallBlocks, totalLen/1024.0, compRatio,
pTableBlockDist->numOfRowsInMemTable); pTableBlockDist->numOfRowsInMemTable);
varDataSetLen(result, sz); varDataSetLen(result, sz);
UNUSED(sz); UNUSED(sz);
} }
void blockinfo_func_finalizer(SQLFunctionCtx* pCtx) { void blockinfo_func_finalizer(SQLFunctionCtx *pCtx) {
SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx);
STableBlockDist* pDist = (STableBlockDist*) GET_ROWCELL_INTERBUF(pResInfo); STableBlockDist * pDist = (STableBlockDist *)GET_ROWCELL_INTERBUF(pResInfo);
pDist->rowSize = (uint16_t)pCtx->param[0].i64; pDist->rowSize = (uint16_t)pCtx->param[0].i64;
generateBlockDistResult(pDist, pCtx->pOutput); generateBlockDistResult(pDist, pCtx->pOutput);
...@@ -4269,16 +4396,47 @@ void blockinfo_func_finalizer(SQLFunctionCtx* pCtx) { ...@@ -4269,16 +4396,47 @@ void blockinfo_func_finalizer(SQLFunctionCtx* pCtx) {
*/ */
int32_t functionCompatList[] = { int32_t functionCompatList[] = {
// count, sum, avg, min, max, stddev, percentile, apercentile, first, last // count, sum, avg, min, max, stddev, percentile, apercentile, first, last
1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1,
1,
1,
1,
1,
1,
1,
1,
1,
1,
// last_row,top, bottom, spread, twa, leastsqr, ts, ts_dummy, tag_dummy, ts_comp // last_row,top, bottom, spread, twa, leastsqr, ts, ts_dummy, tag_dummy, ts_comp
4, -1, -1, 1, 1, 1, 1, 1, 1, -1, 4,
-1,
-1,
1,
1,
1,
1,
1,
1,
-1,
// tag, colprj, tagprj, arithmetic, diff, first_dist, last_dist, stddev_dst, interp rate irate // tag, colprj, tagprj, arithmetic, diff, first_dist, last_dist, stddev_dst, interp rate irate
1, 1, 1, 1, -1, 1, 1, 1, 5, 1, 1, 1,
1,
1,
1,
-1,
1,
1,
1,
5,
1,
1,
// tid_tag, derivative, blk_info // tid_tag, derivative, blk_info
6, 8, 7, 6,
8,
7,
}; };
SAggFunctionInfo aAggs[] = {{ SAggFunctionInfo aAggs[] = {
{
// 0, count function does not invoke the finalize function // 0, count function does not invoke the finalize function
"count", "count",
TSDB_FUNC_COUNT, TSDB_FUNC_COUNT,
...@@ -4623,7 +4781,7 @@ SAggFunctionInfo aAggs[] = {{ ...@@ -4623,7 +4781,7 @@ SAggFunctionInfo aAggs[] = {{
"interp", "interp",
TSDB_FUNC_INTERP, TSDB_FUNC_INTERP,
TSDB_FUNC_INTERP, TSDB_FUNC_INTERP,
TSDB_FUNCSTATE_SO | TSDB_FUNCSTATE_OF | TSDB_FUNCSTATE_STABLE | TSDB_FUNCSTATE_NEED_TS , TSDB_FUNCSTATE_SO | TSDB_FUNCSTATE_OF | TSDB_FUNCSTATE_STABLE | TSDB_FUNCSTATE_NEED_TS,
function_setup, function_setup,
interp_function, interp_function,
doFinalizer, doFinalizer,
...@@ -4666,7 +4824,8 @@ SAggFunctionInfo aAggs[] = {{ ...@@ -4666,7 +4824,8 @@ SAggFunctionInfo aAggs[] = {{
noop1, noop1,
dataBlockRequired, dataBlockRequired,
}, },
{ //32 {
// 32
"derivative", // return table id and the corresponding tags for join match and subscribe "derivative", // return table id and the corresponding tags for join match and subscribe
TSDB_FUNC_DERIVATIVE, TSDB_FUNC_DERIVATIVE,
TSDB_FUNC_INVALID_ID, TSDB_FUNC_INVALID_ID,
......
/*
* src/tdigest.c
*
* Implementation of the t-digest data structure used to compute accurate percentiles.
*
* It is based on the MergingDigest implementation found at:
* https://github.com/tdunning/t-digest/blob/master/src/main/java/com/tdunning/math/stats/MergingDigest.java
*
* Copyright (c) 2016, Usman Masood <usmanm at fastmail dot fm>
*/
#include <ctype.h>
#include <float.h>
#include <math.h>
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include "tdigest.h"
#define INTERPOLATE(x, x0, x1) (((x) - (x0)) / ((x1) - (x0)))
#define INTEGRATED_LOCATION(compression, q) ((compression) * (asin(2 * (q) - 1) + M_PI / 2) / M_PI)
#define FLOAT_EQ(f1, f2) (fabs((f1) - (f2)) <= FLT_EPSILON)
/* From http://stackoverflow.com/questions/3437404/min-and-max-in-c */
#define MAX(a,b) \
({ __typeof__ (a) _a = (a); \
__typeof__ (b) _b = (b); \
_a > _b ? _a : _b; })
#define MIN(a,b) \
({ __typeof__ (a) _a = (a); \
__typeof__ (b) _b = (b); \
_a < _b ? _a : _b; })
struct TDigest *tdigestNew(int compression) {
struct TDigest *t = malloc(sizeof(struct TDigest));
memset(t, 0, sizeof(struct TDigest));
t->compression = compression;
t->size = ceil(compression * M_PI / 2) + 1;
t->threshold = 7.5 + 0.37 * compression - 2e-4 * pow(compression, 2);
t->min = INFINITY;
return t;
}
static int centroid_cmp(const void *a, const void *b) {
struct Centroid *c1 = (struct Centroid *) a;
struct Centroid *c2 = (struct Centroid *) b;
if (c1->mean < c2->mean)
return -1;
if (c1->mean > c2->mean)
return 1;
return 0;
}
struct MergeArgs {
struct TDigest *t;
struct Centroid *centroids;
int idx;
double weight_so_far;
double k1;
double min;
double max;
};
static void merge_centroid(struct MergeArgs *args, struct Centroid *merge) {
double k2;
struct Centroid *c = &args->centroids[args->idx];
args->weight_so_far += merge->weight;
k2 = INTEGRATED_LOCATION(args->t->compression,
args->weight_so_far / args->t->total_weight);
if (k2 - args->k1 > 1 && c->weight > 0) {
args->idx++;
args->k1 = INTEGRATED_LOCATION(args->t->compression,
(args->weight_so_far - merge->weight) / args->t->total_weight);
}
c = &args->centroids[args->idx];
c->weight += merge->weight;
c->mean += (merge->mean - c->mean) * merge->weight / c->weight;
if (merge->weight > 0) {
args->min = MIN(merge->mean, args->min);
args->max = MAX(merge->mean, args->max);
}
}
void tdigestCompress(struct TDigest *t) {
struct Centroid *unmerged_centroids;
long long unmerged_weight = 0;
int num_unmerged = t->num_buffered_pts;
int old_num_centroids = t->num_centroids;
int i, j;
struct MergeArgs args;
if (!t->num_buffered_pts)
return;
unmerged_centroids = malloc(sizeof(struct Centroid) * t->num_buffered_pts);
i = 0;
for (i = 0; i < num_unmerged; i++) {
struct Point *p = t->buffered_pts;
struct Centroid *c = &unmerged_centroids[i];
c->mean = p->value;
c->weight = p->weight;
unmerged_weight += c->weight;
t->buffered_pts = p->next;
free(p);
}
t->num_buffered_pts = 0;
t->total_weight += unmerged_weight;
qsort(unmerged_centroids, num_unmerged, sizeof(struct Centroid),
centroid_cmp);
memset(&args, 0, sizeof(struct MergeArgs));
args.centroids = malloc(sizeof(struct Centroid) * t->size);
memset(args.centroids, 0, sizeof(struct Centroid) * t->size);
args.t = t;
args.min = INFINITY;
i = 0;
j = 0;
while (i < num_unmerged && j < t->num_centroids) {
struct Centroid *a = &unmerged_centroids[i];
struct Centroid *b = &t->centroids[j];
if (a->mean <= b->mean) {
merge_centroid(&args, a);
i++;
} else {
merge_centroid(&args, b);
j++;
}
}
while (i < num_unmerged)
merge_centroid(&args, &unmerged_centroids[i++]);
free(unmerged_centroids);
while (j < t->num_centroids)
merge_centroid(&args, &t->centroids[j++]);
if (t->total_weight > 0) {
t->min = MIN(t->min, args.min);
if (args.centroids[args.idx].weight <= 0)
args.idx--;
t->num_centroids = args.idx + 1;
t->max = MAX(t->max, args.max);
}
if (t->num_centroids > old_num_centroids) {
t->centroids = realloc(t->centroids,
sizeof(struct Centroid) * t->num_centroids);
}
memcpy(t->centroids, args.centroids,
sizeof(struct Centroid) * t->num_centroids);
free(args.centroids);
}
void tdigestAdd(struct TDigest *t, double x, long long w) {
if (w == 0)
return;
struct Point *p = malloc(sizeof(struct Point));
p->value = x;
p->weight = w;
p->next = t->buffered_pts;
t->buffered_pts = p;
t->num_buffered_pts++;
if (t->num_buffered_pts > t->threshold)
tdigestCompress(t);
}
double tdigestCDF(struct TDigest *t, double x) {
if (t == NULL)
return 0;
int i;
double left, right;
long long weight_so_far;
struct Centroid *a, *b, tmp;
tdigestCompress(t);
if (t->num_centroids == 0)
return NAN;
if (x < t->min)
return 0;
if (x > t->max)
return 1;
if (t->num_centroids == 1) {
if (FLOAT_EQ(t->max, t->min))
return 0.5;
return INTERPOLATE(x, t->min, t->max);
}
weight_so_far = 0;
a = b = &tmp;
b->mean = t->min;
b->weight = 0;
right = 0;
for (i = 0; i < t->num_centroids; i++) {
struct Centroid *c = &t->centroids[i];
left = b->mean - (a->mean + right);
a = b;
b = c;
right = (b->mean - a->mean) * a->weight / (a->weight + b->weight);
if (x < a->mean + right) {
double cdf = (weight_so_far
+ a->weight
* INTERPOLATE(x, a->mean - left, a->mean + right))
/ t->total_weight;
return MAX(cdf, 0.0);
}
weight_so_far += a->weight;
}
left = b->mean - (a->mean + right);
a = b;
right = t->max - a->mean;
if (x < a->mean + right)
return (weight_so_far
+ a->weight * INTERPOLATE(x, a->mean - left, a->mean + right))
/ t->total_weight;
return 1;
}
double tdigestQuantile(struct TDigest *t, double q) {
if (t == NULL)
return 0;
int i;
double left, right, idx;
long long weight_so_far;
struct Centroid *a, *b, tmp;
tdigestCompress(t);
if (t->num_centroids == 0)
return NAN;
if (t->num_centroids == 1)
return t->centroids[0].mean;
if (FLOAT_EQ(q, 0.0))
return t->min;
if (FLOAT_EQ(q, 1.0))
return t->max;
idx = q * t->total_weight;
weight_so_far = 0;
b = &tmp;
b->mean = t->min;
b->weight = 0;
right = t->min;
for (i = 0; i < t->num_centroids; i++) {
struct Centroid *c = &t->centroids[i];
a = b;
left = right;
b = c;
right = (b->weight * a->mean + a->weight * b->mean)
/ (a->weight + b->weight);
if (idx < weight_so_far + a->weight) {
double p = (idx - weight_so_far) / a->weight;
return left * (1 - p) + right * p;
}
weight_so_far += a->weight;
}
left = right;
a = b;
right = t->max;
if (idx < weight_so_far + a->weight) {
double p = (idx - weight_so_far) / a->weight;
return left * (1 - p) + right * p;
}
return t->max;
}
void tdigestMerge(struct TDigest *t1, struct TDigest *t2) {
int i = t2->num_buffered_pts;
struct Point *p = t2->buffered_pts;
while (i) {
tdigestAdd(t1, p->value, p->weight);
p = p->next;
i--;
}
for (i = 0; i < t2->num_centroids; i++) {
tdigestAdd(t1, t2->centroids[i].mean, t2->centroids[i].weight);
}
}
void tdigestFree(struct TDigest *t) {
while (t->buffered_pts) {
struct Point *p = t->buffered_pts;
t->buffered_pts = t->buffered_pts->next;
free(p);
}
if (t->centroids)
free(t->centroids);
free(t);
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册