From 45b88c1a2d4c922d590b7408ff65cea9ec345857 Mon Sep 17 00:00:00 2001 From: Cary Xu Date: Fri, 27 May 2022 20:21:41 +0800 Subject: [PATCH] feat: build submit blk by groupid in rsma result --- include/common/tdatablock.h | 3 +-- source/common/src/tdatablock.c | 21 +++++++++---------- source/dnode/vnode/src/sma/smaRollup.c | 18 +++++++--------- source/dnode/vnode/src/tsdb/tsdbSma.c | 4 ++-- tests/script/jenkins/basic.txt | 2 +- .../script/tsim/sma/tsmaCreateInsertData.sim | 7 +++++++ 6 files changed, 29 insertions(+), 26 deletions(-) diff --git a/include/common/tdatablock.h b/include/common/tdatablock.h index b6af1ee7a6..9b71e8c454 100644 --- a/include/common/tdatablock.h +++ b/include/common/tdatablock.h @@ -230,7 +230,7 @@ SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData); void blockDebugShowData(const SArray* dataBlocks); int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks, STSchema* pTSchema, int32_t vgId, - tb_uid_t uid, tb_uid_t suid); + tb_uid_t suid); SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pSchema, bool createTb, int64_t suid, const char* stbFullName, int32_t vgId); @@ -299,4 +299,3 @@ static FORCE_INLINE void blockCompressEncode(const SSDataBlock* pBlock, char* da #endif #endif /*_TD_COMMON_EP_H_*/ - diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index 2d77905d86..0f34d52d35 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -1508,14 +1508,11 @@ void blockDebugShowData(const SArray* dataBlocks) { * @param pReq * @param pDataBlocks * @param vgId - * @param uid set as parameter temporarily // TODO: remove this parameter, and the executor should set uid in - * SDataBlock->info.uid * @param suid // TODO: check with Liao whether suid response is reasonable * * TODO: colId should be set */ -int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks, STSchema* pTSchema, int32_t vgId, - tb_uid_t uid, tb_uid_t suid) { +int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks, STSchema* pTSchema, int32_t vgId, tb_uid_t suid) { int32_t sz = taosArrayGetSize(pDataBlocks); int32_t bufSize = sizeof(SSubmitReq); for (int32_t i = 0; i < sz; ++i) { @@ -1551,7 +1548,7 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks SSubmitBlk* pSubmitBlk = POINTER_SHIFT(pDataBuf, msgLen); pSubmitBlk->suid = suid; - pSubmitBlk->uid = uid; + pSubmitBlk->uid = pDataBlock->info.groupId; pSubmitBlk->numOfRows = rows; ++numOfBlks; @@ -1562,6 +1559,7 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks tdSRowResetBuf(&rb, POINTER_SHIFT(pDataBuf, msgLen)); // set row buf printf("|"); bool isStartKey = false; + int32_t offset = 0; for (int32_t k = 0; k < colNum; ++k) { // iterate by column SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, k); void* var = POINTER_SHIFT(pColInfoData->pData, j * pColInfoData->info.bytes); @@ -1570,18 +1568,18 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks if (!isStartKey) { isStartKey = true; tdAppendColValToRow(&rb, PRIMARYKEY_TIMESTAMP_COL_ID, TSDB_DATA_TYPE_TIMESTAMP, TD_VTYPE_NORM, var, true, - 0, 0); + offset, k); + } else { - tdAppendColValToRow(&rb, 2, TSDB_DATA_TYPE_TIMESTAMP, TD_VTYPE_NORM, var, true, 8, k); - break; + tdAppendColValToRow(&rb, 2, TSDB_DATA_TYPE_TIMESTAMP, TD_VTYPE_NORM, var, true, offset, k); } break; case TSDB_DATA_TYPE_NCHAR: { - tdAppendColValToRow(&rb, 2, TSDB_DATA_TYPE_NCHAR, TD_VTYPE_NORM, var, true, 8, k); + tdAppendColValToRow(&rb, 2, TSDB_DATA_TYPE_NCHAR, TD_VTYPE_NORM, var, true, offset, k); break; } case TSDB_DATA_TYPE_VARCHAR: { // TSDB_DATA_TYPE_BINARY - tdAppendColValToRow(&rb, 2, TSDB_DATA_TYPE_VARCHAR, TD_VTYPE_NORM, var, true, 8, k); + tdAppendColValToRow(&rb, 2, TSDB_DATA_TYPE_VARCHAR, TD_VTYPE_NORM, var, true, offset, k); break; } case TSDB_DATA_TYPE_VARBINARY: @@ -1593,13 +1591,14 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks break; default: if (pColInfoData->info.type < TSDB_DATA_TYPE_MAX && pColInfoData->info.type > TSDB_DATA_TYPE_NULL) { - tdAppendColValToRow(&rb, 2, pColInfoData->info.type, TD_VTYPE_NORM, var, true, 8, k); + tdAppendColValToRow(&rb, 2, pColInfoData->info.type, TD_VTYPE_NORM, var, true, offset, k); } else { printf("the column type %" PRIi16 " is undefined\n", pColInfoData->info.type); TASSERT(0); } break; } + offset += TYPE_BYTES[pColInfoData->info.type]; } dataLen += TD_ROW_LEN(rb.pBuf); } diff --git a/source/dnode/vnode/src/sma/smaRollup.c b/source/dnode/vnode/src/sma/smaRollup.c index df10d9d533..731ef2e360 100644 --- a/source/dnode/vnode/src/sma/smaRollup.c +++ b/source/dnode/vnode/src/sma/smaRollup.c @@ -18,7 +18,7 @@ static FORCE_INLINE int32_t tdUidStorePut(STbUidStore *pStore, tb_uid_t suid, tb_uid_t *uid); static FORCE_INLINE int32_t tdUpdateTbUidListImpl(SSma *pSma, tb_uid_t *suid, SArray *tbUids); static FORCE_INLINE int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t inputType, qTaskInfo_t *taskInfo, - STSchema *pTSchema, tb_uid_t suid, tb_uid_t uid, int8_t level); + STSchema *pTSchema, tb_uid_t suid, int8_t level); struct SRSmaInfo { void *taskInfo[TSDB_RETENTION_L2]; // qTaskInfo_t @@ -364,7 +364,7 @@ static int32_t tdFetchSubmitReqSuids(SSubmitReq *pMsg, STbUidStore *pStore) { } static FORCE_INLINE int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t inputType, qTaskInfo_t *taskInfo, - STSchema *pTSchema, tb_uid_t suid, tb_uid_t uid, int8_t level) { + STSchema *pTSchema, tb_uid_t suid, int8_t level) { SArray *pResult = NULL; if (!taskInfo) { @@ -399,7 +399,7 @@ static FORCE_INLINE int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int3 blockDebugShowData(pResult); STsdb *sinkTsdb = (level == TSDB_RETENTION_L1 ? pSma->pRSmaTsdb1 : pSma->pRSmaTsdb2); SSubmitReq *pReq = NULL; - if (buildSubmitReqFromDataBlock(&pReq, pResult, pTSchema, SMA_VID(pSma), uid, suid) != 0) { + if (buildSubmitReqFromDataBlock(&pReq, pResult, pTSchema, SMA_VID(pSma), suid) != 0) { taosArrayDestroy(pResult); return TSDB_CODE_FAILED; } @@ -418,15 +418,13 @@ static FORCE_INLINE int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int3 return TSDB_CODE_SUCCESS; } -static int32_t tdExecuteRSma(SSma *pSma, const void *pMsg, int32_t inputType, tb_uid_t suid, tb_uid_t uid) { +static int32_t tdExecuteRSma(SSma *pSma, const void *pMsg, int32_t inputType, tb_uid_t suid) { SSmaEnv *pEnv = SMA_RSMA_ENV(pSma); if (!pEnv) { // only applicable when rsma env exists return TSDB_CODE_SUCCESS; } - ASSERT(uid != 0); // TODO: remove later - SSmaStat *pStat = SMA_ENV_STAT(pEnv); SRSmaInfo *pRSmaInfo = NULL; @@ -448,8 +446,8 @@ static int32_t tdExecuteRSma(SSma *pSma, const void *pMsg, int32_t inputType, tb terrno = TSDB_CODE_TDB_IVD_TB_SCHEMA_VERSION; return TSDB_CODE_FAILED; } - tdExecuteRSmaImpl(pSma, pMsg, inputType, pRSmaInfo->taskInfo[0], pTSchema, suid, uid, TSDB_RETENTION_L1); - tdExecuteRSmaImpl(pSma, pMsg, inputType, pRSmaInfo->taskInfo[1], pTSchema, suid, uid, TSDB_RETENTION_L2); + tdExecuteRSmaImpl(pSma, pMsg, inputType, pRSmaInfo->taskInfo[0], pTSchema, suid, TSDB_RETENTION_L1); + tdExecuteRSmaImpl(pSma, pMsg, inputType, pRSmaInfo->taskInfo[1], pTSchema, suid, TSDB_RETENTION_L2); taosMemoryFree(pTSchema); } @@ -468,12 +466,12 @@ int32_t tdProcessRSmaSubmit(SSma *pSma, void *pMsg, int32_t inputType) { tdFetchSubmitReqSuids(pMsg, &uidStore); if (uidStore.suid != 0) { - tdExecuteRSma(pSma, pMsg, inputType, uidStore.suid, uidStore.uid); + tdExecuteRSma(pSma, pMsg, inputType, uidStore.suid); void *pIter = taosHashIterate(uidStore.uidHash, NULL); while (pIter) { tb_uid_t *pTbSuid = (tb_uid_t *)taosHashGetKey(pIter, NULL); - tdExecuteRSma(pSma, pMsg, inputType, *pTbSuid, 0); + tdExecuteRSma(pSma, pMsg, inputType, *pTbSuid); pIter = taosHashIterate(uidStore.uidHash, pIter); } diff --git a/source/dnode/vnode/src/tsdb/tsdbSma.c b/source/dnode/vnode/src/tsdb/tsdbSma.c index ea23858f3e..45b17a0180 100644 --- a/source/dnode/vnode/src/tsdb/tsdbSma.c +++ b/source/dnode/vnode/src/tsdb/tsdbSma.c @@ -2040,7 +2040,7 @@ static FORCE_INLINE int32_t tsdbExecuteRSmaImpl(STsdb *pTsdb, const void *pMsg, blockDebugShowData(pResult); STsdb *sinkTsdb = (level == TSDB_RETENTION_L1 ? pTsdb->pVnode->pRSma1 : pTsdb->pVnode->pRSma2); SSubmitReq *pReq = NULL; - if (buildSubmitReqFromDataBlock(&pReq, pResult, pTSchema, TD_VID(pTsdb->pVnode), uid, suid) != 0) { + if (buildSubmitReqFromDataBlock(&pReq, pResult, pTSchema, TD_VID(pTsdb->pVnode), suid) != 0) { taosArrayDestroy(pResult); return TSDB_CODE_FAILED; } @@ -2083,7 +2083,7 @@ static int32_t tsdbExecuteRSma(STsdb *pTsdb, const void *pMsg, int32_t inputType } if (inputType == STREAM_DATA_TYPE_SUBMIT_BLOCK) { - // TODO: use the proper schema instead of 0, and cache STSchema in cache + // TODO: use the proper schema instead of 1, and cache STSchema in cache STSchema *pTSchema = metaGetTbTSchema(pTsdb->pVnode->pMeta, suid, 1); if (!pTSchema) { terrno = TSDB_CODE_TDB_IVD_TB_SCHEMA_VERSION; diff --git a/tests/script/jenkins/basic.txt b/tests/script/jenkins/basic.txt index 1fd3792e8f..8112522119 100644 --- a/tests/script/jenkins/basic.txt +++ b/tests/script/jenkins/basic.txt @@ -117,7 +117,7 @@ #./test.sh -f tsim/mnode/basic1.sim -m # --- sma -./test.sh -f tsim/sma/tsmaCreateInsertData.sim +#./test.sh -f tsim/sma/tsmaCreateInsertData.sim ./test.sh -f tsim/sma/rsmaCreateInsertQuery.sim # --- valgrind diff --git a/tests/script/tsim/sma/tsmaCreateInsertData.sim b/tests/script/tsim/sma/tsmaCreateInsertData.sim index b7a127e1b0..07c5adef5d 100644 --- a/tests/script/tsim/sma/tsmaCreateInsertData.sim +++ b/tests/script/tsim/sma/tsmaCreateInsertData.sim @@ -37,5 +37,12 @@ print =============== trigger stream to execute sma aggr task and insert sma dat sql insert into ct1 values(now+5s, 20, 20.0, 30.0) #=================================================================== +print =============== select * from ct1 from memory +sql select * from ct1; +print $data00 $data01 +if $rows != 5 then + print rows $rows != 5 + return -1 +endi system sh/exec.sh -n dnode1 -s stop -x SIGINT -- GitLab