diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 0d0eb841bc31c4314e07ec2ebc3b5154922f8b17..eb12bb407386aee2456548005950ae3e1ca6785f 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -1154,6 +1154,10 @@ typedef struct { int32_t numOfRetensions; SArray* pRetensions; // SRetention void* pTsma; + int32_t walRetentionPeriod; + int64_t walRetentionSize; + int32_t walRollPeriod; + int64_t walSegmentSize; } SCreateVnodeReq; int32_t tSerializeSCreateVnodeReq(void* buf, int32_t bufLen, SCreateVnodeReq* pReq); diff --git a/include/libs/wal/wal.h b/include/libs/wal/wal.h index 220c4f73e0867b9ca71ef4adb2a3c9d5ac9ffa21..5b8d70fb7c127d476566f54623c43783cd5a240a 100644 --- a/include/libs/wal/wal.h +++ b/include/libs/wal/wal.h @@ -103,8 +103,8 @@ typedef struct SWal { int32_t fsyncSeq; // meta SWalVer vers; - TdFilePtr pWriteLogTFile; - TdFilePtr pWriteIdxTFile; + TdFilePtr pLogFile; + TdFilePtr pIdxFile; int32_t writeCur; SArray *fileInfoSet; // SArray // status diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index f87d336ec2107d83cea45b615770039f235d0852..6bdb6a4e655acfa177d45db716f1eb67556d3515 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -3750,6 +3750,10 @@ int32_t tSerializeSCreateVnodeReq(void *buf, int32_t bufLen, SCreateVnodeReq *pR uint32_t tsmaLen = (uint32_t)(htonl(((SMsgHead *)pReq->pTsma)->contLen)); if (tEncodeBinary(&encoder, (const uint8_t *)pReq->pTsma, tsmaLen) < 0) return -1; } + if (tEncodeI32(&encoder, pReq->walRetentionPeriod) < 0) return -1; + if (tEncodeI64(&encoder, pReq->walRetentionSize) < 0) return -1; + if (tEncodeI32(&encoder, pReq->walRollPeriod) < 0) return -1; + if (tEncodeI64(&encoder, pReq->walSegmentSize) < 0) return -1; tEndEncode(&encoder); @@ -3818,6 +3822,11 @@ int32_t tDeserializeSCreateVnodeReq(void *buf, int32_t bufLen, SCreateVnodeReq * if (tDecodeBinary(&decoder, (uint8_t **)&pReq->pTsma, NULL) < 0) return -1; } + if (tDecodeI32(&decoder, &pReq->walRetentionPeriod) < 0) return -1; + if (tDecodeI64(&decoder, &pReq->walRetentionSize) < 0) return -1; + if (tDecodeI32(&decoder, &pReq->walRollPeriod) < 0) return -1; + if (tDecodeI64(&decoder, &pReq->walSegmentSize) < 0) return -1; + tEndDecode(&decoder); tDecoderClear(&decoder); return 0; diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c index 0471e2b85043e0286909d3174e8c937dd4e22f08..cb061e6d1c4a3a2148996cc781b437f8af0fcb9f 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c @@ -160,6 +160,13 @@ static void vmGenerateVnodeCfg(SCreateVnodeReq *pCreate, SVnodeCfg *pCfg) { } pCfg->walCfg.vgId = pCreate->vgId; + pCfg->walCfg.fsyncPeriod = pCreate->fsyncPeriod; + pCfg->walCfg.retentionPeriod = pCreate->walRetentionPeriod; + pCfg->walCfg.rollPeriod = pCreate->walRollPeriod; + pCfg->walCfg.retentionSize = pCreate->walRetentionSize; + pCfg->walCfg.segSize = pCreate->walSegmentSize; + pCfg->walCfg.level = pCreate->walLevel; + pCfg->hashBegin = pCreate->hashBegin; pCfg->hashEnd = pCreate->hashEnd; pCfg->hashMethod = pCreate->hashMethod; diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index c9997fa3d5860e92b6b92c3d33d35c32080be866..0ff9b4102d5623efd8b406367b282fda150d1190 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -302,9 +302,13 @@ typedef struct { int8_t strict; int8_t hashMethod; // default is 1 int8_t cacheLast; + int8_t schemaless; int32_t numOfRetensions; SArray* pRetensions; - int8_t schemaless; + int32_t walRetentionPeriod; + int64_t walRetentionSize; + int32_t walRollPeriod; + int64_t walSegmentSize; } SDbCfg; typedef struct { diff --git a/source/dnode/mnode/impl/src/mndDb.c b/source/dnode/mnode/impl/src/mndDb.c index 064ef9b40a772790ff01284cee9103dfe642f78b..86787fcd012bfc6bec3448940940e485e630f404 100644 --- a/source/dnode/mnode/impl/src/mndDb.c +++ b/source/dnode/mnode/impl/src/mndDb.c @@ -120,6 +120,10 @@ static SSdbRaw *mndDbActionEncode(SDbObj *pDb) { SDB_SET_INT8(pRaw, dataPos, pRetension->keepUnit, _OVER) } SDB_SET_INT8(pRaw, dataPos, pDb->cfg.schemaless, _OVER) + SDB_SET_INT32(pRaw, dataPos, pDb->cfg.walRetentionPeriod, _OVER) + SDB_SET_INT64(pRaw, dataPos, pDb->cfg.walRetentionSize, _OVER) + SDB_SET_INT32(pRaw, dataPos, pDb->cfg.walRollPeriod, _OVER) + SDB_SET_INT64(pRaw, dataPos, pDb->cfg.walSegmentSize, _OVER) SDB_SET_RESERVE(pRaw, dataPos, DB_RESERVE_SIZE, _OVER) SDB_SET_DATALEN(pRaw, dataPos, _OVER) @@ -199,6 +203,10 @@ static SSdbRow *mndDbActionDecode(SSdbRaw *pRaw) { } } SDB_GET_INT8(pRaw, dataPos, &pDb->cfg.schemaless, _OVER) + SDB_GET_INT32(pRaw, dataPos, &pDb->cfg.walRetentionPeriod, _OVER) + SDB_GET_INT64(pRaw, dataPos, &pDb->cfg.walRetentionSize, _OVER) + SDB_GET_INT32(pRaw, dataPos, &pDb->cfg.walRollPeriod, _OVER) + SDB_GET_INT64(pRaw, dataPos, &pDb->cfg.walSegmentSize, _OVER) SDB_GET_RESERVE(pRaw, dataPos, DB_RESERVE_SIZE, _OVER) taosInitRWLatch(&pDb->lock); @@ -318,6 +326,10 @@ static int32_t mndCheckDbCfg(SMnode *pMnode, SDbCfg *pCfg) { terrno = TSDB_CODE_MND_NO_ENOUGH_DNODES; return -1; } + if (pCfg->walRetentionPeriod < TSDB_DB_MIN_WAL_RETENTION_PERIOD) return -1; + if (pCfg->walRetentionSize < TSDB_DB_MIN_WAL_RETENTION_SIZE) return -1; + if (pCfg->walRollPeriod < TSDB_DB_MIN_WAL_ROLL_PERIOD) return -1; + if (pCfg->walSegmentSize < TSDB_DB_MIN_WAL_SEGMENT_SIZE) return -1; terrno = 0; return terrno; @@ -345,6 +357,12 @@ static void mndSetDefaultDbCfg(SDbCfg *pCfg) { if (pCfg->cacheLastSize <= 0) pCfg->cacheLastSize = TSDB_DEFAULT_CACHE_SIZE; if (pCfg->numOfRetensions < 0) pCfg->numOfRetensions = 0; if (pCfg->schemaless < 0) pCfg->schemaless = TSDB_DB_SCHEMALESS_OFF; + if (pCfg->walRetentionPeriod < 0 && pCfg->walRetentionPeriod != -1) + pCfg->walRetentionPeriod = TSDB_DEFAULT_DB_WAL_RETENTION_PERIOD; + if (pCfg->walRetentionSize < 0 && pCfg->walRetentionSize != -1) + pCfg->walRetentionSize = TSDB_DEFAULT_DB_WAL_RETENTION_SIZE; + if (pCfg->walRollPeriod < 0) pCfg->walRollPeriod = TSDB_DEFAULT_DB_WAL_ROLL_PERIOD; + if (pCfg->walSegmentSize < 0) pCfg->walSegmentSize = TSDB_DEFAULT_DB_WAL_SEGMENT_SIZE; } static int32_t mndSetCreateDbRedoLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroups) { @@ -457,6 +475,10 @@ static int32_t mndCreateDb(SMnode *pMnode, SRpcMsg *pReq, SCreateDbReq *pCreate, .cacheLast = pCreate->cacheLast, .hashMethod = 1, .schemaless = pCreate->schemaless, + .walRetentionPeriod = pCreate->walRetentionPeriod, + .walRetentionSize = pCreate->walRetentionSize, + .walRollPeriod = pCreate->walRollPeriod, + .walSegmentSize = pCreate->walSegmentSize, }; dbObj.cfg.numOfRetensions = pCreate->numOfRetensions; diff --git a/source/dnode/mnode/impl/src/mndVgroup.c b/source/dnode/mnode/impl/src/mndVgroup.c index 3eb3a6cd1fd1f3423fe50829d307a1dfea815272..4625b2ab01544c3970e8509aa8b992e725812a45 100644 --- a/source/dnode/mnode/impl/src/mndVgroup.c +++ b/source/dnode/mnode/impl/src/mndVgroup.c @@ -230,6 +230,10 @@ void *mndBuildCreateVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVg createReq.standby = standby; createReq.isTsma = pVgroup->isTsma; createReq.pTsma = pVgroup->pTsma; + createReq.walRetentionPeriod = pDb->cfg.walRetentionPeriod; + createReq.walRetentionSize = pDb->cfg.walRetentionSize; + createReq.walRollPeriod = pDb->cfg.walRollPeriod; + createReq.walSegmentSize = pDb->cfg.walSegmentSize; for (int32_t v = 0; v < pVgroup->replica; ++v) { SReplica *pReplica = &createReq.replicas[v]; diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 118e3a5d43d159abf7f3ed8a901c2707e7987cdf..2adfc92ab1f7f0f88ae9dd4f14147c278b3fefdb 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -583,7 +583,7 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) { pHandle->execHandle.execTb.suid = req.suid; SArray* tbUidList = taosArrayInit(0, sizeof(int64_t)); vnodeGetCtbIdList(pTq->pVnode, req.suid, tbUidList); - tqDebug("vgId:%d, tq try get suid:%" PRId64, pTq->pVnode->config.vgId, req.suid); + tqDebug("vgId:%d, tq try to get all ctb, suid:%" PRId64, pTq->pVnode->config.vgId, req.suid); for (int32_t i = 0; i < taosArrayGetSize(tbUidList); i++) { int64_t tbUid = *(int64_t*)taosArrayGet(tbUidList, i); tqDebug("vgId:%d, idx %d, uid:%" PRId64, TD_VID(pTq->pVnode), i, tbUid); diff --git a/source/dnode/vnode/src/vnd/vnodeCfg.c b/source/dnode/vnode/src/vnd/vnodeCfg.c index eac1fd1a74dc9f83a15492c4e686b08d572734aa..e38fe9876b31be7d171a748c29a18ea5fc31c770 100644 --- a/source/dnode/vnode/src/vnd/vnodeCfg.c +++ b/source/dnode/vnode/src/vnd/vnodeCfg.c @@ -40,8 +40,8 @@ const SVnodeCfg vnodeCfgDefault = {.vgId = -1, .vgId = -1, .fsyncPeriod = 0, .retentionPeriod = -1, - .rollPeriod = -1, - .segSize = -1, + .rollPeriod = 0, + .segSize = 0, .retentionSize = -1, .level = TAOS_WAL_WRITE, }, diff --git a/source/libs/wal/src/walMgmt.c b/source/libs/wal/src/walMgmt.c index 85238e87b9f5c5549f2a1cf82435dca8fd85b5a4..047354c4aa44c5faa4bf50ec087ab939b8df0855 100644 --- a/source/libs/wal/src/walMgmt.c +++ b/source/libs/wal/src/walMgmt.c @@ -101,8 +101,8 @@ SWal *walOpen(const char *path, SWalCfg *pCfg) { // open meta walResetVer(&pWal->vers); - pWal->pWriteLogTFile = NULL; - pWal->pWriteIdxTFile = NULL; + pWal->pLogFile = NULL; + pWal->pIdxFile = NULL; pWal->writeCur = -1; pWal->fileInfoSet = taosArrayInit(8, sizeof(SWalFileInfo)); if (pWal->fileInfoSet == NULL) { @@ -179,10 +179,10 @@ int32_t walAlter(SWal *pWal, SWalCfg *pCfg) { void walClose(SWal *pWal) { taosThreadMutexLock(&pWal->mutex); - taosCloseFile(&pWal->pWriteLogTFile); - pWal->pWriteLogTFile = NULL; - taosCloseFile(&pWal->pWriteIdxTFile); - pWal->pWriteIdxTFile = NULL; + taosCloseFile(&pWal->pLogFile); + pWal->pLogFile = NULL; + taosCloseFile(&pWal->pIdxFile); + pWal->pIdxFile = NULL; walSaveMeta(pWal); taosArrayDestroy(pWal->fileInfoSet); pWal->fileInfoSet = NULL; @@ -223,7 +223,7 @@ static void walFsyncAll() { if (walNeedFsync(pWal)) { wTrace("vgId:%d, do fsync, level:%d seq:%d rseq:%d", pWal->cfg.vgId, pWal->cfg.level, pWal->fsyncSeq, atomic_load_32(&tsWal.seq)); - int32_t code = taosFsyncFile(pWal->pWriteLogTFile); + int32_t code = taosFsyncFile(pWal->pLogFile); if (code != 0) { wError("vgId:%d, file:%" PRId64 ".log, failed to fsync since %s", pWal->cfg.vgId, walGetLastFileFirstVer(pWal), strerror(code)); diff --git a/source/libs/wal/src/walSeek.c b/source/libs/wal/src/walSeek.c index 78d45c84e2699770bb3453521c72c0d9dfbc6ac9..87ab155065021cb902739ef18c8925afc04c2f07 100644 --- a/source/libs/wal/src/walSeek.c +++ b/source/libs/wal/src/walSeek.c @@ -22,8 +22,8 @@ static int64_t walSeekWritePos(SWal* pWal, int64_t ver) { int64_t code = 0; - TdFilePtr pIdxTFile = pWal->pWriteIdxTFile; - TdFilePtr pLogTFile = pWal->pWriteLogTFile; + TdFilePtr pIdxTFile = pWal->pIdxFile; + TdFilePtr pLogTFile = pWal->pLogFile; // seek position int64_t idxOff = walGetVerIdxOffset(pWal, ver); @@ -68,8 +68,8 @@ int walInitWriteFile(SWal* pWal) { return -1; } // switch file - pWal->pWriteIdxTFile = pIdxTFile; - pWal->pWriteLogTFile = pLogTFile; + pWal->pIdxFile = pIdxTFile; + pWal->pLogFile = pLogTFile; pWal->writeCur = taosArrayGetSize(pWal->fileInfoSet) - 1; return 0; } @@ -78,15 +78,15 @@ int walChangeWrite(SWal* pWal, int64_t ver) { int code; TdFilePtr pIdxTFile, pLogTFile; char fnameStr[WAL_FILE_LEN]; - if (pWal->pWriteLogTFile != NULL) { - code = taosCloseFile(&pWal->pWriteLogTFile); + if (pWal->pLogFile != NULL) { + code = taosCloseFile(&pWal->pLogFile); if (code != 0) { terrno = TAOS_SYSTEM_ERROR(errno); return -1; } } - if (pWal->pWriteIdxTFile != NULL) { - code = taosCloseFile(&pWal->pWriteIdxTFile); + if (pWal->pIdxFile != NULL) { + code = taosCloseFile(&pWal->pIdxFile); if (code != 0) { terrno = TAOS_SYSTEM_ERROR(errno); return -1; @@ -106,7 +106,7 @@ int walChangeWrite(SWal* pWal, int64_t ver) { pIdxTFile = taosOpenFile(fnameStr, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND); if (pIdxTFile == NULL) { terrno = TAOS_SYSTEM_ERROR(errno); - pWal->pWriteIdxTFile = NULL; + pWal->pIdxFile = NULL; return -1; } walBuildLogName(pWal, fileFirstVer, fnameStr); @@ -114,12 +114,12 @@ int walChangeWrite(SWal* pWal, int64_t ver) { if (pLogTFile == NULL) { taosCloseFile(&pIdxTFile); terrno = TAOS_SYSTEM_ERROR(errno); - pWal->pWriteLogTFile = NULL; + pWal->pLogFile = NULL; return -1; } - pWal->pWriteLogTFile = pLogTFile; - pWal->pWriteIdxTFile = pIdxTFile; + pWal->pLogFile = pLogTFile; + pWal->pIdxFile = pIdxTFile; pWal->writeCur = idx; return fileFirstVer; } diff --git a/source/libs/wal/src/walWrite.c b/source/libs/wal/src/walWrite.c index 81500d80882f930e253771b835ac0f9e63f0f57b..d869e6e2ceee749657a53970c1981d97ca07db1a 100644 --- a/source/libs/wal/src/walWrite.c +++ b/source/libs/wal/src/walWrite.c @@ -32,8 +32,8 @@ int32_t walRestoreFromSnapshot(SWal *pWal, int64_t ver) { } } - taosCloseFile(&pWal->pWriteLogTFile); - taosCloseFile(&pWal->pWriteIdxTFile); + taosCloseFile(&pWal->pLogFile); + taosCloseFile(&pWal->pIdxFile); if (pWal->vers.firstVer != -1) { int32_t fileSetSize = taosArrayGetSize(pWal->fileInfoSet); @@ -261,14 +261,13 @@ int32_t walEndSnapshot(SWal *pWal) { pWal->vers.snapshotVer = ver; int ts = taosGetTimestampSec(); - int64_t minVerToDelete = ver; - void *pIter = NULL; + void *pIter = NULL; while (1) { pIter = taosHashIterate(pWal->pRefHash, pIter); if (pIter == NULL) break; SWalRef *pRef = *(SWalRef **)pIter; if (pRef->refVer == -1) continue; - minVerToDelete = TMIN(minVerToDelete, pRef->refVer); + ver = TMIN(ver, pRef->refVer); } int deleteCnt = 0; @@ -277,35 +276,37 @@ int32_t walEndSnapshot(SWal *pWal) { tmp.firstVer = ver; // find files safe to delete SWalFileInfo *pInfo = taosArraySearch(pWal->fileInfoSet, &tmp, compareWalFileInfo, TD_LE); - if (ver >= pInfo->lastVer) { - pInfo++; - } - // iterate files, until the searched result - for (SWalFileInfo *iter = pWal->fileInfoSet->pData; iter < pInfo; iter++) { - if ((pWal->cfg.retentionSize != -1 && newTotSize > pWal->cfg.retentionSize) || - (pWal->cfg.retentionPeriod != -1 && iter->closeTs + pWal->cfg.retentionPeriod > ts)) { - // delete according to file size or close time - deleteCnt++; - newTotSize -= iter->fileSize; + if (pInfo) { + if (ver >= pInfo->lastVer) { + pInfo++; + } + // iterate files, until the searched result + for (SWalFileInfo *iter = pWal->fileInfoSet->pData; iter < pInfo; iter++) { + if ((pWal->cfg.retentionSize != -1 && newTotSize > pWal->cfg.retentionSize) || + (pWal->cfg.retentionPeriod != -1 && iter->closeTs + pWal->cfg.retentionPeriod > ts)) { + // delete according to file size or close time + deleteCnt++; + newTotSize -= iter->fileSize; + } + } + char fnameStr[WAL_FILE_LEN]; + // remove file + for (int i = 0; i < deleteCnt; i++) { + pInfo = taosArrayGet(pWal->fileInfoSet, i); + walBuildLogName(pWal, pInfo->firstVer, fnameStr); + taosRemoveFile(fnameStr); + walBuildIdxName(pWal, pInfo->firstVer, fnameStr); + taosRemoveFile(fnameStr); } - } - char fnameStr[WAL_FILE_LEN]; - // remove file - for (int i = 0; i < deleteCnt; i++) { - pInfo = taosArrayGet(pWal->fileInfoSet, i); - walBuildLogName(pWal, pInfo->firstVer, fnameStr); - taosRemoveFile(fnameStr); - walBuildIdxName(pWal, pInfo->firstVer, fnameStr); - taosRemoveFile(fnameStr); - } - // make new array, remove files - taosArrayPopFrontBatch(pWal->fileInfoSet, deleteCnt); - if (taosArrayGetSize(pWal->fileInfoSet) == 0) { - pWal->writeCur = -1; - pWal->vers.firstVer = -1; - } else { - pWal->vers.firstVer = ((SWalFileInfo *)taosArrayGet(pWal->fileInfoSet, 0))->firstVer; + // make new array, remove files + taosArrayPopFrontBatch(pWal->fileInfoSet, deleteCnt); + if (taosArrayGetSize(pWal->fileInfoSet) == 0) { + pWal->writeCur = -1; + pWal->vers.firstVer = -1; + } else { + pWal->vers.firstVer = ((SWalFileInfo *)taosArrayGet(pWal->fileInfoSet, 0))->firstVer; + } } pWal->writeCur = taosArrayGetSize(pWal->fileInfoSet) - 1; pWal->totSize = newTotSize; @@ -324,34 +325,34 @@ END: int32_t walRollImpl(SWal *pWal) { int32_t code = 0; - if (pWal->pWriteIdxTFile != NULL) { - code = taosCloseFile(&pWal->pWriteIdxTFile); + if (pWal->pIdxFile != NULL) { + code = taosCloseFile(&pWal->pIdxFile); if (code != 0) { terrno = TAOS_SYSTEM_ERROR(errno); goto END; } } - if (pWal->pWriteLogTFile != NULL) { - code = taosCloseFile(&pWal->pWriteLogTFile); + if (pWal->pLogFile != NULL) { + code = taosCloseFile(&pWal->pLogFile); if (code != 0) { terrno = TAOS_SYSTEM_ERROR(errno); goto END; } } - TdFilePtr pIdxTFile, pLogTFile; + TdFilePtr pIdxFile, pLogFile; // create new file - int64_t newFileFirstVersion = pWal->vers.lastVer + 1; + int64_t newFileFirstVer = pWal->vers.lastVer + 1; char fnameStr[WAL_FILE_LEN]; - walBuildIdxName(pWal, newFileFirstVersion, fnameStr); - pIdxTFile = taosOpenFile(fnameStr, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND); - if (pIdxTFile == NULL) { + walBuildIdxName(pWal, newFileFirstVer, fnameStr); + pIdxFile = taosOpenFile(fnameStr, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND); + if (pIdxFile == NULL) { terrno = TAOS_SYSTEM_ERROR(errno); code = -1; goto END; } - walBuildLogName(pWal, newFileFirstVersion, fnameStr); - pLogTFile = taosOpenFile(fnameStr, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND); - if (pLogTFile == NULL) { + walBuildLogName(pWal, newFileFirstVer, fnameStr); + pLogFile = taosOpenFile(fnameStr, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND); + if (pLogFile == NULL) { terrno = TAOS_SYSTEM_ERROR(errno); code = -1; goto END; @@ -363,8 +364,8 @@ int32_t walRollImpl(SWal *pWal) { } // switch file - pWal->pWriteIdxTFile = pIdxTFile; - pWal->pWriteLogTFile = pLogTFile; + pWal->pIdxFile = pIdxFile; + pWal->pLogFile = pLogFile; pWal->writeCur = taosArrayGetSize(pWal->fileInfoSet) - 1; ASSERT(pWal->writeCur >= 0); @@ -378,10 +379,10 @@ END: static int32_t walWriteIndex(SWal *pWal, int64_t ver, int64_t offset) { SWalIdxEntry entry = {.ver = ver, .offset = offset}; - int64_t idxOffset = taosLSeekFile(pWal->pWriteIdxTFile, 0, SEEK_END); + int64_t idxOffset = taosLSeekFile(pWal->pIdxFile, 0, SEEK_END); wDebug("vgId:%d, write index, index:%" PRId64 ", offset:%" PRId64 ", at %" PRId64, pWal->cfg.vgId, ver, offset, idxOffset); - int64_t size = taosWriteFile(pWal->pWriteIdxTFile, &entry, sizeof(SWalIdxEntry)); + int64_t size = taosWriteFile(pWal->pIdxFile, &entry, sizeof(SWalIdxEntry)); if (size != sizeof(SWalIdxEntry)) { terrno = TAOS_SYSTEM_ERROR(errno); // TODO truncate @@ -407,7 +408,7 @@ static FORCE_INLINE int32_t walWriteImpl(SWal *pWal, int64_t index, tmsg_t msgTy pWal->writeHead.cksumHead = walCalcHeadCksum(&pWal->writeHead); pWal->writeHead.cksumBody = walCalcBodyCksum(body, bodyLen); - if (taosWriteFile(pWal->pWriteLogTFile, &pWal->writeHead, sizeof(SWalCkHead)) != sizeof(SWalCkHead)) { + if (taosWriteFile(pWal->pLogFile, &pWal->writeHead, sizeof(SWalCkHead)) != sizeof(SWalCkHead)) { // TODO ftruncate terrno = TAOS_SYSTEM_ERROR(errno); wError("vgId:%d, file:%" PRId64 ".log, failed to write since %s", pWal->cfg.vgId, walGetLastFileFirstVer(pWal), @@ -416,7 +417,7 @@ static FORCE_INLINE int32_t walWriteImpl(SWal *pWal, int64_t index, tmsg_t msgTy goto END; } - if (taosWriteFile(pWal->pWriteLogTFile, (char *)body, bodyLen) != bodyLen) { + if (taosWriteFile(pWal->pLogFile, (char *)body, bodyLen) != bodyLen) { // TODO ftruncate terrno = TAOS_SYSTEM_ERROR(errno); wError("vgId:%d, file:%" PRId64 ".log, failed to write since %s", pWal->cfg.vgId, walGetLastFileFirstVer(pWal), @@ -456,14 +457,14 @@ int64_t walAppendLog(SWal *pWal, tmsg_t msgType, SWalSyncInfo syncMeta, const vo return -1; } - if (pWal->pWriteIdxTFile == NULL || pWal->pWriteIdxTFile == NULL || pWal->writeCur < 0) { + if (pWal->pIdxFile == NULL || pWal->pIdxFile == NULL || pWal->writeCur < 0) { if (walInitWriteFile(pWal) < 0) { taosThreadMutexUnlock(&pWal->mutex); return -1; } } - ASSERT(pWal->pWriteIdxTFile != NULL && pWal->pWriteLogTFile != NULL && pWal->writeCur >= 0); + ASSERT(pWal->pIdxFile != NULL && pWal->pLogFile != NULL && pWal->writeCur >= 0); if (walWriteImpl(pWal, index, msgType, syncMeta, body, bodyLen) < 0) { taosThreadMutexUnlock(&pWal->mutex); @@ -494,14 +495,14 @@ int32_t walWriteWithSyncInfo(SWal *pWal, int64_t index, tmsg_t msgType, SWalSync return -1; } - if (pWal->pWriteIdxTFile == NULL || pWal->pWriteIdxTFile == NULL || pWal->writeCur < 0) { + if (pWal->pIdxFile == NULL || pWal->pIdxFile == NULL || pWal->writeCur < 0) { if (walInitWriteFile(pWal) < 0) { taosThreadMutexUnlock(&pWal->mutex); return -1; } } - ASSERT(pWal->pWriteIdxTFile != NULL && pWal->pWriteLogTFile != NULL && pWal->writeCur >= 0); + ASSERT(pWal->pIdxFile != NULL && pWal->pLogFile != NULL && pWal->writeCur >= 0); if (walWriteImpl(pWal, index, msgType, syncMeta, body, bodyLen) < 0) { taosThreadMutexUnlock(&pWal->mutex); @@ -524,7 +525,7 @@ int32_t walWrite(SWal *pWal, int64_t index, tmsg_t msgType, const void *body, in void walFsync(SWal *pWal, bool forceFsync) { if (forceFsync || (pWal->cfg.level == TAOS_WAL_FSYNC && pWal->cfg.fsyncPeriod == 0)) { wTrace("vgId:%d, fileId:%" PRId64 ".log, do fsync", pWal->cfg.vgId, walGetCurFileFirstVer(pWal)); - if (taosFsyncFile(pWal->pWriteLogTFile) < 0) { + if (taosFsyncFile(pWal->pLogFile) < 0) { wError("vgId:%d, file:%" PRId64 ".log, fsync failed since %s", pWal->cfg.vgId, walGetCurFileFirstVer(pWal), strerror(errno)); } diff --git a/tests/system-test/7-tmq/db.py b/tests/system-test/7-tmq/db.py index fd793fd841f76a402ba35fed68013167133d9a62..1fd0638d172fe6dfb6f19c1617e1eebc38ccac41 100644 --- a/tests/system-test/7-tmq/db.py +++ b/tests/system-test/7-tmq/db.py @@ -118,7 +118,7 @@ class TDTestCase: if dropFlag == 1: tsql.execute("drop database if exists %s"%(dbName)) - tsql.execute("create database if not exists %s vgroups %d replica %d"%(dbName, vgroups, replica)) + tsql.execute("create database if not exists %s vgroups %d replica %d wal_retention_period -1 wal_retention_size -1"%(dbName, vgroups, replica)) tdLog.debug("complete to create database %s"%(dbName)) return diff --git a/tests/system-test/7-tmq/tmqDelete-1ctb.py b/tests/system-test/7-tmq/tmqDelete-1ctb.py index a2a429771cc4fddcbf8ba86c64e0d5cfc90ff7cd..bedb36e505fa0f0d136d33aeeb472287d7566c54 100644 --- a/tests/system-test/7-tmq/tmqDelete-1ctb.py +++ b/tests/system-test/7-tmq/tmqDelete-1ctb.py @@ -52,7 +52,7 @@ class TDTestCase: paraDict['rowsPerTbl'] = self.rowsPerTbl tmqCom.initConsumerTable() - tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=1) + tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=1,wal_retention_size=-1, wal_retention_period=-1) tdLog.info("create stb") tmqCom.create_stable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"]) tdLog.info("create ctb") diff --git a/tests/system-test/7-tmq/tmqDelete-multiCtb.py b/tests/system-test/7-tmq/tmqDelete-multiCtb.py index fa32efbd0b6b44d8077b182b5dd6c7dbe81aab2c..94ca16bc6f4b6abf260f291380e1c1f0079d5fa1 100644 --- a/tests/system-test/7-tmq/tmqDelete-multiCtb.py +++ b/tests/system-test/7-tmq/tmqDelete-multiCtb.py @@ -52,7 +52,7 @@ class TDTestCase: paraDict['rowsPerTbl'] = self.rowsPerTbl tmqCom.initConsumerTable() - tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=1) + tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=1,wal_retention_size=-1, wal_retention_period=-1) tdLog.info("create stb") tmqCom.create_stable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"]) tdLog.info("create ctb") diff --git a/tests/system-test/7-tmq/tmqDnodeRestart.py b/tests/system-test/7-tmq/tmqDnodeRestart.py index 5117ee3d24c289e695eb3113e643e94a6eb01e53..9a11106e3ea464415815dc020870f60cbaf21fb7 100644 --- a/tests/system-test/7-tmq/tmqDnodeRestart.py +++ b/tests/system-test/7-tmq/tmqDnodeRestart.py @@ -53,7 +53,7 @@ class TDTestCase: paraDict['rowsPerTbl'] = self.rowsPerTbl tmqCom.initConsumerTable() - tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=1) + tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=1,wal_retention_size=-1, wal_retention_period=-1) tdLog.info("create stb") tmqCom.create_stable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"]) tdLog.info("create ctb") diff --git a/tests/system-test/7-tmq/tmqUpdateWithConsume.py b/tests/system-test/7-tmq/tmqUpdateWithConsume.py index 4f21beffc40e825ce143bf23060f566048ec16b4..2dd3a061c62bf5ac63a135e99424ac549ef83a54 100644 --- a/tests/system-test/7-tmq/tmqUpdateWithConsume.py +++ b/tests/system-test/7-tmq/tmqUpdateWithConsume.py @@ -52,7 +52,7 @@ class TDTestCase: paraDict['rowsPerTbl'] = self.rowsPerTbl tmqCom.initConsumerTable() - tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=1) + tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=1, wal_retention_size=-1, wal_retention_period=-1) tdLog.info("create stb") tmqCom.create_stable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"]) tdLog.info("create ctb")