From 37245334d5a7db37bfbc10e508a28c5d84dac722 Mon Sep 17 00:00:00 2001 From: Cary Xu Date: Sat, 2 Apr 2022 10:37:17 +0800 Subject: [PATCH] rollup sma grammar integrate --- include/common/tmsg.h | 9 +++--- include/common/ttypes.h | 1 + source/common/src/tmsg.c | 22 +++++++------- source/dnode/mnode/impl/src/mndStb.c | 39 ++++++++++++++++++++++++- source/dnode/vnode/src/vnd/vnodeWrite.c | 24 +++++++++++---- 5 files changed, 72 insertions(+), 23 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 12909875d5..6ec7d8c75b 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -1395,11 +1395,10 @@ typedef struct { } SDDropTopicReq; typedef struct { - float xFilesFactor; - int8_t delayUnit; - int8_t nFuncIds; - int32_t* pFuncIds; - int64_t delay; + float xFilesFactor; + int32_t delay; + int8_t nFuncIds; + func_id_t* pFuncIds; } SRSmaParam; typedef struct SVCreateTbReq { diff --git a/include/common/ttypes.h b/include/common/ttypes.h index 19442af206..1cb398fb85 100644 --- a/include/common/ttypes.h +++ b/include/common/ttypes.h @@ -31,6 +31,7 @@ typedef int16_t col_id_t; typedef int8_t col_type_t; typedef int32_t col_bytes_t; typedef uint16_t schema_ver_t; +typedef int32_t func_id_t; #pragma pack(push, 1) typedef struct { diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index d75fa6e9c5..2a16b58565 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -314,13 +314,12 @@ int32_t tSerializeSVCreateTbReq(void **buf, SVCreateTbReq *pReq) { } if (pReq->rollup && pReq->stbCfg.pRSmaParam) { SRSmaParam *param = pReq->stbCfg.pRSmaParam; - tlen += taosEncodeFixedU32(buf, (uint32_t)param->xFilesFactor); - tlen += taosEncodeFixedI8(buf, param->delayUnit); + tlen += taosEncodeBinary(buf, (const void *)¶m->xFilesFactor, sizeof(param->xFilesFactor)); + tlen += taosEncodeFixedI32(buf, param->delay); tlen += taosEncodeFixedI8(buf, param->nFuncIds); for (int8_t i = 0; i < param->nFuncIds; ++i) { tlen += taosEncodeFixedI32(buf, param->pFuncIds[i]); } - tlen += taosEncodeFixedI64(buf, param->delay); } break; case TD_CHILD_TABLE: @@ -339,13 +338,12 @@ int32_t tSerializeSVCreateTbReq(void **buf, SVCreateTbReq *pReq) { } if (pReq->rollup && pReq->ntbCfg.pRSmaParam) { SRSmaParam *param = pReq->ntbCfg.pRSmaParam; - tlen += taosEncodeFixedU32(buf, (uint32_t)param->xFilesFactor); - tlen += taosEncodeFixedI8(buf, param->delayUnit); + tlen += taosEncodeBinary(buf, (const void *)¶m->xFilesFactor, sizeof(param->xFilesFactor)); + tlen += taosEncodeFixedI32(buf, param->delay); tlen += taosEncodeFixedI8(buf, param->nFuncIds); for (int8_t i = 0; i < param->nFuncIds; ++i) { tlen += taosEncodeFixedI32(buf, param->pFuncIds[i]); } - tlen += taosEncodeFixedI64(buf, param->delay); } break; default: @@ -387,17 +385,17 @@ void *tDeserializeSVCreateTbReq(void *buf, SVCreateTbReq *pReq) { if (pReq->rollup) { pReq->stbCfg.pRSmaParam = (SRSmaParam *)taosMemoryMalloc(sizeof(SRSmaParam)); SRSmaParam *param = pReq->stbCfg.pRSmaParam; - buf = taosDecodeFixedU32(buf, (uint32_t *)¶m->xFilesFactor); - buf = taosDecodeFixedI8(buf, ¶m->delayUnit); + buf = taosDecodeBinaryTo(buf, (void*)¶m->xFilesFactor, sizeof(param->xFilesFactor)); + buf = taosDecodeFixedI32(buf, ¶m->delay); buf = taosDecodeFixedI8(buf, ¶m->nFuncIds); if (param->nFuncIds > 0) { + param->pFuncIds = (func_id_t *)taosMemoryMalloc(param->nFuncIds * sizeof(func_id_t)); for (int8_t i = 0; i < param->nFuncIds; ++i) { buf = taosDecodeFixedI32(buf, param->pFuncIds + i); } } else { param->pFuncIds = NULL; } - buf = taosDecodeFixedI64(buf, ¶m->delay); } else { pReq->stbCfg.pRSmaParam = NULL; } @@ -420,17 +418,17 @@ void *tDeserializeSVCreateTbReq(void *buf, SVCreateTbReq *pReq) { if (pReq->rollup) { pReq->ntbCfg.pRSmaParam = (SRSmaParam *)taosMemoryMalloc(sizeof(SRSmaParam)); SRSmaParam *param = pReq->ntbCfg.pRSmaParam; - buf = taosDecodeFixedU32(buf, (uint32_t *)¶m->xFilesFactor); - buf = taosDecodeFixedI8(buf, ¶m->delayUnit); + buf = taosDecodeBinaryTo(buf, (void*)¶m->xFilesFactor, sizeof(param->xFilesFactor)); + buf = taosDecodeFixedI32(buf, ¶m->delay); buf = taosDecodeFixedI8(buf, ¶m->nFuncIds); if (param->nFuncIds > 0) { + param->pFuncIds = (func_id_t *)taosMemoryMalloc(param->nFuncIds * sizeof(func_id_t)); for (int8_t i = 0; i < param->nFuncIds; ++i) { buf = taosDecodeFixedI32(buf, param->pFuncIds + i); } } else { param->pFuncIds = NULL; } - buf = taosDecodeFixedI64(buf, ¶m->delay); } else { pReq->ntbCfg.pRSmaParam = NULL; } diff --git a/source/dnode/mnode/impl/src/mndStb.c b/source/dnode/mnode/impl/src/mndStb.c index f3fda102f4..85e7ade462 100644 --- a/source/dnode/mnode/impl/src/mndStb.c +++ b/source/dnode/mnode/impl/src/mndStb.c @@ -354,6 +354,7 @@ static void *mndBuildVCreateStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pSt req.name = (char *)tNameGetTableName(&name); req.ttl = 0; req.keep = 0; + req.rollup = pStb->aggregationMethod > -1 ? 1 : 0; req.type = TD_SUPER_TABLE; req.stbCfg.suid = pStb->uid; req.stbCfg.nCols = pStb->numOfColumns; @@ -365,7 +366,7 @@ static void *mndBuildVCreateStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pSt terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL; } - + int bSmaStat = 0; // no column has bsma if (pStb->numOfSmas == pStb->numOfColumns) { // assume pColumns > 0 bSmaStat = 1; // all columns have bsma @@ -405,9 +406,38 @@ static void *mndBuildVCreateStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pSt } } + SRSmaParam *pRSmaParam = NULL; + if (req.rollup) { + pRSmaParam = (SRSmaParam *)taosMemoryCalloc(1, sizeof(SRSmaParam)); + if (pRSmaParam == NULL) { + taosMemoryFreeClear(req.stbCfg.pSchema); + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } + + pRSmaParam->xFilesFactor = pStb->xFilesFactor; + pRSmaParam->delay = pStb->delay; + pRSmaParam->nFuncIds = 1; // only 1 aggregation method supported currently + pRSmaParam->pFuncIds = (func_id_t *)taosMemoryCalloc(pRSmaParam->nFuncIds, sizeof(func_id_t)); + if (pRSmaParam->pFuncIds == NULL) { + taosMemoryFreeClear(req.stbCfg.pRSmaParam); + taosMemoryFreeClear(req.stbCfg.pSchema); + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } + for (int32_t f = 0; f < pRSmaParam->nFuncIds; ++f) { + *(pRSmaParam->pFuncIds + f) = pStb->aggregationMethod; + } + req.stbCfg.pRSmaParam = pRSmaParam; + } + int32_t contLen = tSerializeSVCreateTbReq(NULL, &req) + sizeof(SMsgHead); SMsgHead *pHead = taosMemoryMalloc(contLen); if (pHead == NULL) { + if (pRSmaParam) { + taosMemoryFreeClear(pRSmaParam->pFuncIds); + taosMemoryFreeClear(pRSmaParam); + } taosMemoryFreeClear(req.stbCfg.pSchema); terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL; @@ -420,6 +450,10 @@ static void *mndBuildVCreateStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pSt tSerializeSVCreateTbReq(&pBuf, &req); *pContLen = contLen; + if (pRSmaParam) { + taosMemoryFreeClear(pRSmaParam->pFuncIds); + taosMemoryFreeClear(pRSmaParam); + } taosMemoryFreeClear(req.stbCfg.pSchema); return pHead; } @@ -632,6 +666,9 @@ static int32_t mndCreateStb(SMnode *pMnode, SNodeMsg *pReq, SMCreateStbReq *pCre stbObj.dbUid = pDb->uid; stbObj.version = 1; stbObj.nextColId = 1; + stbObj.xFilesFactor = pCreate->xFilesFactor; + stbObj.aggregationMethod = pCreate->aggregationMethod; + stbObj.delay = pCreate->delay; stbObj.ttl = pCreate->ttl; stbObj.numOfColumns = pCreate->numOfColumns; stbObj.numOfTags = pCreate->numOfTags; diff --git a/source/dnode/vnode/src/vnd/vnodeWrite.c b/source/dnode/vnode/src/vnd/vnodeWrite.c index 59b027e370..7ef0b50402 100644 --- a/source/dnode/vnode/src/vnd/vnodeWrite.c +++ b/source/dnode/vnode/src/vnd/vnodeWrite.c @@ -78,10 +78,13 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { // TODO: handle error } - // TODO: maybe need to clear the request struct + // TODO: to encapsule a free API taosMemoryFree(vCreateTbReq.stbCfg.pSchema); taosMemoryFree(vCreateTbReq.stbCfg.pTagSchema); - taosMemoryFree(vCreateTbReq.stbCfg.pRSmaParam); + if(vCreateTbReq.stbCfg.pRSmaParam) { + taosMemoryFree(vCreateTbReq.stbCfg.pRSmaParam->pFuncIds); + taosMemoryFree(vCreateTbReq.stbCfg.pRSmaParam); + } taosMemoryFree(vCreateTbReq.dbFName); taosMemoryFree(vCreateTbReq.name); break; @@ -110,17 +113,24 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { // TODO: handle error vError("vgId:%d, failed to create table: %s", pVnode->vgId, pCreateTbReq->name); } + // TODO: to encapsule a free API taosMemoryFree(pCreateTbReq->name); taosMemoryFree(pCreateTbReq->dbFName); if (pCreateTbReq->type == TD_SUPER_TABLE) { taosMemoryFree(pCreateTbReq->stbCfg.pSchema); taosMemoryFree(pCreateTbReq->stbCfg.pTagSchema); - taosMemoryFree(pCreateTbReq->stbCfg.pRSmaParam); + if (pCreateTbReq->stbCfg.pRSmaParam) { + taosMemoryFree(pCreateTbReq->stbCfg.pRSmaParam->pFuncIds); + taosMemoryFree(pCreateTbReq->stbCfg.pRSmaParam); + } } else if (pCreateTbReq->type == TD_CHILD_TABLE) { taosMemoryFree(pCreateTbReq->ctbCfg.pTag); } else { taosMemoryFree(pCreateTbReq->ntbCfg.pSchema); - taosMemoryFree(pCreateTbReq->ntbCfg.pRSmaParam); + if (pCreateTbReq->ntbCfg.pRSmaParam) { + taosMemoryFree(pCreateTbReq->ntbCfg.pRSmaParam->pFuncIds); + taosMemoryFree(pCreateTbReq->ntbCfg.pRSmaParam); + } } } @@ -145,9 +155,13 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { SVCreateTbReq vAlterTbReq = {0}; vTrace("vgId:%d, process alter stb req", pVnode->vgId); tDeserializeSVCreateTbReq(POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), &vAlterTbReq); + // TODO: to encapsule a free API taosMemoryFree(vAlterTbReq.stbCfg.pSchema); taosMemoryFree(vAlterTbReq.stbCfg.pTagSchema); - taosMemoryFree(vAlterTbReq.stbCfg.pRSmaParam); + if (vAlterTbReq.stbCfg.pRSmaParam) { + taosMemoryFree(vAlterTbReq.stbCfg.pRSmaParam->pFuncIds); + taosMemoryFree(vAlterTbReq.stbCfg.pRSmaParam); + } taosMemoryFree(vAlterTbReq.dbFName); taosMemoryFree(vAlterTbReq.name); break; -- GitLab