diff --git a/include/common/tmsg.h b/include/common/tmsg.h index ad992fd9dbf50f9c789c636238b3d005de788fa8..eaf34c0a3db78c1111597a7b51f89722bba0c4c2 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -3264,6 +3264,8 @@ void tDestroySSubmitRsp2(SSubmitRsp2* pRsp, int32_t flag); #define TSDB_MSG_FLG_ENCODE 0x1 #define TSDB_MSG_FLG_DECODE 0x2 +int32_t tBuildSubmitReq(int32_t vgId, SSubmitReq2* pReq, void** pData, uint32_t* pLen, __tmalloc_fn_t fp); + #pragma pack(pop) #ifdef __cplusplus diff --git a/include/os/osMemory.h b/include/os/osMemory.h index 4681ff66741d4781c88dba5ccc950094c96f6769..4f06c4f2ced907857f2a11ae9b8f96d7ed97c482 100644 --- a/include/os/osMemory.h +++ b/include/os/osMemory.h @@ -47,6 +47,8 @@ void *taosMemoryMallocAlign(uint32_t alignment, int64_t size); } \ } while (0) +typedef void *(*__tmalloc_fn_t)(int64_t); + #ifdef __cplusplus } #endif diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index 22a66290babf76847b636dd960c5f424acce1f88..eacb7b1188ebbed4bcae001accfc798af68a9cda 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -2183,11 +2183,13 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq2** ppReq, const SSDataBlock* pDat int32_t vgId, tb_uid_t suid) { SSubmitReq2* pReq = NULL; SArray* pVals = NULL; - int32_t bufSize = sizeof(SSubmitReq2); int32_t numOfBlks = 0; int32_t sz = 1; - if (!(pReq = taosMemoryMalloc(bufSize))) { + terrno = TSDB_CODE_SUCCESS; + + if (!(pReq = taosMemoryMalloc(sizeof(SSubmitReq2)))) { + terrno = TSDB_CODE_OUT_OF_MEMORY; goto _end; } @@ -2205,11 +2207,10 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq2** ppReq, const SSDataBlock* pDat SSubmitTbData* pTbData = (SSubmitTbData*)taosMemoryCalloc(1, sizeof(SSubmitTbData)); if (!pTbData) { + terrno = TSDB_CODE_OUT_OF_MEMORY; goto _end; } - taosArrayPush(pReq->aSubmitTbData, pTbData); - if(!(pTbData->aRowP = taosArrayInit(rows, sizeof(SRow*)))){ taosMemoryFree(pTbData); goto _end; diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 8dc4633443e3224254fe72a64e9313f357014c42..2548422ed86806e8eafedfb70b8c459560b12a11 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -7000,3 +7000,31 @@ void tDestroySSubmitRsp2(SSubmitRsp2 *pRsp, int32_t flag) { } } } + +int32_t tBuildSubmitReq(int32_t vgId, SSubmitReq2 *pReq, void **pData, uint32_t *pLen, __tmalloc_fn_t fp) { + int32_t code = TSDB_CODE_SUCCESS; + uint32_t len = 0; + void *pBuf = NULL; + tEncodeSize(tEncodeSSubmitReq2, pReq, len, code); + if (TSDB_CODE_SUCCESS == code) { + SEncoder encoder; + len += sizeof(SMsgHead); + pBuf = (*fp)(len); + if (NULL == pBuf) { + return TSDB_CODE_TSC_OUT_OF_MEMORY; + } + ((SMsgHead *)pBuf)->vgId = htonl(vgId); + ((SMsgHead *)pBuf)->contLen = htonl(len); + tEncoderInit(&encoder, POINTER_SHIFT(pBuf, sizeof(SMsgHead)), len - sizeof(SMsgHead)); + code = tEncodeSSubmitReq2(&encoder, pReq); + tEncoderClear(&encoder); + } + + if (TSDB_CODE_SUCCESS == code) { + *pData = pBuf; + *pLen = len; + } else { + taosMemoryFree(pBuf); + } + return code; +} \ No newline at end of file diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 976300f92725868eddabbb0a1176783e545d4988..1da40c8b736d1ece40cfd070374c43cb7358bc53 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -197,9 +197,9 @@ int32_t tqProcessTaskRecover2Req(STQ* pTq, int64_t version, char* msg, int32_t m int32_t tqProcessTaskRecoverFinishReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskRecoverFinishRsp(STQ* pTq, SRpcMsg* pMsg); -SSubmitReq* tqBlockToSubmit(SVnode* pVnode, const SArray* pBlocks, const STSchema* pSchema, - SSchemaWrapper* pTagSchemaWrapper, bool createTb, int64_t suid, const char* stbFullName, - SBatchDeleteReq* pDeleteReq); +int32_t tqBlockToSubmit(SVnode* pVnode, const SArray* pBlocks, const STSchema* pSchema, + SSchemaWrapper* pTagSchemaWrapper, bool createTb, int64_t suid, const char* stbFullName, + SBatchDeleteReq* pDeleteReq, void* ppData, int32_t* pLen); // sma int32_t smaInit(); diff --git a/source/dnode/vnode/src/sma/smaTimeRange.c b/source/dnode/vnode/src/sma/smaTimeRange.c index a2c9484693a720243ccb205f0d311289c86f254e..cf1e5d5d397df0e5de2c41e15c9dd92adbcfabe0 100644 --- a/source/dnode/vnode/src/sma/smaTimeRange.c +++ b/source/dnode/vnode/src/sma/smaTimeRange.c @@ -204,18 +204,18 @@ static int32_t tdProcessTSmaInsertImpl(SSma *pSma, int64_t indexUid, const char } SBatchDeleteReq deleteReq = {0}; - SSubmitReq *pSubmitReq = - tqBlockToSubmit(pSma->pVnode, (const SArray *)msg, pTsmaStat->pTSchema, &pTsmaStat->pTSma->schemaTag, true, - pTsmaStat->pTSma->dstTbUid, pTsmaStat->pTSma->dstTbName, &deleteReq); - // TODO deleteReq - taosArrayDestroy(deleteReq.deleteReqs); - + void *pSubmitReq = NULL; + int32_t contLen = 0; - if (!pSubmitReq) { - smaError("vgId:%d, failed to gen submit blk while tsma insert for smaIndex %" PRIi64 " since %s", SMA_VID(pSma), + if (tqBlockToSubmit(pSma->pVnode, (const SArray *)msg, pTsmaStat->pTSchema, &pTsmaStat->pTSma->schemaTag, true, + pTsmaStat->pTSma->dstTbUid, pTsmaStat->pTSma->dstTbName, &deleteReq, &pSubmitReq, &contLen) < 0) { + smaError("vgId:%d, failed to gen submit msg while tsma insert for smaIndex %" PRIi64 " since %s", SMA_VID(pSma), indexUid, tstrerror(terrno)); goto _err; } + // TODO deleteReq + taosArrayDestroy(deleteReq.deleteReqs); + #if 0 ASSERT(!strncasecmp("td.tsma.rst.tb", pTsmaStat->pTSma->dstTbName, 14)); @@ -224,7 +224,7 @@ static int32_t tdProcessTSmaInsertImpl(SSma *pSma, int64_t indexUid, const char SRpcMsg submitReqMsg = { .msgType = TDMT_VND_SUBMIT, .pCont = pSubmitReq, - .contLen = ntohl(pSubmitReq->length), + .contLen = ntohl(contLen), }; if (tmsgPutToQueue(&pSma->pVnode->msgCb, WRITE_QUEUE, &submitReqMsg) < 0) { diff --git a/source/dnode/vnode/src/tq/tqSink.c b/source/dnode/vnode/src/tq/tqSink.c index 5907be576a67441f12c4045ba33243b751451fd4..36c98d778868c3291ed471b234c2acb37220c6a6 100644 --- a/source/dnode/vnode/src/tq/tqSink.c +++ b/source/dnode/vnode/src/tq/tqSink.c @@ -72,6 +72,7 @@ int32_t tqBuildDeleteReq(SVnode* pVnode, const char* stbFullName, const SSDataBl return 0; } +#if 0 SSubmitReq* tqBlockToSubmit(SVnode* pVnode, const SArray* pBlocks, const STSchema* pTSchema, SSchemaWrapper* pTagSchemaWrapper, bool createTb, int64_t suid, const char* stbFullName, SBatchDeleteReq* pDeleteReq) { @@ -299,6 +300,181 @@ SSubmitReq* tqBlockToSubmit(SVnode* pVnode, const SArray* pBlocks, const STSchem return ret; } +#endif + +int32_t tqBlockToSubmit(SVnode* pVnode, const SArray* pBlocks, const STSchema* pTSchema, + SSchemaWrapper* pTagSchemaWrapper, bool createTb, int64_t suid, const char* stbFullName, + SBatchDeleteReq* pDeleteReq, void* ppData, int32_t* pLen) { + SSubmitReq2* pReq = NULL; + SArray* tagArray = NULL; + SArray* createTbArray = NULL; + SArray* pVals = NULL; + + int32_t sz = taosArrayGetSize(pBlocks); + + if (!(tagArray = taosArrayInit(1, sizeof(STagVal)))) { + goto _end; + } + + if (!(createTbArray = taosArrayInit(sz, POINTER_BYTES))) { + goto _end; + } + + if (!(pReq = taosMemoryCalloc(1, sizeof(SSubmitReq2)))) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + goto _end; + } + + if (!(pReq->aSubmitTbData = taosArrayInit(1, sizeof(SSubmitTbData)))) { + goto _end; + } + + // create table req + if (createTb) { + for (int32_t i = 0; i < sz; ++i) { + SSDataBlock* pDataBlock = taosArrayGet(pBlocks, i); + SVCreateTbReq* pCreateTbReq = NULL; + if (pDataBlock->info.type == STREAM_DELETE_RESULT) { + taosArrayPush(createTbArray, &pCreateTbReq); + continue; + } + + if (!(pCreateTbReq = taosMemoryCalloc(1, sizeof(SVCreateStbReq)))) { + goto _end; + }; + + // don't move to the end of loop as to destroy in the end of func when error occur + taosArrayPush(createTbArray, &pCreateTbReq); + + // set const + pCreateTbReq->flags = 0; + pCreateTbReq->type = TSDB_CHILD_TABLE; + pCreateTbReq->ctb.suid = suid; + + // set super table name + SName name = {0}; + tNameFromString(&name, stbFullName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE); + pCreateTbReq->ctb.stbName = strdup((char*)tNameGetTableName(&name)); // strdup(stbFullName); + + // set tag content + taosArrayClear(tagArray); + STagVal tagVal = { + .cid = taosArrayGetSize(pDataBlock->pDataBlock) + 1, + .type = TSDB_DATA_TYPE_UBIGINT, + .i64 = (int64_t)pDataBlock->info.id.groupId, + }; + taosArrayPush(tagArray, &tagVal); + pCreateTbReq->ctb.tagNum = taosArrayGetSize(tagArray); + + STag* pTag = NULL; + tTagNew(tagArray, 1, false, &pTag); + if (pTag == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + goto _end; + } + pCreateTbReq->ctb.pTag = (uint8_t*)pTag; + + // set tag name + SArray* tagName = taosArrayInit(1, TSDB_COL_NAME_LEN); + char tagNameStr[TSDB_COL_NAME_LEN] = {0}; + strcpy(tagNameStr, "group_id"); + taosArrayPush(tagName, tagNameStr); + pCreateTbReq->ctb.tagName = tagName; + + // set table name + if (pDataBlock->info.parTbName[0]) { + pCreateTbReq->name = strdup(pDataBlock->info.parTbName); + } else { + pCreateTbReq->name = buildCtbNameByGroupId(stbFullName, pDataBlock->info.id.groupId); + } + } + } + + // SSubmitTbData req + for (int32_t i = 0; i < sz; ++i) { + SSDataBlock* pDataBlock = taosArrayGet(pBlocks, i); + if (pDataBlock->info.type == STREAM_DELETE_RESULT) { + pDeleteReq->suid = suid; + pDeleteReq->deleteReqs = taosArrayInit(0, sizeof(SSingleDeleteReq)); + tqBuildDeleteReq(pVnode, stbFullName, pDataBlock, pDeleteReq); + continue; + } + + int32_t rows = pDataBlock->info.rows; + + SSubmitTbData* pTbData = (SSubmitTbData*)taosMemoryCalloc(1, sizeof(SSubmitTbData)); + if (!pTbData) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + goto _end; + } + + if (!(pTbData->aRowP = taosArrayInit(rows, sizeof(SRow*)))) { + taosMemoryFree(pTbData); + goto _end; + } + pTbData->suid = suid; + pTbData->uid = 0; // uid is assigned by vnode + pTbData->sver = pTSchema->version; + + tqDebug("tq sink, convert block1 %d, rows: %d", i, rows); + + if (createTb) { + pTbData->pCreateTbReq = taosArrayGetP(createTbArray, i); + } + + if (!pVals && !(pVals = taosArrayInit(pTSchema->numOfCols, sizeof(SColVal)))) { + taosArrayDestroy(pTbData->aRowP); + taosMemoryFree(pTbData); + goto _end; + } + + for (int32_t j = 0; j < rows; j++) { + taosArrayClear(pVals); + for (int32_t k = 0; k < pTSchema->numOfCols; k++) { + const STColumn* pCol = &pTSchema->columns[k]; + SColumnInfoData* pColData = taosArrayGet(pDataBlock->pDataBlock, k); + if (colDataIsNull_s(pColData, j)) { + SColVal cv = COL_VAL_NULL(pCol->colId, pCol->type); + taosArrayPush(pVals, &cv); + } else { + void* data = colDataGetData(pColData, j); + if (IS_STR_DATA_TYPE(pCol->type)) { + SValue sv = (SValue){.nData = varDataLen(data), .pData = varDataVal(data)}; // address copy, no value + SColVal cv = COL_VAL_VALUE(pCol->colId, pCol->type, sv); + taosArrayPush(pVals, &cv); + } else { + SValue sv; + memcpy(&sv.val, data, tDataTypes[pCol->type].bytes); + SColVal cv = COL_VAL_VALUE(pCol->colId, pCol->type, sv); + taosArrayPush(pVals, &cv); + } + } + } + SRow* pRow = NULL; + if ((terrno = tRowBuild(pVals, (STSchema*)pTSchema, &pRow)) < 0) { + tDestroySSubmitTbData(pTbData, TSDB_MSG_FLG_ENCODE); + goto _end; + } + ASSERT(pRow); + taosArrayPush(pTbData->aRowP, &pRow); + } + + taosArrayPush(pReq->aSubmitTbData, pTbData); + } + + terrno = tBuildSubmitReq(TD_VID(pVnode), pReq, ppData, pLen, rpcMallocCont); + +_end: + taosArrayDestroy(tagArray); + taosArrayDestroy(pVals); + tDestroySSubmitReq2(pReq, TSDB_MSG_FLG_ENCODE); + + if (terrno != 0) { + taosArrayDestroy(pDeleteReq->deleteReqs); + return TSDB_CODE_FAILED; + } + return TSDB_CODE_SUCCESS; +} void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, int64_t ver, void* data) { const SArray* pBlocks = (const SArray*)data;