提交 11684ceb 编写于 作者: K kailixu

refact: resource mgmt when build block for rsma

上级 e918982a
...@@ -2181,15 +2181,14 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq2** pReq, const SSDataBlock* pData ...@@ -2181,15 +2181,14 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq2** pReq, const SSDataBlock* pData
int32_t buildSubmitReqFromDataBlock(SSubmitReq2** ppReq, const SSDataBlock* pDataBlock, STSchema* pTSchema, int32_t buildSubmitReqFromDataBlock(SSubmitReq2** ppReq, const SSDataBlock* pDataBlock, STSchema* pTSchema,
int32_t vgId, tb_uid_t suid) { int32_t vgId, tb_uid_t suid) {
SSubmitReq2* pReq = *ppReq; SSubmitReq2* pReq = NULL;
SArray* pVals = NULL; SArray* pVals = NULL;
int32_t bufSize = sizeof(SSubmitReq2); int32_t bufSize = sizeof(SSubmitReq2);
int32_t numOfBlks = 0; int32_t numOfBlks = 0;
int32_t sz = 1; int32_t sz = 1;
if (!(pReq = taosMemoryMalloc(bufSize))) { if (!(pReq = taosMemoryMalloc(bufSize))) {
terrno = TSDB_CODE_OUT_OF_MEMORY; goto _end;
return TSDB_CODE_FAILED;
} }
if (!(pReq->aSubmitTbData = taosArrayInit(1, sizeof(SSubmitTbData)))) { if (!(pReq->aSubmitTbData = taosArrayInit(1, sizeof(SSubmitTbData)))) {
...@@ -2205,19 +2204,24 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq2** ppReq, const SSDataBlock* pDat ...@@ -2205,19 +2204,24 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq2** ppReq, const SSDataBlock* pDat
} }
SSubmitTbData* pTbData = (SSubmitTbData*)taosMemoryCalloc(1, sizeof(SSubmitTbData)); SSubmitTbData* pTbData = (SSubmitTbData*)taosMemoryCalloc(1, sizeof(SSubmitTbData));
if (!pTbData) { if (!pTbData) {
terrno = TSDB_CODE_OUT_OF_MEMORY; goto _end;
return TSDB_CODE_FAILED; }
taosArrayPush(pReq->aSubmitTbData, pTbData);
if(!(pTbData->aRowP = taosArrayInit(rows, sizeof(SRow*)))){
taosMemoryFree(pTbData);
goto _end;
} }
pTbData->aRowP = taosArrayInit(rows, sizeof(SRow*));
pTbData->suid = suid; pTbData->suid = suid;
pTbData->uid = pDataBlock->info.id.groupId; pTbData->uid = pDataBlock->info.id.groupId;
pTbData->sver = pTSchema->version; pTbData->sver = pTSchema->version;
if (!pVals && !(pVals = taosArrayInit(colNum, sizeof(SColVal)))) { if (!pVals && !(pVals = taosArrayInit(colNum, sizeof(SColVal)))) {
terrno = TSDB_CODE_OUT_OF_MEMORY; taosArrayDestroy(pTbData->aRowP);
return TSDB_CODE_FAILED; taosMemoryFree(pTbData);
goto _end;
} }
for (int32_t j = 0; j < rows; ++j) { // iterate by row for (int32_t j = 0; j < rows; ++j) { // iterate by row
...@@ -2313,16 +2317,24 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq2** ppReq, const SSDataBlock* pDat ...@@ -2313,16 +2317,24 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq2** ppReq, const SSDataBlock* pDat
} }
} }
SRow* pRow = NULL; SRow* pRow = NULL;
tRowBuild(pVals, pTSchema, &pRow); if ((terrno = tRowBuild(pVals, pTSchema, &pRow)) < 0) {
if (pRow) { tDestroySSubmitTbData(pTbData, TSDB_MSG_FLG_ENCODE);
taosArrayPush(pTbData->aRowP, &pRow); goto _end;
} }
ASSERT(pRow);
taosArrayPush(pTbData->aRowP, &pRow);
} }
taosArrayPush(pReq->aSubmitTbData, pTbData);
} }
_end: _end:
taosArrayDestroy(pVals);
if (terrno != 0) { if (terrno != 0) {
*ppReq = NULL;
if (pReq) tDestroySSubmitReq2(pReq, TSDB_MSG_FLG_ENCODE);
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} }
*ppReq = pReq;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
......
...@@ -713,7 +713,7 @@ static int32_t tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSma ...@@ -713,7 +713,7 @@ static int32_t tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSma
} }
if (pReq && tdProcessSubmitReq(sinkTsdb, output->info.version, pReq) < 0) { if (pReq && tdProcessSubmitReq(sinkTsdb, output->info.version, pReq) < 0) {
taosMemoryFreeClear(pReq); tDestroySSubmitReq2(pReq, TSDB_MSG_FLG_ENCODE);
smaError("vgId:%d, process submit req for rsma suid:%" PRIu64 ", uid:%" PRIu64 " level %" PRIi8 smaError("vgId:%d, process submit req for rsma suid:%" PRIu64 ", uid:%" PRIu64 " level %" PRIi8
" failed since %s", " failed since %s",
SMA_VID(pSma), suid, output->info.id.groupId, pItem->level, terrstr()); SMA_VID(pSma), suid, output->info.id.groupId, pItem->level, terrstr());
...@@ -723,7 +723,7 @@ static int32_t tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSma ...@@ -723,7 +723,7 @@ static int32_t tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSma
smaDebug("vgId:%d, process submit req for rsma suid:%" PRIu64 ",uid:%" PRIu64 ", level %" PRIi8 " ver %" PRIi64, smaDebug("vgId:%d, process submit req for rsma suid:%" PRIu64 ",uid:%" PRIu64 ", level %" PRIi8 " ver %" PRIi64,
SMA_VID(pSma), suid, output->info.id.groupId, pItem->level, output->info.version); SMA_VID(pSma), suid, output->info.id.groupId, pItem->level, output->info.version);
taosMemoryFreeClear(pReq); if(pReq) tDestroySSubmitReq2(pReq, TSDB_MSG_FLG_ENCODE);
} }
} }
......
...@@ -150,7 +150,6 @@ int tsdbScanAndConvertSubmitMsg(STsdb *pTsdb, SSubmitReq *pMsg) { ...@@ -150,7 +150,6 @@ int tsdbScanAndConvertSubmitMsg(STsdb *pTsdb, SSubmitReq *pMsg) {
} }
} }
#endif #endif
// pSubmitTbData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT
tInitSubmitBlkIter(&msgIter, pBlock, &blkIter); tInitSubmitBlkIter(&msgIter, pBlock, &blkIter);
while ((row = tGetSubmitBlkNext(&blkIter)) != NULL) { while ((row = tGetSubmitBlkNext(&blkIter)) != NULL) {
if (tsdbCheckRowRange(pTsdb, msgIter.uid, row, minKey, maxKey, now) < 0) { if (tsdbCheckRowRange(pTsdb, msgIter.uid, row, minKey, maxKey, now) < 0) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册