diff --git a/include/libs/wal/wal.h b/include/libs/wal/wal.h index 907b3be5609b119f7a7b9d48c68d88d1bc53f38c..220c4f73e0867b9ca71ef4adb2a3c9d5ac9ffa21 100644 --- a/include/libs/wal/wal.h +++ b/include/libs/wal/wal.h @@ -114,21 +114,30 @@ typedef struct SWal { int64_t refId; TdThreadMutex mutex; // ref - SHashObj *pRefHash; // ref -> SWalRef + SHashObj *pRefHash; // refId -> SWalRef // path char path[WAL_PATH_LEN]; // reusable write head SWalCkHead writeHead; -} SWal; // WAL HANDLE +} SWal; + +typedef struct { + int64_t refId; + int64_t refVer; + int64_t refFile; + SWal *pWal; +} SWalRef; typedef struct { int8_t scanUncommited; + int8_t scanNotApplied; int8_t scanMeta; int8_t enableRef; } SWalFilterCond; typedef struct { SWal *pWal; + int64_t readerId; TdFilePtr pLogFile; TdFilePtr pIdxFile; int64_t curFileFirstVer; @@ -138,7 +147,8 @@ typedef struct { int8_t curStopped; TdThreadMutex mutex; SWalFilterCond cond; - SWalCkHead *pHead; + // TODO remove it + SWalCkHead *pHead; } SWalReader; // module initialization @@ -157,11 +167,7 @@ int32_t walWrite(SWal *, int64_t index, tmsg_t msgType, const void *body, int32_ int32_t walWriteWithSyncInfo(SWal *, int64_t index, tmsg_t msgType, SWalSyncInfo syncMeta, const void *body, int32_t bodyLen); -// This interface assign version automatically and return to caller. -// When using this interface with concurrent writes, -// wal will write all logs atomically, -// but not sure which one will be actually write first, -// and then the unique index of successful writen is returned. +// Assign version automatically and return to caller, // -1 will be returned for failed writes int64_t walAppendLog(SWal *, tmsg_t msgType, SWalSyncInfo syncMeta, const void *body, int32_t bodyLen); @@ -191,17 +197,15 @@ void walSetReaderCapacity(SWalReader *pRead, int32_t capacity); int32_t walFetchHead(SWalReader *pRead, int64_t ver, SWalCkHead *pHead); int32_t walFetchBody(SWalReader *pRead, SWalCkHead **ppHead); int32_t walSkipFetchBody(SWalReader *pRead, const SWalCkHead *pHead); -typedef struct { - int64_t refId; - int64_t ver; -} SWalRef; + +SWalRef *walRefCommittedVer(SWal *); SWalRef *walOpenRef(SWal *); -void walCloseRef(SWalRef *); +void walCloseRef(SWal *pWal, int64_t refId); int32_t walRefVer(SWalRef *, int64_t ver); -int32_t walUnrefVer(SWal *); +void walUnrefVer(SWalRef *); -// help function for raft +// helper function for raft bool walLogExist(SWal *, int64_t ver); bool walIsEmpty(SWal *); diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index b063e552f683536736546d7a5e2f7df04ec303ec..262300a3e73f9f4f879f5f1e9e9304ce5b180aef 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -104,6 +104,8 @@ typedef struct { // TODO remove SWalReader* pWalReader; + SWalRef* pRef; + // push STqPushHandle pushHandle; diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 8ea4b8d98b5c672b3f9ff3bca341153ce6dd0be6..118e3a5d43d159abf7f3ed8a901c2707e7987cdf 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -212,6 +212,15 @@ int32_t tqProcessOffsetCommitReq(STQ* pTq, char* msg, int32_t msgLen) { ASSERT(0); return -1; } + + if (offset.val.type == TMQ_OFFSET__LOG) { + STqHandle* pHandle = taosHashGet(pTq->handles, offset.subKey, strlen(offset.subKey)); + if (walRefVer(pHandle->pRef, offset.val.version) < 0) { + ASSERT(0); + return -1; + } + } + /*}*/ /*}*/ @@ -376,8 +385,8 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { } if (pHandle->execHandle.subType != TOPIC_SUB_TYPE__COLUMN) { - int64_t fetchVer = fetchOffsetNew.version + 1; - SWalCkHead* pCkHead = taosMemoryMalloc(sizeof(SWalCkHead) + 2048); + int64_t fetchVer = fetchOffsetNew.version + 1; + pCkHead = taosMemoryMalloc(sizeof(SWalCkHead) + 2048); if (pCkHead == NULL) { code = -1; goto OVER; @@ -534,11 +543,14 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) { pHandle->execHandle.subType = req.subType; pHandle->fetchMeta = req.withMeta; + // TODO version should be assigned and refed during preprocess + SWalRef* pRef = walRefCommittedVer(pTq->pVnode->pWal); + if (pRef == NULL) { + ASSERT(0); + } + int64_t ver = pRef->refVer; + pHandle->pRef = pRef; - pHandle->pWalReader = walOpenReader(pTq->pVnode->pWal, NULL); - - // TODO version should be assigned in preprocess - int64_t ver = walGetCommittedVer(pTq->pVnode->pWal); if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { pHandle->execHandle.execCol.qmsg = req.qmsg; pHandle->snapshotVer = ver; @@ -560,10 +572,14 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) { pHandle->execHandle.pExecReader = qExtractReaderFromStreamScanner(scanner); ASSERT(pHandle->execHandle.pExecReader); } else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__DB) { + pHandle->pWalReader = walOpenReader(pTq->pVnode->pWal, NULL); + pHandle->execHandle.pExecReader = tqOpenReader(pTq->pVnode); pHandle->execHandle.execDb.pFilterOutTbUid = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK); } else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) { + pHandle->pWalReader = walOpenReader(pTq->pVnode->pWal, NULL); + pHandle->execHandle.execTb.suid = req.suid; SArray* tbUidList = taosArrayInit(0, sizeof(int64_t)); vnodeGetCtbIdList(pTq->pVnode, req.suid, tbUidList); diff --git a/source/dnode/vnode/src/tq/tqMeta.c b/source/dnode/vnode/src/tq/tqMeta.c index 620417016f9fd44e395edf878cc511f44e9ce22b..290ffe5c8d6a16865df7ee1296c871ccebf0a847 100644 --- a/source/dnode/vnode/src/tq/tqMeta.c +++ b/source/dnode/vnode/src/tq/tqMeta.c @@ -52,7 +52,7 @@ int32_t tqMetaOpen(STQ* pTq) { ASSERT(0); } - TXN txn; + TXN txn = {0}; if (tdbTxnOpen(&txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, 0) < 0) { ASSERT(0); @@ -75,7 +75,13 @@ int32_t tqMetaOpen(STQ* pTq) { STqHandle handle; tDecoderInit(&decoder, (uint8_t*)pVal, vLen); tDecodeSTqHandle(&decoder, &handle); - handle.pWalReader = walOpenReader(pTq->pVnode->pWal, NULL); + + handle.pRef = walOpenRef(pTq->pVnode->pWal); + if (handle.pRef == NULL) { + ASSERT(0); + } + walRefVer(handle.pRef, handle.snapshotVer); + if (handle.execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { SReadHandle reader = { .meta = pTq->pVnode->pMeta, @@ -94,6 +100,7 @@ int32_t tqMetaOpen(STQ* pTq) { handle.execHandle.pExecReader = qExtractReaderFromStreamScanner(scanner); ASSERT(handle.execHandle.pExecReader); } else { + handle.pWalReader = walOpenReader(pTq->pVnode->pWal, NULL); handle.execHandle.execDb.pFilterOutTbUid = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK); } diff --git a/source/libs/wal/src/walMgmt.c b/source/libs/wal/src/walMgmt.c index 491b982968d4fdfb75d3c5fe2cc6e4b27d8c2ebf..85238e87b9f5c5549f2a1cf82435dca8fd85b5a4 100644 --- a/source/libs/wal/src/walMgmt.c +++ b/source/libs/wal/src/walMgmt.c @@ -93,7 +93,7 @@ SWal *walOpen(const char *path, SWalCfg *pCfg) { } // init ref - pWal->pRefHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), true, HASH_ENTRY_LOCK); + pWal->pRefHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK); if (pWal->pRefHash == NULL) { taosMemoryFree(pWal); return NULL; diff --git a/source/libs/wal/src/walRead.c b/source/libs/wal/src/walRead.c index 6d0e844e8e2f17681f0aa7559153bbd4e9eeb24c..ac62b7d98dfeec5e7df073ef165a79d1ad95b7f6 100644 --- a/source/libs/wal/src/walRead.c +++ b/source/libs/wal/src/walRead.c @@ -21,107 +21,112 @@ static int32_t walFetchBodyNew(SWalReader *pRead); static int32_t walSkipFetchBodyNew(SWalReader *pRead); SWalReader *walOpenReader(SWal *pWal, SWalFilterCond *cond) { - SWalReader *pRead = taosMemoryCalloc(1, sizeof(SWalReader)); - if (pRead == NULL) { + SWalReader *pReader = taosMemoryCalloc(1, sizeof(SWalReader)); + if (pReader == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL; } - pRead->pWal = pWal; - pRead->pIdxFile = NULL; - pRead->pLogFile = NULL; - pRead->curVersion = -1; - pRead->curFileFirstVer = -1; - pRead->curInvalid = 1; - pRead->capacity = 0; + pReader->pWal = pWal; + pReader->readerId = tGenIdPI64(); + pReader->pIdxFile = NULL; + pReader->pLogFile = NULL; + pReader->curVersion = -1; + pReader->curFileFirstVer = -1; + pReader->curInvalid = 1; + pReader->capacity = 0; if (cond) { - pRead->cond = *cond; + pReader->cond = *cond; } else { - pRead->cond.scanMeta = 0; - pRead->cond.scanUncommited = 0; - pRead->cond.enableRef = 0; + pReader->cond.scanUncommited = 0; + pReader->cond.scanNotApplied = 0; + pReader->cond.scanMeta = 0; + pReader->cond.enableRef = 0; } - taosThreadMutexInit(&pRead->mutex, NULL); + taosThreadMutexInit(&pReader->mutex, NULL); - /*if (pRead->cond.enableRef) {*/ - /*walOpenRef(pWal);*/ - /*}*/ - - pRead->pHead = taosMemoryMalloc(sizeof(SWalCkHead)); - if (pRead->pHead == NULL) { + pReader->pHead = taosMemoryMalloc(sizeof(SWalCkHead)); + if (pReader->pHead == NULL) { terrno = TSDB_CODE_WAL_OUT_OF_MEMORY; - taosMemoryFree(pRead); + taosMemoryFree(pReader); return NULL; } - return pRead; + /*if (pReader->cond.enableRef) {*/ + /* taosHashPut(pWal->pRefHash, &pReader->readerId, sizeof(int64_t), &pReader, sizeof(void *));*/ + /*}*/ + + return pReader; } -void walCloseReader(SWalReader *pRead) { - taosCloseFile(&pRead->pIdxFile); - taosCloseFile(&pRead->pLogFile); - taosMemoryFreeClear(pRead->pHead); - taosMemoryFree(pRead); +void walCloseReader(SWalReader *pReader) { + taosCloseFile(&pReader->pIdxFile); + taosCloseFile(&pReader->pLogFile); + /*if (pReader->cond.enableRef) {*/ + /*taosHashRemove(pReader->pWal->pRefHash, &pReader->readerId, sizeof(int64_t));*/ + /*}*/ + taosMemoryFreeClear(pReader->pHead); + taosMemoryFree(pReader); } -int32_t walNextValidMsg(SWalReader *pRead) { - int64_t fetchVer = pRead->curVersion; - int64_t lastVer = walGetLastVer(pRead->pWal); - int64_t committedVer = walGetCommittedVer(pRead->pWal); - int64_t appliedVer = walGetAppliedVer(pRead->pWal); - int64_t endVer = pRead->cond.scanUncommited ? lastVer : committedVer; +int32_t walNextValidMsg(SWalReader *pReader) { + int64_t fetchVer = pReader->curVersion; + int64_t lastVer = walGetLastVer(pReader->pWal); + int64_t committedVer = walGetCommittedVer(pReader->pWal); + int64_t appliedVer = walGetAppliedVer(pReader->pWal); + int64_t endVer = pReader->cond.scanUncommited ? lastVer : committedVer; endVer = TMIN(appliedVer, endVer); wDebug("vgId:%d wal start to fetch, ver %ld, last ver %ld commit ver %ld, applied ver %ld, end ver %ld", - pRead->pWal->cfg.vgId, fetchVer, lastVer, committedVer, appliedVer, endVer); - pRead->curStopped = 0; + pReader->pWal->cfg.vgId, fetchVer, lastVer, committedVer, appliedVer, endVer); + pReader->curStopped = 0; while (fetchVer <= endVer) { - if (walFetchHeadNew(pRead, fetchVer) < 0) { + if (walFetchHeadNew(pReader, fetchVer) < 0) { return -1; } - if (pRead->pHead->head.msgType == TDMT_VND_SUBMIT || - (IS_META_MSG(pRead->pHead->head.msgType) && pRead->cond.scanMeta)) { - if (walFetchBodyNew(pRead) < 0) { + if (pReader->pHead->head.msgType == TDMT_VND_SUBMIT || + (IS_META_MSG(pReader->pHead->head.msgType) && pReader->cond.scanMeta)) { + if (walFetchBodyNew(pReader) < 0) { return -1; } return 0; } else { - if (walSkipFetchBodyNew(pRead) < 0) { + if (walSkipFetchBodyNew(pReader) < 0) { return -1; } fetchVer++; - ASSERT(fetchVer == pRead->curVersion); + ASSERT(fetchVer == pReader->curVersion); } } - pRead->curStopped = 1; + pReader->curStopped = 1; return -1; } -static int64_t walReadSeekFilePos(SWalReader *pRead, int64_t fileFirstVer, int64_t ver) { +static int64_t walReadSeekFilePos(SWalReader *pReader, int64_t fileFirstVer, int64_t ver) { int64_t ret = 0; - TdFilePtr pIdxTFile = pRead->pIdxFile; - TdFilePtr pLogTFile = pRead->pLogFile; + TdFilePtr pIdxTFile = pReader->pIdxFile; + TdFilePtr pLogTFile = pReader->pLogFile; // seek position int64_t offset = (ver - fileFirstVer) * sizeof(SWalIdxEntry); ret = taosLSeekFile(pIdxTFile, offset, SEEK_SET); if (ret < 0) { terrno = TAOS_SYSTEM_ERROR(errno); - wError("vgId:%d, failed to seek idx file, index:%" PRId64 ", pos:%" PRId64 ", since %s", pRead->pWal->cfg.vgId, ver, - offset, terrstr()); + wError("vgId:%d, failed to seek idx file, index:%" PRId64 ", pos:%" PRId64 ", since %s", pReader->pWal->cfg.vgId, + ver, offset, terrstr()); return -1; } SWalIdxEntry entry = {0}; if ((ret = taosReadFile(pIdxTFile, &entry, sizeof(SWalIdxEntry))) != sizeof(SWalIdxEntry)) { if (ret < 0) { terrno = TAOS_SYSTEM_ERROR(errno); - wError("vgId:%d, failed to read idx file, since %s", pRead->pWal->cfg.vgId, terrstr()); + wError("vgId:%d, failed to read idx file, since %s", pReader->pWal->cfg.vgId, terrstr()); } else { terrno = TSDB_CODE_WAL_FILE_CORRUPTED; wError("vgId:%d, read idx file incompletely, read bytes %" PRId64 ", bytes should be %" PRIu64, - pRead->pWal->cfg.vgId, ret, sizeof(SWalIdxEntry)); + pReader->pWal->cfg.vgId, ret, sizeof(SWalIdxEntry)); } return -1; } @@ -130,79 +135,79 @@ static int64_t walReadSeekFilePos(SWalReader *pRead, int64_t fileFirstVer, int64 ret = taosLSeekFile(pLogTFile, entry.offset, SEEK_SET); if (ret < 0) { terrno = TAOS_SYSTEM_ERROR(errno); - wError("vgId:%d, failed to seek log file, index:%" PRId64 ", pos:%" PRId64 ", since %s", pRead->pWal->cfg.vgId, ver, - entry.offset, terrstr()); + wError("vgId:%d, failed to seek log file, index:%" PRId64 ", pos:%" PRId64 ", since %s", pReader->pWal->cfg.vgId, + ver, entry.offset, terrstr()); return -1; } return ret; } -static int32_t walReadChangeFile(SWalReader *pRead, int64_t fileFirstVer) { +static int32_t walReadChangeFile(SWalReader *pReader, int64_t fileFirstVer) { char fnameStr[WAL_FILE_LEN]; - taosCloseFile(&pRead->pIdxFile); - taosCloseFile(&pRead->pLogFile); + taosCloseFile(&pReader->pIdxFile); + taosCloseFile(&pReader->pLogFile); - walBuildLogName(pRead->pWal, fileFirstVer, fnameStr); - TdFilePtr pLogTFile = taosOpenFile(fnameStr, TD_FILE_READ); - if (pLogTFile == NULL) { + walBuildLogName(pReader->pWal, fileFirstVer, fnameStr); + TdFilePtr pLogFile = taosOpenFile(fnameStr, TD_FILE_READ); + if (pLogFile == NULL) { terrno = TAOS_SYSTEM_ERROR(errno); - wError("vgId:%d, cannot open file %s, since %s", pRead->pWal->cfg.vgId, fnameStr, terrstr()); + wError("vgId:%d, cannot open file %s, since %s", pReader->pWal->cfg.vgId, fnameStr, terrstr()); return -1; } - pRead->pLogFile = pLogTFile; + pReader->pLogFile = pLogFile; - walBuildIdxName(pRead->pWal, fileFirstVer, fnameStr); - TdFilePtr pIdxTFile = taosOpenFile(fnameStr, TD_FILE_READ); - if (pIdxTFile == NULL) { + walBuildIdxName(pReader->pWal, fileFirstVer, fnameStr); + TdFilePtr pIdxFile = taosOpenFile(fnameStr, TD_FILE_READ); + if (pIdxFile == NULL) { terrno = TAOS_SYSTEM_ERROR(errno); - wError("vgId:%d, cannot open file %s, since %s", pRead->pWal->cfg.vgId, fnameStr, terrstr()); + wError("vgId:%d, cannot open file %s, since %s", pReader->pWal->cfg.vgId, fnameStr, terrstr()); return -1; } - pRead->pIdxFile = pIdxTFile; + pReader->pIdxFile = pIdxFile; return 0; } -int32_t walReadSeekVerImpl(SWalReader *pRead, int64_t ver) { - SWal *pWal = pRead->pWal; +int32_t walReadSeekVerImpl(SWalReader *pReader, int64_t ver) { + SWal *pWal = pReader->pWal; + // bsearch in fileSet SWalFileInfo tmpInfo; tmpInfo.firstVer = ver; - // bsearch in fileSet SWalFileInfo *pRet = taosArraySearch(pWal->fileInfoSet, &tmpInfo, compareWalFileInfo, TD_LE); ASSERT(pRet != NULL); - if (pRead->curFileFirstVer != pRet->firstVer) { + if (pReader->curFileFirstVer != pRet->firstVer) { // error code was set inner - if (walReadChangeFile(pRead, pRet->firstVer) < 0) { + if (walReadChangeFile(pReader, pRet->firstVer) < 0) { return -1; } } // error code was set inner - if (walReadSeekFilePos(pRead, pRet->firstVer, ver) < 0) { + if (walReadSeekFilePos(pReader, pRet->firstVer, ver) < 0) { return -1; } - wDebug("wal version reset from %ld(invalid: %d) to %ld", pRead->curVersion, pRead->curInvalid, ver); + wDebug("wal version reset from %ld(invalid: %d) to %ld", pReader->curVersion, pReader->curInvalid, ver); - pRead->curVersion = ver; + pReader->curVersion = ver; return 0; } -int32_t walReadSeekVer(SWalReader *pRead, int64_t ver) { - SWal *pWal = pRead->pWal; - if (!pRead->curInvalid && ver == pRead->curVersion) { +int32_t walReadSeekVer(SWalReader *pReader, int64_t ver) { + SWal *pWal = pReader->pWal; + if (!pReader->curInvalid && ver == pReader->curVersion) { wDebug("wal version %ld match, no need to reset", ver); return 0; } - pRead->curInvalid = 1; - pRead->curVersion = ver; + pReader->curInvalid = 1; + pReader->curVersion = ver; if (ver > pWal->vers.lastVer || ver < pWal->vers.firstVer) { - wDebug("vgId:%d, invalid index:%" PRId64 ", first index:%" PRId64 ", last index:%" PRId64, pRead->pWal->cfg.vgId, + wDebug("vgId:%d, invalid index:%" PRId64 ", first index:%" PRId64 ", last index:%" PRId64, pReader->pWal->cfg.vgId, ver, pWal->vers.firstVer, pWal->vers.lastVer); terrno = TSDB_CODE_WAL_LOG_NOT_EXIST; return -1; @@ -210,7 +215,7 @@ int32_t walReadSeekVer(SWalReader *pRead, int64_t ver) { if (ver < pWal->vers.snapshotVer) { } - if (walReadSeekVerImpl(pRead, ver) < 0) { + if (walReadSeekVerImpl(pReader, ver) < 0) { return -1; } diff --git a/source/libs/wal/src/walRef.c b/source/libs/wal/src/walRef.c new file mode 100644 index 0000000000000000000000000000000000000000..bd0f6fb1a8106d47b7c874f7e7ac3a99f9384911 --- /dev/null +++ b/source/libs/wal/src/walRef.c @@ -0,0 +1,89 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "cJSON.h" +#include "os.h" +#include "taoserror.h" +#include "tutil.h" +#include "walInt.h" + +SWalRef *walOpenRef(SWal *pWal) { + SWalRef *pRef = taosMemoryCalloc(1, sizeof(SWalRef)); + if (pRef == NULL) { + return NULL; + } + pRef->refId = tGenIdPI64(); + pRef->refVer = -1; + pRef->refFile = -1; + pRef->pWal = pWal; + taosHashPut(pWal->pRefHash, &pRef->refId, sizeof(int64_t), &pRef, sizeof(void *)); + return pRef; +} + +void walCloseRef(SWal *pWal, int64_t refId) { + SWalRef *pRef = *(SWalRef **)taosHashGet(pWal->pRefHash, &refId, sizeof(int64_t)); + taosHashRemove(pWal->pRefHash, &refId, sizeof(int64_t)); + taosMemoryFree(pRef); +} + +int32_t walRefVer(SWalRef *pRef, int64_t ver) { + SWal *pWal = pRef->pWal; + if (pRef->refVer != ver) { + taosThreadMutexLock(&pWal->mutex); + if (ver < pWal->vers.firstVer || ver > pWal->vers.lastVer) { + taosThreadMutexUnlock(&pWal->mutex); + terrno = TSDB_CODE_WAL_INVALID_VER; + return -1; + } + + pRef->refVer = ver; + // bsearch in fileSet + SWalFileInfo tmpInfo; + tmpInfo.firstVer = ver; + SWalFileInfo *pRet = taosArraySearch(pWal->fileInfoSet, &tmpInfo, compareWalFileInfo, TD_LE); + ASSERT(pRet != NULL); + pRef->refFile = pRet->firstVer; + + taosThreadMutexUnlock(&pWal->mutex); + } + + return 0; +} + +void walUnrefVer(SWalRef *pRef) { + pRef->refId = -1; + pRef->refFile = -1; +} + +SWalRef *walRefCommittedVer(SWal *pWal) { + SWalRef *pRef = walOpenRef(pWal); + if (pRef == NULL) { + return NULL; + } + taosThreadMutexLock(&pWal->mutex); + + int64_t ver = walGetCommittedVer(pWal); + + pRef->refVer = ver; + // bsearch in fileSet + SWalFileInfo tmpInfo; + tmpInfo.firstVer = ver; + SWalFileInfo *pRet = taosArraySearch(pWal->fileInfoSet, &tmpInfo, compareWalFileInfo, TD_LE); + ASSERT(pRet != NULL); + pRef->refFile = pRet->firstVer; + + taosThreadMutexUnlock(&pWal->mutex); + return pRef; +} diff --git a/source/libs/wal/src/walWrite.c b/source/libs/wal/src/walWrite.c index d6348cc5dd78776b8a4d17dbb6492e0a63f6ba37..81500d80882f930e253771b835ac0f9e63f0f57b 100644 --- a/source/libs/wal/src/walWrite.c +++ b/source/libs/wal/src/walWrite.c @@ -26,7 +26,7 @@ int32_t walRestoreFromSnapshot(SWal *pWal, int64_t ver) { pIter = taosHashIterate(pWal->pRefHash, pIter); if (pIter == NULL) break; SWalRef *pRef = (SWalRef *)pIter; - if (pRef->ver != -1) { + if (pRef->refVer != -1) { taosHashCancelIterate(pWal->pRefHash, pIter); return -1; } @@ -215,22 +215,23 @@ int32_t walRollback(SWal *pWal, int64_t ver) { static FORCE_INLINE int32_t walCheckAndRoll(SWal *pWal) { if (taosArrayGetSize(pWal->fileInfoSet) == 0) { - /*pWal->vers.firstVer = index;*/ if (walRollImpl(pWal) < 0) { return -1; } - } else { - int64_t passed = walGetSeq() - pWal->lastRollSeq; - if (pWal->cfg.rollPeriod != -1 && pWal->cfg.rollPeriod != 0 && passed > pWal->cfg.rollPeriod) { - if (walRollImpl(pWal) < 0) { - return -1; - } - } else if (pWal->cfg.segSize != -1 && pWal->cfg.segSize != 0 && walGetLastFileSize(pWal) > pWal->cfg.segSize) { - if (walRollImpl(pWal) < 0) { - return -1; - } + return 0; + } + + int64_t passed = walGetSeq() - pWal->lastRollSeq; + if (pWal->cfg.rollPeriod != -1 && pWal->cfg.rollPeriod != 0 && passed > pWal->cfg.rollPeriod) { + if (walRollImpl(pWal) < 0) { + return -1; + } + } else if (pWal->cfg.segSize != -1 && pWal->cfg.segSize != 0 && walGetLastFileSize(pWal) > pWal->cfg.segSize) { + if (walRollImpl(pWal) < 0) { + return -1; } } + return 0; } @@ -260,6 +261,16 @@ int32_t walEndSnapshot(SWal *pWal) { pWal->vers.snapshotVer = ver; int ts = taosGetTimestampSec(); + int64_t minVerToDelete = ver; + void *pIter = NULL; + while (1) { + pIter = taosHashIterate(pWal->pRefHash, pIter); + if (pIter == NULL) break; + SWalRef *pRef = *(SWalRef **)pIter; + if (pRef->refVer == -1) continue; + minVerToDelete = TMIN(minVerToDelete, pRef->refVer); + } + int deleteCnt = 0; int64_t newTotSize = pWal->totSize; SWalFileInfo tmp; diff --git a/tests/system-test/7-tmq/TD-17803.py b/tests/system-test/7-tmq/TD-17803.py new file mode 100644 index 0000000000000000000000000000000000000000..771ff83a29a02867c319a4b7ee6267b24eba24a5 --- /dev/null +++ b/tests/system-test/7-tmq/TD-17803.py @@ -0,0 +1,198 @@ +from distutils.log import error +import taos +import sys +import time +import socket +import os +import threading +import subprocess +import platform + +from util.log import * +from util.sql import * +from util.cases import * +from util.dnodes import * +from util.common import * +sys.path.append("./7-tmq") +from tmqCommon import * + + + +class TDTestCase: + def __init__(self): + self.snapshot = 0 + self.replica = 3 + self.vgroups = 3 + self.ctbNum = 2 + self.rowsPerTbl = 2 + + def init(self, conn, logSql): + tdLog.debug(f"start to excute {__file__}") + tdSql.init(conn.cursor()) + #tdSql.init(conn.cursor(), logSql) # output sql.txt file + + def checkFileContent(self, consumerId, queryString): + buildPath = tdCom.getBuildPath() + cfgPath = tdCom.getClientCfgPath() + dstFile = '%s/../log/dstrows_%d.txt'%(cfgPath, consumerId) + cmdStr = '%s/build/bin/taos -c %s -s "%s >> %s"'%(buildPath, cfgPath, queryString, dstFile) + tdLog.info(cmdStr) + os.system(cmdStr) + + consumeRowsFile = '%s/../log/consumerid_%d.txt'%(cfgPath, consumerId) + tdLog.info("rows file: %s, %s"%(consumeRowsFile, dstFile)) + + consumeFile = open(consumeRowsFile, mode='r') + queryFile = open(dstFile, mode='r') + + # skip first line for it is schema + queryFile.readline() + + while True: + dst = queryFile.readline() + src = consumeFile.readline() + + if dst: + if dst != src: + tdLog.exit("consumerId %d consume rows is not match the rows by direct query"%consumerId) + else: + break + return + + def prepareTestEnv(self): + tdLog.printNoPrefix("======== prepare test env include database, stable, ctables, and insert data: ") + paraDict = {'dbName': 'dbt', + 'dropFlag': 1, + 'event': '', + 'vgroups': 4, + 'stbName': 'stb', + 'colPrefix': 'c', + 'tagPrefix': 't', + 'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1}], + 'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1}], + 'ctbPrefix': 'ctb', + 'ctbStartIdx': 0, + 'ctbNum': 2, + 'rowsPerTbl': 1000, + 'batchNum': 10, + 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 + 'pollDelay': 3, + 'showMsg': 1, + 'showRow': 1, + 'snapshot': 0} + + paraDict['vgroups'] = self.vgroups + paraDict['ctbNum'] = self.ctbNum + paraDict['rowsPerTbl'] = self.rowsPerTbl + + tmqCom.initConsumerTable() + tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=self.replica) + tdLog.info("create stb") + tmqCom.create_stable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"]) + tdLog.info("create ctb") + tmqCom.create_ctable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix=paraDict['ctbPrefix'], + ctbNum=paraDict["ctbNum"],ctbStartIdx=paraDict['ctbStartIdx']) + tdLog.info("insert data") + tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"], + ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"], + startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']) + # tmqCom.insert_data_with_autoCreateTbl(tsql=tdSql,dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix="ctbx", + # ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"], + # startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']) + # tmqCom.asyncInsertDataByInterlace(paraDict) + tdLog.printNoPrefix("11111111111111111111111") + tmqCom.create_ntable(tdSql, dbname=paraDict["dbName"], tbname_prefix="ntb", tbname_index_start_num = 1, column_elm_list=paraDict["colSchema"], colPrefix='c', tblNum=1) + tdLog.printNoPrefix("222222222222222") + tmqCom.insert_rows_into_ntbl(tdSql, dbname=paraDict["dbName"], tbname_prefix="ntb", tbname_index_start_num = 1, column_ele_list=paraDict["colSchema"], startTs=paraDict["startTs"], tblNum=1, rows=2) # tdLog.info("restart taosd to ensure that the data falls into the disk") + + tdLog.printNoPrefix("333333333333333333333") + tdSql.query("drop database %s"%paraDict["dbName"]) + tdLog.printNoPrefix("44444444444444444") + return + + def tmqCase1(self): + tdLog.printNoPrefix("======== test case 1: ") + + # create and start thread + paraDict = {'dbName': 'dbt', + 'dropFlag': 1, + 'event': '', + 'vgroups': 4, + 'stbName': 'stb', + 'colPrefix': 'c', + 'tagPrefix': 't', + 'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1},{'type': 'TIMESTAMP', 'count':1}], + 'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}], + 'ctbPrefix': 'ctb', + 'ctbStartIdx': 0, + 'ctbNum': 100, + 'rowsPerTbl': 1000, + 'batchNum': 100, + 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 + 'pollDelay': 3, + 'showMsg': 1, + 'showRow': 1, + 'snapshot': 1} + + paraDict['vgroups'] = self.vgroups + paraDict['ctbNum'] = self.ctbNum + paraDict['rowsPerTbl'] = self.rowsPerTbl + + tdLog.info("create topics from stb1") + topicFromStb1 = 'topic_stb1' + queryString = "select ts, c1, c2 from %s.%s where t4 == 'beijing' or t4 == 'changsha' "%(paraDict['dbName'], paraDict['stbName']) + sqlString = "create topic %s as %s" %(topicFromStb1, queryString) + tdLog.info("create topic sql: %s"%sqlString) + tdSql.execute(sqlString) + + consumerId = 0 + expectrowcnt = paraDict["rowsPerTbl"] * paraDict["ctbNum"] + topicList = topicFromStb1 + ifcheckdata = 0 + ifManualCommit = 0 + keyList = 'group.id:cgrp1,\ + enable.auto.commit:false,\ + auto.commit.interval.ms:6000,\ + auto.offset.reset:earliest' + tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit) + + tdLog.info("start consume processor") + pollDelay = 100 + showMsg = 1 + showRow = 1 + tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot']) + + tdLog.info("start to check consume result") + expectRows = 1 + resultList = tmqCom.selectConsumeResult(expectRows) + totalConsumeRows = 0 + for i in range(expectRows): + totalConsumeRows += resultList[i] + + tdSql.query(queryString) + totalRowsInserted = tdSql.getRows() + + tdLog.info("act consume rows: %d, act insert rows: %d, expect consume rows: %d, "%(totalConsumeRows, totalRowsInserted, expectrowcnt)) + + if totalConsumeRows != expectrowcnt: + tdLog.exit("tmq consume rows error!") + + # tmqCom.checkFileContent(consumerId, queryString) + + tmqCom.waitSubscriptionExit(tdSql, topicFromStb1) + tdSql.query("drop topic %s"%topicFromStb1) + + tdLog.printNoPrefix("======== test case 1 end ...... ") + + def run(self): + self.prepareTestEnv() + # self.tmqCase1() + + def stop(self): + tdSql.close() + tdLog.success(f"{__file__} successfully executed") + +event = threading.Event() + +tdCases.addLinux(__file__, TDTestCase()) +tdCases.addWindows(__file__, TDTestCase())