From 7b4bb723922a0b7fb8e549dbc6a90c23b424248a Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Fri, 18 Aug 2023 16:38:26 +0800 Subject: [PATCH] fix(vnode/destroy): delete objects --- source/dnode/mgmt/mgmt_vnode/src/vmHandle.c | 105 ++++++++++---------- source/dnode/mgmt/mgmt_vnode/src/vmInt.c | 2 +- source/dnode/vnode/inc/vnode.h | 51 +++++----- source/dnode/vnode/src/inc/vndCos.h | 1 + source/dnode/vnode/src/vnd/vnodeCos.c | 20 ++++ source/dnode/vnode/src/vnd/vnodeOpen.c | 26 +++-- 6 files changed, 114 insertions(+), 91 deletions(-) diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c index cf57deaa22..444639315a 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c @@ -144,7 +144,7 @@ static void vmGenerateVnodeCfg(SCreateVnodeReq *pCreate, SVnodeCfg *pCfg) { tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort); pCfg->syncCfg.replicaNum++; } - if(pCreate->selfIndex != -1){ + if (pCreate->selfIndex != -1) { pCfg->syncCfg.myIndex = pCreate->selfIndex; } for (int32_t i = pCfg->syncCfg.replicaNum; i < pCreate->replica + pCreate->learnerReplica; ++i) { @@ -157,7 +157,7 @@ static void vmGenerateVnodeCfg(SCreateVnodeReq *pCreate, SVnodeCfg *pCfg) { pCfg->syncCfg.totalReplicaNum++; } pCfg->syncCfg.totalReplicaNum += pCfg->syncCfg.replicaNum; - if(pCreate->learnerSelfIndex != -1){ + if (pCreate->learnerSelfIndex != -1) { pCfg->syncCfg.myIndex = pCreate->replica + pCreate->learnerSelfIndex; } } @@ -201,38 +201,37 @@ int32_t vmProcessCreateVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { return -1; } - if(req.learnerReplica == 0) - { + if (req.learnerReplica == 0) { req.learnerSelfIndex = -1; } - dInfo("vgId:%d, vnode management handle msgType:%s, start to create vnode, page:%d pageSize:%d buffer:%d szPage:%d szBuf:%" PRIu64 - ", cacheLast:%d cacheLastSize:%d sstTrigger:%d tsdbPageSize:%d %d dbname:%s dbId:%" PRId64 - ", days:%d keep0:%d keep1:%d keep2:%d tsma:%d precision:%d compression:%d minRows:%d maxRows:%d" - ", wal fsync:%d level:%d retentionPeriod:%d retentionSize:%" PRId64 " rollPeriod:%d segSize:%" PRId64 - ", hash method:%d begin:%u end:%u prefix:%d surfix:%d replica:%d selfIndex:%d " - "learnerReplica:%d learnerSelfIndex:%d strict:%d", - req.vgId, TMSG_INFO(pMsg->msgType), req.pages, req.pageSize, req.buffer, req.pageSize * 1024, - (uint64_t)req.buffer * 1024 * 1024, - req.cacheLast, req.cacheLastSize, req.sstTrigger, req.tsdbPageSize, req.tsdbPageSize * 1024, req.db, req.dbUid, - req.daysPerFile, req.daysToKeep0, req.daysToKeep1, req.daysToKeep2, req.isTsma, req.precision, req.compression, - req.minRows, req.maxRows, req.walFsyncPeriod, req.walLevel, req.walRetentionPeriod, req.walRetentionSize, - req.walRollPeriod, req.walSegmentSize, req.hashMethod, req.hashBegin, req.hashEnd, req.hashPrefix, - req.hashSuffix, req.replica, req.selfIndex, req.learnerReplica, req.learnerSelfIndex, req.strict); + dInfo( + "vgId:%d, vnode management handle msgType:%s, start to create vnode, page:%d pageSize:%d buffer:%d szPage:%d " + "szBuf:%" PRIu64 ", cacheLast:%d cacheLastSize:%d sstTrigger:%d tsdbPageSize:%d %d dbname:%s dbId:%" PRId64 + ", days:%d keep0:%d keep1:%d keep2:%d tsma:%d precision:%d compression:%d minRows:%d maxRows:%d" + ", wal fsync:%d level:%d retentionPeriod:%d retentionSize:%" PRId64 " rollPeriod:%d segSize:%" PRId64 + ", hash method:%d begin:%u end:%u prefix:%d surfix:%d replica:%d selfIndex:%d " + "learnerReplica:%d learnerSelfIndex:%d strict:%d", + req.vgId, TMSG_INFO(pMsg->msgType), req.pages, req.pageSize, req.buffer, req.pageSize * 1024, + (uint64_t)req.buffer * 1024 * 1024, req.cacheLast, req.cacheLastSize, req.sstTrigger, req.tsdbPageSize, + req.tsdbPageSize * 1024, req.db, req.dbUid, req.daysPerFile, req.daysToKeep0, req.daysToKeep1, req.daysToKeep2, + req.isTsma, req.precision, req.compression, req.minRows, req.maxRows, req.walFsyncPeriod, req.walLevel, + req.walRetentionPeriod, req.walRetentionSize, req.walRollPeriod, req.walSegmentSize, req.hashMethod, + req.hashBegin, req.hashEnd, req.hashPrefix, req.hashSuffix, req.replica, req.selfIndex, req.learnerReplica, + req.learnerSelfIndex, req.strict); for (int32_t i = 0; i < req.replica; ++i) { dInfo("vgId:%d, replica:%d ep:%s:%u dnode:%d", req.vgId, i, req.replicas[i].fqdn, req.replicas[i].port, req.replicas[i].id); } for (int32_t i = 0; i < req.learnerReplica; ++i) { dInfo("vgId:%d, learnerReplica:%d ep:%s:%u dnode:%d", req.vgId, i, req.learnerReplicas[i].fqdn, - req.learnerReplicas[i].port, req.replicas[i].id); + req.learnerReplicas[i].port, req.replicas[i].id); } SReplica *pReplica = NULL; - if(req.selfIndex != -1){ + if (req.selfIndex != -1) { pReplica = &req.replicas[req.selfIndex]; - } - else{ + } else { pReplica = &req.learnerReplicas[req.learnerSelfIndex]; } if (pReplica->id != pMgmt->pData->dnodeId || pReplica->port != tsServerPort || @@ -313,10 +312,10 @@ int32_t vmProcessCreateVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { _OVER: if (code != 0) { vnodeClose(pImpl); - vnodeDestroy(path, pMgmt->pTfs); + vnodeDestroy(0, path, pMgmt->pTfs); } else { - dInfo("vgId:%d, vnode management handle msgType:%s, end to create vnode, vnode is created", - req.vgId, TMSG_INFO(pMsg->msgType)); + dInfo("vgId:%d, vnode management handle msgType:%s, end to create vnode, vnode is created", req.vgId, + TMSG_INFO(pMsg->msgType)); } tFreeSCreateVnodeReq(&req); @@ -331,12 +330,12 @@ int32_t vmProcessAlterVnodeTypeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { return -1; } - if(req.learnerReplicas == 0){ + if (req.learnerReplicas == 0) { req.learnerSelfIndex = -1; } - dInfo("vgId:%d, vnode management handle msgType:%s, start to process alter-node-type-request", - req.vgId, TMSG_INFO(pMsg->msgType)); + dInfo("vgId:%d, vnode management handle msgType:%s, start to process alter-node-type-request", req.vgId, + TMSG_INFO(pMsg->msgType)); SVnodeObj *pVnode = vmAcquireVnode(pMgmt, req.vgId); if (pVnode == NULL) { @@ -347,7 +346,7 @@ int32_t vmProcessAlterVnodeTypeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { ESyncRole role = vnodeGetRole(pVnode->pImpl); dInfo("vgId:%d, checking node role:%d", req.vgId, role); - if(role == TAOS_SYNC_ROLE_VOTER){ + if (role == TAOS_SYNC_ROLE_VOTER) { dError("vgId:%d, failed to alter vnode type since node already is role:%d", req.vgId, role); terrno = TSDB_CODE_VND_ALREADY_IS_VOTER; vmReleaseVnode(pMgmt, pVnode); @@ -355,7 +354,7 @@ int32_t vmProcessAlterVnodeTypeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { } dInfo("vgId:%d, checking node catch up", req.vgId); - if(vnodeIsCatchUp(pVnode->pImpl) != 1){ + if (vnodeIsCatchUp(pVnode->pImpl) != 1) { terrno = TSDB_CODE_VND_NOT_CATCH_UP; vmReleaseVnode(pMgmt, pVnode); return -1; @@ -375,9 +374,8 @@ int32_t vmProcessAlterVnodeTypeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { dInfo("vgId:%d, learnerReplicas:%d ep:%s:%u dnode:%d", vgId, i, pReplica->fqdn, pReplica->port, pReplica->id); } - if (req.replica <= 0 || - (req.selfIndex < 0 && req.learnerSelfIndex <0)|| - req.selfIndex >= req.replica || req.learnerSelfIndex >= req.learnerReplica) { + if (req.replica <= 0 || (req.selfIndex < 0 && req.learnerSelfIndex < 0) || req.selfIndex >= req.replica || + req.learnerSelfIndex >= req.learnerReplica) { terrno = TSDB_CODE_INVALID_MSG; dError("vgId:%d, failed to alter replica since invalid msg", vgId); vmReleaseVnode(pMgmt, pVnode); @@ -385,10 +383,9 @@ int32_t vmProcessAlterVnodeTypeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { } SReplica *pReplica = NULL; - if(req.selfIndex != -1){ + if (req.selfIndex != -1) { pReplica = &req.replicas[req.selfIndex]; - } - else{ + } else { pReplica = &req.learnerReplicas[req.learnerSelfIndex]; } @@ -412,7 +409,7 @@ int32_t vmProcessAlterVnodeTypeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { vmCloseVnode(pMgmt, pVnode, false); int32_t diskPrimary = wrapperCfg.diskPrimary; - char path[TSDB_FILENAME_LEN] = {0}; + char path[TSDB_FILENAME_LEN] = {0}; snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, vgId); dInfo("vgId:%d, start to alter vnode replica at %s", vgId, path); @@ -439,7 +436,7 @@ int32_t vmProcessAlterVnodeTypeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { } dInfo("vgId:%d, vnode management handle msgType:%s, end to process alter-node-type-request, vnode config is altered", - req.vgId, TMSG_INFO(pMsg->msgType)); + req.vgId, TMSG_INFO(pMsg->msgType)); return 0; } @@ -510,8 +507,8 @@ int32_t vmProcessAlterHashRangeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { vmCloseVnode(pMgmt, pVnode, true); int32_t diskPrimary = wrapperCfg.diskPrimary; - char srcPath[TSDB_FILENAME_LEN] = {0}; - char dstPath[TSDB_FILENAME_LEN] = {0}; + char srcPath[TSDB_FILENAME_LEN] = {0}; + char dstPath[TSDB_FILENAME_LEN] = {0}; snprintf(srcPath, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, srcVgId); snprintf(dstPath, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, dstVgId); @@ -555,15 +552,16 @@ int32_t vmProcessAlterVnodeReplicaReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { return -1; } - if(alterReq.learnerReplica == 0){ + if (alterReq.learnerReplica == 0) { alterReq.learnerSelfIndex = -1; } int32_t vgId = alterReq.vgId; - dInfo("vgId:%d,vnode management handle msgType:%s, start to alter vnode replica:%d selfIndex:%d leanerReplica:%d " - "learnerSelfIndex:%d strict:%d", - vgId, TMSG_INFO(pMsg->msgType), alterReq.replica, alterReq.selfIndex, alterReq.learnerReplica, - alterReq.learnerSelfIndex, alterReq.strict); + dInfo( + "vgId:%d,vnode management handle msgType:%s, start to alter vnode replica:%d selfIndex:%d leanerReplica:%d " + "learnerSelfIndex:%d strict:%d", + vgId, TMSG_INFO(pMsg->msgType), alterReq.replica, alterReq.selfIndex, alterReq.learnerReplica, + alterReq.learnerSelfIndex, alterReq.strict); for (int32_t i = 0; i < alterReq.replica; ++i) { SReplica *pReplica = &alterReq.replicas[i]; dInfo("vgId:%d, replica:%d ep:%s:%u dnode:%d", vgId, i, pReplica->fqdn, pReplica->port, pReplica->port); @@ -573,8 +571,7 @@ int32_t vmProcessAlterVnodeReplicaReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { dInfo("vgId:%d, learnerReplicas:%d ep:%s:%u dnode:%d", vgId, i, pReplica->fqdn, pReplica->port, pReplica->port); } - if (alterReq.replica <= 0 || - (alterReq.selfIndex < 0 && alterReq.learnerSelfIndex <0)|| + if (alterReq.replica <= 0 || (alterReq.selfIndex < 0 && alterReq.learnerSelfIndex < 0) || alterReq.selfIndex >= alterReq.replica || alterReq.learnerSelfIndex >= alterReq.learnerReplica) { terrno = TSDB_CODE_INVALID_MSG; dError("vgId:%d, failed to alter replica since invalid msg", vgId); @@ -582,10 +579,9 @@ int32_t vmProcessAlterVnodeReplicaReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { } SReplica *pReplica = NULL; - if(alterReq.selfIndex != -1){ + if (alterReq.selfIndex != -1) { pReplica = &alterReq.replicas[alterReq.selfIndex]; - } - else{ + } else { pReplica = &alterReq.learnerReplicas[alterReq.learnerSelfIndex]; } @@ -615,7 +611,7 @@ int32_t vmProcessAlterVnodeReplicaReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { vmCloseVnode(pMgmt, pVnode, false); int32_t diskPrimary = wrapperCfg.diskPrimary; - char path[TSDB_FILENAME_LEN] = {0}; + char path[TSDB_FILENAME_LEN] = {0}; snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, vgId); dInfo("vgId:%d, start to alter vnode replica at %s", vgId, path); @@ -641,10 +637,11 @@ int32_t vmProcessAlterVnodeReplicaReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { return -1; } - dInfo("vgId:%d, vnode management handle msgType:%s, end to alter vnode replica:%d selfIndex:%d leanerReplica:%d " - "learnerSelfIndex:%d strict:%d", - vgId, TMSG_INFO(pMsg->msgType), alterReq.replica, alterReq.selfIndex, alterReq.learnerReplica, - alterReq.learnerSelfIndex, alterReq.strict); + dInfo( + "vgId:%d, vnode management handle msgType:%s, end to alter vnode replica:%d selfIndex:%d leanerReplica:%d " + "learnerSelfIndex:%d strict:%d", + vgId, TMSG_INFO(pMsg->msgType), alterReq.replica, alterReq.selfIndex, alterReq.learnerReplica, + alterReq.learnerSelfIndex, alterReq.strict); return 0; } diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmInt.c b/source/dnode/mgmt/mgmt_vnode/src/vmInt.c index 0ff2537e4c..f4ce46411a 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmInt.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmInt.c @@ -208,7 +208,7 @@ void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode, bool commitAndRemoveWal) if (pVnode->dropped) { dInfo("vgId:%d, vnode is destroyed, dropped:%d", pVnode->vgId, pVnode->dropped); snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, pVnode->vgId); - vnodeDestroy(path, pMgmt->pTfs); + vnodeDestroy(pVnode->vgId, path, pMgmt->pTfs); } taosMemoryFree(pVnode->path); diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index a7ce18198d..5ae257aef8 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -33,8 +33,8 @@ #include "tmsg.h" #include "trow.h" -#include "tdb.h" #include "storageapi.h" +#include "tdb.h" #ifdef __cplusplus extern "C" { @@ -57,7 +57,7 @@ int32_t vnodeAlterHashRange(const char *srcPath, const char *dstPath, SAlterVnod int32_t diskPrimary, STfs *pTfs); int32_t vnodeRestoreVgroupId(const char *srcPath, const char *dstPath, int32_t srcVgId, int32_t dstVgId, int32_t diskPrimary, STfs *pTfs); -void vnodeDestroy(const char *path, STfs *pTfs); +void vnodeDestroy(int32_t vgId, const char *path, STfs *pTfs); SVnode *vnodeOpen(const char *path, int32_t diskPrimary, STfs *pTfs, SMsgCb msgCb); void vnodePreClose(SVnode *pVnode); void vnodePostClose(SVnode *pVnode); @@ -66,13 +66,13 @@ void vnodeClose(SVnode *pVnode); int32_t vnodeSyncCommit(SVnode *pVnode); int32_t vnodeBegin(SVnode *pVnode); -int32_t vnodeStart(SVnode *pVnode); -void vnodeStop(SVnode *pVnode); -int64_t vnodeGetSyncHandle(SVnode *pVnode); -void vnodeGetSnapshot(SVnode *pVnode, SSnapshot *pSnapshot); -void vnodeGetInfo(void *pVnode, const char **dbname, int32_t *vgId, int64_t* numOfTables, int64_t* numOfNormalTables); +int32_t vnodeStart(SVnode *pVnode); +void vnodeStop(SVnode *pVnode); +int64_t vnodeGetSyncHandle(SVnode *pVnode); +void vnodeGetSnapshot(SVnode *pVnode, SSnapshot *pSnapshot); +void vnodeGetInfo(void *pVnode, const char **dbname, int32_t *vgId, int64_t *numOfTables, int64_t *numOfNormalTables); int32_t vnodeProcessCreateTSma(SVnode *pVnode, void *pCont, uint32_t contLen); -int32_t vnodeGetTableList(void* pVnode, int8_t type, SArray* pList); +int32_t vnodeGetTableList(void *pVnode, int8_t type, SArray *pList); int32_t vnodeGetAllTableList(SVnode *pVnode, uint64_t uid, SArray *list); int32_t vnodeIsCatchUp(SVnode *pVnode); ESyncRole vnodeGetRole(SVnode *pVnode); @@ -80,7 +80,8 @@ ESyncRole vnodeGetRole(SVnode *pVnode); int32_t vnodeGetCtbIdList(void *pVnode, int64_t suid, SArray *list); int32_t vnodeGetCtbIdListByFilter(SVnode *pVnode, int64_t suid, SArray *list, bool (*filter)(void *arg), void *arg); int32_t vnodeGetStbIdList(SVnode *pVnode, int64_t suid, SArray *list); -int32_t vnodeGetStbIdListByFilter(SVnode *pVnode, int64_t suid, SArray *list, bool (*filter)(void *arg, void* arg1), void *arg); +int32_t vnodeGetStbIdListByFilter(SVnode *pVnode, int64_t suid, SArray *list, bool (*filter)(void *arg, void *arg1), + void *arg); void *vnodeGetIdx(void *pVnode); void *vnodeGetIvtIdx(void *pVnode); @@ -105,13 +106,13 @@ void vnodeApplyWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs void vnodeProposeCommitOnNeed(SVnode *pVnode, bool atExit); // meta -void _metaReaderInit(SMetaReader *pReader, void *pVnode, int32_t flags, SStoreMeta* pAPI); +void _metaReaderInit(SMetaReader *pReader, void *pVnode, int32_t flags, SStoreMeta *pAPI); void metaReaderReleaseLock(SMetaReader *pReader); void metaReaderClear(SMetaReader *pReader); int32_t metaReaderGetTableEntryByUid(SMetaReader *pReader, tb_uid_t uid); int32_t metaReaderGetTableEntryByUidCache(SMetaReader *pReader, tb_uid_t uid); int32_t metaGetTableTags(void *pVnode, uint64_t suid, SArray *uidList); -int32_t metaGetTableTagsByUids(void* pVnode, int64_t suid, SArray *uidList); +int32_t metaGetTableTagsByUids(void *pVnode, int64_t suid, SArray *uidList); int32_t metaReadNext(SMetaReader *pReader); const void *metaGetTableTagVal(const void *tag, int16_t type, STagVal *tagVal); int metaGetTableNameByUid(void *meta, uint64_t uid, char *tbName); @@ -120,20 +121,20 @@ int metaGetTableSzNameByUid(void *meta, uint64_t uid, char *tbName); int metaGetTableUidByName(void *pVnode, char *tbName, uint64_t *uid); int metaGetTableTypeByName(void *meta, char *tbName, ETableType *tbType); int metaGetTableTtlByUid(void *meta, uint64_t uid, int64_t *ttlDays); -bool metaIsTableExist(void* pVnode, tb_uid_t uid); +bool metaIsTableExist(void *pVnode, tb_uid_t uid); int32_t metaGetCachedTableUidList(void *pVnode, tb_uid_t suid, const uint8_t *key, int32_t keyLen, SArray *pList, bool *acquired); int32_t metaUidFilterCachePut(void *pVnode, uint64_t suid, const void *pKey, int32_t keyLen, void *pPayload, int32_t payloadLen, double selectivityRatio); tb_uid_t metaGetTableEntryUidByName(SMeta *pMeta, const char *name); int32_t metaGetCachedTbGroup(void *pVnode, tb_uid_t suid, const uint8_t *pKey, int32_t keyLen, SArray **pList); -int32_t metaPutTbGroupToCache(void* pVnode, uint64_t suid, const void *pKey, int32_t keyLen, void *pPayload, +int32_t metaPutTbGroupToCache(void *pVnode, uint64_t suid, const void *pKey, int32_t keyLen, void *pPayload, int32_t payloadLen); bool metaTbInFilterCache(void *pVnode, tb_uid_t suid, int8_t type); int32_t metaPutTbToFilterCache(void *pVnode, tb_uid_t suid, int8_t type); int32_t metaSizeOfTbFilterCache(void *pVnode, int8_t type); -int32_t metaGetStbStats(void *pVnode, int64_t uid, int64_t *numOfTables); +int32_t metaGetStbStats(void *pVnode, int64_t uid, int64_t *numOfTables); // tsdb typedef struct STsdbReader STsdbReader; @@ -170,8 +171,8 @@ int64_t tsdbGetLastTimestamp(SVnode *pVnode, void *pTableList, int32_t numO //====================================================================================================================== int32_t tsdbReaderOpen2(void *pVnode, SQueryTableDataCond *pCond, void *pTableList, int32_t numOfTables, - SSDataBlock *pResBlock, void **ppReader, const char *idstr, bool countOnly, - SHashObj **pIgnoreTables); + SSDataBlock *pResBlock, void **ppReader, const char *idstr, bool countOnly, + SHashObj **pIgnoreTables); int32_t tsdbSetTableList2(STsdbReader *pReader, const void *pTableList, int32_t num); void tsdbReaderSetId2(STsdbReader *pReader, const char *idstr); void tsdbReaderClose2(STsdbReader *pReader); @@ -228,23 +229,23 @@ STqReader *tqReaderOpen(SVnode *pVnode); void tqReaderClose(STqReader *); void tqReaderSetColIdList(STqReader *pReader, SArray *pColIdList); -int32_t tqReaderSetTbUidList(STqReader *pReader, const SArray *tbUidList, const char* id); +int32_t tqReaderSetTbUidList(STqReader *pReader, const SArray *tbUidList, const char *id); int32_t tqReaderAddTbUidList(STqReader *pReader, const SArray *pTableUidList); int32_t tqReaderRemoveTbUidList(STqReader *pReader, const SArray *tbUidList); -bool tqReaderIsQueriedTable(STqReader* pReader, uint64_t uid); -bool tqCurrentBlockConsumed(const STqReader* pReader); +bool tqReaderIsQueriedTable(STqReader *pReader, uint64_t uid); +bool tqCurrentBlockConsumed(const STqReader *pReader); -int32_t tqReaderSeek(STqReader *pReader, int64_t ver, const char *id); -bool tqNextBlockInWal(STqReader *pReader, const char *idstr); -bool tqNextBlockImpl(STqReader *pReader, const char *idstr); -SWalReader* tqGetWalReader(STqReader* pReader); -SSDataBlock* tqGetResultBlock (STqReader* pReader); +int32_t tqReaderSeek(STqReader *pReader, int64_t ver, const char *id); +bool tqNextBlockInWal(STqReader *pReader, const char *idstr); +bool tqNextBlockImpl(STqReader *pReader, const char *idstr); +SWalReader *tqGetWalReader(STqReader *pReader); +SSDataBlock *tqGetResultBlock(STqReader *pReader); int32_t extractMsgFromWal(SWalReader *pReader, void **pItem, int64_t maxVer, const char *id); int32_t tqReaderSetSubmitMsg(STqReader *pReader, void *msgStr, int32_t msgLen, int64_t ver); bool tqNextDataBlockFilterOut(STqReader *pReader, SHashObj *filterOutUids); -int32_t tqRetrieveDataBlock(STqReader *pReader, SSDataBlock** pRes, const char* idstr); +int32_t tqRetrieveDataBlock(STqReader *pReader, SSDataBlock **pRes, const char *idstr); int32_t tqRetrieveTaosxBlock(STqReader *pReader, SArray *blocks, SArray *schemas, SSubmitTbData **pSubmitTbDataRet); int32_t vnodeEnqueueStreamMsg(SVnode *pVnode, SRpcMsg *pMsg); diff --git a/source/dnode/vnode/src/inc/vndCos.h b/source/dnode/vnode/src/inc/vndCos.h index cf2c5eb441..51d214518a 100644 --- a/source/dnode/vnode/src/inc/vndCos.h +++ b/source/dnode/vnode/src/inc/vndCos.h @@ -27,6 +27,7 @@ extern int8_t tsS3Enabled; int32_t s3Init(); void s3CleanUp(); int32_t s3PutObjectFromFile(const char *file, const char *object); +void s3DeleteObjectsByPrefix(const char *prefix); void s3DeleteObjects(const char *object_name[], int nobject); bool s3Exists(const char *object_name); bool s3Get(const char *object_name, const char *path); diff --git a/source/dnode/vnode/src/vnd/vnodeCos.c b/source/dnode/vnode/src/vnd/vnodeCos.c index 4c76538eb2..7e95a55077 100644 --- a/source/dnode/vnode/src/vnd/vnodeCos.c +++ b/source/dnode/vnode/src/vnd/vnodeCos.c @@ -85,6 +85,25 @@ int32_t s3PutObjectFromFile(const char *file_str, const char *object_str) { return code; } +void s3DeleteObjectsByPrefix(const char *prefix_str) { + cos_pool_t *p = NULL; + cos_request_options_t *options = NULL; + int is_cname = 0; + cos_string_t bucket; + cos_status_t *s = NULL; + cos_string_t prefix; + + cos_pool_create(&p, NULL); + options = cos_request_options_create(p); + s3InitRequestOptions(options, is_cname); + cos_str_set(&bucket, tsS3BucketName); + cos_str_set(&prefix, prefix_str); + + s = cos_delete_objects_by_prefix(options, &bucket, &prefix); + log_status(s); + cos_pool_destroy(p); +} + void s3DeleteObjects(const char *object_name[], int nobject) { cos_pool_t *p = NULL; int is_cname = 0; @@ -314,6 +333,7 @@ long s3Size(const char *object_name) { int32_t s3Init() { return 0; } void s3CleanUp() {} int32_t s3PutObjectFromFile(const char *file, const char *object) { return 0; } +void s3DeleteObjectsByPrefix(const char *prefix) {} void s3DeleteObjects(const char *object_name[], int nobject) {} bool s3Exists(const char *object_name) { return false; } bool s3Get(const char *object_name, const char *path) { return false; } diff --git a/source/dnode/vnode/src/vnd/vnodeOpen.c b/source/dnode/vnode/src/vnd/vnodeOpen.c index 65fc552365..c8b383a6a8 100644 --- a/source/dnode/vnode/src/vnd/vnodeOpen.c +++ b/source/dnode/vnode/src/vnd/vnodeOpen.c @@ -14,6 +14,7 @@ */ #include "vnd.h" +#include "vndCos.h" int32_t vnodeGetPrimaryDir(const char *relPath, int32_t diskPrimary, STfs *pTfs, char *buf, size_t bufLen) { if (pTfs) { @@ -100,7 +101,7 @@ int32_t vnodeAlterReplica(const char *path, SAlterVnodeReplicaReq *pReq, int32_t vInfo("vgId:%d, replica:%d ep:%s:%u dnode:%d", pReq->vgId, i, pNode->nodeFqdn, pNode->nodePort, pNode->nodeId); pCfg->replicaNum++; } - if(pReq->selfIndex != -1){ + if (pReq->selfIndex != -1) { pCfg->myIndex = pReq->selfIndex; } for (int i = pCfg->replicaNum; i < pReq->replica + pReq->learnerReplica; ++i) { @@ -114,12 +115,12 @@ int32_t vnodeAlterReplica(const char *path, SAlterVnodeReplicaReq *pReq, int32_t pCfg->totalReplicaNum++; } pCfg->totalReplicaNum += pReq->replica; - if(pReq->learnerSelfIndex != -1){ + if (pReq->learnerSelfIndex != -1) { pCfg->myIndex = pReq->replica + pReq->learnerSelfIndex; } - vInfo("vgId:%d, save config while alter, replicas:%d totalReplicas:%d selfIndex:%d", - pReq->vgId, pCfg->replicaNum, pCfg->totalReplicaNum, pCfg->myIndex); + vInfo("vgId:%d, save config while alter, replicas:%d totalReplicas:%d selfIndex:%d", pReq->vgId, pCfg->replicaNum, + pCfg->totalReplicaNum, pCfg->myIndex); info.config.syncCfg = *pCfg; ret = vnodeSaveInfo(dir, &info); @@ -293,9 +294,16 @@ int32_t vnodeRestoreVgroupId(const char *srcPath, const char *dstPath, int32_t s return dstVgId; } -void vnodeDestroy(const char *path, STfs *pTfs) { +void vnodeDestroy(int32_t vgId, const char *path, STfs *pTfs) { vInfo("path:%s is removed while destroy vnode", path); tfsRmdir(pTfs, path); + + int32_t nlevel = tfsGetLevel(pTfs); + if (vgId > 0 && nlevel > 1 && tsS3Enabled) { + char vnode_prefix[TSDB_FILENAME_LEN]; + snprintf(vnode_prefix, TSDB_FILENAME_LEN, "v%df", vgId); + s3DeleteObjectsByPrefix(vnode_prefix); + } } static int32_t vnodeCheckDisk(int32_t diskPrimary, STfs *pTfs) { @@ -497,13 +505,9 @@ void vnodeClose(SVnode *pVnode) { // start the sync timer after the queue is ready int32_t vnodeStart(SVnode *pVnode) { return vnodeSyncStart(pVnode); } -int32_t vnodeIsCatchUp(SVnode *pVnode){ - return syncIsCatchUp(pVnode->sync); -} +int32_t vnodeIsCatchUp(SVnode *pVnode) { return syncIsCatchUp(pVnode->sync); } -ESyncRole vnodeGetRole(SVnode *pVnode){ - return syncGetRole(pVnode->sync); -} +ESyncRole vnodeGetRole(SVnode *pVnode) { return syncGetRole(pVnode->sync); } void vnodeStop(SVnode *pVnode) {} -- GitLab