diff --git a/examples/c/tmq.c b/examples/c/tmq.c index 8a2112fbccf313d64ac21431af0a1ab005889fcd..525f3e106ffb9d57f1fe915c838f925c1b7a0119 100644 --- a/examples/c/tmq.c +++ b/examples/c/tmq.c @@ -61,7 +61,7 @@ static int32_t init_env() { printf("create database\n"); pRes = taos_query(pConn, "drop topic topicname"); if (taos_errno(pRes) != 0) { - printf("error in drop tmqdb, reason:%s\n", taos_errstr(pRes)); + printf("error in drop topicname, reason:%s\n", taos_errstr(pRes)); } taos_free_result(pRes); diff --git a/include/libs/wal/wal.h b/include/libs/wal/wal.h index 1eed342f8ca496006fc5fa00150cad4cc5dea9a6..09d737fe93a7cfb6c542f8dae2abfa49fe7c5293 100644 --- a/include/libs/wal/wal.h +++ b/include/libs/wal/wal.h @@ -126,7 +126,7 @@ typedef struct SWal { typedef struct { int64_t refId; int64_t refVer; - int64_t refFile; +// int64_t refFile; SWal *pWal; } SWalRef; diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index 841dc29731b38a2e4d63e5c27aa351c31fe14b18..631652f7fa774b651b7060d62edc852ae76d6150 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -297,11 +297,8 @@ void tqCloseReader(STqReader* pReader) { int32_t tqSeekVer(STqReader* pReader, int64_t ver) { if (walReadSeekVer(pReader->pWalReader, ver) < 0) { - ASSERT(pReader->pWalReader->curInvalid); - ASSERT(pReader->pWalReader->curVersion == ver); return -1; } - ASSERT(pReader->pWalReader->curVersion == ver); return 0; } @@ -362,11 +359,13 @@ int32_t tqNextBlock(STqReader* pReader, SFetchRet* ret) { int32_t tqReaderSetDataMsg(STqReader* pReader, const SSubmitReq* pMsg, int64_t ver) { pReader->pMsg = pMsg; - if (tInitSubmitMsgIter(pMsg, &pReader->msgIter) < 0) return -1; - while (true) { - if (tGetSubmitMsgNext(&pReader->msgIter, &pReader->pBlock) < 0) return -1; - if (pReader->pBlock == NULL) break; - } +// if (tInitSubmitMsgIter(pMsg, &pReader->msgIter) < 0) return -1; +// while (true) { +// if (tGetSubmitMsgNext(&pReader->msgIter, &pReader->pBlock) < 0) return -1; +// tqDebug("submitnext vgId:%d, block:%p, dataLen:%d, len:%d, uid:%"PRId64, pReader->pWalReader->pWal->cfg.vgId, pReader->pBlock, pReader->msgIter.dataLen, +// pReader->msgIter.len, pReader->msgIter.uid); +// if (pReader->pBlock == NULL) break; +// } if (tInitSubmitMsgIter(pMsg, &pReader->msgIter) < 0) return -1; pReader->ver = ver; diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 19c9c6f97d2722058dbc90d8973eedee7bb51700..366ef09e60a7741b16f0f9a4a404fbeccf3a9251 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -1047,18 +1047,9 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT STableScanInfo* pTSInfo = pInfo->pTableScanOp->info; tsdbReaderClose(pTSInfo->base.dataReader); pTSInfo->base.dataReader = NULL; -#if 0 - if (tOffsetEqual(pOffset, &pTaskInfo->streamInfo.lastStatus) && - pInfo->tqReader->pWalReader->curVersion != pOffset->version) { - qError("prepare scan ver %" PRId64 " actual ver %" PRId64 ", last %" PRId64, pOffset->version, - pInfo->tqReader->pWalReader->curVersion, pTaskInfo->streamInfo.lastStatus.version); - ASSERT(0); - } -#endif if (tqSeekVer(pInfo->tqReader, pOffset->version + 1) < 0) { return -1; } - ASSERT(pInfo->tqReader->pWalReader->curVersion == pOffset->version + 1); } else if (pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) { /*pInfo->blockType = STREAM_INPUT__TABLE_SCAN;*/ int64_t uid = pOffset->uid; diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 99281e6c590bb18d1428d31b4f3b98a0c8b6c7c1..71e1068fb39831d07040b61c99e7a19c7b3494db 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1618,7 +1618,6 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) { tqOffsetResetToLog(&pTaskInfo->streamInfo.lastStatus, pTaskInfo->streamInfo.snapshotVer); return NULL; } - ASSERT(pInfo->tqReader->pWalReader->curVersion == pTaskInfo->streamInfo.snapshotVer + 1); } else { return NULL; } diff --git a/source/libs/wal/src/walRead.c b/source/libs/wal/src/walRead.c index 5e09af5b2e9e8e5c55eb8e3f4414ecd48582f6b9..526dba0bb52e891bfebaaf369a66b4109c3f49c8 100644 --- a/source/libs/wal/src/walRead.c +++ b/source/libs/wal/src/walRead.c @@ -96,8 +96,7 @@ int32_t walNextValidMsg(SWalReader *pReader) { if (walSkipFetchBodyNew(pReader) < 0) { return -1; } - fetchVer++; - ASSERT(fetchVer == pReader->curVersion); + fetchVer = pReader->curVersion; } } pReader->curStopped = 1; @@ -144,7 +143,7 @@ static int64_t walReadSeekFilePos(SWalReader *pReader, int64_t fileFirstVer, int } static int32_t walReadChangeFile(SWalReader *pReader, int64_t fileFirstVer) { - char fnameStr[WAL_FILE_LEN]; + char fnameStr[WAL_FILE_LEN] = {0}; taosCloseFile(&pReader->pIdxFile); taosCloseFile(&pReader->pLogFile); @@ -300,14 +299,6 @@ static int32_t walFetchBodyNew(SWalReader *pRead) { return -1; } - if (pReadHead->version != ver) { - wError("vgId:%d, wal fetch body error:%" PRId64 ", read request index:%" PRId64, pRead->pWal->cfg.vgId, - pRead->pHead->head.version, ver); - pRead->curInvalid = 1; - terrno = TSDB_CODE_WAL_FILE_CORRUPTED; - return -1; - } - if (walValidBodyCksum(pRead->pHead) != 0) { wError("vgId:%d, wal fetch body error:%" PRId64 ", since body checksum not passed", pRead->pWal->cfg.vgId, ver); pRead->curInvalid = 1; diff --git a/source/libs/wal/src/walRef.c b/source/libs/wal/src/walRef.c index 43470f4c82ab1cb35d37ae9ace9a78b938e3cbd9..768256cefa3dd0e8322eb163ca5107be5e48cab4 100644 --- a/source/libs/wal/src/walRef.c +++ b/source/libs/wal/src/walRef.c @@ -26,7 +26,7 @@ SWalRef *walOpenRef(SWal *pWal) { } pRef->refId = tGenIdPI64(); pRef->refVer = -1; - pRef->refFile = -1; +// pRef->refFile = -1; pRef->pWal = pWal; taosHashPut(pWal->pRefHash, &pRef->refId, sizeof(int64_t), &pRef, sizeof(void *)); return pRef; @@ -58,11 +58,11 @@ int32_t walRefVer(SWalRef *pRef, int64_t ver) { pRef->refVer = ver; // bsearch in fileSet - SWalFileInfo tmpInfo; - tmpInfo.firstVer = ver; - SWalFileInfo *pRet = taosArraySearch(pWal->fileInfoSet, &tmpInfo, compareWalFileInfo, TD_LE); - ASSERT(pRet != NULL); - pRef->refFile = pRet->firstVer; +// SWalFileInfo tmpInfo; +// tmpInfo.firstVer = ver; +// SWalFileInfo *pRet = taosArraySearch(pWal->fileInfoSet, &tmpInfo, compareWalFileInfo, TD_LE); +// ASSERT(pRet != NULL); +// pRef->refFile = pRet->firstVer; taosThreadMutexUnlock(&pWal->mutex); } @@ -73,7 +73,7 @@ int32_t walRefVer(SWalRef *pRef, int64_t ver) { #if 1 void walUnrefVer(SWalRef *pRef) { pRef->refId = -1; - pRef->refFile = -1; +// pRef->refFile = -1; } #endif @@ -85,20 +85,18 @@ SWalRef *walRefFirstVer(SWal *pWal, SWalRef *pRef) { } } taosThreadMutexLock(&pWal->mutex); - int64_t ver = walGetFirstVer(pWal); - - wDebug("vgId:%d, wal ref version %" PRId64 " for first", pWal->cfg.vgId, ver); - pRef->refVer = ver; // bsearch in fileSet - SWalFileInfo tmpInfo; - tmpInfo.firstVer = ver; - SWalFileInfo *pRet = taosArraySearch(pWal->fileInfoSet, &tmpInfo, compareWalFileInfo, TD_LE); - ASSERT(pRet != NULL); - pRef->refFile = pRet->firstVer; +// SWalFileInfo tmpInfo; +// tmpInfo.firstVer = ver; +// SWalFileInfo *pRet = taosArraySearch(pWal->fileInfoSet, &tmpInfo, compareWalFileInfo, TD_LE); +// ASSERT(pRet != NULL); +// pRef->refFile = pRet->firstVer; taosThreadMutexUnlock(&pWal->mutex); + wDebug("vgId:%d, wal ref version %" PRId64 " for first", pWal->cfg.vgId, ver); + return pRef; } @@ -119,7 +117,7 @@ SWalRef *walRefCommittedVer(SWal *pWal) { tmpInfo.firstVer = ver; SWalFileInfo *pRet = taosArraySearch(pWal->fileInfoSet, &tmpInfo, compareWalFileInfo, TD_LE); ASSERT(pRet != NULL); - pRef->refFile = pRet->firstVer; +// pRef->refFile = pRet->firstVer; taosThreadMutexUnlock(&pWal->mutex); return pRef; diff --git a/source/os/src/osSemaphore.c b/source/os/src/osSemaphore.c index 2f947d325263d45025a7cff835a715ac108674e7..1f2df09ce109d7aced2e14d9e8affe8fd3229836 100644 --- a/source/os/src/osSemaphore.c +++ b/source/os/src/osSemaphore.c @@ -132,7 +132,8 @@ int tsem_wait(tsem_t *psem) { int tsem_timewait(tsem_t *psem, int64_t milis) { if (psem == NULL || *psem == NULL) return -1; - dispatch_semaphore_wait(*psem, milis * 1000 * 1000); + dispatch_time_t time = dispatch_time(DISPATCH_TIME_NOW, (int64_t)(milis * USEC_PER_SEC)); + dispatch_semaphore_wait(*psem, time); return 0; }