diff --git a/include/common/tcommon.h b/include/common/tcommon.h index 4b17ff48abea1d254c739eb4234ed6b024dc53a6..520a8e9c2c03a6e455e1cc29846d42c6ba3e4561 100644 --- a/include/common/tcommon.h +++ b/include/common/tcommon.h @@ -208,11 +208,6 @@ typedef struct SSDataBlock { SDataBlockInfo info; } SSDataBlock; -enum { - FETCH_TYPE__DATA = 0, - FETCH_TYPE__NONE, -}; - typedef struct SVarColAttr { int32_t* offset; // start position for each entry in the list uint32_t length; // used buffer size that contain the valid data diff --git a/include/libs/executor/executor.h b/include/libs/executor/executor.h index b7e6c42e3b7ede77237aa7e8e910b45f236b5cc9..1fb00e743fb33f35b2a2dd4344b2f81214832c2d 100644 --- a/include/libs/executor/executor.h +++ b/include/libs/executor/executor.h @@ -190,8 +190,6 @@ STimeWindow getAlignQueryTimeWindow(SInterval* pInterval, int32_t precision, int SArray* qGetQueriedTableListInfo(qTaskInfo_t tinfo); -void verifyOffset(void *pWalReader, STqOffsetVal* pOffset); - int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subType); void qStreamSetOpen(qTaskInfo_t tinfo); diff --git a/include/libs/wal/wal.h b/include/libs/wal/wal.h index 862a1da9d0446dc361a98736056ee4d82e127a98..35a6838b2e0f36ba36ccd41c4e8dd175e2efe7c9 100644 --- a/include/libs/wal/wal.h +++ b/include/libs/wal/wal.h @@ -142,7 +142,7 @@ typedef struct { typedef struct SWalReader SWalReader; // todo hide this struct -typedef struct SWalReader { +struct SWalReader { SWal *pWal; int64_t readerId; TdFilePtr pLogFile; @@ -154,7 +154,7 @@ typedef struct SWalReader { SWalFilterCond cond; // TODO remove it SWalCkHead *pHead; -} SWalReader; +}; // module initialization int32_t walInit(); @@ -201,6 +201,7 @@ int32_t walNextValidMsg(SWalReader *pRead); int64_t walReaderGetCurrentVer(const SWalReader *pReader); int64_t walReaderGetValidFirstVer(const SWalReader *pReader); void walReaderValidVersionRange(SWalReader *pReader, int64_t *sver, int64_t *ever); +void walReaderVerifyOffset(SWalReader *pWalReader, STqOffsetVal* pOffset); // only for tq usage void walSetReaderCapacity(SWalReader *pRead, int32_t capacity); diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index 549173b3f7bb9b29d177ddec3251831b975fcd7c..da5d6f8b3caaea12d68e19c07fe80552bfad13fe 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -185,7 +185,6 @@ typedef struct STsdbReader STsdbReader; int32_t tsdbReaderOpen(SVnode *pVnode, SQueryTableDataCond *pCond, void *pTableList, int32_t numOfTables, SSDataBlock *pResBlock, STsdbReader **ppReader, const char *idstr, bool countOnly, SHashObj** pIgnoreTables); int32_t tsdbSetTableList(STsdbReader *pReader, const void *pTableList, int32_t num); - void tsdbReaderSetId(STsdbReader *pReader, const char *idstr); void tsdbReaderClose(STsdbReader *pReader); int32_t tsdbNextDataBlock(STsdbReader *pReader, bool *hasNext); @@ -198,8 +197,6 @@ int64_t tsdbGetNumOfRowsInMemTable(STsdbReader *pHandle); void *tsdbGetIdx(SMeta *pMeta); void *tsdbGetIvtIdx(SMeta *pMeta); uint64_t tsdbGetReaderMaxVersion(STsdbReader *pReader); -int32_t tsdbSetTableList(STsdbReader *pReader, const void *pTableList, int32_t num); -void tsdbReaderSetId(STsdbReader *pReader, const char *idstr); void tsdbReaderSetCloseFlag(STsdbReader *pReader); int32_t tsdbReuseCacherowsReader(void* pReader, void* pTableIdList, int32_t numOfTables); @@ -267,7 +264,7 @@ int32_t tqReaderAddTbUidList(STqReader *pReader, const SArray *pTableUidList); int32_t tqReaderRemoveTbUidList(STqReader *pReader, const SArray *tbUidList); int32_t tqSeekVer(STqReader *pReader, int64_t ver, const char *id); -int32_t tqNextBlockInWal(STqReader* pReader); +bool tqNextBlockInWal(STqReader* pReader, const char* idstr); bool tqNextBlockImpl(STqReader *pReader, const char* idstr); int32_t extractMsgFromWal(SWalReader* pReader, void** pItem, const char* id); diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 861235a5c397a881bd5d4cd4c90f47adb06d6135..230e8b8f886d532521f2c3419042fb5c9aa8ec6b 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -219,8 +219,6 @@ int32_t tqProcessTaskPauseReq(STQ* pTq, int64_t version, char* msg, int32_t msgL int32_t tqProcessTaskResumeReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen); int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessStreamTaskCheckRsp(STQ* pTq, int64_t version, char* msg, int32_t msgLen); -int32_t tqProcessSubmitReqForSubscribe(STQ* pTq); -int32_t tqProcessDeleteDataReq(STQ* pTq, void* pReq, int32_t len, int64_t ver); int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg, bool exec); int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index e61cc2fdb143b41614aeba70d392e85e560a0539..a8c2a09319b04896117e2beb099ab27731c3591b 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1195,159 +1195,6 @@ int32_t extractDelDataBlock(const void* pData, int32_t len, int64_t ver, SStream return TSDB_CODE_SUCCESS; } -int32_t tqProcessDeleteDataReq(STQ* pTq, void* pReq, int32_t len, int64_t ver) { - bool failed = false; - SDecoder* pCoder = &(SDecoder){0}; - SDeleteRes* pRes = &(SDeleteRes){0}; - - pRes->uidList = taosArrayInit(0, sizeof(tb_uid_t)); - if (pRes->uidList == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - failed = true; - } - - tDecoderInit(pCoder, pReq, len); - tDecodeDeleteRes(pCoder, pRes); - tDecoderClear(pCoder); - - int32_t sz = taosArrayGetSize(pRes->uidList); - if (sz == 0 || pRes->affectedRows == 0) { - taosArrayDestroy(pRes->uidList); - return 0; - } - - SSDataBlock* pDelBlock = createSpecialDataBlock(STREAM_DELETE_DATA); - blockDataEnsureCapacity(pDelBlock, sz); - pDelBlock->info.rows = sz; - pDelBlock->info.version = ver; - - for (int32_t i = 0; i < sz; i++) { - // start key column - SColumnInfoData* pStartCol = taosArrayGet(pDelBlock->pDataBlock, START_TS_COLUMN_INDEX); - colDataSetVal(pStartCol, i, (const char*)&pRes->skey, false); // end key column - SColumnInfoData* pEndCol = taosArrayGet(pDelBlock->pDataBlock, END_TS_COLUMN_INDEX); - colDataSetVal(pEndCol, i, (const char*)&pRes->ekey, false); - // uid column - SColumnInfoData* pUidCol = taosArrayGet(pDelBlock->pDataBlock, UID_COLUMN_INDEX); - int64_t* pUid = taosArrayGet(pRes->uidList, i); - colDataSetVal(pUidCol, i, (const char*)pUid, false); - - colDataSetNULL(taosArrayGet(pDelBlock->pDataBlock, GROUPID_COLUMN_INDEX), i); - colDataSetNULL(taosArrayGet(pDelBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX), i); - colDataSetNULL(taosArrayGet(pDelBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX), i); - } - - taosArrayDestroy(pRes->uidList); - - int32_t* pRef = taosMemoryMalloc(sizeof(int32_t)); - *pRef = 1; - - taosWLockLatch(&pTq->pStreamMeta->lock); - - void* pIter = NULL; - while (1) { - pIter = taosHashIterate(pTq->pStreamMeta->pTasks, pIter); - if (pIter == NULL) { - break; - } - - SStreamTask* pTask = *(SStreamTask**)pIter; - if (pTask->taskLevel != TASK_LEVEL__SOURCE) { - continue; - } - - qDebug("s-task:%s delete req enqueue, ver: %" PRId64, pTask->id.idStr, ver); - - if (!failed) { - SStreamRefDataBlock* pRefBlock = taosAllocateQitem(sizeof(SStreamRefDataBlock), DEF_QITEM, 0); - pRefBlock->type = STREAM_INPUT__REF_DATA_BLOCK; - pRefBlock->pBlock = pDelBlock; - - if (tAppendDataToInputQueue(pTask, (SStreamQueueItem*)pRefBlock) < 0) { - atomic_sub_fetch_32(pRef, 1); - taosFreeQitem(pRefBlock); - continue; - } - - if (streamSchedExec(pTask) < 0) { - qError("s-task:%s stream task launch failed", pTask->id.idStr); - continue; - } - - } else { - streamTaskInputFail(pTask); - } - } - - taosWUnLockLatch(&pTq->pStreamMeta->lock); - - int32_t ref = atomic_sub_fetch_32(pRef, 1); - if (ref == 0) { - blockDataDestroy(pDelBlock); - taosMemoryFree(pRef); - } - -#if 0 - SStreamDataBlock* pStreamBlock = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, 0); - pStreamBlock->type = STREAM_INPUT__DATA_BLOCK; - pStreamBlock->blocks = taosArrayInit(0, sizeof(SSDataBlock)); - SSDataBlock block = {0}; - assignOneDataBlock(&block, pDelBlock); - block.info.type = STREAM_DELETE_DATA; - taosArrayPush(pStreamBlock->blocks, &block); - - if (!failed) { - if (tAppendDataToInputQueue(pTask, (SStreamQueueItem*)pStreamBlock) < 0) { - qError("stream task input del failed, task id %d", pTask->id.taskId); - continue; - } - - if (streamSchedExec(pTask) < 0) { - qError("stream task launch failed, task id %d", pTask->id.taskId); - continue; - } - } else { - streamTaskInputFail(pTask); - } - } - blockDataDestroy(pDelBlock); -#endif - return 0; -} - -int32_t tqProcessSubmitReqForSubscribe(STQ* pTq) { - int32_t vgId = TD_VID(pTq->pVnode); - - taosWLockLatch(&pTq->lock); - - if (taosHashGetSize(pTq->pPushMgr) > 0) { - void* pIter = taosHashIterate(pTq->pPushMgr, NULL); - - while (pIter) { - STqHandle* pHandle = *(STqHandle**)pIter; - tqDebug("vgId:%d start set submit for pHandle:%p, consumer:0x%" PRIx64, vgId, pHandle, pHandle->consumerId); - - if (ASSERT(pHandle->msg != NULL)) { - tqError("pHandle->msg should not be null"); - break; - }else{ - SRpcMsg msg = {.msgType = TDMT_VND_TMQ_CONSUME, .pCont = pHandle->msg->pCont, .contLen = pHandle->msg->contLen, .info = pHandle->msg->info}; - tmsgPutToQueue(&pTq->pVnode->msgCb, QUERY_QUEUE, &msg); - taosMemoryFree(pHandle->msg); - pHandle->msg = NULL; - } - - pIter = taosHashIterate(pTq->pPushMgr, pIter); - } - - taosHashClear(pTq->pPushMgr); - } - - // unlock - taosWUnLockLatch(&pTq->lock); - return 0; -} - int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) { SStreamTaskRunReq* pReq = pMsg->pCont; diff --git a/source/dnode/vnode/src/tq/tqPush.c b/source/dnode/vnode/src/tq/tqPush.c index 82a76657fddf2dd0b574b418572cd89c1d898ea6..7f4fe48b8e20c8fb4dd7b67f14ff553bf78e8199 100644 --- a/source/dnode/vnode/src/tq/tqPush.c +++ b/source/dnode/vnode/src/tq/tqPush.c @@ -16,8 +16,40 @@ #include "tq.h" #include "vnd.h" -int32_t tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) { +int32_t tqProcessSubmitReqForSubscribe(STQ* pTq) { + int32_t vgId = TD_VID(pTq->pVnode); + + taosWLockLatch(&pTq->lock); + + if (taosHashGetSize(pTq->pPushMgr) > 0) { + void* pIter = taosHashIterate(pTq->pPushMgr, NULL); + + while (pIter) { + STqHandle* pHandle = *(STqHandle**)pIter; + tqDebug("vgId:%d start set submit for pHandle:%p, consumer:0x%" PRIx64, vgId, pHandle, pHandle->consumerId); + if (ASSERT(pHandle->msg != NULL)) { + tqError("pHandle->msg should not be null"); + break; + }else{ + SRpcMsg msg = {.msgType = TDMT_VND_TMQ_CONSUME, .pCont = pHandle->msg->pCont, .contLen = pHandle->msg->contLen, .info = pHandle->msg->info}; + tmsgPutToQueue(&pTq->pVnode->msgCb, QUERY_QUEUE, &msg); + taosMemoryFree(pHandle->msg); + pHandle->msg = NULL; + } + + pIter = taosHashIterate(pTq->pPushMgr, pIter); + } + + taosHashClear(pTq->pPushMgr); + } + + // unlock + taosWUnLockLatch(&pTq->lock); + return 0; +} + +int32_t tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) { if (msgType == TDMT_VND_SUBMIT) { tqProcessSubmitReqForSubscribe(pTq); } @@ -37,10 +69,6 @@ int32_t tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t v if (msgType == TDMT_VND_SUBMIT || msgType == TDMT_VND_DELETE) { tqStartStreamTasks(pTq); } - - if (msgType == TDMT_VND_DELETE) { -// tqProcessDeleteDataReq(pTq, POINTER_SHIFT(msg, sizeof(SMsgHead)), msgLen - sizeof(SMsgHead), ver); - } } return 0; diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index 510d54355eb56ec1445fc99f0701999764f0e80d..b756f99f3298a9c1e614839aad1c533036680085 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -344,7 +344,7 @@ int32_t extractMsgFromWal(SWalReader* pReader, void** pItem, const char* id) { } // todo ignore the error in wal? -int32_t tqNextBlockInWal(STqReader* pReader) { +bool tqNextBlockInWal(STqReader* pReader, const char* id) { SWalReader* pWalReader = pReader->pWalReader; while (1) { @@ -353,7 +353,7 @@ int32_t tqNextBlockInWal(STqReader* pReader) { // try next message in wal file // todo always retry to avoid read failure caused by wal file deletion if (walNextValidMsg(pWalReader) < 0) { - return FETCH_TYPE__NONE; + return false; } void* pBody = POINTER_SHIFT(pWalReader->pHead->head.body, sizeof(SSubmitReq2Msg)); @@ -379,24 +379,24 @@ int32_t tqNextBlockInWal(STqReader* pReader) { if (tDecodeSubmitReq(&decoder, &pReader->submit) < 0) { tDecoderClear(&decoder); tqError("decode wal file error, msgLen:%d, ver:%" PRId64, bodyLen, ver); - return FETCH_TYPE__NONE; + return false; } tDecoderClear(&decoder); pReader->nextBlk = 0; } - size_t numOfBlocks = taosArrayGetSize(pReader->submit.aSubmitTbData); + int32_t numOfBlocks = taosArrayGetSize(pReader->submit.aSubmitTbData); while (pReader->nextBlk < numOfBlocks) { - tqDebug("tq reader next data block %p, %d %" PRId64 " %d", pReader->msg.msgStr, pReader->msg.msgLen, - pReader->msg.ver, pReader->nextBlk); + tqDebug("tq reader next data block %d/%d, len:%d %" PRId64 " %d", pReader->nextBlk, + numOfBlocks, pReader->msg.msgLen, pReader->msg.ver, pReader->nextBlk); SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk); if (pReader->tbIdHash == NULL) { int32_t code = tqRetrieveDataBlock(pReader, NULL); if (code == TSDB_CODE_SUCCESS && pReader->pResBlock->info.rows > 0) { - return FETCH_TYPE__DATA; + return true; } } @@ -406,7 +406,7 @@ int32_t tqNextBlockInWal(STqReader* pReader) { int32_t code = tqRetrieveDataBlock(pReader, NULL); if (code == TSDB_CODE_SUCCESS && pReader->pResBlock->info.rows > 0) { - return FETCH_TYPE__DATA; + return true; } } else { pReader->nextBlk += 1; @@ -414,7 +414,9 @@ int32_t tqNextBlockInWal(STqReader* pReader) { } } + qDebug("stream scan return empty, all %d submit blocks consumed, %s", numOfBlocks, id); tDestroySubmitReq(&pReader->submit, TSDB_MSG_FLG_DECODE); + pReader->msg.msgStr = NULL; } } @@ -730,13 +732,17 @@ int32_t tqRetrieveDataBlock(STqReader* pReader, const char* id) { return 0; } +// todo refactor: int32_t tqRetrieveTaosxBlock(STqReader* pReader, SArray* blocks, SArray* schemas, SSubmitTbData** pSubmitTbDataRet) { tqDebug("tq reader retrieve data block %p, %d", pReader->msg.msgStr, pReader->nextBlk); SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk); pReader->nextBlk++; - if (pSubmitTbDataRet) *pSubmitTbDataRet = pSubmitTbData; + if (pSubmitTbDataRet) { + *pSubmitTbDataRet = pSubmitTbData; + } + int32_t sversion = pSubmitTbData->sver; int64_t suid = pSubmitTbData->suid; int64_t uid = pSubmitTbData->uid; diff --git a/source/dnode/vnode/src/tq/tqRestore.c b/source/dnode/vnode/src/tq/tqRestore.c index 0191a42a78411c907deab09fc418154768b67bba..9acf2454af6a725b8b7a12888830358afe65eac4 100644 --- a/source/dnode/vnode/src/tq/tqRestore.c +++ b/source/dnode/vnode/src/tq/tqRestore.c @@ -15,7 +15,7 @@ #include "tq.h" -static int32_t createStreamRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle); +static int32_t createStreamTaskRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle); // this function should be executed by stream threads. // extract submit block from WAL, and add them into the input queue for the sources tasks. @@ -30,7 +30,7 @@ int32_t tqStreamTasksScanWal(STQ* pTq) { // check all restore tasks bool shouldIdle = true; - createStreamRunReq(pTq->pStreamMeta, &shouldIdle); + createStreamTaskRunReq(pTq->pStreamMeta, &shouldIdle); int32_t times = 0; @@ -57,7 +57,39 @@ int32_t tqStreamTasksScanWal(STQ* pTq) { return 0; } -int32_t createStreamRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) { +static int32_t doSetOffsetForWalReader(SStreamTask *pTask, int32_t vgId) { + // seek the stored version and extract data from WAL + int64_t firstVer = walReaderGetValidFirstVer(pTask->exec.pWalReader); + if (pTask->chkInfo.currentVer < firstVer) { + pTask->chkInfo.currentVer = firstVer; + tqWarn("vgId:%d s-task:%s ver earlier than the first ver of wal range %" PRId64 ", forward to %" PRId64, vgId, + pTask->id.idStr, firstVer, pTask->chkInfo.currentVer); + + // todo need retry if failed + int32_t code = walReaderSeekVer(pTask->exec.pWalReader, pTask->chkInfo.currentVer); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + + // append the data for the stream + tqDebug("vgId:%d s-task:%s wal reader seek to ver:%" PRId64, vgId, pTask->id.idStr, pTask->chkInfo.currentVer); + } else { + int64_t currentVer = walReaderGetCurrentVer(pTask->exec.pWalReader); + if (currentVer == -1) { // we only seek the read for the first time + int32_t code = walReaderSeekVer(pTask->exec.pWalReader, pTask->chkInfo.currentVer); + if (code != TSDB_CODE_SUCCESS) { // no data in wal, quit + return code; + } + + // append the data for the stream + tqDebug("vgId:%d s-task:%s wal reader initial seek to ver:%" PRId64, vgId, pTask->id.idStr, pTask->chkInfo.currentVer); + } + } + + return TSDB_CODE_SUCCESS; +} + +int32_t createStreamTaskRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) { *pScanIdle = true; bool noNewDataInWal = true; int32_t vgId = pStreamMeta->vgId; @@ -67,6 +99,7 @@ int32_t createStreamRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) { return TSDB_CODE_SUCCESS; } + // clone the task list, to avoid the task update during scan wal files SArray* pTaskList = NULL; taosWLockLatch(&pStreamMeta->lock); pTaskList = taosArrayDup(pStreamMeta->pTaskList, NULL); @@ -107,39 +140,15 @@ int32_t createStreamRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) { *pScanIdle = false; // seek the stored version and extract data from WAL - int64_t firstVer = walReaderGetValidFirstVer(pTask->exec.pWalReader); - if (pTask->chkInfo.currentVer < firstVer) { - pTask->chkInfo.currentVer = firstVer; - tqWarn("vgId:%d s-task:%s ver earlier than the first ver of wal range %" PRId64 ", forward to %" PRId64, vgId, - pTask->id.idStr, firstVer, pTask->chkInfo.currentVer); - - // todo need retry if failed - int32_t code = walReaderSeekVer(pTask->exec.pWalReader, pTask->chkInfo.currentVer); - if (code != TSDB_CODE_SUCCESS) { - streamMetaReleaseTask(pStreamMeta, pTask); - continue; - } - - // append the data for the stream - tqDebug("vgId:%d s-task:%s wal reader seek to ver:%" PRId64, vgId, pTask->id.idStr, pTask->chkInfo.currentVer); - } else { - int64_t currentVer = walReaderGetCurrentVer(pTask->exec.pWalReader); - if (currentVer == -1) { - int32_t code = walReaderSeekVer(pTask->exec.pWalReader, pTask->chkInfo.currentVer); - if (code != TSDB_CODE_SUCCESS) { // no data in wal, quit - streamMetaReleaseTask(pStreamMeta, pTask); - continue; - } - - // append the data for the stream - tqDebug("vgId:%d s-task:%s wal reader initial seek to ver:%" PRId64, vgId, pTask->id.idStr, - pTask->chkInfo.currentVer); - } + int32_t code = doSetOffsetForWalReader(pTask, vgId); + if (code != TSDB_CODE_SUCCESS) { + streamMetaReleaseTask(pStreamMeta, pTask); + continue; } // append the data for the stream SStreamQueueItem* pItem = NULL; - int32_t code = extractMsgFromWal(pTask->exec.pWalReader, (void**)&pItem, pTask->id.idStr); + code = extractMsgFromWal(pTask->exec.pWalReader, (void**) &pItem, pTask->id.idStr); if (code != TSDB_CODE_SUCCESS) { // failed, continue streamMetaReleaseTask(pStreamMeta, pTask); continue; diff --git a/source/dnode/vnode/src/tq/tqUtil.c b/source/dnode/vnode/src/tq/tqUtil.c index aaa497221b0c63b5d179f24c755816380bbc2236..b5308d8b98c225c3aa6209b4648aa183ea690e95 100644 --- a/source/dnode/vnode/src/tq/tqUtil.c +++ b/source/dnode/vnode/src/tq/tqUtil.c @@ -249,7 +249,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, } if (offset->type == TMQ_OFFSET__LOG) { - verifyOffset(pHandle->pWalReader, offset); + walReaderVerifyOffset(pHandle->pWalReader, offset); int64_t fetchVer = offset->version + 1; pCkHead = taosMemoryMalloc(sizeof(SWalCkHead) + 2048); if (pCkHead == NULL) { diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index a825f56639b12dcb9dd46f6d2a8a51016e3b1622..b6d4615997ec5a62dc92e98372673f77c6f4b3df 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -1058,17 +1058,6 @@ void qStreamSetOpen(qTaskInfo_t tinfo) { pOperator->status = OP_NOT_OPENED; } -void verifyOffset(void *pWalReader, STqOffsetVal* pOffset){ - // if offset version is small than first version , let's seek to first version - taosThreadMutexLock(&((SWalReader*)pWalReader)->pWal->mutex); - int64_t firstVer = walGetFirstVer(((SWalReader*)pWalReader)->pWal); - taosThreadMutexUnlock(&((SWalReader*)pWalReader)->pWal->mutex); - - if (pOffset->version + 1 < firstVer){ - pOffset->version = firstVer - 1; - } -} - int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subType) { SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; SOperatorInfo* pOperator = pTaskInfo->pRoot; @@ -1095,7 +1084,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT tsdbReaderClose(pScanBaseInfo->dataReader); pScanBaseInfo->dataReader = NULL; - verifyOffset(pInfo->tqReader->pWalReader, pOffset); + walReaderVerifyOffset(pInfo->tqReader->pWalReader, pOffset); if (tqSeekVer(pInfo->tqReader, pOffset->version + 1, id) < 0) { qError("tqSeekVer failed ver:%" PRId64 ", %s", pOffset->version + 1, id); return -1; diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 1aa60ea392dd49023d6eb52ab6a70c18a9db1d3d..0537702a754d8e0f0d40f7338f243c3068f20340 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1697,13 +1697,13 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) { if (pTaskInfo->streamInfo.currentOffset.type == TMQ_OFFSET__LOG) { while (1) { - int32_t type = tqNextBlockInWal(pInfo->tqReader); + bool hasResult = tqNextBlockInWal(pInfo->tqReader, id); SSDataBlock* pRes = pInfo->tqReader->pResBlock; // curVersion move to next, so currentOffset = curVersion - 1 tqOffsetResetToLog(&pTaskInfo->streamInfo.currentOffset, pInfo->tqReader->pWalReader->curVersion - 1); - if (type == FETCH_TYPE__DATA) { + if (hasResult) { qDebug("doQueueScan get data from log %" PRId64 " rows, version:%" PRId64, pRes->info.rows, pTaskInfo->streamInfo.currentOffset.version); blockDataCleanup(pInfo->pRes); @@ -1711,7 +1711,7 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) { if (pInfo->pRes->info.rows > 0) { return pInfo->pRes; } - } else if (type == FETCH_TYPE__NONE) { + } else { qDebug("doQueueScan get none from log, return, version:%" PRId64, pTaskInfo->streamInfo.currentOffset.version); return NULL; } @@ -2074,8 +2074,9 @@ FETCH_NEXT_BLOCK: return pInfo->pUpdateRes; } - const char* id = GET_TASKID(pTaskInfo); - SDataBlockInfo* pBlockInfo = &pInfo->pRes->info; + const char* id = GET_TASKID(pTaskInfo); + SSDataBlock* pBlock = pInfo->pRes; + SDataBlockInfo* pBlockInfo = &pBlock->info; int32_t totalBlocks = taosArrayGetSize(pInfo->pBlockLists); NEXT_SUBMIT_BLK: @@ -2086,12 +2087,6 @@ FETCH_NEXT_BLOCK: doClearBufferedBlocks(pInfo); qDebug("stream scan return empty, all %d submit blocks consumed, %s", totalBlocks, id); - void* buff = NULL; - // int32_t len = streamScanOperatorEncode(pInfo, &buff); - // if (len > 0) { - // streamStateSaveInfo(pInfo->pState, STREAM_SCAN_OP_NAME, strlen(STREAM_SCAN_OP_NAME), buff, len); - // } - taosMemoryFreeClear(buff); return NULL; } @@ -2105,7 +2100,7 @@ FETCH_NEXT_BLOCK: } } - blockDataCleanup(pInfo->pRes); + blockDataCleanup(pBlock); while (tqNextBlockImpl(pInfo->tqReader, id)) { int32_t code = tqRetrieveDataBlock(pInfo->tqReader, id); @@ -2120,10 +2115,10 @@ FETCH_NEXT_BLOCK: return pInfo->pCreateTbRes; } - doCheckUpdate(pInfo, pBlockInfo->window.ekey, pInfo->pRes); - doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL); - pInfo->pRes->info.dataLoad = 1; - blockDataUpdateTsWindow(pInfo->pRes, pInfo->primaryTsIndex); + doCheckUpdate(pInfo, pBlockInfo->window.ekey, pBlock); + doFilter(pBlock, pOperator->exprSupp.pFilterInfo, NULL); + pBlock->info.dataLoad = 1; + blockDataUpdateTsWindow(pBlock, pInfo->primaryTsIndex); if (pBlockInfo->rows > 0 || pInfo->pUpdateDataRes->info.rows > 0) { break; @@ -2143,7 +2138,7 @@ FETCH_NEXT_BLOCK: qDebug("stream scan get source rows:%" PRId64", %s", pBlockInfo->rows, id); if (pBlockInfo->rows > 0) { - return pInfo->pRes; + return pBlock; } if (pInfo->pUpdateDataRes->info.rows > 0) { @@ -2151,10 +2146,9 @@ FETCH_NEXT_BLOCK: } goto NEXT_SUBMIT_BLK; - } else { - ASSERT(0); - return NULL; } + + return NULL; } static SArray* extractTableIdList(const STableListInfo* pTableListInfo) { @@ -2242,44 +2236,6 @@ static SSDataBlock* doRawScan(SOperatorInfo* pOperator) { return NULL; } - // else if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__LOG) { - // int64_t fetchVer = pTaskInfo->streamInfo.prepareStatus.version + 1; - // - // while(1){ - // if (tqFetchLog(pInfo->tqReader->pWalReader, pInfo->sContext->withMeta, &fetchVer, &pInfo->pCkHead) < 0) { - // qDebug("tmqsnap tmq poll: consumer log end. offset %" PRId64, fetchVer); - // pTaskInfo->streamInfo.lastStatus.version = fetchVer; - // pTaskInfo->streamInfo.lastStatus.type = TMQ_OFFSET__LOG; - // return NULL; - // } - // SWalCont* pHead = &pInfo->pCkHead->head; - // qDebug("tmqsnap tmq poll: consumer log offset %" PRId64 " msgType %d", fetchVer, pHead->msgType); - // - // if (pHead->msgType == TDMT_VND_SUBMIT) { - // SSubmitReq* pCont = (SSubmitReq*)&pHead->body; - // tqReaderSetDataMsg(pInfo->tqReader, pCont, 0); - // SSDataBlock* block = tqLogScanExec(pInfo->sContext->subType, pInfo->tqReader, pInfo->pFilterOutTbUid, - // &pInfo->pRes); if(block){ - // pTaskInfo->streamInfo.lastStatus.type = TMQ_OFFSET__LOG; - // pTaskInfo->streamInfo.lastStatus.version = fetchVer; - // qDebug("tmqsnap fetch data msg, ver:%" PRId64 ", type:%d", pHead->version, pHead->msgType); - // return block; - // }else{ - // fetchVer++; - // } - // } else{ - // ASSERT(pInfo->sContext->withMeta); - // ASSERT(IS_META_MSG(pHead->msgType)); - // qDebug("tmqsnap fetch meta msg, ver:%" PRId64 ", type:%d", pHead->version, pHead->msgType); - // pTaskInfo->streamInfo.metaRsp.rspOffset.version = fetchVer; - // pTaskInfo->streamInfo.metaRsp.rspOffset.type = TMQ_OFFSET__LOG; - // pTaskInfo->streamInfo.metaRsp.resMsgType = pHead->msgType; - // pTaskInfo->streamInfo.metaRsp.metaRspLen = pHead->bodyLen; - // pTaskInfo->streamInfo.metaRsp.metaRsp = taosMemoryMalloc(pHead->bodyLen); - // memcpy(pTaskInfo->streamInfo.metaRsp.metaRsp, pHead->body, pHead->bodyLen); - // return NULL; - // } - // } return NULL; } diff --git a/source/libs/stream/src/streamQueue.c b/source/libs/stream/src/streamQueue.c index 52d65257698211fc3232e39d7c2c78fb1749781c..4cfeedab57fb289d12643e996f0b4cdd3f65c531 100644 --- a/source/libs/stream/src/streamQueue.c +++ b/source/libs/stream/src/streamQueue.c @@ -27,6 +27,7 @@ SStreamQueue* streamQueueOpen(int64_t cap) { taosSetQueueCapacity(pQueue->queue, cap); taosSetQueueMemoryCapacity(pQueue->queue, cap * 1024); return pQueue; + FAIL: if (pQueue->queue) taosCloseQueue(pQueue->queue); if (pQueue->qall) taosFreeQall(pQueue->qall); diff --git a/source/libs/wal/src/walRead.c b/source/libs/wal/src/walRead.c index 627529d95613001561319fa34c479a4d240d2ef9..c29d82bcf394b65beb5dcd94e6d35835e0a7cc60 100644 --- a/source/libs/wal/src/walRead.c +++ b/source/libs/wal/src/walRead.c @@ -116,6 +116,17 @@ void walReaderValidVersionRange(SWalReader *pReader, int64_t *sver, int64_t *eve *ever = pReader->cond.scanUncommited ? lastVer : committedVer; } +void walReaderVerifyOffset(SWalReader *pWalReader, STqOffsetVal* pOffset){ + // if offset version is small than first version , let's seek to first version + taosThreadMutexLock(&pWalReader->pWal->mutex); + int64_t firstVer = walGetFirstVer((pWalReader)->pWal); + taosThreadMutexUnlock(&pWalReader->pWal->mutex); + + if (pOffset->version + 1 < firstVer){ + pOffset->version = firstVer - 1; + } +} + static int64_t walReadSeekFilePos(SWalReader *pReader, int64_t fileFirstVer, int64_t ver) { int64_t ret = 0;