提交 390cba6d 编写于 作者: J jiacy-jcy

Merge branch '3.0' into test/jcy

......@@ -232,7 +232,8 @@ void blockDebugShowData(const SArray* dataBlocks);
int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks, STSchema* pTSchema, int32_t vgId,
tb_uid_t uid, tb_uid_t suid);
SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pSchema, bool createTb, int64_t suid, int32_t vgId);
SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pSchema, bool createTb, int64_t suid,
const char* stbFullName, int32_t vgId);
static FORCE_INLINE int32_t blockGetEncodeSize(const SSDataBlock* pBlock) {
return blockDataGetSerialMetaSize(pBlock) + blockDataGetSize(pBlock);
......
......@@ -1646,8 +1646,8 @@ _err:
return NULL;
}
// this message is sent from mnode to mnode(read thread to write thread), so there is no need for serialization or
// deserialization
// this message is sent from mnode to mnode(read thread to write thread),
// so there is no need for serialization or deserialization
typedef struct {
SHashObj* rebSubHash; // SHashObj<key, SMqRebSubscribe>
} SMqDoRebalanceMsg;
......
......@@ -142,6 +142,7 @@ typedef void FTbSink(SStreamTask* pTask, void* vnode, int64_t ver, void* data);
typedef struct {
int64_t stbUid;
char stbFullName[TSDB_TABLE_FNAME_LEN];
SSchemaWrapper* pSchemaWrapper;
// not applicable to encoder and decoder
void* vnode;
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
/*
* include/tdigest.c
*
* Copyright (c) 2016, Usman Masood <usmanm at fastmail dot fm>
*/
#ifndef TDIGEST_H
#define TDIGEST_H
#ifndef M_PI
#define M_PI 3.14159265358979323846264338327950288 /* pi */
#endif
#define DOUBLE_MAX 1.79e+308
#define ADDITION_CENTROID_NUM 2
#define COMPRESSION 300
#define GET_CENTROID(compression) (ceil(compression * M_PI / 2) + 1 + ADDITION_CENTROID_NUM)
#define GET_THRESHOLD(compression) (7.5 + 0.37 * compression - 2e-4 * pow(compression, 2))
#define TDIGEST_SIZE(compression) (sizeof(TDigest) + sizeof(SCentroid)*GET_CENTROID(compression) + sizeof(SPt)*GET_THRESHOLD(compression))
typedef struct SCentroid {
double mean;
int64_t weight;
}SCentroid;
typedef struct SPt {
double value;
int64_t weight;
}SPt;
typedef struct TDigest {
double compression;
int32_t threshold;
int64_t size;
int64_t total_weight;
double min;
double max;
int32_t num_buffered_pts;
SPt *buffered_pts;
int32_t num_centroids;
SCentroid *centroids;
}TDigest;
TDigest *tdigestNewFrom(void* pBuf, int32_t compression);
void tdigestAdd(TDigest *t, double x, int64_t w);
void tdigestMerge(TDigest *t1, TDigest *t2);
double tdigestQuantile(TDigest *t, double q);
void tdigestCompress(TDigest *t);
void tdigestFreeFrom(TDigest *t);
void tdigestAutoFill(TDigest* t, int32_t compression);
#endif /* TDIGEST_H */
......@@ -1630,7 +1630,7 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks
}
SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema, bool createTb, int64_t suid,
int32_t vgId) {
const char* stbFullName, int32_t vgId) {
SSubmitReq* ret = NULL;
// cal size
......@@ -1646,10 +1646,12 @@ SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema, boo
if (createTb) {
SVCreateTbReq createTbReq = {0};
createTbReq.name = "a";
char* cname = taosMemoryCalloc(1, TSDB_TABLE_FNAME_LEN);
snprintf(cname, TSDB_TABLE_FNAME_LEN, "%s:%ld", stbFullName, pDataBlock->info.groupId);
createTbReq.name = cname;
createTbReq.flags = 0;
createTbReq.type = TSDB_CHILD_TABLE;
createTbReq.ctb.suid = htobe64(suid);
createTbReq.ctb.suid = suid;
SKVRowBuilder kvRowBuilder = {0};
if (tdInitKVRowBuilder(&kvRowBuilder) < 0) {
......@@ -1662,6 +1664,7 @@ SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema, boo
int32_t code;
tEncodeSize(tEncodeSVCreateTbReq, &createTbReq, schemaLen, code);
if (code < 0) return NULL;
taosMemoryFree(cname);
}
cap += sizeof(SSubmitBlk) + schemaLen + rows * maxLen;
......@@ -1697,7 +1700,9 @@ SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema, boo
int32_t schemaLen = 0;
if (createTb) {
SVCreateTbReq createTbReq = {0};
createTbReq.name = "a";
char* cname = taosMemoryCalloc(1, TSDB_TABLE_FNAME_LEN);
snprintf(cname, TSDB_TABLE_FNAME_LEN, "%s:%ld", stbFullName, pDataBlock->info.groupId);
createTbReq.name = cname;
createTbReq.flags = 0;
createTbReq.type = TSDB_CHILD_TABLE;
createTbReq.ctb.suid = suid;
......
......@@ -3702,6 +3702,7 @@ int32_t tSerializeSCMCreateStreamReq(void *buf, int32_t bufLen, const SCMCreateS
if (tStartEncode(&encoder) < 0) return -1;
if (tEncodeCStr(&encoder, pReq->name) < 0) return -1;
if (tEncodeCStr(&encoder, pReq->sourceDB) < 0) return -1;
if (tEncodeCStr(&encoder, pReq->targetStbFullName) < 0) return -1;
if (tEncodeI8(&encoder, pReq->igExists) < 0) return -1;
if (tEncodeI32(&encoder, sqlLen) < 0) return -1;
......@@ -3727,6 +3728,7 @@ int32_t tDeserializeSCMCreateStreamReq(void *buf, int32_t bufLen, SCMCreateStrea
if (tStartDecode(&decoder) < 0) return -1;
if (tDecodeCStrTo(&decoder, pReq->name) < 0) return -1;
if (tDecodeCStrTo(&decoder, pReq->sourceDB) < 0) return -1;
if (tDecodeCStrTo(&decoder, pReq->targetStbFullName) < 0) return -1;
if (tDecodeI8(&decoder, &pReq->igExists) < 0) return -1;
if (tDecodeI32(&decoder, &sqlLen) < 0) return -1;
......
......@@ -127,7 +127,7 @@ int32_t tNameExtractFullName(const SName* name, char* dst) {
size_t tnameLen = strlen(name->tname);
if (tnameLen > 0) {
assert(name->type == TSDB_TABLE_NAME_T);
/*assert(name->type == TSDB_TABLE_NAME_T);*/
dst[len] = TS_PATH_DELIMITER[0];
memcpy(dst + len + 1, name->tname, tnameLen);
......@@ -314,9 +314,9 @@ void buildChildTableName(RandTableName* rName) {
for (int j = 0; j < taosArrayGetSize(rName->tags); ++j) {
SSmlKv* tagKv = taosArrayGetP(rName->tags, j);
taosStringBuilderAppendStringLen(&sb, tagKv->key, tagKv->keyLen);
if(IS_VAR_DATA_TYPE(tagKv->type)){
if (IS_VAR_DATA_TYPE(tagKv->type)) {
taosStringBuilderAppendStringLen(&sb, tagKv->value, tagKv->length);
}else{
} else {
taosStringBuilderAppendStringLen(&sb, (char*)(&(tagKv->value)), tagKv->length);
}
}
......
......@@ -206,6 +206,7 @@ int32_t mndAddShuffledSinkToStream(SMnode* pMnode, STrans* pTrans, SStreamObj* p
} else {
pTask->sinkType = TASK_SINK__TABLE;
pTask->tbSink.stbUid = pStream->targetStbUid;
memcpy(pTask->tbSink.stbFullName, pStream->targetSTbName, TSDB_TABLE_FNAME_LEN);
pTask->tbSink.pSchemaWrapper = tCloneSSchemaWrapper(&pStream->outputSchema);
ASSERT(pTask->tbSink.pSchemaWrapper);
}
......@@ -248,6 +249,7 @@ int32_t mndAddFixedSinkToStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStr
} else {
pTask->sinkType = TASK_SINK__TABLE;
pTask->tbSink.stbUid = pStream->targetStbUid;
memcpy(pTask->tbSink.stbFullName, pStream->targetSTbName, TSDB_TABLE_FNAME_LEN);
pTask->tbSink.pSchemaWrapper = tCloneSSchemaWrapper(&pStream->outputSchema);
}
......@@ -325,6 +327,7 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
} else {
pTask->sinkType = TASK_SINK__TABLE;
pTask->tbSink.stbUid = pStream->targetStbUid;
memcpy(pTask->tbSink.stbFullName, pStream->targetSTbName, TSDB_TABLE_FNAME_LEN);
pTask->tbSink.pSchemaWrapper = tCloneSSchemaWrapper(&pStream->outputSchema);
}
#endif
......
......@@ -456,7 +456,7 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
goto CREATE_STREAM_OVER;
}
pDb = mndAcquireDbByStream(pMnode, createStreamReq.name);
pDb = mndAcquireDb(pMnode, createStreamReq.sourceDB);
if (pDb == NULL) {
terrno = TSDB_CODE_MND_DB_NOT_SELECTED;
goto CREATE_STREAM_OVER;
......
......@@ -748,7 +748,8 @@ void tqTableSink(SStreamTask* pTask, void* vnode, int64_t ver, void* data) {
SVnode* pVnode = (SVnode*)vnode;
ASSERT(pTask->tbSink.pTSchema);
SSubmitReq* pReq = tdBlockToSubmit(pRes, pTask->tbSink.pTSchema, true, pTask->tbSink.stbUid, pVnode->config.vgId);
SSubmitReq* pReq = tdBlockToSubmit(pRes, pTask->tbSink.pTSchema, true, pTask->tbSink.stbUid,
pTask->tbSink.stbFullName, pVnode->config.vgId);
/*tPrintFixedSchemaSubmitReq(pReq, pTask->tbSink.pTSchema);*/
// build write msg
SRpcMsg msg = {
......
......@@ -73,6 +73,11 @@ bool percentileFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultI
int32_t percentileFunction(SqlFunctionCtx *pCtx);
int32_t percentileFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
bool getApercentileFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
bool apercentileFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo);
int32_t apercentileFunction(SqlFunctionCtx *pCtx);
int32_t apercentileFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
bool getDiffFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
bool diffFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResInfo);
int32_t diffFunction(SqlFunctionCtx *pCtx);
......
......@@ -760,19 +760,32 @@ static int32_t translateSubstr(SFunctionNode* pFunc, char* pErrBuf, int32_t len)
return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName);
}
SExprNode* pPara1 = (SExprNode*)nodesListGetNode(pFunc->pParameterList, 0);
uint8_t para2Type = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 1))->resType.type;
if (!IS_VAR_DATA_TYPE(pPara1->resType.type) || !IS_INTEGER_TYPE(para2Type)) {
SExprNode* pPara0 = (SExprNode*)nodesListGetNode(pFunc->pParameterList, 0);
SExprNode* p1 = (SExprNode*)nodesListGetNode(pFunc->pParameterList, 1);
uint8_t para1Type = p1->resType.type;
if (!IS_VAR_DATA_TYPE(pPara0->resType.type) || !IS_INTEGER_TYPE(para1Type)) {
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
}
if (((SValueNode*)p1)->datum.i < 1) {
return invaildFuncParaValueErrMsg(pErrBuf, len, pFunc->functionName);
}
if (3 == numOfParams) {
uint8_t para3Type = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 1))->resType.type;
if (!IS_INTEGER_TYPE(para3Type)) {
SExprNode* p2 = (SExprNode*)nodesListGetNode(pFunc->pParameterList, 2);
uint8_t para2Type = p2->resType.type;
if (!IS_INTEGER_TYPE(para2Type)) {
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
}
int64_t v = ((SValueNode*)p1)->datum.i;
if (v < 0 || v > INT16_MAX) {
return invaildFuncParaValueErrMsg(pErrBuf, len, pFunc->functionName);
}
}
pFunc->node.resType = (SDataType){.bytes = pPara1->resType.bytes, .type = pPara1->resType.type};
pFunc->node.resType = (SDataType){.bytes = pPara0->resType.bytes, .type = pPara0->resType.type};
return TSDB_CODE_SUCCESS;
}
......@@ -985,10 +998,10 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.type = FUNCTION_TYPE_APERCENTILE,
.classification = FUNC_MGT_AGG_FUNC,
.translateFunc = translateApercentile,
.getEnvFunc = getMinmaxFuncEnv,
.initFunc = minmaxFunctionSetup,
.processFunc = maxFunction,
.finalizeFunc = functionFinalize
.getEnvFunc = getApercentileFuncEnv,
.initFunc = apercentileFunctionSetup,
.processFunc = apercentileFunction,
.finalizeFunc = apercentileFinalize
},
{
.name = "top",
......
......@@ -20,6 +20,8 @@
#include "taggfunction.h"
#include "tcompare.h"
#include "tdatablock.h"
#include "tdigest.h"
#include "thistogram.h"
#include "tpercentile.h"
#define HISTOGRAM_MAX_BINS_NUM 1000
......@@ -95,6 +97,19 @@ typedef struct SPercentileInfo {
int64_t numOfElems;
} SPercentileInfo;
typedef struct SAPercentileInfo {
double result;
int8_t algo;
SHistogramInfo *pHisto;
TDigest *pTDigest;
} SAPercentileInfo;
typedef enum {
APERCT_ALGO_UNKNOWN = 0,
APERCT_ALGO_DEFAULT,
APERCT_ALGO_TDIGEST,
} EAPerctAlgoType;
typedef struct SDiffInfo {
bool hasPrev;
bool includeNull;
......@@ -1905,6 +1920,131 @@ int32_t percentileFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
return functionFinalize(pCtx, pBlock);
}
bool getApercentileFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
int32_t bytesHist = (int32_t)(sizeof(SAPercentileInfo) + sizeof(SHistogramInfo) + sizeof(SHistBin) * (MAX_HISTOGRAM_BIN + 1));
int32_t bytesDigest = (int32_t)(sizeof(SAPercentileInfo) + TDIGEST_SIZE(COMPRESSION));
pEnv->calcMemSize = TMAX(bytesHist, bytesDigest);
return true;
}
static int8_t getApercentileAlgo(char *algoStr) {
int8_t algoType;
if (strcasecmp(algoStr, "default") == 0) {
algoType = APERCT_ALGO_DEFAULT;
} else if (strcasecmp(algoStr, "t-digest") == 0) {
algoType = APERCT_ALGO_TDIGEST;
} else {
algoType = APERCT_ALGO_UNKNOWN;
}
return algoType;
}
static void buildHistogramInfo(SAPercentileInfo* pInfo) {
pInfo->pHisto = (SHistogramInfo*) ((char*) pInfo + sizeof(SAPercentileInfo));
pInfo->pHisto->elems = (SHistBin*) ((char*)pInfo->pHisto + sizeof(SHistogramInfo));
}
bool apercentileFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo) {
if (!functionSetup(pCtx, pResultInfo)) {
return false;
}
SAPercentileInfo* pInfo = GET_ROWCELL_INTERBUF(pResultInfo);
if (pCtx->numOfParams == 2) {
pInfo->algo = APERCT_ALGO_DEFAULT;
} else if (pCtx->numOfParams == 3) {
pInfo->algo = getApercentileAlgo(pCtx->param[2].param.pz);
if (pInfo->algo == APERCT_ALGO_UNKNOWN) {
return false;
}
}
char *tmp = (char *)pInfo + sizeof(SAPercentileInfo);
if (pInfo->algo == APERCT_ALGO_TDIGEST) {
pInfo->pTDigest = tdigestNewFrom(tmp, COMPRESSION);
} else {
buildHistogramInfo(pInfo);
pInfo->pHisto = tHistogramCreateFrom(tmp, MAX_HISTOGRAM_BIN);
}
return true;
}
int32_t apercentileFunction(SqlFunctionCtx* pCtx) {
int32_t notNullElems = 0;
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
SInputColumnInfoData* pInput = &pCtx->input;
//SColumnDataAgg* pAgg = pInput->pColumnDataAgg[0];
SColumnInfoData* pCol = pInput->pData[0];
int32_t type = pCol->info.type;
SAPercentileInfo* pInfo = GET_ROWCELL_INTERBUF(pResInfo);
int32_t start = pInput->startRowIndex;
if (pInfo->algo == APERCT_ALGO_TDIGEST) {
for (int32_t i = start; i < pInput->numOfRows + start; ++i) {
if (colDataIsNull_f(pCol->nullbitmap, i)) {
continue;
}
notNullElems += 1;
char* data = colDataGetData(pCol, i);
double v = 0; // value
int64_t w = 1; // weigth
GET_TYPED_DATA(v, double, type, data);
tdigestAdd(pInfo->pTDigest, v, w);
}
} else {
for (int32_t i = start; i < pInput->numOfRows + start; ++i) {
if (colDataIsNull_f(pCol->nullbitmap, i)) {
continue;
}
notNullElems += 1;
char* data = colDataGetData(pCol, i);
double v = 0;
GET_TYPED_DATA(v, double, type, data);
tHistogramAdd(&pInfo->pHisto, v);
}
}
SET_VAL(pResInfo, notNullElems, 1);
return TSDB_CODE_SUCCESS;
}
int32_t apercentileFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
SVariant* pVal = &pCtx->param[1].param;
double percent = (pVal->nType == TSDB_DATA_TYPE_BIGINT) ? pVal->i : pVal->d;
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
SAPercentileInfo* pInfo = (SAPercentileInfo*)GET_ROWCELL_INTERBUF(pResInfo);
if (pInfo->algo == APERCT_ALGO_TDIGEST) {
if (pInfo->pTDigest->size > 0) {
pInfo->result = tdigestQuantile(pInfo->pTDigest, percent/100);
} else { // no need to free
//setNull(pCtx->pOutput, pCtx->outputType, pCtx->outputBytes);
return TSDB_CODE_SUCCESS;
}
} else {
if (pInfo->pHisto->numOfElems > 0) {
double ratio[] = {percent};
double *res = tHistogramUniform(pInfo->pHisto, ratio, 1);
pInfo->result = *res;
//memcpy(pCtx->pOutput, res, sizeof(double));
taosMemoryFree(res);
} else { // no need to free
//setNull(pCtx->pOutput, pCtx->outputType, pCtx->outputBytes);
return TSDB_CODE_SUCCESS;
}
}
return functionFinalize(pCtx, pBlock);
}
bool getFirstLastFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
SColumnNode* pNode = nodesListGetNode(pFunc->pParameterList, 0);
pEnv->calcMemSize = pNode->node.resType.bytes + sizeof(int64_t);
......@@ -1917,8 +2057,6 @@ bool getSelectivityFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
return true;
}
static FORCE_INLINE TSKEY getRowPTs(SColumnInfoData* pTsColInfo, int32_t rowIndex) {
if (pTsColInfo == NULL) {
return 0;
......
......@@ -1120,12 +1120,12 @@ static int32_t parseInsertBody(SInsertParseContext* pCxt) {
NEXT_TOKEN(pCxt->pSql, sToken);
SName name;
createSName(&name, &tbnameToken, pCxt->pComCxt->acctId, pCxt->pComCxt->db, &pCxt->msg);
tNameExtractFullName(&name, tbFName);
CHECK_CODE(createSName(&name, &tbnameToken, pCxt->pComCxt->acctId, pCxt->pComCxt->db, &pCxt->msg));
tNameExtractFullName(&name, tbFName);
CHECK_CODE(taosHashPut(pCxt->pTableNameHashObj, tbFName, strlen(tbFName), &name, sizeof(SName)));
// USING cluase
// USING clause
if (TK_USING == sToken.type) {
CHECK_CODE(parseUsingClause(pCxt, &name, tbFName));
NEXT_TOKEN(pCxt->pSql, sToken);
......
......@@ -3386,9 +3386,9 @@ static int32_t buildCreateStreamReq(STranslateContext* pCxt, SCreateStreamStmt*
pReq->igExists = pStmt->ignoreExists;
SName name;
// tNameSetDbName(&name, pCxt->pParseCxt->acctId, pStmt->streamName, strlen(pStmt->streamName));
// tNameGetFullDbName(&name, pReq->name);
tNameExtractFullName(toName(pCxt->pParseCxt->acctId, pCxt->pParseCxt->db, pStmt->streamName, &name), pReq->name);
tNameSetDbName(&name, pCxt->pParseCxt->acctId, pStmt->streamName, strlen(pStmt->streamName));
tNameGetFullDbName(&name, pReq->name);
// tNameExtractFullName(toName(pCxt->pParseCxt->acctId, pCxt->pParseCxt->db, pStmt->streamName, &name), pReq->name);
if ('\0' != pStmt->targetTabName[0]) {
strcpy(name.dbname, pStmt->targetDbName);
......
......@@ -505,6 +505,7 @@ int32_t tEncodeSStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) {
if (pTask->sinkType == TASK_SINK__TABLE) {
if (tEncodeI64(pEncoder, pTask->tbSink.stbUid) < 0) return -1;
if (tEncodeCStr(pEncoder, pTask->tbSink.stbFullName) < 0) return -1;
if (tEncodeSSchemaWrapper(pEncoder, pTask->tbSink.pSchemaWrapper) < 0) return -1;
} else if (pTask->sinkType == TASK_SINK__SMA) {
if (tEncodeI64(pEncoder, pTask->smaSink.smaId) < 0) return -1;
......@@ -551,6 +552,7 @@ int32_t tDecodeSStreamTask(SDecoder* pDecoder, SStreamTask* pTask) {
if (pTask->sinkType == TASK_SINK__TABLE) {
if (tDecodeI64(pDecoder, &pTask->tbSink.stbUid) < 0) return -1;
if (tDecodeCStrTo(pDecoder, pTask->tbSink.stbFullName) < 0) return -1;
pTask->tbSink.pSchemaWrapper = taosMemoryCalloc(1, sizeof(SSchemaWrapper));
if (pTask->tbSink.pSchemaWrapper == NULL) return -1;
if (tDecodeSSchemaWrapper(pDecoder, pTask->tbSink.pSchemaWrapper) < 0) return -1;
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
/*
* src/tdigest.c
*
* Implementation of the t-digest data structure used to compute accurate percentiles.
*
* It is based on the MergingDigest implementation found at:
* https://github.com/tdunning/t-digest/blob/master/src/main/java/com/tdunning/math/stats/MergingDigest.java
*
* Copyright (c) 2016, Usman Masood <usmanm at fastmail dot fm>
*/
#include "os.h"
#include "osMath.h"
#include "tdigest.h"
#define INTERPOLATE(x, x0, x1) (((x) - (x0)) / ((x1) - (x0)))
//#define INTEGRATED_LOCATION(compression, q) ((compression) * (asin(2 * (q) - 1) + M_PI / 2) / M_PI)
#define INTEGRATED_LOCATION(compression, q) ((compression) * (asin(2 * (double)(q) - 1)/M_PI + (double)1/2))
#define FLOAT_EQ(f1, f2) (fabs((f1) - (f2)) <= FLT_EPSILON)
typedef struct SMergeArgs {
TDigest *t;
SCentroid *centroids;
int32_t idx;
double weight_so_far;
double k1;
double min;
double max;
}SMergeArgs;
void tdigestAutoFill(TDigest* t, int32_t compression) {
t->centroids = (SCentroid*)((char*)t + sizeof(TDigest));
t->buffered_pts = (SPt*) ((char*)t + sizeof(TDigest) + sizeof(SCentroid) * (int32_t)GET_CENTROID(compression));
}
TDigest *tdigestNewFrom(void* pBuf, int32_t compression) {
memset(pBuf, 0, (size_t)TDIGEST_SIZE(compression));
TDigest* t = (TDigest*)pBuf;
tdigestAutoFill(t, compression);
t->compression = compression;
t->size = (int64_t)GET_CENTROID(compression);
t->threshold = (int32_t)GET_THRESHOLD(compression);
t->min = DOUBLE_MAX;
t->max = -DOUBLE_MAX;
return t;
}
static int32_t cmpCentroid(const void *a, const void *b) {
SCentroid *c1 = (SCentroid *) a;
SCentroid *c2 = (SCentroid *) b;
if (c1->mean < c2->mean)
return -1;
if (c1->mean > c2->mean)
return 1;
return 0;
}
static void mergeCentroid(SMergeArgs *args, SCentroid *merge) {
double k2;
SCentroid *c = &args->centroids[args->idx];
args->weight_so_far += merge->weight;
k2 = INTEGRATED_LOCATION(args->t->size,
args->weight_so_far / args->t->total_weight);
//idx++
if(k2 - args->k1 > 1 && c->weight > 0) {
if(args->idx + 1 < args->t->size
&& merge->mean != args->centroids[args->idx].mean) {
args->idx++;
}
args->k1 = k2;
}
c = &args->centroids[args->idx];
if(c->mean == merge->mean) {
c->weight += merge->weight;
} else {
c->weight += merge->weight;
c->mean += (merge->mean - c->mean) * merge->weight / c->weight;
if (merge->weight > 0) {
args->min = TMIN(merge->mean, args->min);
args->max = TMAX(merge->mean, args->max);
}
}
}
void tdigestCompress(TDigest *t) {
SCentroid *unmerged_centroids;
int64_t unmerged_weight = 0;
int32_t num_unmerged = t->num_buffered_pts;
int32_t i, j;
SMergeArgs args;
if (t->num_buffered_pts <= 0)
return;
unmerged_centroids = (SCentroid*)taosMemoryMalloc(sizeof(SCentroid) * t->num_buffered_pts);
for (i = 0; i < num_unmerged; i++) {
SPt *p = t->buffered_pts + i;
SCentroid *c = &unmerged_centroids[i];
c->mean = p->value;
c->weight = p->weight;
unmerged_weight += c->weight;
}
t->num_buffered_pts = 0;
t->total_weight += unmerged_weight;
qsort(unmerged_centroids, num_unmerged, sizeof(SCentroid), cmpCentroid);
memset(&args, 0, sizeof(SMergeArgs));
args.centroids = (SCentroid*)taosMemoryMalloc((size_t)(sizeof(SCentroid) * t->size));
memset(args.centroids, 0, (size_t)(sizeof(SCentroid) * t->size));
args.t = t;
args.min = DOUBLE_MAX;
args.max = -DOUBLE_MAX;
i = 0;
j = 0;
while (i < num_unmerged && j < t->num_centroids) {
SCentroid *a = &unmerged_centroids[i];
SCentroid *b = &t->centroids[j];
if (a->mean <= b->mean) {
mergeCentroid(&args, a);
assert(args.idx < t->size);
i++;
} else {
mergeCentroid(&args, b);
assert(args.idx < t->size);
j++;
}
}
while (i < num_unmerged) {
mergeCentroid(&args, &unmerged_centroids[i++]);
assert(args.idx < t->size);
}
taosMemoryFree((void*)unmerged_centroids);
while (j < t->num_centroids) {
mergeCentroid(&args, &t->centroids[j++]);
assert(args.idx < t->size);
}
if (t->total_weight > 0) {
t->min = TMIN(t->min, args.min);
if (args.centroids[args.idx].weight <= 0) {
args.idx--;
}
t->num_centroids = args.idx + 1;
t->max = TMAX(t->max, args.max);
}
memcpy(t->centroids, args.centroids, sizeof(SCentroid) * t->num_centroids);
taosMemoryFree((void*)args.centroids);
}
void tdigestAdd(TDigest* t, double x, int64_t w) {
if (w == 0)
return;
int32_t i = t->num_buffered_pts;
if(i > 0 && t->buffered_pts[i-1].value == x ) {
t->buffered_pts[i].weight = w;
} else {
t->buffered_pts[i].value = x;
t->buffered_pts[i].weight = w;
t->num_buffered_pts++;
}
if (t->num_buffered_pts >= t->threshold)
tdigestCompress(t);
}
double tdigestCDF(TDigest *t, double x) {
if (t == NULL)
return 0;
int32_t i;
double left, right;
int64_t weight_so_far;
SCentroid *a, *b, tmp;
tdigestCompress(t);
if (t->num_centroids == 0)
return NAN;
if (x < t->min)
return 0;
if (x > t->max)
return 1;
if (t->num_centroids == 1) {
if (FLOAT_EQ(t->max, t->min))
return 0.5;
return INTERPOLATE(x, t->min, t->max);
}
weight_so_far = 0;
a = b = &tmp;
b->mean = t->min;
b->weight = 0;
right = 0;
for (i = 0; i < t->num_centroids; i++) {
SCentroid *c = &t->centroids[i];
left = b->mean - (a->mean + right);
a = b;
b = c;
right = (b->mean - a->mean) * a->weight / (a->weight + b->weight);
if (x < a->mean + right) {
double cdf = (weight_so_far
+ a->weight
* INTERPOLATE(x, a->mean - left, a->mean + right))
/ t->total_weight;
return TMAX(cdf, 0.0);
}
weight_so_far += a->weight;
}
left = b->mean - (a->mean + right);
a = b;
right = t->max - a->mean;
if (x < a->mean + right) {
return (weight_so_far + a->weight * INTERPOLATE(x, a->mean - left, a->mean + right))
/ t->total_weight;
}
return 1;
}
double tdigestQuantile(TDigest *t, double q) {
if (t == NULL)
return 0;
int32_t i;
double left, right, idx;
int64_t weight_so_far;
SCentroid *a, *b, tmp;
tdigestCompress(t);
if (t->num_centroids == 0)
return NAN;
if (t->num_centroids == 1)
return t->centroids[0].mean;
if (FLOAT_EQ(q, 0.0))
return t->min;
if (FLOAT_EQ(q, 1.0))
return t->max;
idx = q * t->total_weight;
weight_so_far = 0;
b = &tmp;
b->mean = t->min;
b->weight = 0;
right = t->min;
for (i = 0; i < t->num_centroids; i++) {
SCentroid *c = &t->centroids[i];
a = b;
left = right;
b = c;
right = (b->weight * a->mean + a->weight * b->mean)/ (a->weight + b->weight);
if (idx < weight_so_far + a->weight) {
double p = (idx - weight_so_far) / a->weight;
return left * (1 - p) + right * p;
}
weight_so_far += a->weight;
}
left = right;
a = b;
right = t->max;
if (idx < weight_so_far + a->weight && a->weight != 0) {
double p = (idx - weight_so_far) / a->weight;
return left * (1 - p) + right * p;
}
return t->max;
}
void tdigestMerge(TDigest *t1, TDigest *t2) {
// SPoints
int32_t num_pts = t2->num_buffered_pts;
for(int32_t i = num_pts - 1; i >= 0; i--) {
SPt* p = t2->buffered_pts + i;
tdigestAdd(t1, p->value, p->weight);
t2->num_buffered_pts --;
}
// centroids
for (int32_t i = 0; i < t2->num_centroids; i++) {
tdigestAdd(t1, t2->centroids[i].mean, t2->centroids[i].weight);
}
}
此差异已折叠。
此差异已折叠。
......@@ -23,10 +23,10 @@ python3 ./test.py -f 2-query/length.py
python3 ./test.py -f 2-query/char_length.py
python3 ./test.py -f 2-query/upper.py
python3 ./test.py -f 2-query/lower.py
python3 ./test.py -f 2-query/join.py
#python3 ./test.py -f 2-query/join.py
python3 ./test.py -f 2-query/cast.py
python3 ./test.py -f 2-query/concat.py
python3 ./test.py -f 2-query/concat_ws.py
#python3 ./test.py -f 2-query/concat.py
#python3 ./test.py -f 2-query/concat_ws.py
python3 ./test.py -f 2-query/check_tsdb.py
# python3 ./test.py -f 2-query/union.py
# python3 ./test.py -f 2-query/union2.py
......@@ -50,6 +50,7 @@ python3 ./test.py -f 2-query/Timediff.py
python3 ./test.py -f 2-query/top.py
python3 ./test.py -f 2-query/bottom.py
python3 ./test.py -f 2-query/abs.py
python3 ./test.py -f 2-query/ceil.py
python3 ./test.py -f 2-query/floor.py
......@@ -64,8 +65,7 @@ python3 ./test.py -f 2-query/arcsin.py
python3 ./test.py -f 2-query/arccos.py
python3 ./test.py -f 2-query/arctan.py
python3 ./test.py -f 2-query/query_cols_tags_and_or.py
python3 ./test.py -f 2-query/nestedQuery.py
#python3 ./test.py -f 2-query/nestedQuery.py
python3 ./test.py -f 7-tmq/basic5.py
python3 ./test.py -f 7-tmq/subscribeDb.py
......
Subproject commit a8bb88c9056735919fc50bf9b12d9562f17e844f
Subproject commit 4d83d8c62973506f760bcaa3a33f4665ed9046d0
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册