提交 db1d75bb 编写于 作者: H Haojun Liao

refactor: do some internal refactor.

上级 a751f750
...@@ -3423,7 +3423,7 @@ typedef struct { ...@@ -3423,7 +3423,7 @@ typedef struct {
int32_t tEncodeSSubmitReq2(SEncoder* pCoder, const SSubmitReq2* pReq); int32_t tEncodeSSubmitReq2(SEncoder* pCoder, const SSubmitReq2* pReq);
int32_t tDecodeSSubmitReq2(SDecoder* pCoder, SSubmitReq2* pReq); int32_t tDecodeSSubmitReq2(SDecoder* pCoder, SSubmitReq2* pReq);
void tDestroySSubmitTbData(SSubmitTbData* pTbData, int32_t flag); void tDestroySSubmitTbData(SSubmitTbData* pTbData, int32_t flag);
void tDestroySSubmitReq2(SSubmitReq2* pReq, int32_t flag); void tDestroySSubmitReq(SSubmitReq2* pReq, int32_t flag);
typedef struct { typedef struct {
int32_t affectedRows; int32_t affectedRows;
......
...@@ -2388,7 +2388,7 @@ _end: ...@@ -2388,7 +2388,7 @@ _end:
if (terrno != 0) { if (terrno != 0) {
*ppReq = NULL; *ppReq = NULL;
if (pReq) { if (pReq) {
tDestroySSubmitReq2(pReq, TSDB_MSG_FLG_ENCODE); tDestroySSubmitReq(pReq, TSDB_MSG_FLG_ENCODE);
taosMemoryFreeClear(pReq); taosMemoryFreeClear(pReq);
} }
......
...@@ -7436,7 +7436,7 @@ void tDestroySSubmitTbData(SSubmitTbData *pTbData, int32_t flag) { ...@@ -7436,7 +7436,7 @@ void tDestroySSubmitTbData(SSubmitTbData *pTbData, int32_t flag) {
} }
} }
void tDestroySSubmitReq2(SSubmitReq2 *pReq, int32_t flag) { void tDestroySSubmitReq(SSubmitReq2 *pReq, int32_t flag) {
if (pReq->aSubmitTbData == NULL) return; if (pReq->aSubmitTbData == NULL) return;
int32_t nSubmitTbData = TARRAY_SIZE(pReq->aSubmitTbData); int32_t nSubmitTbData = TARRAY_SIZE(pReq->aSubmitTbData);
......
...@@ -259,17 +259,14 @@ int32_t tqReaderAddTbUidList(STqReader *pReader, const SArray *pTableUidList); ...@@ -259,17 +259,14 @@ int32_t tqReaderAddTbUidList(STqReader *pReader, const SArray *pTableUidList);
int32_t tqReaderRemoveTbUidList(STqReader *pReader, const SArray *tbUidList); int32_t tqReaderRemoveTbUidList(STqReader *pReader, const SArray *tbUidList);
int32_t tqSeekVer(STqReader *pReader, int64_t ver, const char *id); int32_t tqSeekVer(STqReader *pReader, int64_t ver, const char *id);
void tqNextBlock(STqReader *pReader, SFetchRet *ret); int32_t tqNextBlock(STqReader *pReader, SSDataBlock* pBlock);
int32_t extractSubmitMsgFromWal(SWalReader *pReader, SPackedData *pPackedData); int32_t extractSubmitMsgFromWal(SWalReader *pReader, SPackedData *pPackedData);
int32_t tqReaderSetSubmitMsg(STqReader *pReader, void *msgStr, int32_t msgLen, int64_t ver); int32_t tqReaderSetSubmitMsg(STqReader *pReader, void *msgStr, int32_t msgLen, int64_t ver);
// int32_t tqReaderSetDataMsg(STqReader *pReader, const SSubmitReq *pMsg, int64_t ver); bool tqNextBlockImpl(STqReader *pReader);
bool tqNextDataBlock(STqReader *pReader); bool tqNextDataBlockFilterOut(STqReader *pReader, SHashObj *filterOutUids);
bool tqNextDataBlockFilterOut2(STqReader *pReader, SHashObj *filterOutUids); int32_t tqRetrieveDataBlock(SSDataBlock *pBlock, STqReader *pReader, SSubmitTbData **pSubmitTbDataRet);
int32_t tqRetrieveDataBlock2(SSDataBlock *pBlock, STqReader *pReader, SSubmitTbData **pSubmitTbDataRet);
int32_t tqRetrieveTaosxBlock2(STqReader *pReader, SArray *blocks, SArray *schemas, SSubmitTbData **pSubmitTbDataRet); int32_t tqRetrieveTaosxBlock2(STqReader *pReader, SArray *blocks, SArray *schemas, SSubmitTbData **pSubmitTbDataRet);
// int32_t tqRetrieveDataBlock(SSDataBlock *pBlock, STqReader *pReader);
// int32_t tqRetrieveTaosxBlock(STqReader *pReader, SArray *blocks, SArray *schemas);
int32_t vnodeEnqueueStreamMsg(SVnode *pVnode, SRpcMsg *pMsg); int32_t vnodeEnqueueStreamMsg(SVnode *pVnode, SRpcMsg *pMsg);
......
...@@ -684,7 +684,7 @@ static int32_t tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSma ...@@ -684,7 +684,7 @@ static int32_t tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSma
} }
if (pReq && tdProcessSubmitReq(sinkTsdb, output->info.version, pReq) < 0) { if (pReq && tdProcessSubmitReq(sinkTsdb, output->info.version, pReq) < 0) {
tDestroySSubmitReq2(pReq, TSDB_MSG_FLG_ENCODE); tDestroySSubmitReq(pReq, TSDB_MSG_FLG_ENCODE);
taosMemoryFree(pReq); taosMemoryFree(pReq);
smaError("vgId:%d, process submit req for rsma suid:%" PRIu64 ", uid:%" PRIu64 " level %" PRIi8 smaError("vgId:%d, process submit req for rsma suid:%" PRIu64 ", uid:%" PRIu64 " level %" PRIi8
" failed since %s", " failed since %s",
...@@ -696,7 +696,7 @@ static int32_t tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSma ...@@ -696,7 +696,7 @@ static int32_t tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSma
SMA_VID(pSma), suid, output->info.id.groupId, pItem->level, output->info.version); SMA_VID(pSma), suid, output->info.id.groupId, pItem->level, output->info.version);
if (pReq) { if (pReq) {
tDestroySSubmitReq2(pReq, TSDB_MSG_FLG_ENCODE); tDestroySSubmitReq(pReq, TSDB_MSG_FLG_ENCODE);
taosMemoryFree(pReq); taosMemoryFree(pReq);
} }
} }
......
...@@ -332,7 +332,7 @@ _end: ...@@ -332,7 +332,7 @@ _end:
taosArrayDestroy(tagArray); taosArrayDestroy(tagArray);
taosArrayDestroy(pVals); taosArrayDestroy(pVals);
if (pReq) { if (pReq) {
tDestroySSubmitReq2(pReq, TSDB_MSG_FLG_ENCODE); tDestroySSubmitReq(pReq, TSDB_MSG_FLG_ENCODE);
taosMemoryFree(pReq); taosMemoryFree(pReq);
} }
......
...@@ -288,7 +288,7 @@ void tqCloseReader(STqReader* pReader) { ...@@ -288,7 +288,7 @@ void tqCloseReader(STqReader* pReader) {
} }
// free hash // free hash
taosHashCleanup(pReader->tbIdHash); taosHashCleanup(pReader->tbIdHash);
tDestroySSubmitReq2(&pReader->submit, TSDB_MSG_FLG_DECODE); tDestroySSubmitReq(&pReader->submit, TSDB_MSG_FLG_DECODE);
taosMemoryFree(pReader); taosMemoryFree(pReader);
} }
...@@ -322,12 +322,11 @@ int32_t extractSubmitMsgFromWal(SWalReader* pReader, SPackedData* pPackedData) { ...@@ -322,12 +322,11 @@ int32_t extractSubmitMsgFromWal(SWalReader* pReader, SPackedData* pPackedData) {
return 0; return 0;
} }
void tqNextBlock(STqReader* pReader, SFetchRet* ret) { int32_t tqNextBlock(STqReader* pReader, SSDataBlock* pBlock) {
while (1) { while (1) {
if (pReader->msg2.msgStr == NULL) { if (pReader->msg2.msgStr == NULL) {
if (walNextValidMsg(pReader->pWalReader) < 0) { if (walNextValidMsg(pReader->pWalReader) < 0) {
ret->fetchType = FETCH_TYPE__NONE; return FETCH_TYPE__NONE;
return;
} }
void* pBody = POINTER_SHIFT(pReader->pWalReader->pHead->head.body, sizeof(SSubmitReq2Msg)); void* pBody = POINTER_SHIFT(pReader->pWalReader->pHead->head.body, sizeof(SSubmitReq2Msg));
...@@ -337,15 +336,14 @@ void tqNextBlock(STqReader* pReader, SFetchRet* ret) { ...@@ -337,15 +336,14 @@ void tqNextBlock(STqReader* pReader, SFetchRet* ret) {
tqReaderSetSubmitMsg(pReader, pBody, bodyLen, ver); tqReaderSetSubmitMsg(pReader, pBody, bodyLen, ver);
} }
while (tqNextDataBlock(pReader)) { while (tqNextBlockImpl(pReader)) {
memset(&ret->data, 0, sizeof(SSDataBlock)); memset(pBlock, 0, sizeof(SSDataBlock));
int32_t code = tqRetrieveDataBlock2(&ret->data, pReader, NULL); int32_t code = tqRetrieveDataBlock(pBlock, pReader, NULL);
if (code != 0 || ret->data.info.rows == 0) { if (code != TSDB_CODE_SUCCESS || pBlock->info.rows == 0) {
continue; continue;
} }
ret->fetchType = FETCH_TYPE__DATA; return FETCH_TYPE__DATA;
return;
} }
} }
} }
...@@ -367,7 +365,7 @@ int32_t tqReaderSetSubmitMsg(STqReader* pReader, void* msgStr, int32_t msgLen, i ...@@ -367,7 +365,7 @@ int32_t tqReaderSetSubmitMsg(STqReader* pReader, void* msgStr, int32_t msgLen, i
return 0; return 0;
} }
bool tqNextDataBlock(STqReader* pReader) { bool tqNextBlockImpl(STqReader* pReader) {
if (pReader->msg2.msgStr == NULL) { if (pReader->msg2.msgStr == NULL) {
return false; return false;
} }
...@@ -387,20 +385,20 @@ bool tqNextDataBlock(STqReader* pReader) { ...@@ -387,20 +385,20 @@ bool tqNextDataBlock(STqReader* pReader) {
tqDebug("tq reader block found, ver:%"PRId64", uid:%"PRId64, pReader->msg2.ver, pSubmitTbData->uid); tqDebug("tq reader block found, ver:%"PRId64", uid:%"PRId64, pReader->msg2.ver, pSubmitTbData->uid);
return true; return true;
} else { } else {
tqDebug("tq reader discard block, uid:%"PRId64", continue", pSubmitTbData->uid); tqDebug("tq reader discard submit block, uid:%"PRId64", continue", pSubmitTbData->uid);
} }
pReader->nextBlk++; pReader->nextBlk++;
} }
tDestroySSubmitReq2(&pReader->submit, TSDB_MSG_FLG_DECODE); tDestroySSubmitReq(&pReader->submit, TSDB_MSG_FLG_DECODE);
pReader->nextBlk = 0; pReader->nextBlk = 0;
pReader->msg2.msgStr = NULL; pReader->msg2.msgStr = NULL;
return false; return false;
} }
bool tqNextDataBlockFilterOut2(STqReader* pReader, SHashObj* filterOutUids) { bool tqNextDataBlockFilterOut(STqReader* pReader, SHashObj* filterOutUids) {
if (pReader->msg2.msgStr == NULL) return false; if (pReader->msg2.msgStr == NULL) return false;
int32_t blockSz = taosArrayGetSize(pReader->submit.aSubmitTbData); int32_t blockSz = taosArrayGetSize(pReader->submit.aSubmitTbData);
...@@ -415,7 +413,7 @@ bool tqNextDataBlockFilterOut2(STqReader* pReader, SHashObj* filterOutUids) { ...@@ -415,7 +413,7 @@ bool tqNextDataBlockFilterOut2(STqReader* pReader, SHashObj* filterOutUids) {
pReader->nextBlk++; pReader->nextBlk++;
} }
tDestroySSubmitReq2(&pReader->submit, TSDB_MSG_FLG_DECODE); tDestroySSubmitReq(&pReader->submit, TSDB_MSG_FLG_DECODE);
pReader->nextBlk = 0; pReader->nextBlk = 0;
pReader->msg2.msgStr = NULL; pReader->msg2.msgStr = NULL;
...@@ -451,7 +449,7 @@ int32_t tqMaskBlock(SSchemaWrapper* pDst, SSDataBlock* pBlock, const SSchemaWrap ...@@ -451,7 +449,7 @@ int32_t tqMaskBlock(SSchemaWrapper* pDst, SSDataBlock* pBlock, const SSchemaWrap
return 0; return 0;
} }
int32_t tqRetrieveDataBlock2(SSDataBlock* pBlock, STqReader* pReader, SSubmitTbData** pSubmitTbDataRet) { int32_t tqRetrieveDataBlock(SSDataBlock* pBlock, STqReader* pReader, SSubmitTbData** pSubmitTbDataRet) {
tqDebug("tq reader retrieve data block %p, index:%d", pReader->msg2.msgStr, pReader->nextBlk); tqDebug("tq reader retrieve data block %p, index:%d", pReader->msg2.msgStr, pReader->nextBlk);
SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk); SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk);
pReader->nextBlk++; pReader->nextBlk++;
...@@ -560,7 +558,7 @@ int32_t tqRetrieveDataBlock2(SSDataBlock* pBlock, STqReader* pReader, SSubmitTbD ...@@ -560,7 +558,7 @@ int32_t tqRetrieveDataBlock2(SSDataBlock* pBlock, STqReader* pReader, SSubmitTbD
int32_t sourceIdx = 0; int32_t sourceIdx = 0;
while (targetIdx < colActual) { while (targetIdx < colActual) {
if(sourceIdx >= numOfCols){ if(sourceIdx >= numOfCols){
tqError("tqRetrieveDataBlock2 sourceIdx:%d >= numOfCols:%d", sourceIdx, numOfCols); tqError("tqRetrieveDataBlock sourceIdx:%d >= numOfCols:%d", sourceIdx, numOfCols);
goto FAIL; goto FAIL;
} }
SColData* pCol = taosArrayGet(pCols, sourceIdx); SColData* pCol = taosArrayGet(pCols, sourceIdx);
...@@ -568,7 +566,7 @@ int32_t tqRetrieveDataBlock2(SSDataBlock* pBlock, STqReader* pReader, SSubmitTbD ...@@ -568,7 +566,7 @@ int32_t tqRetrieveDataBlock2(SSDataBlock* pBlock, STqReader* pReader, SSubmitTbD
SColVal colVal; SColVal colVal;
if(pCol->nVal != numOfRows){ if(pCol->nVal != numOfRows){
tqError("tqRetrieveDataBlock2 pCol->nVal:%d != numOfRows:%d", pCol->nVal, numOfRows); tqError("tqRetrieveDataBlock pCol->nVal:%d != numOfRows:%d", pCol->nVal, numOfRows);
goto FAIL; goto FAIL;
} }
......
...@@ -203,7 +203,7 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxR ...@@ -203,7 +203,7 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxR
if (pExec->subType == TOPIC_SUB_TYPE__TABLE) { if (pExec->subType == TOPIC_SUB_TYPE__TABLE) {
STqReader* pReader = pExec->pTqReader; STqReader* pReader = pExec->pTqReader;
tqReaderSetSubmitMsg(pReader, submit.msgStr, submit.msgLen, submit.ver); tqReaderSetSubmitMsg(pReader, submit.msgStr, submit.msgLen, submit.ver);
while (tqNextDataBlock(pReader)) { while (tqNextBlockImpl(pReader)) {
taosArrayClear(pBlocks); taosArrayClear(pBlocks);
taosArrayClear(pSchemas); taosArrayClear(pSchemas);
SSubmitTbData* pSubmitTbDataRet = NULL; SSubmitTbData* pSubmitTbDataRet = NULL;
...@@ -262,7 +262,7 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxR ...@@ -262,7 +262,7 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxR
} else if (pExec->subType == TOPIC_SUB_TYPE__DB) { } else if (pExec->subType == TOPIC_SUB_TYPE__DB) {
STqReader* pReader = pExec->pTqReader; STqReader* pReader = pExec->pTqReader;
tqReaderSetSubmitMsg(pReader, submit.msgStr, submit.msgLen, submit.ver); tqReaderSetSubmitMsg(pReader, submit.msgStr, submit.msgLen, submit.ver);
while (tqNextDataBlockFilterOut2(pReader, pExec->execDb.pFilterOutTbUid)) { while (tqNextDataBlockFilterOut(pReader, pExec->execDb.pFilterOutTbUid)) {
taosArrayClear(pBlocks); taosArrayClear(pBlocks);
taosArrayClear(pSchemas); taosArrayClear(pSchemas);
SSubmitTbData* pSubmitTbDataRet = NULL; SSubmitTbData* pSubmitTbDataRet = NULL;
......
...@@ -695,7 +695,7 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void* ...@@ -695,7 +695,7 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void*
len += sizeof(SSubmitReq2Msg); len += sizeof(SSubmitReq2Msg);
pBuf = rpcMallocCont(len); pBuf = rpcMallocCont(len);
if (NULL == pBuf) { if (NULL == pBuf) {
tDestroySSubmitReq2(&submitReq, TSDB_MSG_FLG_ENCODE); tDestroySSubmitReq(&submitReq, TSDB_MSG_FLG_ENCODE);
goto _end; goto _end;
} }
((SSubmitReq2Msg*)pBuf)->header.vgId = TD_VID(pVnode); ((SSubmitReq2Msg*)pBuf)->header.vgId = TD_VID(pVnode);
...@@ -707,11 +707,11 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void* ...@@ -707,11 +707,11 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void*
tqError("failed to encode submit req since %s", terrstr()); tqError("failed to encode submit req since %s", terrstr());
tEncoderClear(&encoder); tEncoderClear(&encoder);
rpcFreeCont(pBuf); rpcFreeCont(pBuf);
tDestroySSubmitReq2(&submitReq, TSDB_MSG_FLG_ENCODE); tDestroySSubmitReq(&submitReq, TSDB_MSG_FLG_ENCODE);
continue; continue;
} }
tEncoderClear(&encoder); tEncoderClear(&encoder);
tDestroySSubmitReq2(&submitReq, TSDB_MSG_FLG_ENCODE); tDestroySSubmitReq(&submitReq, TSDB_MSG_FLG_ENCODE);
SRpcMsg msg = { SRpcMsg msg = {
.msgType = TDMT_VND_SUBMIT, .msgType = TDMT_VND_SUBMIT,
......
...@@ -1388,7 +1388,7 @@ _exit: ...@@ -1388,7 +1388,7 @@ _exit:
// clear // clear
taosArrayDestroy(newTbUids); taosArrayDestroy(newTbUids);
tDestroySSubmitReq2(pSubmitReq, 0 == pMsg->version ? TSDB_MSG_FLG_CMPT : TSDB_MSG_FLG_DECODE); tDestroySSubmitReq(pSubmitReq, 0 == pMsg->version ? TSDB_MSG_FLG_CMPT : TSDB_MSG_FLG_DECODE);
tDestroySSubmitRsp2(pSubmitRsp, TSDB_MSG_FLG_ENCODE); tDestroySSubmitRsp2(pSubmitRsp, TSDB_MSG_FLG_ENCODE);
if (code) terrno = code; if (code) terrno = code;
......
...@@ -301,7 +301,7 @@ _end: ...@@ -301,7 +301,7 @@ _end:
if (terrno != 0) { if (terrno != 0) {
*ppReq = NULL; *ppReq = NULL;
if (pReq) { if (pReq) {
tDestroySSubmitReq2(pReq, TSDB_MSG_FLG_ENCODE); tDestroySSubmitReq(pReq, TSDB_MSG_FLG_ENCODE);
taosMemoryFree(pReq); taosMemoryFree(pReq);
} }
return terrno; return terrno;
...@@ -326,7 +326,7 @@ int32_t dataBlocksToSubmitReq(SDataInserterHandle* pInserter, void** pMsg, int32 ...@@ -326,7 +326,7 @@ int32_t dataBlocksToSubmitReq(SDataInserterHandle* pInserter, void** pMsg, int32
code = buildSubmitReqFromBlock(pInserter, &pReq, pDataBlock, pTSchema, uid, vgId, suid); code = buildSubmitReqFromBlock(pInserter, &pReq, pDataBlock, pTSchema, uid, vgId, suid);
if (code) { if (code) {
if (pReq) { if (pReq) {
tDestroySSubmitReq2(pReq, TSDB_MSG_FLG_ENCODE); tDestroySSubmitReq(pReq, TSDB_MSG_FLG_ENCODE);
taosMemoryFree(pReq); taosMemoryFree(pReq);
} }
...@@ -335,7 +335,7 @@ int32_t dataBlocksToSubmitReq(SDataInserterHandle* pInserter, void** pMsg, int32 ...@@ -335,7 +335,7 @@ int32_t dataBlocksToSubmitReq(SDataInserterHandle* pInserter, void** pMsg, int32
} }
code = submitReqToMsg(vgId, pReq, pMsg, msgLen); code = submitReqToMsg(vgId, pReq, pMsg, msgLen);
tDestroySSubmitReq2(pReq, TSDB_MSG_FLG_ENCODE); tDestroySSubmitReq(pReq, TSDB_MSG_FLG_ENCODE);
taosMemoryFree(pReq); taosMemoryFree(pReq);
return code; return code;
......
...@@ -1646,10 +1646,10 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) { ...@@ -1646,10 +1646,10 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) {
blockDataCleanup(pInfo->pRes); blockDataCleanup(pInfo->pRes);
SDataBlockInfo* pBlockInfo = &pInfo->pRes->info; SDataBlockInfo* pBlockInfo = &pInfo->pRes->info;
while (tqNextDataBlock(pInfo->tqReader)) { while (tqNextBlockImpl(pInfo->tqReader)) {
SSDataBlock block = {0}; SSDataBlock block = {0};
int32_t code = tqRetrieveDataBlock2(&block, pInfo->tqReader, NULL); int32_t code = tqRetrieveDataBlock(&block, pInfo->tqReader, NULL);
if (code != TSDB_CODE_SUCCESS || block.info.rows == 0) { if (code != TSDB_CODE_SUCCESS || block.info.rows == 0) {
continue; continue;
} }
...@@ -1687,23 +1687,23 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) { ...@@ -1687,23 +1687,23 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) {
if (pTaskInfo->streamInfo.currentOffset.type == TMQ_OFFSET__LOG) { if (pTaskInfo->streamInfo.currentOffset.type == TMQ_OFFSET__LOG) {
while (1) { while (1) {
SFetchRet ret = {0}; SSDataBlock block = {0};
tqNextBlock(pInfo->tqReader, &ret); int32_t type = tqNextBlock(pInfo->tqReader, &block);
tqOffsetResetToLog(
&pTaskInfo->streamInfo.currentOffset, // curVersion move to next, so currentOffset = curVersion - 1
pInfo->tqReader->pWalReader->curVersion - 1); // curVersion move to next, so currentOffset = curVersion - 1 tqOffsetResetToLog(&pTaskInfo->streamInfo.currentOffset, pInfo->tqReader->pWalReader->curVersion - 1);
if (ret.fetchType == FETCH_TYPE__DATA) { if (type == FETCH_TYPE__DATA) {
qDebug("doQueueScan get data from log %" PRId64 " rows, version:%" PRId64, ret.data.info.rows, qDebug("doQueueScan get data from log %" PRId64 " rows, version:%" PRId64, block.info.rows,
pTaskInfo->streamInfo.currentOffset.version); pTaskInfo->streamInfo.currentOffset.version);
blockDataCleanup(pInfo->pRes); blockDataCleanup(pInfo->pRes);
setBlockIntoRes(pInfo, &ret.data, true); setBlockIntoRes(pInfo, &block, true);
if (pInfo->pRes->info.rows > 0) { if (pInfo->pRes->info.rows > 0) {
qDebug("doQueueScan get data from log %" PRId64 " rows, return, version:%" PRId64, pInfo->pRes->info.rows, qDebug("doQueueScan get data from log %" PRId64 " rows, return, version:%" PRId64, pInfo->pRes->info.rows,
pTaskInfo->streamInfo.currentOffset.version); pTaskInfo->streamInfo.currentOffset.version);
return pInfo->pRes; return pInfo->pRes;
} }
} else if (ret.fetchType == FETCH_TYPE__NONE) { } else if (type == FETCH_TYPE__NONE) {
qDebug("doQueueScan get none from log, return, version:%" PRId64, pTaskInfo->streamInfo.currentOffset.version); qDebug("doQueueScan get none from log, return, version:%" PRId64, pTaskInfo->streamInfo.currentOffset.version);
return NULL; return NULL;
} }
...@@ -2072,11 +2072,10 @@ FETCH_NEXT_BLOCK: ...@@ -2072,11 +2072,10 @@ FETCH_NEXT_BLOCK:
blockDataCleanup(pInfo->pRes); blockDataCleanup(pInfo->pRes);
while (tqNextDataBlock(pInfo->tqReader)) { while (tqNextBlockImpl(pInfo->tqReader)) {
SSDataBlock block = {0}; SSDataBlock block = {0};
int32_t code = tqRetrieveDataBlock2(&block, pInfo->tqReader, NULL); int32_t code = tqRetrieveDataBlock(&block, pInfo->tqReader, NULL);
if (code != TSDB_CODE_SUCCESS || block.info.rows == 0) { if (code != TSDB_CODE_SUCCESS || block.info.rows == 0) {
continue; continue;
} }
......
...@@ -324,7 +324,7 @@ void insDestroyVgroupDataCxt(SVgroupDataCxt* pVgCxt) { ...@@ -324,7 +324,7 @@ void insDestroyVgroupDataCxt(SVgroupDataCxt* pVgCxt) {
return; return;
} }
tDestroySSubmitReq2(pVgCxt->pData, TSDB_MSG_FLG_ENCODE); tDestroySSubmitReq(pVgCxt->pData, TSDB_MSG_FLG_ENCODE);
taosMemoryFree(pVgCxt->pData); taosMemoryFree(pVgCxt->pData);
taosMemoryFree(pVgCxt); taosMemoryFree(pVgCxt);
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册