diff --git a/include/libs/executor/executor.h b/include/libs/executor/executor.h index b7e6c42e3b7ede77237aa7e8e910b45f236b5cc9..1fb00e743fb33f35b2a2dd4344b2f81214832c2d 100644 --- a/include/libs/executor/executor.h +++ b/include/libs/executor/executor.h @@ -190,8 +190,6 @@ STimeWindow getAlignQueryTimeWindow(SInterval* pInterval, int32_t precision, int SArray* qGetQueriedTableListInfo(qTaskInfo_t tinfo); -void verifyOffset(void *pWalReader, STqOffsetVal* pOffset); - int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subType); void qStreamSetOpen(qTaskInfo_t tinfo); diff --git a/include/libs/wal/wal.h b/include/libs/wal/wal.h index 862a1da9d0446dc361a98736056ee4d82e127a98..35a6838b2e0f36ba36ccd41c4e8dd175e2efe7c9 100644 --- a/include/libs/wal/wal.h +++ b/include/libs/wal/wal.h @@ -142,7 +142,7 @@ typedef struct { typedef struct SWalReader SWalReader; // todo hide this struct -typedef struct SWalReader { +struct SWalReader { SWal *pWal; int64_t readerId; TdFilePtr pLogFile; @@ -154,7 +154,7 @@ typedef struct SWalReader { SWalFilterCond cond; // TODO remove it SWalCkHead *pHead; -} SWalReader; +}; // module initialization int32_t walInit(); @@ -201,6 +201,7 @@ int32_t walNextValidMsg(SWalReader *pRead); int64_t walReaderGetCurrentVer(const SWalReader *pReader); int64_t walReaderGetValidFirstVer(const SWalReader *pReader); void walReaderValidVersionRange(SWalReader *pReader, int64_t *sver, int64_t *ever); +void walReaderVerifyOffset(SWalReader *pWalReader, STqOffsetVal* pOffset); // only for tq usage void walSetReaderCapacity(SWalReader *pRead, int32_t capacity); diff --git a/source/dnode/vnode/src/tq/tqUtil.c b/source/dnode/vnode/src/tq/tqUtil.c index 8a0faa9d18d801c4b07bccafe8e4c51d699265dd..704f4695fb3826d37bc8d9f5c1847c61162ad421 100644 --- a/source/dnode/vnode/src/tq/tqUtil.c +++ b/source/dnode/vnode/src/tq/tqUtil.c @@ -245,7 +245,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, } if (offset->type == TMQ_OFFSET__LOG) { - verifyOffset(pHandle->pWalReader, offset); + walReaderVerifyOffset(pHandle->pWalReader, offset); int64_t fetchVer = offset->version + 1; pCkHead = taosMemoryMalloc(sizeof(SWalCkHead) + 2048); if (pCkHead == NULL) { diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index a825f56639b12dcb9dd46f6d2a8a51016e3b1622..b6d4615997ec5a62dc92e98372673f77c6f4b3df 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -1058,17 +1058,6 @@ void qStreamSetOpen(qTaskInfo_t tinfo) { pOperator->status = OP_NOT_OPENED; } -void verifyOffset(void *pWalReader, STqOffsetVal* pOffset){ - // if offset version is small than first version , let's seek to first version - taosThreadMutexLock(&((SWalReader*)pWalReader)->pWal->mutex); - int64_t firstVer = walGetFirstVer(((SWalReader*)pWalReader)->pWal); - taosThreadMutexUnlock(&((SWalReader*)pWalReader)->pWal->mutex); - - if (pOffset->version + 1 < firstVer){ - pOffset->version = firstVer - 1; - } -} - int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subType) { SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; SOperatorInfo* pOperator = pTaskInfo->pRoot; @@ -1095,7 +1084,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT tsdbReaderClose(pScanBaseInfo->dataReader); pScanBaseInfo->dataReader = NULL; - verifyOffset(pInfo->tqReader->pWalReader, pOffset); + walReaderVerifyOffset(pInfo->tqReader->pWalReader, pOffset); if (tqSeekVer(pInfo->tqReader, pOffset->version + 1, id) < 0) { qError("tqSeekVer failed ver:%" PRId64 ", %s", pOffset->version + 1, id); return -1; diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 1aa60ea392dd49023d6eb52ab6a70c18a9db1d3d..0c93365c1c702d6d8ade7a23ce56c01e46a7478f 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1697,13 +1697,13 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) { if (pTaskInfo->streamInfo.currentOffset.type == TMQ_OFFSET__LOG) { while (1) { - int32_t type = tqNextBlockInWal(pInfo->tqReader); + bool hasResult = tqNextBlockInWal(pInfo->tqReader, id); SSDataBlock* pRes = pInfo->tqReader->pResBlock; // curVersion move to next, so currentOffset = curVersion - 1 tqOffsetResetToLog(&pTaskInfo->streamInfo.currentOffset, pInfo->tqReader->pWalReader->curVersion - 1); - if (type == FETCH_TYPE__DATA) { + if (hasResult) { qDebug("doQueueScan get data from log %" PRId64 " rows, version:%" PRId64, pRes->info.rows, pTaskInfo->streamInfo.currentOffset.version); blockDataCleanup(pInfo->pRes); @@ -1711,7 +1711,7 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) { if (pInfo->pRes->info.rows > 0) { return pInfo->pRes; } - } else if (type == FETCH_TYPE__NONE) { + } else { qDebug("doQueueScan get none from log, return, version:%" PRId64, pTaskInfo->streamInfo.currentOffset.version); return NULL; } @@ -2074,66 +2074,43 @@ FETCH_NEXT_BLOCK: return pInfo->pUpdateRes; } - const char* id = GET_TASKID(pTaskInfo); - SDataBlockInfo* pBlockInfo = &pInfo->pRes->info; + const char* id = GET_TASKID(pTaskInfo); + SSDataBlock* pBlock = pInfo->pRes; + SDataBlockInfo* pBlockInfo = &pBlock->info; int32_t totalBlocks = taosArrayGetSize(pInfo->pBlockLists); NEXT_SUBMIT_BLK: - while (1) { - if (pInfo->tqReader->msg.msgStr == NULL) { - if (pInfo->validBlockIndex >= totalBlocks) { - updateInfoDestoryColseWinSBF(pInfo->pUpdateInfo); - doClearBufferedBlocks(pInfo); - - qDebug("stream scan return empty, all %d submit blocks consumed, %s", totalBlocks, id); - void* buff = NULL; - // int32_t len = streamScanOperatorEncode(pInfo, &buff); - // if (len > 0) { - // streamStateSaveInfo(pInfo->pState, STREAM_SCAN_OP_NAME, strlen(STREAM_SCAN_OP_NAME), buff, len); - // } - taosMemoryFreeClear(buff); - return NULL; - } - - int32_t current = pInfo->validBlockIndex++; - SPackedData* pSubmit = taosArrayGet(pInfo->pBlockLists, current); - - qDebug("set %d/%d as the input submit block, %s", current, totalBlocks, id); - if (tqReaderSetSubmitMsg(pInfo->tqReader, pSubmit->msgStr, pSubmit->msgLen, pSubmit->ver) < 0) { - qError("submit msg messed up when initializing stream submit block %p, current %d/%d, %s", pSubmit, current, totalBlocks, id); - continue; - } - } - blockDataCleanup(pInfo->pRes); + while (1) { + bool hasResult = tqNextBlockInWal(pInfo->tqReader, id); + SSDataBlock* pRes = pInfo->tqReader->pResBlock; - while (tqNextBlockImpl(pInfo->tqReader, id)) { - int32_t code = tqRetrieveDataBlock(pInfo->tqReader, id); - if (code != TSDB_CODE_SUCCESS || pInfo->tqReader->pResBlock->info.rows == 0) { - continue; - } + blockDataCleanup(pBlock); - setBlockIntoRes(pInfo, pInfo->tqReader->pResBlock, false); + if (hasResult) { + qDebug("stream scan get data from log %" PRId64 " rows, version:%" PRId64, pRes->info.rows, + pTaskInfo->streamInfo.currentOffset.version); + setBlockIntoRes(pInfo, pRes, true); if (pInfo->pCreateTbRes->info.rows > 0) { pInfo->scanMode = STREAM_SCAN_FROM_RES; return pInfo->pCreateTbRes; } - doCheckUpdate(pInfo, pBlockInfo->window.ekey, pInfo->pRes); - doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL); - pInfo->pRes->info.dataLoad = 1; - blockDataUpdateTsWindow(pInfo->pRes, pInfo->primaryTsIndex); + doCheckUpdate(pInfo, pBlockInfo->window.ekey, pBlock); + doFilter(pBlock, pOperator->exprSupp.pFilterInfo, NULL); + pBlock->info.dataLoad = 1; + blockDataUpdateTsWindow(pBlock, pInfo->primaryTsIndex); + } else { + updateInfoDestoryColseWinSBF(pInfo->pUpdateInfo); + doClearBufferedBlocks(pInfo); - if (pBlockInfo->rows > 0 || pInfo->pUpdateDataRes->info.rows > 0) { - break; - } + qDebug("stream scan return empty, all %d submit blocks consumed, %s", totalBlocks, id); + return NULL; } if (pBlockInfo->rows > 0 || pInfo->pUpdateDataRes->info.rows > 0) { break; - } else { - continue; } } @@ -2141,9 +2118,9 @@ FETCH_NEXT_BLOCK: pInfo->numOfExec++; pOperator->resultInfo.totalRows += pBlockInfo->rows; - qDebug("stream scan get source rows:%" PRId64", %s", pBlockInfo->rows, id); + qDebug("stream scan get source rows:%" PRId64 ", %s", pBlockInfo->rows, id); if (pBlockInfo->rows > 0) { - return pInfo->pRes; + return pBlock; } if (pInfo->pUpdateDataRes->info.rows > 0) { @@ -2151,10 +2128,84 @@ FETCH_NEXT_BLOCK: } goto NEXT_SUBMIT_BLK; - } else { - ASSERT(0); - return NULL; + + // } else { + // qDebug("stream scan get none from log, return, version:%" PRId64, + // pTaskInfo->streamInfo.currentOffset.version); return NULL; + // } + + // while (1) { + // if (pInfo->tqReader->msg.msgStr == NULL) { + // if (pInfo->validBlockIndex >= totalBlocks) { + // updateInfoDestoryColseWinSBF(pInfo->pUpdateInfo); + // doClearBufferedBlocks(pInfo); + // + // qDebug("stream scan return empty, all %d submit blocks consumed, %s", totalBlocks, id); + // return NULL; + // } + // + // int32_t current = pInfo->validBlockIndex++; + // SPackedData* pSubmit = taosArrayGet(pInfo->pBlockLists, current); + // + // qDebug("set %d/%d as the input submit block, %s", current, totalBlocks, id); + // if (tqReaderSetSubmitMsg(pInfo->tqReader, pSubmit->msgStr, pSubmit->msgLen, pSubmit->ver) < 0) { + // qError("submit msg messed up when initializing stream submit block %p, current %d/%d, %s", pSubmit, + // current, totalBlocks, id); continue; + // } + // } + // + // blockDataCleanup(pInfo->pRes); + // + // while (tqNextBlockImpl(pInfo->tqReader, id)) { + // int32_t code = tqRetrieveDataBlock(pInfo->tqReader, id); + // if (code != TSDB_CODE_SUCCESS || pInfo->tqReader->pResBlock->info.rows == 0) { + // continue; + // } + // + // setBlockIntoRes(pInfo, pInfo->tqReader->pResBlock, false); + // + // if (pInfo->pCreateTbRes->info.rows > 0) { + // pInfo->scanMode = STREAM_SCAN_FROM_RES; + // return pInfo->pCreateTbRes; + // } + // + // doCheckUpdate(pInfo, pBlockInfo->window.ekey, pInfo->pRes); + // doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL); + // pInfo->pRes->info.dataLoad = 1; + // blockDataUpdateTsWindow(pInfo->pRes, pInfo->primaryTsIndex); + // + // if (pBlockInfo->rows > 0 || pInfo->pUpdateDataRes->info.rows > 0) { + // break; + // } + // } + // + // if (pBlockInfo->rows > 0 || pInfo->pUpdateDataRes->info.rows > 0) { + // break; + // } else { + // continue; + // } + // } + // + // // record the scan action. + // pInfo->numOfExec++; + // pOperator->resultInfo.totalRows += pBlockInfo->rows; + // + // qDebug("stream scan get source rows:%" PRId64", %s", pBlockInfo->rows, id); + // if (pBlockInfo->rows > 0) { + // return pInfo->pRes; + // } + // + // if (pInfo->pUpdateDataRes->info.rows > 0) { + // goto FETCH_NEXT_BLOCK; + // } + // + // goto NEXT_SUBMIT_BLK; + // } else { + // ASSERT(0); + // return NULL; + // } } + return NULL; } static SArray* extractTableIdList(const STableListInfo* pTableListInfo) { diff --git a/source/libs/wal/src/walRead.c b/source/libs/wal/src/walRead.c index 627529d95613001561319fa34c479a4d240d2ef9..c29d82bcf394b65beb5dcd94e6d35835e0a7cc60 100644 --- a/source/libs/wal/src/walRead.c +++ b/source/libs/wal/src/walRead.c @@ -116,6 +116,17 @@ void walReaderValidVersionRange(SWalReader *pReader, int64_t *sver, int64_t *eve *ever = pReader->cond.scanUncommited ? lastVer : committedVer; } +void walReaderVerifyOffset(SWalReader *pWalReader, STqOffsetVal* pOffset){ + // if offset version is small than first version , let's seek to first version + taosThreadMutexLock(&pWalReader->pWal->mutex); + int64_t firstVer = walGetFirstVer((pWalReader)->pWal); + taosThreadMutexUnlock(&pWalReader->pWal->mutex); + + if (pOffset->version + 1 < firstVer){ + pOffset->version = firstVer - 1; + } +} + static int64_t walReadSeekFilePos(SWalReader *pReader, int64_t fileFirstVer, int64_t ver) { int64_t ret = 0;