diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 46ad90c14c44c0e02e9ae48471dbcfb8d842141b..bbe4db89dceae7f7b9ad00af4a88af62cb79ab7b 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -1840,3 +1840,4 @@ static FORCE_INLINE void* tDecodeSMqCMGetSubEpRsp(void* buf, SMqCMGetSubEpRsp* p #endif #endif /*_TD_COMMON_TAOS_MSG_H_*/ + \ No newline at end of file diff --git a/include/libs/catalog/catalog.h b/include/libs/catalog/catalog.h index ab1298785ae8030c43566e3fcd7dcc4d600c27e1..b6054c02b4ae54927c5f0e7ff1d41adc33243e6b 100644 --- a/include/libs/catalog/catalog.h +++ b/include/libs/catalog/catalog.h @@ -30,7 +30,7 @@ extern "C" { #include "tmsg.h" #include "transport.h" -struct SCatalog; +typedef struct SCatalog SCatalog; enum { CTG_DBG_DB_NUM = 1, @@ -64,6 +64,7 @@ typedef struct SCatalogCfg { typedef struct SSTableMetaVersion { char dbFName[TSDB_DB_FNAME_LEN]; char stbName[TSDB_TABLE_NAME_LEN]; + uint64_t dbId; uint64_t suid; int16_t sversion; int16_t tversion; @@ -84,7 +85,7 @@ int32_t catalogInit(SCatalogCfg *cfg); * @param catalogHandle (output, NO need to free it) * @return error code */ -int32_t catalogGetHandle(uint64_t clusterId, struct SCatalog** catalogHandle); +int32_t catalogGetHandle(uint64_t clusterId, SCatalog** catalogHandle); /** * Free a cluster's all catalog info, usually it's not necessary, until the application is closing. @@ -92,9 +93,9 @@ int32_t catalogGetHandle(uint64_t clusterId, struct SCatalog** catalogHandle); * @param pCatalog (input, NO more usage) * @return error code */ -void catalogFreeHandle(struct SCatalog* pCatalog); +void catalogFreeHandle(SCatalog* pCatalog); -int32_t catalogGetDBVgroupVersion(struct SCatalog* pCatalog, const char* dbName, int32_t* version); +int32_t catalogGetDBVgVersion(SCatalog* pCatalog, const char* dbName, int32_t* version); /** * Get a DB's all vgroup info. @@ -106,13 +107,13 @@ int32_t catalogGetDBVgroupVersion(struct SCatalog* pCatalog, const char* dbName, * @param pVgroupList (output, vgroup info list, element is SVgroupInfo, NEED to simply free the array by caller) * @return error code */ -int32_t catalogGetDBVgroup(struct SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const char* pDBName, bool forceUpdate, SArray** pVgroupList); +int32_t catalogGetDBVgInfo(SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const char* pDBName, bool forceUpdate, SArray** pVgroupList); -int32_t catalogUpdateDBVgroup(struct SCatalog* pCatalog, const char* dbName, uint64_t dbId, SDBVgroupInfo* dbInfo); +int32_t catalogUpdateDBVgInfo(SCatalog* pCatalog, const char* dbName, uint64_t dbId, SDBVgInfo* dbInfo); -int32_t catalogRemoveDB(struct SCatalog* pCatalog, const char* dbName, uint64_t dbId); +int32_t catalogRemoveDB(SCatalog* pCatalog, const char* dbName, uint64_t dbId); -int32_t catalogRemoveSTableMeta(struct SCatalog* pCatalog, const char* dbName, const char* stbName, uint64_t suid); +int32_t catalogRemoveStbMeta(SCatalog* pCtg, const char* dbFName, uint64_t dbId, const char* stbName, uint64_t suid); /** * Get a table's meta data. @@ -123,7 +124,7 @@ int32_t catalogRemoveSTableMeta(struct SCatalog* pCatalog, const char* dbName, c * @param pTableMeta(output, table meta data, NEED to free it by calller) * @return error code */ -int32_t catalogGetTableMeta(struct SCatalog* pCatalog, void * pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, STableMeta** pTableMeta); +int32_t catalogGetTableMeta(SCatalog* pCatalog, void * pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, STableMeta** pTableMeta); /** * Get a super table's meta data. @@ -134,13 +135,13 @@ int32_t catalogGetTableMeta(struct SCatalog* pCatalog, void * pTransporter, cons * @param pTableMeta(output, table meta data, NEED to free it by calller) * @return error code */ -int32_t catalogGetSTableMeta(struct SCatalog* pCatalog, void * pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, STableMeta** pTableMeta); +int32_t catalogGetSTableMeta(SCatalog* pCatalog, void * pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, STableMeta** pTableMeta); -int32_t catalogUpdateSTableMeta(struct SCatalog* pCatalog, STableMetaRsp *rspMsg); +int32_t catalogUpdateSTableMeta(SCatalog* pCatalog, STableMetaRsp *rspMsg); /** - * Force renew a table's local cached meta data. + * Force refresh a table's local cached meta data. * @param pCatalog (input, got with catalogGetHandle) * @param pTransporter (input, rpc object) * @param pMgmtEps (input, mnode EPs) @@ -148,10 +149,10 @@ int32_t catalogUpdateSTableMeta(struct SCatalog* pCatalog, STableMetaRsp *rspMsg * @param isSTable (input, is super table or not, 1:supposed to be stable, 0: supposed not to be stable, -1:not sure) * @return error code */ - int32_t catalogRenewTableMeta(struct SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, int32_t isSTable); + int32_t catalogRefreshTableMeta(SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, int32_t isSTable); /** - * Force renew a table's local cached meta data and get the new one. + * Force refresh a table's local cached meta data and get the new one. * @param pCatalog (input, got with catalogGetHandle) * @param pTransporter (input, rpc object) * @param pMgmtEps (input, mnode EPs) @@ -160,7 +161,7 @@ int32_t catalogUpdateSTableMeta(struct SCatalog* pCatalog, STableMetaRsp *rspMsg * @param isSTable (input, is super table or not, 1:supposed to be stable, 0: supposed not to be stable, -1:not sure) * @return error code */ - int32_t catalogRenewAndGetTableMeta(struct SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, STableMeta** pTableMeta, int32_t isSTable); + int32_t catalogRefreshGetTableMeta(SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, STableMeta** pTableMeta, int32_t isSTable); @@ -173,7 +174,7 @@ int32_t catalogUpdateSTableMeta(struct SCatalog* pCatalog, STableMetaRsp *rspMsg * @param pVgroupList (output, vgroup info list, element is SVgroupInfo, NEED to simply free the array by caller) * @return error code */ -int32_t catalogGetTableDistVgroup(struct SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, SArray** pVgroupList); +int32_t catalogGetTableDistVgInfo(SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, SArray** pVgroupList); /** * Get a table's vgroup from its name's hash value. @@ -184,7 +185,7 @@ int32_t catalogGetTableDistVgroup(struct SCatalog* pCatalog, void *pTransporter, * @param vgInfo (output, vgroup info) * @return error code */ -int32_t catalogGetTableHashVgroup(struct SCatalog* pCatalog, void * pTransporter, const SEpSet* pMgmtEps, const SName* pName, SVgroupInfo* vgInfo); +int32_t catalogGetTableHashVgroup(SCatalog* pCatalog, void * pTransporter, const SEpSet* pMgmtEps, const SName* pName, SVgroupInfo* vgInfo); /** @@ -196,14 +197,14 @@ int32_t catalogGetTableHashVgroup(struct SCatalog* pCatalog, void * pTransporter * @param pRsp (output, response data) * @return error code */ -int32_t catalogGetAllMeta(struct SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const SCatalogReq* pReq, SMetaData* pRsp); +int32_t catalogGetAllMeta(SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const SCatalogReq* pReq, SMetaData* pRsp); -int32_t catalogGetQnodeList(struct SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, SArray* pQnodeList); +int32_t catalogGetQnodeList(SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, SArray* pQnodeList); -int32_t catalogGetExpiredSTables(struct SCatalog* pCatalog, SSTableMetaVersion **stables, uint32_t *num); +int32_t catalogGetExpiredSTables(SCatalog* pCatalog, SSTableMetaVersion **stables, uint32_t *num); -int32_t catalogGetExpiredDBs(struct SCatalog* pCatalog, SDbVgVersion **dbs, uint32_t *num); +int32_t catalogGetExpiredDBs(SCatalog* pCatalog, SDbVgVersion **dbs, uint32_t *num); /** diff --git a/include/libs/qcom/query.h b/include/libs/qcom/query.h index 549f36a8980c5db945b2b471f94911e9c9b8d1f8..1b70c78af9d27344cf45fdb915705ad926aa21e7 100644 --- a/include/libs/qcom/query.h +++ b/include/libs/qcom/query.h @@ -80,16 +80,16 @@ typedef struct STableMeta { SSchema schema[]; } STableMeta; -typedef struct SDBVgroupInfo { +typedef struct SDBVgInfo { int32_t vgVersion; int8_t hashMethod; SHashObj *vgHash; //key:vgId, value:SVgroupInfo -} SDBVgroupInfo; +} SDBVgInfo; typedef struct SUseDbOutput { char db[TSDB_DB_FNAME_LEN]; uint64_t dbId; - SDBVgroupInfo *dbVgroup; + SDBVgInfo *dbVgroup; } SUseDbOutput; enum { diff --git a/source/client/src/clientHb.c b/source/client/src/clientHb.c index 3e1af765b0ae4e725025d651a2cb6c8360aeaf47..3f60c5c6e1cb53e52940e2472aa4cfce56a9e3e7 100644 --- a/source/client/src/clientHb.c +++ b/source/client/src/clientHb.c @@ -43,7 +43,7 @@ static int32_t hbProcessDBInfoRsp(void *value, int32_t valueLen, struct SCatalog if (rsp->vgVersion < 0) { code = catalogRemoveDB(pCatalog, rsp->db, rsp->uid); } else { - SDBVgroupInfo vgInfo = {0}; + SDBVgInfo vgInfo = {0}; vgInfo.vgVersion = rsp->vgVersion; vgInfo.hashMethod = rsp->hashMethod; vgInfo.vgHash = taosHashInit(rsp->vgNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK); @@ -68,10 +68,7 @@ static int32_t hbProcessDBInfoRsp(void *value, int32_t valueLen, struct SCatalog } } - code = catalogUpdateDBVgroup(pCatalog, rsp->db, rsp->uid, &vgInfo); - if (code) { - taosHashCleanup(vgInfo.vgHash); - } + catalogUpdateDBVgInfo(pCatalog, rsp->db, rsp->uid, &vgInfo); } if (code) { @@ -94,13 +91,14 @@ static int32_t hbProcessStbInfoRsp(void *value, int32_t valueLen, struct SCatalo rsp->numOfColumns = ntohl(rsp->numOfColumns); rsp->suid = be64toh(rsp->suid); + rsp->dbId = be64toh(rsp->dbId); if (rsp->numOfColumns < 0) { schemaNum = 0; tscDebug("hb remove stb, db:%s, stb:%s", rsp->dbFName, rsp->stbName); - catalogRemoveSTableMeta(pCatalog, rsp->dbFName, rsp->stbName, rsp->suid); + catalogRemoveStbMeta(pCatalog, rsp->dbFName, rsp->dbId, rsp->stbName, rsp->suid); } else { tscDebug("hb update stb, db:%s, stb:%s", rsp->dbFName, rsp->stbName); diff --git a/source/dnode/mgmt/impl/src/dndVnodes.c b/source/dnode/mgmt/impl/src/dndVnodes.c index c4d14ef69709c1bcb64a9378382261bf38412873..dc524ac9f5a60ecf1e85d74fad24b2c88ab7588b 100644 --- a/source/dnode/mgmt/impl/src/dndVnodes.c +++ b/source/dnode/mgmt/impl/src/dndVnodes.c @@ -382,7 +382,7 @@ static void *dnodeOpenVnodeFunc(void *param) { pMgmt->openVnodes, pMgmt->totalVnodes); dndReportStartup(pDnode, "open-vnodes", stepDesc); - SVnodeCfg cfg = {.pDnode = pDnode, .pTfs = pDnode->pTfs, .vgId = pCfg->vgId}; + SVnodeCfg cfg = {.pDnode = pDnode, .pTfs = pDnode->pTfs, .vgId = pCfg->vgId, .dbId = pCfg->dbUid}; SVnode * pImpl = vnodeOpen(pCfg->path, &cfg); if (pImpl == NULL) { dError("vgId:%d, failed to open vnode by thread:%d", pCfg->vgId, pThread->threadIndex); @@ -594,6 +594,7 @@ int32_t dndProcessCreateVnodeReq(SDnode *pDnode, SRpcMsg *pReq) { vnodeCfg.pDnode = pDnode; vnodeCfg.pTfs = pDnode->pTfs; + vnodeCfg.dbId = wrapperCfg.dbUid; SVnode *pImpl = vnodeOpen(wrapperCfg.path, &vnodeCfg); if (pImpl == NULL) { dError("vgId:%d, failed to create vnode since %s", pCreate->vgId, terrstr()); diff --git a/source/dnode/mnode/impl/src/mndStb.c b/source/dnode/mnode/impl/src/mndStb.c index 33e51da28abef66944b519c4b1938f7542ee469c..d71d22698cf7e7141bb087cb1053c92984f581a5 100644 --- a/source/dnode/mnode/impl/src/mndStb.c +++ b/source/dnode/mnode/impl/src/mndStb.c @@ -1469,3 +1469,4 @@ static void mndCancelGetNextStb(SMnode *pMnode, void *pIter) { SSdb *pSdb = pMnode->pSdb; sdbCancelFetch(pSdb, pIter); } + \ No newline at end of file diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index 75497726132b83ad2ebd58fab3c5c6b535d36771..520a4d026a2113d84a70966fb647cc85cdc01b90 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -42,6 +42,7 @@ typedef struct STqCfg { typedef struct SVnodeCfg { int32_t vgId; + uint64_t dbId; SDnode *pDnode; STfs *pTfs; uint64_t wsize; diff --git a/source/dnode/vnode/src/vnd/vnodeMain.c b/source/dnode/vnode/src/vnd/vnodeMain.c index 6bbf3b959d667a54868cca6e99c2361cf5b902dc..c748907d6c41a02ecbd2be3f70f04ab2ea9b2738 100644 --- a/source/dnode/vnode/src/vnd/vnodeMain.c +++ b/source/dnode/vnode/src/vnd/vnodeMain.c @@ -29,6 +29,7 @@ SVnode *vnodeOpen(const char *path, const SVnodeCfg *pVnodeCfg) { cfg.vgId = pVnodeCfg->vgId; cfg.pDnode = pVnodeCfg->pDnode; cfg.pTfs = pVnodeCfg->pTfs; + cfg.dbId = pVnodeCfg->dbId; } // Validate options diff --git a/source/dnode/vnode/src/vnd/vnodeQuery.c b/source/dnode/vnode/src/vnd/vnodeQuery.c index a45a9d5a7231310dae3b09fb7aafe65dc553a17f..f541834bec0c308c246e2de1839e28488a0a0cec 100644 --- a/source/dnode/vnode/src/vnd/vnodeQuery.c +++ b/source/dnode/vnode/src/vnd/vnodeQuery.c @@ -93,6 +93,7 @@ static int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg) { if (pTbCfg->type == META_CHILD_TABLE) { pStbCfg = metaGetTbInfoByUid(pVnode->pMeta, pTbCfg->ctbCfg.suid); if (pStbCfg == NULL) { + code = TSDB_CODE_VND_TB_NOT_EXIST; goto _exit; } @@ -116,9 +117,11 @@ static int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg) { msgLen = sizeof(STableMetaRsp) + sizeof(SSchema) * (nCols + nTagCols); pTbMetaMsg = (STableMetaRsp *)rpcMallocCont(msgLen); if (pTbMetaMsg == NULL) { + code = TSDB_CODE_VND_OUT_OF_MEMORY; goto _exit; } + pTbMetaMsg->dbId = htobe64(pVnode->config.dbId); memcpy(pTbMetaMsg->dbFName, pReq->dbFName, sizeof(pTbMetaMsg->dbFName)); strcpy(pTbMetaMsg->tbName, pReq->tbName); if (pTbCfg->type == META_CHILD_TABLE) { diff --git a/source/libs/catalog/inc/catalogInt.h b/source/libs/catalog/inc/catalogInt.h index 8f243743872b11ea0f44cab953901b08bc516514..6bfc8d6ca3fd3fe7679bedf04a5066c867784746 100644 --- a/source/libs/catalog/inc/catalogInt.h +++ b/source/libs/catalog/inc/catalogInt.h @@ -27,7 +27,7 @@ extern "C" { #define CTG_DEFAULT_CACHE_CLUSTER_NUMBER 6 #define CTG_DEFAULT_CACHE_VGROUP_NUMBER 100 #define CTG_DEFAULT_CACHE_DB_NUMBER 20 -#define CTG_DEFAULT_CACHE_TABLEMETA_NUMBER 10000 +#define CTG_DEFAULT_CACHE_TBLMETA_NUMBER 1000 #define CTG_DEFAULT_RENT_SECOND 10 #define CTG_DEFAULT_RENT_SLOT_SIZE 10 @@ -47,9 +47,19 @@ enum { CTG_RENT_STABLE, }; +enum { + CTG_ACT_UPDATE_VG = 0, + CTG_ACT_UPDATE_TBL, + CTG_ACT_REMOVE_DB, + CTG_ACT_REMOVE_STB, + CTG_ACT_REMOVE_TBL, + CTG_ACT_MAX +}; + typedef struct SCtgDebug { bool lockDebug; bool cacheDebug; + bool apiDebug; uint32_t showCachePeriodSec; } SCtgDebug; @@ -65,7 +75,7 @@ typedef struct SCtgDBCache { SRWLatch vgLock; uint64_t dbId; int8_t deleted; - SDBVgroupInfo *vgInfo; + SDBVgInfo *vgInfo; SCtgTbMetaCache tbCache; } SCtgDBCache; @@ -85,7 +95,6 @@ typedef struct SCtgRentMgmt { typedef struct SCatalog { uint64_t clusterId; - SRWLatch dbLock; SHashObj *dbCache; //key:dbname, value:SCtgDBCache SCtgRentMgmt dbRent; SCtgRentMgmt stbRent; @@ -109,15 +118,57 @@ typedef struct SCatalogStat { SCtgCacheStat cache; } SCatalogStat; +typedef struct SCtgUpdateVgMsg { + SCatalog* pCtg; + char dbFName[TSDB_DB_FNAME_LEN]; + uint64_t dbId; + SDBVgInfo* dbInfo; +} SCtgUpdateVgMsg; + +typedef struct SCtgUpdateTblMsg { + SCatalog* pCtg; + STableMetaOutput* output; +} SCtgUpdateTblMsg; + +typedef struct SCtgRemoveDBMsg { + SCatalog* pCtg; + char dbFName[TSDB_DB_FNAME_LEN]; + uint64_t dbId; +} SCtgRemoveDBMsg; + +typedef struct SCtgRemoveStbMsg { + SCatalog* pCtg; + char dbFName[TSDB_DB_FNAME_LEN]; + char stbName[TSDB_TABLE_NAME_LEN]; + uint64_t dbId; + uint64_t suid; +} SCtgRemoveStbMsg; + +typedef struct SCtgMetaAction { + int32_t act; + void *data; +} SCtgMetaAction; + +typedef struct SCtgQNode { + SCtgMetaAction action; + struct SCtgQNode *next; +} SCtgQNode; + typedef struct SCatalogMgmt { bool exit; SRWLatch lock; + SRWLatch qlock; + SCtgQNode *head; + SCtgQNode *tail; + tsem_t sem; + pthread_t updateThread; SHashObj *pCluster; //key: clusterId, value: SCatalog* SCatalogStat stat; SCatalogCfg cfg; } SCatalogMgmt; typedef uint32_t (*tableNameHashFp)(const char *, uint32_t); +typedef int32_t (*ctgActFunc)(SCtgMetaAction *); #define CTG_IS_META_NULL(type) ((type) == META_TYPE_NULL_TABLE) #define CTG_IS_META_CTABLE(type) ((type) == META_TYPE_CTABLE) @@ -130,17 +181,21 @@ typedef uint32_t (*tableNameHashFp)(const char *, uint32_t); #define CTG_SET_STABLE(isSTable, tbType) do { (isSTable) = ((tbType) == TSDB_SUPER_TABLE) ? 1 : ((tbType) > TSDB_SUPER_TABLE ? 0 : -1); } while (0) #define CTG_TBTYPE_MATCH(isSTable, tbType) (CTG_IS_UNKNOWN_STABLE(isSTable) || (CTG_IS_STABLE(isSTable) && (tbType) == TSDB_SUPER_TABLE) || (CTG_IS_NOT_STABLE(isSTable) && (tbType) != TSDB_SUPER_TABLE)) +#define CTG_META_SIZE(pMeta) (sizeof(STableMeta) + ((pMeta)->tableInfo.numOfTags + (pMeta)->tableInfo.numOfColumns) * sizeof(SSchema)) + #define CTG_TABLE_NOT_EXIST(code) (code == CTG_ERR_CODE_TABLE_NOT_EXIST) +#define CTG_DB_NOT_EXIST(code) (code == TSDB_CODE_MND_DB_NOT_EXIST) -#define ctgFatal(param, ...) qFatal("CTG:%p " param, pCatalog, __VA_ARGS__) -#define ctgError(param, ...) qError("CTG:%p " param, pCatalog, __VA_ARGS__) -#define ctgWarn(param, ...) qWarn("CTG:%p " param, pCatalog, __VA_ARGS__) -#define ctgInfo(param, ...) qInfo("CTG:%p " param, pCatalog, __VA_ARGS__) -#define ctgDebug(param, ...) qDebug("CTG:%p " param, pCatalog, __VA_ARGS__) -#define ctgTrace(param, ...) qTrace("CTG:%p " param, pCatalog, __VA_ARGS__) +#define ctgFatal(param, ...) qFatal("CTG:%p " param, pCtg, __VA_ARGS__) +#define ctgError(param, ...) qError("CTG:%p " param, pCtg, __VA_ARGS__) +#define ctgWarn(param, ...) qWarn("CTG:%p " param, pCtg, __VA_ARGS__) +#define ctgInfo(param, ...) qInfo("CTG:%p " param, pCtg, __VA_ARGS__) +#define ctgDebug(param, ...) qDebug("CTG:%p " param, pCtg, __VA_ARGS__) +#define ctgTrace(param, ...) qTrace("CTG:%p " param, pCtg, __VA_ARGS__) #define CTG_LOCK_DEBUG(...) do { if (gCTGDebug.lockDebug) { qDebug(__VA_ARGS__); } } while (0) #define CTG_CACHE_DEBUG(...) do { if (gCTGDebug.cacheDebug) { qDebug(__VA_ARGS__); } } while (0) +#define CTG_API_DEBUG(...) do { if (gCTGDebug.apiDebug) { qDebug(__VA_ARGS__); } } while (0) #define TD_RWLATCH_WRITE_FLAG_COPY 0x40000000 @@ -181,8 +236,8 @@ typedef uint32_t (*tableNameHashFp)(const char *, uint32_t); #define CTG_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; } return _code; } while (0) #define CTG_ERR_JRET(c) do { code = c; if (code != TSDB_CODE_SUCCESS) { terrno = code; goto _return; } } while (0) -#define CTG_API_ENTER() do { CTG_LOCK(CTG_READ, &ctgMgmt.lock); if (atomic_load_8(&ctgMgmt.exit)) { CTG_RET(TSDB_CODE_CTG_OUT_OF_SERVICE); } } while (0) -#define CTG_API_LEAVE(c) do { int32_t __code = c; CTG_UNLOCK(CTG_READ, &ctgMgmt.lock); CTG_RET(__code); } while (0) +#define CTG_API_ENTER() do { CTG_API_DEBUG("enter %s", __FUNCTION__); CTG_LOCK(CTG_READ, &ctgMgmt.lock); if (atomic_load_8(&ctgMgmt.exit)) { CTG_UNLOCK(CTG_READ, &ctgMgmt.lock); CTG_RET(TSDB_CODE_CTG_OUT_OF_SERVICE); } } while (0) +#define CTG_API_LEAVE(c) do { int32_t __code = c; CTG_UNLOCK(CTG_READ, &ctgMgmt.lock); CTG_API_DEBUG("leave %s", __FUNCTION__); CTG_RET(__code); } while (0) diff --git a/source/libs/catalog/src/catalog.c b/source/libs/catalog/src/catalog.c index 6ecff87a895cfc61b4024eb62c412c61c678fe71..1a4b498c58505ea35e834d5de2e2a22ab7fa4120 100644 --- a/source/libs/catalog/src/catalog.c +++ b/source/libs/catalog/src/catalog.c @@ -22,6 +22,8 @@ SCatalogMgmt ctgMgmt = {0}; SCtgDebug gCTGDebug = {0}; +ctgActFunc ctgActFuncs[] = {ctgActUpdateVg, ctgActUpdateTbl, ctgActRemoveDB, ctgActRemoveStb, ctgActRemoveTbl}; + int32_t ctgDbgGetTbMetaNum(SCtgDBCache *dbCache) { return dbCache->tbCache.metaCache ? (int32_t)taosHashGetSize(dbCache->tbCache.metaCache) : 0; } @@ -44,25 +46,25 @@ int32_t ctgDbgGetRentNum(SCtgRentMgmt *rent) { return num; } -int32_t ctgDbgGetClusterCacheNum(struct SCatalog* pCatalog, int32_t type) { - if (NULL == pCatalog || NULL == pCatalog->dbCache) { +int32_t ctgDbgGetClusterCacheNum(SCatalog* pCtg, int32_t type) { + if (NULL == pCtg || NULL == pCtg->dbCache) { return 0; } switch (type) { case CTG_DBG_DB_NUM: - return (int32_t)taosHashGetSize(pCatalog->dbCache); + return (int32_t)taosHashGetSize(pCtg->dbCache); case CTG_DBG_DB_RENT_NUM: - return ctgDbgGetRentNum(&pCatalog->dbRent); + return ctgDbgGetRentNum(&pCtg->dbRent); case CTG_DBG_STB_RENT_NUM: - return ctgDbgGetRentNum(&pCatalog->stbRent); + return ctgDbgGetRentNum(&pCtg->stbRent); default: break; } SCtgDBCache *dbCache = NULL; int32_t num = 0; - void *pIter = taosHashIterate(pCatalog->dbCache, NULL); + void *pIter = taosHashIterate(pCtg->dbCache, NULL); while (pIter) { dbCache = (SCtgDBCache *)pIter; switch (type) { @@ -76,7 +78,7 @@ int32_t ctgDbgGetClusterCacheNum(struct SCatalog* pCatalog, int32_t type) { ctgError("invalid type:%d", type); break; } - pIter = taosHashIterate(pCatalog->dbCache, pIter); + pIter = taosHashIterate(pCtg->dbCache, pIter); } return num; @@ -108,79 +110,18 @@ void ctgDbgShowDBCache(SHashObj *dbHash) { -void ctgDbgShowClusterCache(struct SCatalog* pCatalog) { - if (NULL == pCatalog) { +void ctgDbgShowClusterCache(SCatalog* pCtg) { + if (NULL == pCtg) { return; } - CTG_CACHE_DEBUG("## cluster %"PRIx64" %p cache Info ##", pCatalog->clusterId, pCatalog); - CTG_CACHE_DEBUG("db:%d meta:%d stb:%d dbRent:%d stbRent:%d", ctgDbgGetClusterCacheNum(pCatalog, CTG_DBG_DB_NUM), ctgDbgGetClusterCacheNum(pCatalog, CTG_DBG_META_NUM), - ctgDbgGetClusterCacheNum(pCatalog, CTG_DBG_STB_NUM), ctgDbgGetClusterCacheNum(pCatalog, CTG_DBG_DB_RENT_NUM), ctgDbgGetClusterCacheNum(pCatalog, CTG_DBG_STB_RENT_NUM)); - - ctgDbgShowDBCache(pCatalog->dbCache); -} - -int32_t ctgInitDBCache(struct SCatalog* pCatalog) { - if (NULL == pCatalog->dbCache) { - SHashObj *cache = taosHashInit(ctgMgmt.cfg.maxDBCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK); - if (NULL == cache) { - ctgError("taosHashInit %d failed", CTG_DEFAULT_CACHE_DB_NUMBER); - CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR); - } - - if (NULL != atomic_val_compare_exchange_ptr(&pCatalog->dbCache, NULL, cache)) { - taosHashCleanup(cache); - } - } - - return TSDB_CODE_SUCCESS; -} - - -int32_t ctgInitTbMetaCache(struct SCatalog* pCatalog, SCtgDBCache *dbCache) { - if (NULL == dbCache->tbCache.metaCache) { - if (dbCache->deleted) { - ctgInfo("db is dropping, dbId:%"PRIx64, dbCache->dbId); - CTG_ERR_RET(TSDB_CODE_CTG_DB_DROPPED); - } - - SHashObj *metaCache = taosHashInit(ctgMgmt.cfg.maxTblCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); - if (NULL == metaCache) { - ctgError("taosHashInit failed, num:%d", ctgMgmt.cfg.maxTblCacheNum); - CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR); - } - - if (NULL != atomic_val_compare_exchange_ptr(&dbCache->tbCache.metaCache, NULL, metaCache)) { - taosHashCleanup(metaCache); - } - } - - return TSDB_CODE_SUCCESS; -} - -int32_t ctgInitStbCache(struct SCatalog* pCatalog, SCtgDBCache *dbCache) { - if (NULL == dbCache->tbCache.stbCache) { - if (dbCache->deleted) { - ctgInfo("db is dropping, dbId:%"PRIx64, dbCache->dbId); - CTG_ERR_RET(TSDB_CODE_CTG_DB_DROPPED); - } + CTG_CACHE_DEBUG("## cluster %"PRIx64" %p cache Info ##", pCtg->clusterId, pCtg); + CTG_CACHE_DEBUG("db:%d meta:%d stb:%d dbRent:%d stbRent:%d", ctgDbgGetClusterCacheNum(pCtg, CTG_DBG_DB_NUM), ctgDbgGetClusterCacheNum(pCtg, CTG_DBG_META_NUM), + ctgDbgGetClusterCacheNum(pCtg, CTG_DBG_STB_NUM), ctgDbgGetClusterCacheNum(pCtg, CTG_DBG_DB_RENT_NUM), ctgDbgGetClusterCacheNum(pCtg, CTG_DBG_STB_RENT_NUM)); - SHashObj *cache = taosHashInit(ctgMgmt.cfg.maxTblCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), true, HASH_ENTRY_LOCK); - if (NULL == cache) { - ctgError("taosHashInit failed, num:%d", ctgMgmt.cfg.maxTblCacheNum); - CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR); - } - - if (NULL != atomic_val_compare_exchange_ptr(&dbCache->tbCache.stbCache, NULL, cache)) { - taosHashCleanup(cache); - } - } - - return TSDB_CODE_SUCCESS; + ctgDbgShowDBCache(pCtg->dbCache); } - - void ctgFreeMetaRent(SCtgRentMgmt *mgmt) { if (NULL == mgmt->slots) { return; @@ -214,89 +155,129 @@ void ctgFreeTableMetaCache(SCtgTbMetaCache *cache) { CTG_UNLOCK(CTG_WRITE, &cache->metaLock); } +void ctgFreeVgInfo(SDBVgInfo *vgInfo) { + if (NULL == vgInfo) { + return; + } + + if (vgInfo->vgHash) { + taosHashCleanup(vgInfo->vgHash); + vgInfo->vgHash = NULL; + } + + tfree(vgInfo); +} + void ctgFreeDbCache(SCtgDBCache *dbCache) { if (NULL == dbCache) { return; } - atomic_store_8(&dbCache->deleted, 1); - CTG_LOCK(CTG_WRITE, &dbCache->vgLock); - if (dbCache->vgInfo) { - - if (dbCache->vgInfo->vgHash) { - taosHashCleanup(dbCache->vgInfo->vgHash); - dbCache->vgInfo->vgHash = NULL; - } - - tfree(dbCache->vgInfo); - } + ctgFreeVgInfo (dbCache->vgInfo); CTG_UNLOCK(CTG_WRITE, &dbCache->vgLock); ctgFreeTableMetaCache(&dbCache->tbCache); } -void ctgFreeHandle(struct SCatalog* pCatalog) { - ctgFreeMetaRent(&pCatalog->dbRent); - ctgFreeMetaRent(&pCatalog->stbRent); - if (pCatalog->dbCache) { - void *pIter = taosHashIterate(pCatalog->dbCache, NULL); +void ctgFreeHandle(SCatalog* pCtg) { + ctgFreeMetaRent(&pCtg->dbRent); + ctgFreeMetaRent(&pCtg->stbRent); + + if (pCtg->dbCache) { + void *pIter = taosHashIterate(pCtg->dbCache, NULL); while (pIter) { SCtgDBCache *dbCache = pIter; + atomic_store_8(&dbCache->deleted, 1); + ctgFreeDbCache(dbCache); - - pIter = taosHashIterate(pCatalog->dbCache, pIter); + + pIter = taosHashIterate(pCtg->dbCache, pIter); } - taosHashCleanup(pCatalog->dbCache); + taosHashCleanup(pCtg->dbCache); } - free(pCatalog); + free(pCtg); } -int32_t ctgGetDBVgroupFromCache(struct SCatalog* pCatalog, const char *dbFName, SCtgDBCache **dbCache, bool *inCache) { - if (NULL == pCatalog->dbCache) { - *inCache = false; - ctgWarn("empty db cache, dbFName:%s", dbFName); + +int32_t ctgAcquireVgInfo(SCatalog *pCtg, SCtgDBCache *dbCache) { + CTG_LOCK(CTG_READ, &dbCache->vgLock); + + if (NULL == dbCache->vgInfo) { + CTG_UNLOCK(CTG_READ, &dbCache->vgLock); + + ctgDebug("db vgInfo is empty, dbId:%"PRIx64, dbCache->dbId); return TSDB_CODE_SUCCESS; } - SCtgDBCache *cache = NULL; + return TSDB_CODE_SUCCESS; +} + +int32_t ctgWAcquireVgInfo(SCatalog *pCtg, SCtgDBCache *dbCache) { + CTG_LOCK(CTG_WRITE, &dbCache->vgLock); + + if (dbCache->deleted) { + ctgDebug("db is dropping, dbId:%"PRIx64, dbCache->dbId); + CTG_UNLOCK(CTG_WRITE, &dbCache->vgLock); + CTG_ERR_RET(TSDB_CODE_CTG_DB_DROPPED); + } + + return TSDB_CODE_SUCCESS; +} - while (true) { - cache = taosHashAcquire(pCatalog->dbCache, dbFName, strlen(dbFName)); +void ctgReleaseDBCache(SCatalog *pCtg, SCtgDBCache *dbCache) { + taosHashRelease(pCtg->dbCache, dbCache); +} - if (NULL == cache) { - *inCache = false; - ctgWarn("not in db vgroup cache, dbFName:%s", dbFName); - return TSDB_CODE_SUCCESS; - } +void ctgReleaseVgInfo(SCtgDBCache *dbCache) { + CTG_UNLOCK(CTG_READ, &dbCache->vgLock); +} - CTG_LOCK(CTG_READ, &cache->vgLock); - if (NULL == cache->vgInfo) { - CTG_UNLOCK(CTG_READ, &cache->vgLock); - taosHashRelease(pCatalog->dbCache, cache); - ctgWarn("db cache vgInfo is NULL, dbFName:%s", dbFName); - - continue; - } +void ctgWReleaseVgInfo(SCtgDBCache *dbCache) { + CTG_UNLOCK(CTG_WRITE, &dbCache->vgLock); +} - break; + +int32_t ctgAcquireVgInfoFromCache(SCatalog* pCtg, const char *dbFName, SCtgDBCache **pCache, bool *inCache) { + if (NULL == pCtg->dbCache) { + *pCache = NULL; + *inCache = false; + ctgWarn("empty db cache, dbFName:%s", dbFName); + return TSDB_CODE_SUCCESS; + } + + SCtgDBCache *dbCache = NULL; + ctgAcquireDBCache(pCtg, dbFName, &dbCache); + if (NULL == dbCache) { + *pCache = NULL; + *inCache = false; + return TSDB_CODE_SUCCESS; + } + + ctgAcquireVgInfo(pCtg, dbCache); + if (NULL == dbCache->vgInfo) { + ctgReleaseDBCache(pCtg, dbCache); + + *pCache = NULL; + *inCache = false; + return TSDB_CODE_SUCCESS; } - *dbCache = cache; + *pCache = dbCache; *inCache = true; - ctgDebug("Got db vgroup from cache, dbFName:%s", dbFName); + ctgDebug("Got db vgInfo from cache, dbFName:%s", dbFName); return TSDB_CODE_SUCCESS; } -int32_t ctgGetDBVgroupFromMnode(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, SBuildUseDBInput *input, SUseDbOutput *out) { +int32_t ctgGetDBVgInfoFromMnode(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, SBuildUseDBInput *input, SUseDbOutput *out) { char *msg = NULL; int32_t msgLen = 0; @@ -318,6 +299,11 @@ int32_t ctgGetDBVgroupFromMnode(struct SCatalog* pCatalog, void *pRpc, const SEp rpcSendRecv(pRpc, (SEpSet*)pMgmtEps, &rpcMsg, &rpcRsp); if (TSDB_CODE_SUCCESS != rpcRsp.code) { + if (CTG_DB_NOT_EXIST(rpcRsp.code)) { + ctgDebug("db not exist in mnode, dbFName:%s", input->db); + return rpcRsp.code; + } + ctgError("error rsp for use db, code:%s, db:%s", tstrerror(rpcRsp.code), input->db); CTG_ERR_RET(rpcRsp.code); } @@ -331,28 +317,27 @@ int32_t ctgGetDBVgroupFromMnode(struct SCatalog* pCatalog, void *pRpc, const SEp return TSDB_CODE_SUCCESS; } -int32_t ctgIsTableMetaExistInCache(struct SCatalog* pCatalog, char *dbFName, char* tbName, int32_t *exist) { - if (NULL == pCatalog->dbCache) { +int32_t ctgIsTableMetaExistInCache(SCatalog* pCtg, char *dbFName, char* tbName, int32_t *exist) { + if (NULL == pCtg->dbCache) { *exist = 0; ctgWarn("empty db cache, dbFName:%s, tbName:%s", dbFName, tbName); return TSDB_CODE_SUCCESS; } - SCtgDBCache *dbCache = taosHashAcquire(pCatalog->dbCache, dbFName, strlen(dbFName)); + SCtgDBCache *dbCache = NULL; + ctgAcquireDBCache(pCtg, dbFName, &dbCache); if (NULL == dbCache) { *exist = 0; - ctgWarn("db not exist in cache, dbFName:%s", dbFName); return TSDB_CODE_SUCCESS; } - size_t sz = 0; CTG_LOCK(CTG_READ, &dbCache->tbCache.metaLock); STableMeta *tbMeta = taosHashGet(dbCache->tbCache.metaCache, tbName, strlen(tbName)); CTG_UNLOCK(CTG_READ, &dbCache->tbCache.metaLock); if (NULL == tbMeta) { - taosHashRelease(pCatalog->dbCache, dbCache); + taosHashRelease(pCtg->dbCache, dbCache); *exist = 0; ctgDebug("tbmeta not in cache, dbFName:%s, tbName:%s", dbFName, tbName); @@ -361,7 +346,7 @@ int32_t ctgIsTableMetaExistInCache(struct SCatalog* pCatalog, char *dbFName, cha *exist = 1; - taosHashRelease(pCatalog->dbCache, dbCache); + taosHashRelease(pCtg->dbCache, dbCache); ctgDebug("tbmeta is in cache, dbFName:%s, tbName:%s", dbFName, tbName); @@ -369,29 +354,23 @@ int32_t ctgIsTableMetaExistInCache(struct SCatalog* pCatalog, char *dbFName, cha } -int32_t ctgGetTableMetaFromCache(struct SCatalog* pCatalog, const SName* pTableName, STableMeta** pTableMeta, int32_t *exist) { - if (NULL == pCatalog->dbCache) { +int32_t ctgGetTableMetaFromCache(SCatalog* pCtg, const SName* pTableName, STableMeta** pTableMeta, int32_t *exist) { + if (NULL == pCtg->dbCache) { *exist = 0; ctgWarn("empty tbmeta cache, tbName:%s", pTableName->tname); return TSDB_CODE_SUCCESS; } - char db[TSDB_DB_FNAME_LEN] = {0}; - tNameGetFullDbName(pTableName, db); + char dbFName[TSDB_DB_FNAME_LEN] = {0}; + tNameGetFullDbName(pTableName, dbFName); *pTableMeta = NULL; - SCtgDBCache *dbCache = taosHashAcquire(pCatalog->dbCache, db, strlen(db)); + SCtgDBCache *dbCache = NULL; + ctgAcquireDBCache(pCtg, dbFName, &dbCache); if (NULL == dbCache) { *exist = 0; - ctgWarn("no db cache, dbFName:%s, tbName:%s", db, pTableName->tname); - return TSDB_CODE_SUCCESS; - } - - if (NULL == dbCache->tbCache.metaCache) { - *exist = 0; - taosHashRelease(pCatalog->dbCache, dbCache); - ctgWarn("empty tbmeta cache, dbFName:%s, tbName:%s", db, pTableName->tname); + ctgWarn("no db cache, dbFName:%s, tbName:%s", dbFName, pTableName->tname); return TSDB_CODE_SUCCESS; } @@ -402,8 +381,8 @@ int32_t ctgGetTableMetaFromCache(struct SCatalog* pCatalog, const SName* pTableN if (NULL == *pTableMeta) { *exist = 0; - taosHashRelease(pCatalog->dbCache, dbCache); - ctgDebug("tbmeta not in cache, dbFName:%s, tbName:%s", db, pTableName->tname); + ctgReleaseDBCache(pCtg, dbCache); + ctgDebug("tbl not in cache, dbFName:%s, tbName:%s", dbFName, pTableName->tname); return TSDB_CODE_SUCCESS; } @@ -412,8 +391,8 @@ int32_t ctgGetTableMetaFromCache(struct SCatalog* pCatalog, const SName* pTableN tbMeta = *pTableMeta; if (tbMeta->tableType != TSDB_CHILD_TABLE) { - taosHashRelease(pCatalog->dbCache, dbCache); - ctgDebug("Got tbmeta from cache, type:%d, dbFName:%s, tbName:%s", tbMeta->tableType, db, pTableName->tname); + ctgReleaseDBCache(pCtg, dbCache); + ctgDebug("Got tbl from cache, type:%d, dbFName:%s, tbName:%s", tbMeta->tableType, dbFName, pTableName->tname); return TSDB_CODE_SUCCESS; } @@ -422,8 +401,8 @@ int32_t ctgGetTableMetaFromCache(struct SCatalog* pCatalog, const SName* pTableN STableMeta **stbMeta = taosHashGet(dbCache->tbCache.stbCache, &tbMeta->suid, sizeof(tbMeta->suid)); if (NULL == stbMeta || NULL == *stbMeta) { CTG_UNLOCK(CTG_READ, &dbCache->tbCache.stbLock); - taosHashRelease(pCatalog->dbCache, dbCache); - ctgError("stable not in stbCache, suid:%"PRIx64, tbMeta->suid); + ctgReleaseDBCache(pCtg, dbCache); + ctgError("stb not in stbCache, suid:%"PRIx64, tbMeta->suid); tfree(*pTableMeta); *exist = 0; return TSDB_CODE_SUCCESS; @@ -431,17 +410,17 @@ int32_t ctgGetTableMetaFromCache(struct SCatalog* pCatalog, const SName* pTableN if ((*stbMeta)->suid != tbMeta->suid) { CTG_UNLOCK(CTG_READ, &dbCache->tbCache.stbLock); - taosHashRelease(pCatalog->dbCache, dbCache); + ctgReleaseDBCache(pCtg, dbCache); tfree(*pTableMeta); ctgError("stable suid in stbCache mis-match, expected suid:%"PRIx64 ",actual suid:%"PRIx64, tbMeta->suid, (*stbMeta)->suid); CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR); } - int32_t metaSize = sizeof(STableMeta) + ((*stbMeta)->tableInfo.numOfTags + (*stbMeta)->tableInfo.numOfColumns) * sizeof(SSchema); + int32_t metaSize = CTG_META_SIZE(*stbMeta); *pTableMeta = realloc(*pTableMeta, metaSize); if (NULL == *pTableMeta) { CTG_UNLOCK(CTG_READ, &dbCache->tbCache.stbLock); - taosHashRelease(pCatalog->dbCache, dbCache); + ctgReleaseDBCache(pCtg, dbCache); ctgError("realloc size[%d] failed", metaSize); CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR); } @@ -450,15 +429,15 @@ int32_t ctgGetTableMetaFromCache(struct SCatalog* pCatalog, const SName* pTableN CTG_UNLOCK(CTG_READ, &dbCache->tbCache.stbLock); - taosHashRelease(pCatalog->dbCache, dbCache); + ctgReleaseDBCache(pCtg, dbCache); - ctgDebug("Got tbmeta from cache, dbFName:%s, tbName:%s", db, pTableName->tname); + ctgDebug("Got tbmeta from cache, dbFName:%s, tbName:%s", dbFName, pTableName->tname); return TSDB_CODE_SUCCESS; } -int32_t ctgGetTableTypeFromCache(struct SCatalog* pCatalog, const SName* pTableName, int32_t *tbType) { - if (NULL == pCatalog->dbCache) { +int32_t ctgGetTableTypeFromCache(SCatalog* pCtg, const SName* pTableName, int32_t *tbType) { + if (NULL == pCtg->dbCache) { ctgWarn("empty db cache, tbName:%s", pTableName->tname); return TSDB_CODE_SUCCESS; } @@ -466,7 +445,7 @@ int32_t ctgGetTableTypeFromCache(struct SCatalog* pCatalog, const SName* pTableN char dbName[TSDB_DB_FNAME_LEN] = {0}; tNameGetFullDbName(pTableName, dbName); - SCtgDBCache *dbCache = taosHashAcquire(pCatalog->dbCache, dbName, strlen(dbName)); + SCtgDBCache *dbCache = taosHashAcquire(pCtg->dbCache, dbName, strlen(dbName)); if (NULL == dbCache) { ctgInfo("db not in cache, dbFName:%s", dbName); return TSDB_CODE_SUCCESS; @@ -474,26 +453,29 @@ int32_t ctgGetTableTypeFromCache(struct SCatalog* pCatalog, const SName* pTableN CTG_LOCK(CTG_READ, &dbCache->tbCache.metaLock); STableMeta *pTableMeta = (STableMeta *)taosHashAcquire(dbCache->tbCache.metaCache, pTableName->tname, strlen(pTableName->tname)); - CTG_UNLOCK(CTG_READ, &dbCache->tbCache.metaLock); if (NULL == pTableMeta) { - ctgWarn("tbmeta not in cache, dbFName:%s, tbName:%s", dbName, pTableName->tname); - taosHashRelease(pCatalog->dbCache, dbCache); + CTG_UNLOCK(CTG_READ, &dbCache->tbCache.metaLock); + ctgWarn("tbl not in cache, dbFName:%s, tbName:%s", dbName, pTableName->tname); + taosHashRelease(pCtg->dbCache, dbCache); return TSDB_CODE_SUCCESS; } *tbType = atomic_load_8(&pTableMeta->tableType); - taosHashRelease(dbCache->tbCache.metaCache, dbCache); - taosHashRelease(pCatalog->dbCache, dbCache); + taosHashRelease(dbCache->tbCache.metaCache, pTableMeta); + + CTG_UNLOCK(CTG_READ, &dbCache->tbCache.metaLock); + + taosHashRelease(pCtg->dbCache, dbCache); ctgDebug("Got tbtype from cache, dbFName:%s, tbName:%s, type:%d", dbName, pTableName->tname, *tbType); return TSDB_CODE_SUCCESS; } -int32_t ctgGetTableMetaFromMnodeImpl(struct SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, char *dbFName, char* tbName, STableMetaOutput* output) { +int32_t ctgGetTableMetaFromMnodeImpl(SCatalog* pCtg, void *pTransporter, const SEpSet* pMgmtEps, char *dbFName, char* tbName, STableMetaOutput* output) { SBuildTableMetaInput bInput = {.vgId = 0, .dbFName = dbFName, .tbName = tbName}; char *msg = NULL; SEpSet *pVnodeEpSet = NULL; @@ -539,15 +521,15 @@ int32_t ctgGetTableMetaFromMnodeImpl(struct SCatalog* pCatalog, void *pTransport return TSDB_CODE_SUCCESS; } -int32_t ctgGetTableMetaFromMnode(struct SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, STableMetaOutput* output) { +int32_t ctgGetTableMetaFromMnode(SCatalog* pCtg, void *pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, STableMetaOutput* output) { char dbFName[TSDB_DB_FNAME_LEN]; tNameGetFullDbName(pTableName, dbFName); - return ctgGetTableMetaFromMnodeImpl(pCatalog, pTransporter, pMgmtEps, dbFName, (char *)pTableName->tname, output); + return ctgGetTableMetaFromMnodeImpl(pCtg, pTransporter, pMgmtEps, dbFName, (char *)pTableName->tname, output); } -int32_t ctgGetTableMetaFromVnode(struct SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, SVgroupInfo *vgroupInfo, STableMetaOutput* output) { - if (NULL == pCatalog || NULL == pTransporter || NULL == pMgmtEps || NULL == pTableName || NULL == vgroupInfo || NULL == output) { +int32_t ctgGetTableMetaFromVnode(SCatalog* pCtg, void *pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, SVgroupInfo *vgroupInfo, STableMetaOutput* output) { + if (NULL == pCtg || NULL == pTransporter || NULL == pMgmtEps || NULL == pTableName || NULL == vgroupInfo || NULL == output) { CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT); } @@ -607,12 +589,12 @@ int32_t ctgGetHashFunction(int8_t hashMethod, tableNameHashFp *fp) { return TSDB_CODE_SUCCESS; } -int32_t ctgGetVgInfoFromDB(struct SCatalog *pCatalog, void *pRpc, const SEpSet *pMgmtEps, SDBVgroupInfo *dbInfo, SArray** vgroupList) { +int32_t ctgGenerateVgList(SCatalog *pCtg, SHashObj *vgHash, SArray** pList) { SHashObj *vgroupHash = NULL; SVgroupInfo *vgInfo = NULL; SArray *vgList = NULL; int32_t code = 0; - int32_t vgNum = taosHashGetSize(dbInfo->vgHash); + int32_t vgNum = taosHashGetSize(vgHash); vgList = taosArrayInit(vgNum, sizeof(SVgroupInfo)); if (NULL == vgList) { @@ -620,23 +602,23 @@ int32_t ctgGetVgInfoFromDB(struct SCatalog *pCatalog, void *pRpc, const SEpSet * CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR); } - void *pIter = taosHashIterate(dbInfo->vgHash, NULL); + void *pIter = taosHashIterate(vgHash, NULL); while (pIter) { vgInfo = pIter; if (NULL == taosArrayPush(vgList, vgInfo)) { ctgError("taosArrayPush failed, vgId:%d", vgInfo->vgId); + taosHashCancelIterate(vgHash, pIter); CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR); } - pIter = taosHashIterate(dbInfo->vgHash, pIter); + pIter = taosHashIterate(vgHash, pIter); vgInfo = NULL; } - *vgroupList = vgList; - vgList = NULL; + *pList = vgList; - ctgDebug("Got vg list from DB, vgNum:%d", vgNum); + ctgDebug("Got vgList from cache, vgNum:%d", vgNum); return TSDB_CODE_SUCCESS; @@ -649,7 +631,7 @@ _return: CTG_RET(code); } -int32_t ctgGetVgInfoFromHashValue(struct SCatalog *pCatalog, SDBVgroupInfo *dbInfo, const SName *pTableName, SVgroupInfo *pVgroup) { +int32_t ctgGetVgInfoFromHashValue(SCatalog *pCtg, SDBVgInfo *dbInfo, const SName *pTableName, SVgroupInfo *pVgroup) { int32_t code = 0; int32_t vgNum = taosHashGetSize(dbInfo->vgHash); @@ -693,7 +675,6 @@ int32_t ctgGetVgInfoFromHashValue(struct SCatalog *pCatalog, SDBVgroupInfo *dbIn CTG_RET(code); } -#if 1 int32_t ctgSTableVersionCompare(const void* key1, const void* key2) { if (*(uint64_t *)key1 < ((SSTableMetaVersion*)key2)->suid) { return -1; @@ -713,29 +694,6 @@ int32_t ctgDbVgVersionCompare(const void* key1, const void* key2) { return 0; } } -#else - -int32_t ctgSTableVersionCompare(const void* key1, const void* key2) { - if (((SSTableMetaVersion*)key1)->suid < ((SSTableMetaVersion*)key2)->suid) { - return -1; - } else if (((SSTableMetaVersion*)key1)->suid > ((SSTableMetaVersion*)key2)->suid) { - return 1; - } else { - return 0; - } -} - -int32_t ctgDbVgVersionCompare(const void* key1, const void* key2) { - if (((SDbVgVersion*)key1)->dbId < ((SDbVgVersion*)key2)->dbId) { - return -1; - } else if (((SDbVgVersion*)key1)->dbId > ((SDbVgVersion*)key2)->dbId) { - return 1; - } else { - return 0; - } -} - -#endif int32_t ctgMetaRentInit(SCtgRentMgmt *mgmt, uint32_t rentSec, int8_t type) { mgmt->slotRIdx = 0; @@ -932,38 +890,55 @@ int32_t ctgMetaRentGet(SCtgRentMgmt *mgmt, void **res, uint32_t *num, int32_t si return TSDB_CODE_SUCCESS; } -int32_t ctgAddDBCache(struct SCatalog *pCatalog, const char *dbFName, SCtgDBCache *dbCache) { +int32_t ctgAddNewDBCache(SCatalog *pCtg, const char *dbFName, uint64_t dbId) { int32_t code = 0; - code = taosHashPut(pCatalog->dbCache, dbFName, strlen(dbFName), dbCache, sizeof(SCtgDBCache)); + SCtgDBCache newDBCache = {0}; + newDBCache.dbId = dbId; + + newDBCache.tbCache.metaCache = taosHashInit(ctgMgmt.cfg.maxTblCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); + if (NULL == newDBCache.tbCache.metaCache) { + ctgError("taosHashInit %d metaCache failed", ctgMgmt.cfg.maxTblCacheNum); + CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR); + } + + newDBCache.tbCache.stbCache = taosHashInit(ctgMgmt.cfg.maxTblCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), true, HASH_ENTRY_LOCK); + if (NULL == newDBCache.tbCache.stbCache) { + ctgError("taosHashInit %d stbCache failed", ctgMgmt.cfg.maxTblCacheNum); + CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR); + } + + code = taosHashPut(pCtg->dbCache, dbFName, strlen(dbFName), &newDBCache, sizeof(SCtgDBCache)); if (code) { if (HASH_NODE_EXIST(code)) { ctgDebug("db already in cache, dbFName:%s", dbFName); - return TSDB_CODE_SUCCESS; + goto _return; } ctgError("taosHashPut db to cache failed, dbFName:%s", dbFName); CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR); } - SDbVgVersion vgVersion = {.dbId = dbCache->dbId, .vgVersion = dbCache->vgInfo ? dbCache->vgInfo->vgVersion : -1}; + SDbVgVersion vgVersion = {.dbId = newDBCache.dbId, .vgVersion = -1}; strncpy(vgVersion.dbFName, dbFName, sizeof(vgVersion.dbFName)); - ctgDebug("dbCache added, dbFName:%s, vgVersion:%d, dbId:%"PRIx64, dbFName, vgVersion.vgVersion, dbCache->dbId); + ctgDebug("db added to cache, dbFName:%s, dbId:%"PRIx64, dbFName, dbId); - CTG_ERR_JRET(ctgMetaRentAdd(&pCatalog->dbRent, &vgVersion, vgVersion.dbId, sizeof(SDbVgVersion))); + CTG_ERR_RET(ctgMetaRentAdd(&pCtg->dbRent, &vgVersion, dbId, sizeof(SDbVgVersion))); + + ctgDebug("db added to rent, dbFName:%s, vgVersion:%d, dbId:%"PRIx64, dbFName, vgVersion.vgVersion, dbId); return TSDB_CODE_SUCCESS; _return: - ctgFreeDbCache(dbCache); + ctgFreeDbCache(&newDBCache); CTG_RET(code); } -void ctgRemoveAndFreeTableMeta(struct SCatalog* pCatalog, SCtgTbMetaCache *cache) { +void ctgRemoveStbRent(SCatalog* pCtg, SCtgTbMetaCache *cache) { CTG_LOCK(CTG_WRITE, &cache->stbLock); if (cache->stbCache) { void *pIter = taosHashIterate(cache->stbCache, NULL); @@ -971,7 +946,7 @@ void ctgRemoveAndFreeTableMeta(struct SCatalog* pCatalog, SCtgTbMetaCache *cache uint64_t *suid = NULL; taosHashGetKey(pIter, (void **)&suid, NULL); - if (TSDB_CODE_SUCCESS == ctgMetaRentRemove(&pCatalog->stbRent, *suid, ctgSTableVersionCompare)) { + if (TSDB_CODE_SUCCESS == ctgMetaRentRemove(&pCtg->stbRent, *suid, ctgSTableVersionCompare)) { ctgDebug("stb removed from rent, suid:%"PRIx64, *suid); } @@ -979,232 +954,246 @@ void ctgRemoveAndFreeTableMeta(struct SCatalog* pCatalog, SCtgTbMetaCache *cache } } CTG_UNLOCK(CTG_WRITE, &cache->stbLock); - - ctgFreeTableMetaCache(cache); } -int32_t ctgValidateAndRemoveDb(struct SCatalog* pCatalog, SCtgDBCache *dbCache, const char* dbFName) { - if (taosHashRemove(pCatalog->dbCache, dbFName, strlen(dbFName))) { +int32_t ctgRemoveDB(SCatalog* pCtg, SCtgDBCache *dbCache, const char* dbFName) { + atomic_store_8(&dbCache->deleted, 1); + + ctgRemoveStbRent(pCtg, &dbCache->tbCache); + + ctgFreeDbCache(dbCache); + + ctgInfo("db removed from cache, dbFName:%s, dbId:%"PRIx64, dbFName, dbCache->dbId); + + CTG_ERR_RET(ctgMetaRentRemove(&pCtg->dbRent, dbCache->dbId, ctgDbVgVersionCompare)); + + ctgDebug("db removed from rent, dbFName:%s, dbId:%"PRIx64, dbFName, dbCache->dbId); + + if (taosHashRemove(pCtg->dbCache, dbFName, strlen(dbFName))) { ctgInfo("taosHashRemove from dbCache failed, may be removed, dbFName:%s", dbFName); CTG_ERR_RET(TSDB_CODE_CTG_DB_DROPPED); } + + return TSDB_CODE_SUCCESS; +} - atomic_store_8(&dbCache->deleted, 1); +int32_t ctgAcquireDBCacheImpl(SCatalog* pCtg, const char *dbFName, SCtgDBCache **pCache, bool acquire) { + SCtgDBCache *dbCache = NULL; + if (acquire) { + dbCache = (SCtgDBCache *)taosHashAcquire(pCtg->dbCache, dbFName, strlen(dbFName)); + } else { + dbCache = (SCtgDBCache *)taosHashGet(pCtg->dbCache, dbFName, strlen(dbFName)); + } - CTG_LOCK(CTG_WRITE, &dbCache->vgLock); - if (dbCache->vgInfo) { - ctgInfo("cleanup db vgInfo, dbFName:%s, dbId:%"PRIx64, dbFName, dbCache->dbId); - - if (dbCache->vgInfo->vgHash) { - taosHashCleanup(dbCache->vgInfo->vgHash); - } + if (NULL == dbCache) { + *pCache = NULL; + ctgDebug("db not in cache, dbFName:%s", dbFName); + return TSDB_CODE_SUCCESS; + } + + if (dbCache->deleted) { + if (acquire) { + ctgReleaseDBCache(pCtg, dbCache); + } - tfree(dbCache->vgInfo); + *pCache = NULL; + ctgDebug("db is removing from cache, dbFName:%s", dbFName); + return TSDB_CODE_SUCCESS; } - CTG_UNLOCK(CTG_WRITE, &dbCache->vgLock); - ctgRemoveAndFreeTableMeta(pCatalog, &dbCache->tbCache); + *pCache = dbCache; + + return TSDB_CODE_SUCCESS; +} - ctgInfo("db removed from cache, dbFName:%s, uid:%"PRIx64, dbFName, dbCache->dbId); +int32_t ctgAcquireDBCache(SCatalog* pCtg, const char *dbFName, SCtgDBCache **pCache) { + CTG_RET(ctgAcquireDBCacheImpl(pCtg, dbFName, pCache, true)); +} - CTG_ERR_RET(ctgMetaRentRemove(&pCatalog->dbRent, dbCache->dbId, ctgDbVgVersionCompare)); - - ctgDebug("db removed from rent, dbFName:%s, uid:%"PRIx64, dbFName, dbCache->dbId); - - return TSDB_CODE_SUCCESS; +int32_t ctgGetDBCache(SCatalog* pCtg, const char *dbFName, SCtgDBCache **pCache) { + CTG_RET(ctgAcquireDBCacheImpl(pCtg, dbFName, pCache, false)); } -int32_t ctgAcquireDBCache(struct SCatalog* pCatalog, const char *dbFName, uint64_t dbId, SCtgDBCache **pCache) { +int32_t ctgGetAddDBCache(SCatalog* pCtg, const char *dbFName, uint64_t dbId, SCtgDBCache **pCache) { int32_t code = 0; SCtgDBCache *dbCache = NULL; - - CTG_LOCK(CTG_WRITE, &pCatalog->dbLock); + ctgGetDBCache(pCtg, dbFName, &dbCache); - while (true) { - dbCache = (SCtgDBCache *)taosHashAcquire(pCatalog->dbCache, dbFName, strlen(dbFName)); - if (dbCache) { - // TODO OPEN IT + if (dbCache) { + // TODO OPEN IT #if 0 - if (dbCache->dbId == dbId) { - *pCache = dbCache; - return TSDB_CODE_SUCCESS; - } + if (dbCache->dbId == dbId) { + *pCache = dbCache; + return TSDB_CODE_SUCCESS; + } #else - if (0 == dbId) { - *pCache = dbCache; - return TSDB_CODE_SUCCESS; - } - - if (dbId && (dbCache->dbId == 0)) { - dbCache->dbId = dbId; - *pCache = dbCache; - return TSDB_CODE_SUCCESS; - } - - if (dbCache->dbId == dbId) { - *pCache = dbCache; - return TSDB_CODE_SUCCESS; - } -#endif - code = ctgValidateAndRemoveDb(pCatalog, dbCache, dbFName); - taosHashRelease(pCatalog->dbCache, dbCache); - dbCache = NULL; - if (code) { - if (TSDB_CODE_CTG_DB_DROPPED == code) { - continue; - } - - CTG_ERR_JRET(code); - } + if (0 == dbId) { + *pCache = dbCache; + return TSDB_CODE_SUCCESS; } - SCtgDBCache newDBCache = {0}; - newDBCache.dbId = dbId; + if (dbId && (dbCache->dbId == 0)) { + dbCache->dbId = dbId; + *pCache = dbCache; + return TSDB_CODE_SUCCESS; + } - CTG_ERR_JRET(ctgAddDBCache(pCatalog, dbFName, &newDBCache)); + if (dbCache->dbId == dbId) { + *pCache = dbCache; + return TSDB_CODE_SUCCESS; + } +#endif + CTG_ERR_RET(ctgRemoveDB(pCtg, dbCache, dbFName)); } + + CTG_ERR_RET(ctgAddNewDBCache(pCtg, dbFName, dbId)); -_return: + ctgGetDBCache(pCtg, dbFName, &dbCache); - if (dbCache) { - taosHashRelease(pCatalog->dbCache, dbCache); - } + *pCache = dbCache; - CTG_UNLOCK(CTG_WRITE, &pCatalog->dbLock); - - CTG_RET(code); + return TSDB_CODE_SUCCESS; } - -int32_t ctgUpdateTbMetaImpl(struct SCatalog *pCatalog, SCtgTbMetaCache *tbCache, char *dbFName, char *tbName, STableMeta *meta, int32_t metaSize) { - CTG_LOCK(CTG_READ, &tbCache->metaLock); - if (taosHashPut(tbCache->metaCache, tbName, strlen(tbName), meta, metaSize) != 0) { - CTG_UNLOCK(CTG_READ, &tbCache->metaLock); - ctgError("taosHashPut tbmeta to cache failed, dbFName:%s, tbName:%s, tbType:%d", dbFName, tbName, meta->tableType); +int32_t ctgUpdateDBVgInfo(SCatalog* pCtg, const char* dbFName, uint64_t dbId, SDBVgInfo** pDbInfo) { + int32_t code = 0; + SDBVgInfo* dbInfo = *pDbInfo; + + if (NULL == dbInfo->vgHash || dbInfo->vgVersion < 0 || taosHashGetSize(dbInfo->vgHash) <= 0) { + ctgError("invalid db vgInfo, dbFName:%s, vgHash:%p, vgVersion:%d", dbFName, dbInfo->vgHash, dbInfo->vgVersion); CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR); } - CTG_UNLOCK(CTG_READ, &tbCache->metaLock); - - ctgDebug("tbmeta updated to cache, dbFName:%s, tbName:%s, tbType:%d", dbFName, tbName, meta->tableType); - return TSDB_CODE_SUCCESS; -} - -int32_t ctgUpdateStbMetaImpl(struct SCatalog *pCatalog, SCtgTbMetaCache *tbCache, char *dbFName, char *tbName, STableMeta *meta, int32_t metaSize) { bool newAdded = false; - int32_t code = 0; - SSTableMetaVersion metaRent = {.suid = meta->suid, .sversion = meta->sversion, .tversion = meta->tversion}; - strcpy(metaRent.dbFName, dbFName); - strcpy(metaRent.stbName, tbName); - - CTG_LOCK(CTG_WRITE, &tbCache->stbLock); + SDbVgVersion vgVersion = {.dbId = dbId, .vgVersion = dbInfo->vgVersion}; + + SCtgDBCache *dbCache = NULL; + CTG_ERR_RET(ctgGetAddDBCache(pCtg, dbFName, dbId, &dbCache)); + if (NULL == dbCache) { + ctgInfo("conflict db update, ignore this update, dbFName:%s, dbId:%"PRIx64, dbFName, dbId); + CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR); + } + + SDBVgInfo *vgInfo = NULL; + CTG_ERR_RET(ctgWAcquireVgInfo(pCtg, dbCache)); - CTG_LOCK(CTG_READ, &tbCache->metaLock); - STableMeta *orig = taosHashAcquire(tbCache->metaCache, tbName, strlen(tbName)); - if (orig) { - if (orig->suid != meta->suid) { - if (taosHashRemove(tbCache->stbCache, &orig->suid, sizeof(orig->suid))) { - ctgError("stb not exist in stbCache, db:%s, stb:%s, suid:%"PRIx64, dbFName, tbName, orig->suid); - } + if (dbCache->vgInfo) { + if (dbInfo->vgVersion <= dbCache->vgInfo->vgVersion) { + ctgInfo("db vgVersion is old, dbFName:%s, vgVersion:%d, currentVersion:%d", dbFName, dbInfo->vgVersion, dbCache->vgInfo->vgVersion); + ctgWReleaseVgInfo(dbCache); - ctgMetaRentRemove(&pCatalog->stbRent, orig->suid, ctgSTableVersionCompare); + return TSDB_CODE_SUCCESS; } - - taosHashRelease(tbCache->metaCache, orig); + + ctgFreeVgInfo(dbCache->vgInfo); } - CTG_UNLOCK(CTG_READ, &tbCache->metaLock); - CTG_ERR_JRET(ctgUpdateTbMetaImpl(pCatalog, tbCache, dbFName, tbName, meta, metaSize)); + dbCache->vgInfo = dbInfo; - CTG_LOCK(CTG_READ, &tbCache->metaLock); - STableMeta *tbMeta = taosHashGet(tbCache->metaCache, tbName, strlen(tbName)); - if (taosHashPutExt(tbCache->stbCache, &meta->suid, sizeof(meta->suid), &tbMeta, POINTER_BYTES, &newAdded) != 0) { - CTG_UNLOCK(CTG_READ, &tbCache->metaLock); - CTG_UNLOCK(CTG_WRITE, &tbCache->stbLock); - ctgError("taosHashPutExt stable to stable cache failed, suid:%"PRIx64, meta->suid); - CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR); - } - CTG_UNLOCK(CTG_READ, &tbCache->metaLock); - - CTG_UNLOCK(CTG_WRITE, &tbCache->stbLock); - - ctgDebug("update stable to cache, suid:%"PRIx64, meta->suid); - - if (newAdded) { - CTG_ERR_RET(ctgMetaRentAdd(&pCatalog->stbRent, &metaRent, metaRent.suid, sizeof(SSTableMetaVersion))); - } else { - CTG_ERR_RET(ctgMetaRentUpdate(&pCatalog->stbRent, &metaRent, metaRent.suid, sizeof(SSTableMetaVersion), ctgSTableVersionCompare)); - } + *pDbInfo = NULL; - return TSDB_CODE_SUCCESS; + ctgDebug("db vgInfo updated, dbFName:%s, vgVersion:%d, dbId:%"PRIx64, dbFName, vgVersion.vgVersion, vgVersion.dbId); -_return: + ctgWReleaseVgInfo(dbCache); - CTG_UNLOCK(CTG_WRITE, &tbCache->stbLock); + dbCache = NULL; + strncpy(vgVersion.dbFName, dbFName, sizeof(vgVersion.dbFName)); + CTG_ERR_RET(ctgMetaRentUpdate(&pCtg->dbRent, &vgVersion, vgVersion.dbId, sizeof(SDbVgVersion), ctgDbVgVersionCompare)); + CTG_RET(code); } -int32_t ctgUpdateTableMetaCache(struct SCatalog *pCatalog, STableMetaOutput *output) { - int32_t code = 0; - SCtgDBCache *dbCache = NULL; +int32_t ctgUpdateTblMeta(SCatalog *pCtg, SCtgDBCache *dbCache, char *dbFName, uint64_t dbId, char *tbName, STableMeta *meta, int32_t metaSize) { + SCtgTbMetaCache *tbCache = &dbCache->tbCache; - if ((!CTG_IS_META_CTABLE(output->metaType)) && NULL == output->tbMeta) { - ctgError("no valid tbmeta got from meta rsp, dbFName:%s, tbName:%s", output->dbFName, output->tbName); - CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR); + CTG_LOCK(CTG_READ, &tbCache->metaLock); + if (dbCache->deleted || NULL == tbCache->metaCache || NULL == tbCache->stbCache) { + CTG_UNLOCK(CTG_READ, &tbCache->metaLock); + ctgError("db is dropping, dbId:%"PRIx64, dbCache->dbId); + CTG_ERR_RET(TSDB_CODE_CTG_DB_DROPPED); } - CTG_ERR_RET(ctgInitDBCache(pCatalog)); - - CTG_ERR_JRET(ctgAcquireDBCache(pCatalog, output->dbFName, output->dbId, &dbCache)); - - CTG_ERR_JRET(ctgInitTbMetaCache(pCatalog, dbCache)); - CTG_ERR_JRET(ctgInitStbCache(pCatalog, dbCache)); + int8_t origType = 0; + uint64_t origSuid = 0; + bool isStb = meta->tableType == TSDB_SUPER_TABLE; + STableMeta *orig = taosHashGet(tbCache->metaCache, tbName, strlen(tbName)); + if (orig) { + origType = orig->tableType; + origSuid = orig->suid; + + if (origType == TSDB_SUPER_TABLE && ((!isStb) || origSuid != meta->suid)) { + CTG_LOCK(CTG_WRITE, &tbCache->stbLock); + if (taosHashRemove(tbCache->stbCache, &orig->suid, sizeof(orig->suid))) { + ctgError("stb not exist in stbCache, dbFName:%s, stb:%s, suid:%"PRIx64, dbFName, tbName, orig->suid); + } + CTG_UNLOCK(CTG_WRITE, &tbCache->stbLock); - if (CTG_IS_META_CTABLE(output->metaType) || CTG_IS_META_BOTH(output->metaType)) { - CTG_ERR_JRET(ctgUpdateTbMetaImpl(pCatalog, &dbCache->tbCache, output->dbFName, output->ctbName, (STableMeta *)&output->ctbMeta, sizeof(output->ctbMeta))); + ctgDebug("stb removed from stbCache, dbFName:%s, stb:%s, suid:%"PRIx64, dbFName, tbName, orig->suid); + + ctgMetaRentRemove(&pCtg->stbRent, orig->suid, ctgSTableVersionCompare); + } } - if (CTG_IS_META_CTABLE(output->metaType)) { - goto _return; + if (isStb) { + CTG_LOCK(CTG_WRITE, &tbCache->stbLock); } - if (CTG_IS_META_BOTH(output->metaType) && TSDB_SUPER_TABLE != output->tbMeta->tableType) { - ctgError("table type error, expected:%d, actual:%d", TSDB_SUPER_TABLE, output->tbMeta->tableType); - CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR); - } - - int32_t tbSize = sizeof(*output->tbMeta) + sizeof(SSchema) * (output->tbMeta->tableInfo.numOfColumns + output->tbMeta->tableInfo.numOfTags); + if (taosHashPut(tbCache->metaCache, tbName, strlen(tbName), meta, metaSize) != 0) { + if (isStb) { + CTG_UNLOCK(CTG_WRITE, &tbCache->stbLock); + } + + CTG_UNLOCK(CTG_READ, &tbCache->metaLock); + ctgError("taosHashPut tbmeta to cache failed, dbFName:%s, tbName:%s, tbType:%d", dbFName, tbName, meta->tableType); + CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR); + } - if (TSDB_SUPER_TABLE == output->tbMeta->tableType) { - CTG_ERR_JRET(ctgUpdateStbMetaImpl(pCatalog, &dbCache->tbCache, output->dbFName, output->tbName, output->tbMeta, tbSize)); - } else { - CTG_ERR_JRET(ctgUpdateTbMetaImpl(pCatalog, &dbCache->tbCache, output->dbFName, output->tbName, output->tbMeta, tbSize)); + if (!isStb) { + CTG_UNLOCK(CTG_READ, &tbCache->metaLock); + return TSDB_CODE_SUCCESS; } -_return: + if (isStb && origSuid == meta->suid) { + CTG_UNLOCK(CTG_WRITE, &tbCache->stbLock); + CTG_UNLOCK(CTG_READ, &tbCache->metaLock); + return TSDB_CODE_SUCCESS; + } - if (dbCache) { - taosHashRelease(pCatalog->dbCache, dbCache); - CTG_UNLOCK(CTG_WRITE, &pCatalog->dbLock); + STableMeta *tbMeta = taosHashGet(tbCache->metaCache, tbName, strlen(tbName)); + if (taosHashPut(tbCache->stbCache, &meta->suid, sizeof(meta->suid), &tbMeta, POINTER_BYTES) != 0) { + CTG_UNLOCK(CTG_WRITE, &tbCache->stbLock); + CTG_UNLOCK(CTG_READ, &tbCache->metaLock); + ctgError("taosHashPutExt stable to stable cache failed, suid:%"PRIx64, meta->suid); + CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR); } + + CTG_UNLOCK(CTG_WRITE, &tbCache->stbLock); - CTG_RET(code); + CTG_UNLOCK(CTG_READ, &tbCache->metaLock); + + ctgDebug("meta updated to cache, dbFName:%s, tbName:%s, tbType:%d", dbFName, tbName, meta->tableType); + + SSTableMetaVersion metaRent = {.dbId = dbId, .suid = meta->suid, .sversion = meta->sversion, .tversion = meta->tversion}; + strcpy(metaRent.dbFName, dbFName); + strcpy(metaRent.stbName, tbName); + CTG_ERR_RET(ctgMetaRentAdd(&pCtg->stbRent, &metaRent, metaRent.suid, sizeof(SSTableMetaVersion))); + + return TSDB_CODE_SUCCESS; } -int32_t ctgGetDBVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* dbFName, bool forceUpdate, SCtgDBCache** dbCache) { +int32_t ctgGetDBVgInfo(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const char* dbFName, bool forceUpdate, SCtgDBCache** dbCache, SDBVgInfo **pInfo) { bool inCache = false; + int32_t code = 0; if (!forceUpdate) { - CTG_ERR_RET(ctgGetDBVgroupFromCache(pCatalog, dbFName, dbCache, &inCache)); + CTG_ERR_RET(ctgAcquireVgInfoFromCache(pCtg, dbFName, dbCache, &inCache)); if (inCache) { return TSDB_CODE_SUCCESS; } - - ctgDebug("failed to get DB vgroupInfo from cache, dbName:%s, load it from mnode, update:%d", dbFName, forceUpdate); } SUseDbOutput DbOut = {0}; @@ -1214,117 +1203,136 @@ int32_t ctgGetDBVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgm input.vgVersion = CTG_DEFAULT_INVALID_VERSION; while (true) { - CTG_ERR_RET(ctgGetDBVgroupFromMnode(pCatalog, pRpc, pMgmtEps, &input, &DbOut)); - CTG_ERR_RET(catalogUpdateDBVgroup(pCatalog, dbFName, DbOut.dbId, DbOut.dbVgroup)); - CTG_ERR_RET(ctgGetDBVgroupFromCache(pCatalog, dbFName, dbCache, &inCache)); + CTG_ERR_RET(ctgGetDBVgInfoFromMnode(pCtg, pRpc, pMgmtEps, &input, &DbOut)); - if (!inCache) { - ctgWarn("can't get db vgroup from cache, will retry, db:%s", dbFName); - continue; + code = ctgUpdateDBVgInfo(pCtg, dbFName, DbOut.dbId, &DbOut.dbVgroup); + if (code && DbOut.dbVgroup) { + *pInfo = DbOut.dbVgroup; + return TSDB_CODE_SUCCESS; } + + CTG_ERR_RET(ctgAcquireVgInfoFromCache(pCtg, dbFName, dbCache, &inCache)); - break; + if (inCache) { + return TSDB_CODE_SUCCESS; + } } return TSDB_CODE_SUCCESS; } -int32_t ctgValidateAndRemoveStbMeta(struct SCatalog* pCatalog, const char* dbName, const char* stbName, uint64_t suid, bool *removed) { - *removed = false; - SCtgDBCache *dbCache = (SCtgDBCache *)taosHashAcquire(pCatalog->dbCache, dbName, strlen(dbName)); - if (NULL == dbCache) { - ctgInfo("db not exist in dbCache, may be removed, db:%s", dbName); - return TSDB_CODE_SUCCESS; +int32_t ctgCloneMetaOutput(STableMetaOutput *output, STableMetaOutput **pOutput) { + *pOutput = malloc(sizeof(STableMetaOutput)); + if (NULL == *pOutput) { + qError("malloc %d failed", (int32_t)sizeof(STableMetaOutput)); + CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR); } - CTG_LOCK(CTG_WRITE, &dbCache->tbCache.stbLock); - if (taosHashRemove(dbCache->tbCache.stbCache, &suid, sizeof(suid))) { - CTG_UNLOCK(CTG_WRITE, &dbCache->tbCache.stbLock); - taosHashRelease(pCatalog->dbCache, dbCache); - ctgInfo("stb not exist in stbCache, may be removed, db:%s, stb:%s, suid:%"PRIx64, dbName, stbName, suid); - return TSDB_CODE_SUCCESS; + memcpy(*pOutput, output, sizeof(STableMetaOutput)); + + if (output->tbMeta) { + int32_t metaSize = CTG_META_SIZE(output->tbMeta); + (*pOutput)->tbMeta = malloc(metaSize); + if (NULL == (*pOutput)->tbMeta) { + qError("malloc %d failed", sizeof(STableMetaOutput)); + tfree(*pOutput); + CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR); + } + + memcpy((*pOutput)->tbMeta, output->tbMeta, metaSize); } - CTG_LOCK(CTG_READ, &dbCache->tbCache.metaLock); - if (taosHashRemove(dbCache->tbCache.metaCache, stbName, strlen(stbName))) { - CTG_UNLOCK(CTG_READ, &dbCache->tbCache.metaLock); - CTG_UNLOCK(CTG_WRITE, &dbCache->tbCache.stbLock); - taosHashRelease(pCatalog->dbCache, dbCache); - ctgError("stb not exist in cache, db:%s, stb:%s, suid:%"PRIx64, dbName, stbName, suid); - CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR); - } - CTG_UNLOCK(CTG_READ, &dbCache->tbCache.metaLock); + return TSDB_CODE_SUCCESS; +} + +void ctgPopAction(SCtgMetaAction **action) { + SCtgQNode *orig = ctgMgmt.head; - CTG_UNLOCK(CTG_WRITE, &dbCache->tbCache.stbLock); + SCtgQNode *node = ctgMgmt.head->next; + ctgMgmt.head = ctgMgmt.head->next; - taosHashRelease(pCatalog->dbCache, dbCache); + tfree(orig); + + *action = &node->action; +} - *removed = true; + +int32_t ctgPushAction(SCtgMetaAction *action) { + SCtgQNode *node = calloc(1, sizeof(SCtgQNode)); + if (NULL == node) { + qError("calloc %d failed", (int32_t)sizeof(SCtgQNode)); + CTG_RET(TSDB_CODE_CTG_MEM_ERROR); + } + node->action = *action; + + CTG_LOCK(CTG_WRITE, &ctgMgmt.qlock); + ctgMgmt.tail->next = node; + ctgMgmt.tail = node; + CTG_UNLOCK(CTG_WRITE, &ctgMgmt.qlock); + + tsem_post(&ctgMgmt.sem); + return TSDB_CODE_SUCCESS; } - -int32_t ctgRenewTableMetaImpl(struct SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, int32_t isSTable) { - if (NULL == pCatalog || NULL == pTransporter || NULL == pMgmtEps || NULL == pTableName) { +int32_t ctgRefreshTblMeta(SCatalog* pCtg, void *pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, int32_t isSTable, STableMetaOutput **pOutput) { + if (NULL == pCtg || NULL == pTransporter || NULL == pMgmtEps || NULL == pTableName) { CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT); } SVgroupInfo vgroupInfo = {0}; int32_t code = 0; - CTG_ERR_RET(catalogGetTableHashVgroup(pCatalog, pTransporter, pMgmtEps, pTableName, &vgroupInfo)); - - STableMetaOutput voutput = {0}; - STableMetaOutput moutput = {0}; - STableMetaOutput *output = &voutput; + CTG_ERR_RET(catalogGetTableHashVgroup(pCtg, pTransporter, pMgmtEps, pTableName, &vgroupInfo)); + STableMetaOutput moutput = {0}; + STableMetaOutput *output = malloc(sizeof(STableMetaOutput)); + if (NULL == output) { + ctgError("malloc %d failed", (int32_t)sizeof(STableMetaOutput)); + CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR); + } + if (CTG_IS_STABLE(isSTable)) { ctgDebug("will renew tbmeta, supposed to be stb, tbName:%s", tNameGetTableName(pTableName)); // if get from mnode failed, will not try vnode - CTG_ERR_JRET(ctgGetTableMetaFromMnode(pCatalog, pTransporter, pMgmtEps, pTableName, &moutput)); + CTG_ERR_JRET(ctgGetTableMetaFromMnode(pCtg, pTransporter, pMgmtEps, pTableName, output)); - if (CTG_IS_META_NULL(moutput.metaType)) { - CTG_ERR_JRET(ctgGetTableMetaFromVnode(pCatalog, pTransporter, pMgmtEps, pTableName, &vgroupInfo, &voutput)); - } else { - output = &moutput; + if (CTG_IS_META_NULL(output->metaType)) { + CTG_ERR_JRET(ctgGetTableMetaFromVnode(pCtg, pTransporter, pMgmtEps, pTableName, &vgroupInfo, output)); } } else { ctgDebug("will renew tbmeta, not supposed to be stb, tbName:%s, isStable:%d", tNameGetTableName(pTableName), isSTable); // if get from vnode failed or no table meta, will not try mnode - CTG_ERR_JRET(ctgGetTableMetaFromVnode(pCatalog, pTransporter, pMgmtEps, pTableName, &vgroupInfo, &voutput)); + CTG_ERR_JRET(ctgGetTableMetaFromVnode(pCtg, pTransporter, pMgmtEps, pTableName, &vgroupInfo, output)); - if (CTG_IS_META_TABLE(voutput.metaType) && TSDB_SUPER_TABLE == voutput.tbMeta->tableType) { - ctgDebug("will continue to renew tbmeta since got stb, tbName:%s, metaType:%d", tNameGetTableName(pTableName), voutput.metaType); - - CTG_ERR_JRET(ctgGetTableMetaFromMnodeImpl(pCatalog, pTransporter, pMgmtEps, voutput.dbFName, voutput.tbName, &moutput)); + if (CTG_IS_META_TABLE(output->metaType) && TSDB_SUPER_TABLE == output->tbMeta->tableType) { + ctgDebug("will continue to renew tbmeta since got stb, tbName:%s, metaType:%d", tNameGetTableName(pTableName), output->metaType); - voutput.metaType = moutput.metaType; + tfree(output->tbMeta); - tfree(voutput.tbMeta); - voutput.tbMeta = moutput.tbMeta; - moutput.tbMeta = NULL; - } else if (CTG_IS_META_BOTH(voutput.metaType)) { + CTG_ERR_JRET(ctgGetTableMetaFromMnodeImpl(pCtg, pTransporter, pMgmtEps, output->dbFName, output->tbName, output)); + } else if (CTG_IS_META_BOTH(output->metaType)) { int32_t exist = 0; - CTG_ERR_JRET(ctgIsTableMetaExistInCache(pCatalog, voutput.dbFName, voutput.tbName, &exist)); + CTG_ERR_JRET(ctgIsTableMetaExistInCache(pCtg, output->dbFName, output->tbName, &exist)); if (0 == exist) { - CTG_ERR_JRET(ctgGetTableMetaFromMnodeImpl(pCatalog, pTransporter, pMgmtEps, voutput.dbFName, voutput.tbName, &moutput)); + CTG_ERR_JRET(ctgGetTableMetaFromMnodeImpl(pCtg, pTransporter, pMgmtEps, output->dbFName, output->tbName, &moutput)); if (CTG_IS_META_NULL(moutput.metaType)) { - SET_META_TYPE_NULL(voutput.metaType); + SET_META_TYPE_NULL(output->metaType); } - tfree(voutput.tbMeta); - voutput.tbMeta = moutput.tbMeta; + tfree(output->tbMeta); + output->tbMeta = moutput.tbMeta; moutput.tbMeta = NULL; } else { - tfree(voutput.tbMeta); + tfree(output->tbMeta); - SET_META_TYPE_CTABLE(voutput.metaType); + SET_META_TYPE_CTABLE(output->metaType); } } } @@ -1334,46 +1342,282 @@ int32_t ctgRenewTableMetaImpl(struct SCatalog* pCatalog, void *pTransporter, con CTG_ERR_JRET(CTG_ERR_CODE_TABLE_NOT_EXIST); } - CTG_ERR_JRET(ctgUpdateTableMetaCache(pCatalog, output)); + if (pOutput) { + CTG_ERR_JRET(ctgCloneMetaOutput(output, pOutput)); + } + + SCtgMetaAction action= {.act = CTG_ACT_UPDATE_TBL}; + SCtgUpdateTblMsg *msg = malloc(sizeof(SCtgUpdateTblMsg)); + if (NULL == msg) { + ctgError("malloc %d failed", (int32_t)sizeof(SCtgUpdateTblMsg)); + CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR); + } + + msg->pCtg = pCtg; + msg->output = output; + + action.data = msg; + + CTG_ERR_JRET(ctgPushAction(&action)); + + return TSDB_CODE_SUCCESS; _return: - tfree(voutput.tbMeta); - tfree(moutput.tbMeta); + tfree(output->tbMeta); + tfree(msg); CTG_RET(code); } -int32_t ctgGetTableMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const SName* pTableName, bool forceUpdate, STableMeta** pTableMeta, int32_t isSTable) { - if (NULL == pCatalog || NULL == pRpc || NULL == pMgmtEps || NULL == pTableName || NULL == pTableMeta) { +int32_t ctgGetTableMeta(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const SName* pTableName, bool forceUpdate, STableMeta** pTableMeta, int32_t isSTable) { + if (NULL == pCtg || NULL == pRpc || NULL == pMgmtEps || NULL == pTableName || NULL == pTableMeta) { CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT); } int32_t exist = 0; + int32_t code = 0; if (!forceUpdate) { - CTG_ERR_RET(ctgGetTableMetaFromCache(pCatalog, pTableName, pTableMeta, &exist)); + CTG_ERR_RET(ctgGetTableMetaFromCache(pCtg, pTableName, pTableMeta, &exist)); if (exist && CTG_TBTYPE_MATCH(isSTable, (*pTableMeta)->tableType)) { return TSDB_CODE_SUCCESS; } + + tfree(*pTableMeta); } else if (CTG_IS_UNKNOWN_STABLE(isSTable)) { int32_t tbType = 0; - CTG_ERR_RET(ctgGetTableTypeFromCache(pCatalog, pTableName, &tbType)); + CTG_ERR_RET(ctgGetTableTypeFromCache(pCtg, pTableName, &tbType)); CTG_SET_STABLE(isSTable, tbType); } - CTG_ERR_RET(ctgRenewTableMetaImpl(pCatalog, pRpc, pMgmtEps, pTableName, isSTable)); + STableMetaOutput *output = NULL; + + CTG_ERR_JRET(ctgRefreshTblMeta(pCtg, pRpc, pMgmtEps, pTableName, isSTable, &output)); + + if (CTG_IS_META_TABLE(output->metaType)) { + *pTableMeta = output->tbMeta; + goto _return; + } + + if (CTG_IS_META_BOTH(output->metaType)) { + memcpy(output->tbMeta, &output->ctbMeta, sizeof(output->ctbMeta)); + + *pTableMeta = output->tbMeta; + goto _return; + } + + if ((!CTG_IS_META_CTABLE(output->metaType)) || output->tbMeta) { + ctgError("invalid metaType:%d", output->metaType); + tfree(output->tbMeta); + CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR); + } - CTG_ERR_RET(ctgGetTableMetaFromCache(pCatalog, pTableName, pTableMeta, &exist)); + // HANDLE ONLY CHILD TABLE META + SName stbName = *pTableName; + strcpy(stbName.tname, output->tbName); + + CTG_ERR_JRET(ctgGetTableMetaFromCache(pCtg, &stbName, pTableMeta, &exist)); if (0 == exist) { - ctgError("renew tablemeta succeed but get from cache failed, may be deleted, tbName:%s", tNameGetTableName(pTableName)); + ctgDebug("stb no longer exist, dbFName:%s, tbName:%s", output->dbFName, pTableName->tname); + CTG_ERR_JRET(TSDB_CODE_VND_TB_NOT_EXIST); + } + + memcpy(*pTableMeta, &output->ctbMeta, sizeof(output->ctbMeta)); + +_return: + + tfree(output); + + CTG_RET(code); +} + + + +int32_t ctgActUpdateVg(SCtgMetaAction *action) { + int32_t code = 0; + SCtgUpdateVgMsg *msg = action->data; + + CTG_ERR_JRET(ctgUpdateDBVgInfo(msg->pCtg, msg->dbFName, msg->dbId, &msg->dbInfo)); + +_return: + + tfree(msg->dbInfo); + tfree(msg); + + CTG_RET(code); +} + +int32_t ctgActRemoveDB(SCtgMetaAction *action) { + int32_t code = 0; + SCtgRemoveDBMsg *msg = action->data; + SCatalog* pCtg = msg->pCtg; + + SCtgDBCache *dbCache = NULL; + ctgAcquireDBCache(msg->pCtg, msg->dbFName, &dbCache); + if (NULL == dbCache) { + ctgInfo("db not exist in cache, may be removed, dbFName:%s", msg->dbFName); + goto _return; + } + + if (dbCache->dbId != msg->dbId) { + ctgInfo("dbId already updated, dbFName:%s, dbId:%"PRIx64 ", targetId:%"PRIx64, msg->dbFName, dbCache->dbId, msg->dbId); + ctgReleaseDBCache(msg->pCtg, dbCache); + goto _return; + } + + CTG_ERR_JRET(ctgRemoveDB(pCtg, dbCache, msg->dbFName)); + +_return: + + tfree(msg); + + CTG_RET(code); +} + + +int32_t ctgActUpdateTbl(SCtgMetaAction *action) { + int32_t code = 0; + SCtgUpdateTblMsg *msg = action->data; + SCatalog* pCtg = msg->pCtg; + STableMetaOutput* output = msg->output; + SCtgDBCache *dbCache = NULL; + + if ((!CTG_IS_META_CTABLE(output->metaType)) && NULL == output->tbMeta) { + ctgError("no valid tbmeta got from meta rsp, dbFName:%s, tbName:%s", output->dbFName, output->tbName); + CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR); + } + + if (CTG_IS_META_BOTH(output->metaType) && TSDB_SUPER_TABLE != output->tbMeta->tableType) { + ctgError("table type error, expected:%d, actual:%d", TSDB_SUPER_TABLE, output->tbMeta->tableType); + CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR); + } + + CTG_ERR_JRET(ctgGetAddDBCache(pCtg, output->dbFName, output->dbId, &dbCache)); + if (NULL == dbCache) { + ctgInfo("conflict db update, ignore this update, dbFName:%s, dbId:%"PRIx64, output->dbFName, output->dbId); + CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR); + } + + if (CTG_IS_META_TABLE(output->metaType) || CTG_IS_META_BOTH(output->metaType)) { + int32_t metaSize = CTG_META_SIZE(output->tbMeta); + + CTG_ERR_JRET(ctgUpdateTblMeta(pCtg, dbCache, output->dbFName, output->dbId, output->tbName, output->tbMeta, metaSize)); + } + + if (CTG_IS_META_CTABLE(output->metaType) || CTG_IS_META_BOTH(output->metaType)) { + CTG_ERR_JRET(ctgUpdateTblMeta(pCtg, dbCache, output->dbFName, output->dbId, output->ctbName, (STableMeta *)&output->ctbMeta, sizeof(output->ctbMeta))); + } + +_return: + + if (dbCache) { + taosHashRelease(pCtg->dbCache, dbCache); + } + + tfree(msg); + + CTG_RET(code); +} + + +int32_t ctgActRemoveStb(SCtgMetaAction *action) { + int32_t code = 0; + SCtgRemoveStbMsg *msg = action->data; + bool removed = false; + SCatalog* pCtg = msg->pCtg; + + SCtgDBCache *dbCache = NULL; + ctgGetDBCache(pCtg, msg->dbFName, &dbCache); + if (NULL == dbCache) { + return TSDB_CODE_SUCCESS; + } + + if (dbCache->dbId != msg->dbId) { + ctgDebug("dbId already modified, dbFName:%s, current:%"PRIx64", dbId:%"PRIx64", stb:%s, suid:%"PRIx64, msg->dbFName, dbCache->dbId, msg->dbId, msg->stbName, msg->suid); + return TSDB_CODE_SUCCESS; + } + + CTG_LOCK(CTG_WRITE, &dbCache->tbCache.stbLock); + if (taosHashRemove(dbCache->tbCache.stbCache, &msg->suid, sizeof(msg->suid))) { + CTG_UNLOCK(CTG_WRITE, &dbCache->tbCache.stbLock); + ctgDebug("stb not exist in stbCache, may be removed, dbFName:%s, stb:%s, suid:%"PRIx64, msg->dbFName, msg->stbName, msg->suid); + return TSDB_CODE_SUCCESS; + } + + CTG_LOCK(CTG_READ, &dbCache->tbCache.metaLock); + if (taosHashRemove(dbCache->tbCache.metaCache, msg->stbName, strlen(msg->stbName))) { + CTG_UNLOCK(CTG_READ, &dbCache->tbCache.metaLock); + CTG_UNLOCK(CTG_WRITE, &dbCache->tbCache.stbLock); + ctgError("stb not exist in cache, dbFName:%s, stb:%s, suid:%"PRIx64, msg->dbFName, msg->stbName, msg->suid); CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR); + } + CTG_UNLOCK(CTG_READ, &dbCache->tbCache.metaLock); + + CTG_UNLOCK(CTG_WRITE, &dbCache->tbCache.stbLock); + + ctgInfo("stb removed from cache, dbFName:%s, stbName:%s, suid:%"PRIx64, msg->dbFName, msg->stbName, msg->suid); + + CTG_ERR_JRET(ctgMetaRentRemove(&msg->pCtg->stbRent, msg->suid, ctgSTableVersionCompare)); + + ctgDebug("stb removed from rent, dbFName:%s, stbName:%s, suid:%"PRIx64, msg->dbFName, msg->stbName, msg->suid); + +_return: + + tfree(msg); + + CTG_RET(code); +} + +int32_t ctgActRemoveTbl(SCtgMetaAction *action) { + +} + + + +void* ctgUpdateThreadFunc(void* param) { + setThreadName("catalog"); + + qInfo("catalog update thread started"); + + CTG_LOCK(CTG_READ, &ctgMgmt.lock); + + while (true) { + tsem_wait(&ctgMgmt.sem); + + if (atomic_load_8(&ctgMgmt.exit)) { + break; + } + + SCtgMetaAction *action = NULL; + ctgPopAction(&action); + + (*ctgActFuncs[action->act])(action); + } + + CTG_UNLOCK(CTG_READ, &ctgMgmt.lock); + + qInfo("catalog update thread stopped"); + + return NULL; +} + + +int32_t ctgStartUpdateThread() { + pthread_attr_t thAttr; + pthread_attr_init(&thAttr); + pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE); + + if (pthread_create(&ctgMgmt.updateThread, &thAttr, ctgUpdateThreadFunc, NULL) != 0) { + terrno = TAOS_SYSTEM_ERROR(errno); + CTG_ERR_RET(terrno); } + pthread_attr_destroy(&thAttr); return TSDB_CODE_SUCCESS; } @@ -1394,7 +1638,7 @@ int32_t catalogInit(SCatalogCfg *cfg) { } if (ctgMgmt.cfg.maxTblCacheNum == 0) { - ctgMgmt.cfg.maxTblCacheNum = CTG_DEFAULT_CACHE_TABLEMETA_NUMBER; + ctgMgmt.cfg.maxTblCacheNum = CTG_DEFAULT_CACHE_TBLMETA_NUMBER; } if (ctgMgmt.cfg.dbRentSec == 0) { @@ -1406,7 +1650,7 @@ int32_t catalogInit(SCatalogCfg *cfg) { } } else { ctgMgmt.cfg.maxDBCacheNum = CTG_DEFAULT_CACHE_DB_NUMBER; - ctgMgmt.cfg.maxTblCacheNum = CTG_DEFAULT_CACHE_TABLEMETA_NUMBER; + ctgMgmt.cfg.maxTblCacheNum = CTG_DEFAULT_CACHE_TBLMETA_NUMBER; ctgMgmt.cfg.dbRentSec = CTG_DEFAULT_RENT_SECOND; ctgMgmt.cfg.stbRentSec = CTG_DEFAULT_RENT_SECOND; } @@ -1417,12 +1661,23 @@ int32_t catalogInit(SCatalogCfg *cfg) { CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR); } + CTG_ERR_RET(ctgStartUpdateThread()); + + tsem_init(&ctgMgmt.sem, 0, 0); + + ctgMgmt.head = calloc(1, sizeof(SCtgQNode)); + if (NULL == ctgMgmt.head) { + qError("calloc %d failed", (int32_t)sizeof(SCtgQNode)); + CTG_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); + } + ctgMgmt.tail = ctgMgmt.head; + qDebug("catalog initialized, maxDb:%u, maxTbl:%u, dbRentSec:%u, stbRentSec:%u", ctgMgmt.cfg.maxDBCacheNum, ctgMgmt.cfg.maxTblCacheNum, ctgMgmt.cfg.dbRentSec, ctgMgmt.cfg.stbRentSec); return TSDB_CODE_SUCCESS; } -int32_t catalogGetHandle(uint64_t clusterId, struct SCatalog** catalogHandle) { +int32_t catalogGetHandle(uint64_t clusterId, SCatalog** catalogHandle) { if (NULL == catalogHandle) { CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT); } @@ -1455,6 +1710,18 @@ int32_t catalogGetHandle(uint64_t clusterId, struct SCatalog** catalogHandle) { CTG_ERR_JRET(ctgMetaRentInit(&clusterCtg->dbRent, ctgMgmt.cfg.dbRentSec, CTG_RENT_DB)); CTG_ERR_JRET(ctgMetaRentInit(&clusterCtg->stbRent, ctgMgmt.cfg.stbRentSec, CTG_RENT_STABLE)); + clusterCtg->dbCache = taosHashInit(ctgMgmt.cfg.maxDBCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK); + if (NULL == clusterCtg->dbCache) { + qError("taosHashInit %d dbCache failed", CTG_DEFAULT_CACHE_DB_NUMBER); + CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR); + } + + SHashObj *metaCache = taosHashInit(ctgMgmt.cfg.maxTblCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); + if (NULL == metaCache) { + qError("taosHashInit failed, num:%d", ctgMgmt.cfg.maxTblCacheNum); + CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR); + } + code = taosHashPut(ctgMgmt.pCluster, &clusterId, sizeof(clusterId), &clusterCtg, POINTER_BYTES); if (code) { if (HASH_NODE_EXIST(code)) { @@ -1482,97 +1749,81 @@ _return: CTG_RET(code); } -void catalogFreeHandle(struct SCatalog* pCatalog) { - if (NULL == pCatalog) { +void catalogFreeHandle(SCatalog* pCtg) { + if (NULL == pCtg) { return; } - if (taosHashRemove(ctgMgmt.pCluster, &pCatalog->clusterId, sizeof(pCatalog->clusterId))) { - ctgWarn("taosHashRemove from cluster failed, may already be freed, clusterId:%"PRIx64, pCatalog->clusterId); + if (taosHashRemove(ctgMgmt.pCluster, &pCtg->clusterId, sizeof(pCtg->clusterId))) { + ctgWarn("taosHashRemove from cluster failed, may already be freed, clusterId:%"PRIx64, pCtg->clusterId); return; } - uint64_t clusterId = pCatalog->clusterId; + uint64_t clusterId = pCtg->clusterId; - ctgFreeHandle(pCatalog); + ctgFreeHandle(pCtg); ctgInfo("handle freed, culsterId:%"PRIx64, clusterId); } -int32_t catalogGetDBVgroupVersion(struct SCatalog* pCatalog, const char* dbName, int32_t* version) { - if (NULL == pCatalog || NULL == dbName || NULL == version) { - CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT); - } - +int32_t catalogGetDBVgVersion(SCatalog* pCtg, const char* dbFName, int32_t* version) { CTG_API_ENTER(); - if (NULL == pCatalog->dbCache) { + if (NULL == pCtg || NULL == dbFName || NULL == version) { + CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT); + } + + if (NULL == pCtg->dbCache) { *version = CTG_DEFAULT_INVALID_VERSION; - ctgInfo("empty db cache, dbName:%s", dbName); + ctgInfo("empty db cache, dbFName:%s", dbFName); CTG_API_LEAVE(TSDB_CODE_SUCCESS); } - SCtgDBCache *db = taosHashAcquire(pCatalog->dbCache, dbName, strlen(dbName)); - if (NULL == db) { + SCtgDBCache *dbCache = NULL; + ctgAcquireDBCache(pCtg, dbFName, &dbCache); + if (NULL == dbCache) { *version = CTG_DEFAULT_INVALID_VERSION; - ctgInfo("db not in cache, dbName:%s", dbName); CTG_API_LEAVE(TSDB_CODE_SUCCESS); } - CTG_LOCK(CTG_READ, &db->vgLock); - - if (NULL == db->vgInfo) { - CTG_UNLOCK(CTG_READ, &db->vgLock); + ctgAcquireVgInfo(pCtg, dbCache); + if (NULL == dbCache->vgInfo) { + ctgReleaseDBCache(pCtg, dbCache); *version = CTG_DEFAULT_INVALID_VERSION; - ctgInfo("db not in cache, dbName:%s", dbName); CTG_API_LEAVE(TSDB_CODE_SUCCESS); } - *version = db->vgInfo->vgVersion; - CTG_UNLOCK(CTG_READ, &db->vgLock); - - taosHashRelease(pCatalog->dbCache, db); + *version = dbCache->vgInfo->vgVersion; + + ctgReleaseVgInfo(dbCache); + ctgReleaseDBCache(pCtg, dbCache); - ctgDebug("Got db vgVersion from cache, dbName:%s, vgVersion:%d", dbName, *version); + ctgDebug("Got db vgVersion from cache, dbFName:%s, vgVersion:%d", dbFName, *version); CTG_API_LEAVE(TSDB_CODE_SUCCESS); } -int32_t catalogGetDBVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* dbFName, bool forceUpdate, SArray** vgroupList) { - if (NULL == pCatalog || NULL == dbFName || NULL == pRpc || NULL == pMgmtEps || NULL == vgroupList) { - CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT); - } - +int32_t catalogGetDBVgInfo(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const char* dbFName, bool forceUpdate, SArray** vgroupList) { CTG_API_ENTER(); - SCtgDBCache* dbCache = NULL; - SVgroupInfo *vgInfo = NULL; + if (NULL == pCtg || NULL == dbFName || NULL == pRpc || NULL == pMgmtEps || NULL == vgroupList) { + CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT); + } + SCtgDBCache* dbCache = NULL; int32_t code = 0; SArray *vgList = NULL; - CTG_ERR_JRET(ctgGetDBVgroup(pCatalog, pRpc, pMgmtEps, dbFName, forceUpdate, &dbCache)); - - int32_t vgNum = (int32_t)taosHashGetSize(dbCache->vgInfo->vgHash); - vgList = taosArrayInit(vgNum, sizeof(SVgroupInfo)); - if (NULL == vgList) { - ctgError("taosArrayInit %d failed", vgNum); - CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR); + SHashObj *vgHash = NULL; + SDBVgInfo *vgInfo = NULL; + CTG_ERR_JRET(ctgGetDBVgInfo(pCtg, pRpc, pMgmtEps, dbFName, forceUpdate, &dbCache, &vgInfo)); + if (dbCache) { + vgHash = dbCache->vgInfo->vgHash; + } else { + vgHash = vgInfo->vgHash; } - void *pIter = taosHashIterate(dbCache->vgInfo->vgHash, NULL); - while (pIter) { - vgInfo = pIter; - - if (NULL == taosArrayPush(vgList, vgInfo)) { - ctgError("taosArrayPush failed, vgId:%d", vgInfo->vgId); - taosHashCancelIterate(dbCache->vgInfo->vgHash, pIter); - CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR); - } - - pIter = taosHashIterate(dbCache->vgInfo->vgHash, pIter); - vgInfo = NULL; - } + CTG_ERR_JRET(ctgGenerateVgList(pCtg, vgHash, &vgList)); *vgroupList = vgList; vgList = NULL; @@ -1580,218 +1831,221 @@ int32_t catalogGetDBVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* _return: if (dbCache) { - CTG_UNLOCK(CTG_READ, &dbCache->vgLock); - taosHashRelease(pCatalog->dbCache, dbCache); + ctgReleaseVgInfo(dbCache); + ctgReleaseDBCache(pCtg, dbCache); } - if (vgList) { - taosArrayDestroy(vgList); - vgList = NULL; + if (vgInfo) { + taosHashCleanup(vgInfo->vgHash); + tfree(vgInfo); } CTG_API_LEAVE(code); } -int32_t catalogUpdateDBVgroup(struct SCatalog* pCatalog, const char* dbFName, uint64_t dbId, SDBVgroupInfo* dbInfo) { - int32_t code = 0; - +int32_t catalogUpdateDBVgInfo(SCatalog* pCtg, const char* dbFName, uint64_t dbId, SDBVgInfo* dbInfo) { CTG_API_ENTER(); + + int32_t code = 0; - if (NULL == pCatalog || NULL == dbFName || NULL == dbInfo) { + if (NULL == pCtg || NULL == dbFName || NULL == dbInfo) { CTG_ERR_JRET(TSDB_CODE_CTG_INVALID_INPUT); } - if (NULL == dbInfo->vgHash || dbInfo->vgVersion < 0 || taosHashGetSize(dbInfo->vgHash) <= 0) { - ctgError("invalid db vgInfo, dbFName:%s, vgHash:%p, vgVersion:%d", dbFName, dbInfo->vgHash, dbInfo->vgVersion); + SCtgMetaAction action= {.act = CTG_ACT_UPDATE_VG}; + SCtgUpdateVgMsg *msg = malloc(sizeof(SCtgUpdateVgMsg)); + if (NULL == msg) { + ctgError("malloc %d failed", (int32_t)sizeof(SCtgUpdateVgMsg)); CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR); } - CTG_ERR_JRET(ctgInitDBCache(pCatalog)); - - bool newAdded = false; - SDbVgVersion vgVersion = {.dbId = dbId, .vgVersion = dbInfo->vgVersion}; - - SCtgDBCache *dbCache = NULL; - CTG_ERR_JRET(ctgAcquireDBCache(pCatalog, dbFName, dbId, &dbCache)); - - CTG_LOCK(CTG_WRITE, &dbCache->vgLock); - if (dbCache->deleted) { - ctgInfo("db is dropping, dbFName:%s, dbId:%"PRIx64, dbFName, dbId); - CTG_UNLOCK(CTG_WRITE, &dbCache->vgLock); - taosHashRelease(pCatalog->dbCache, dbCache); - CTG_ERR_JRET(TSDB_CODE_CTG_DB_DROPPED); - } - - if (NULL == dbCache->vgInfo) { - dbCache->vgInfo = dbInfo; - } else { - if (dbInfo->vgVersion <= dbCache->vgInfo->vgVersion) { - ctgInfo("db vgVersion is old, dbFName:%s, vgVersion:%d, current:%d", dbFName, dbInfo->vgVersion, dbCache->vgInfo->vgVersion); - CTG_UNLOCK(CTG_WRITE, &dbCache->vgLock); - taosHashRelease(pCatalog->dbCache, dbCache); - - goto _return; - } - - if (dbCache->vgInfo->vgHash) { - ctgDebug("cleanup db vgHash, dbFName:%s", dbFName); - taosHashCleanup(dbCache->vgInfo->vgHash); - dbCache->vgInfo->vgHash = NULL; - } - - tfree(dbCache->vgInfo); - dbCache->vgInfo = dbInfo; - } - + msg->pCtg = pCtg; + strncpy(msg->dbFName, dbFName, sizeof(msg->dbFName)); + msg->dbId = dbId; + msg->dbInfo = dbInfo; dbInfo = NULL; - CTG_UNLOCK(CTG_WRITE, &dbCache->vgLock); - taosHashRelease(pCatalog->dbCache, dbCache); + action.data = msg; - strncpy(vgVersion.dbFName, dbFName, sizeof(vgVersion.dbFName)); - CTG_ERR_JRET(ctgMetaRentUpdate(&pCatalog->dbRent, &vgVersion, vgVersion.dbId, sizeof(SDbVgVersion), ctgDbVgVersionCompare)); - - ctgDebug("dbCache updated, dbFName:%s, vgVersion:%d, dbId:%"PRIx64, dbFName, vgVersion.vgVersion, vgVersion.dbId); + CTG_ERR_JRET(ctgPushAction(&action)); + CTG_API_LEAVE(code); + _return: - if (dbCache) { - CTG_UNLOCK(CTG_WRITE, &pCatalog->dbLock); - } - if (dbInfo) { taosHashCleanup(dbInfo->vgHash); - dbInfo->vgHash = NULL; tfree(dbInfo); } + + tfree(msg); CTG_API_LEAVE(code); } -int32_t catalogRemoveDB(struct SCatalog* pCatalog, const char* dbFName, uint64_t dbId) { +int32_t catalogRemoveDB(SCatalog* pCtg, const char* dbFName, uint64_t dbId) { + CTG_API_ENTER(); + int32_t code = 0; - if (NULL == pCatalog || NULL == dbFName) { - CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT); + if (NULL == pCtg || NULL == dbFName) { + CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT); } - CTG_API_ENTER(); - - if (NULL == pCatalog->dbCache) { + if (NULL == pCtg->dbCache) { CTG_API_LEAVE(TSDB_CODE_SUCCESS); } - SCtgDBCache *dbCache = (SCtgDBCache *)taosHashAcquire(pCatalog->dbCache, dbFName, strlen(dbFName)); - if (NULL == dbCache) { - ctgInfo("db not exist in dbCache, may be removed, dbFName:%s", dbFName); - CTG_API_LEAVE(TSDB_CODE_SUCCESS); + SCtgMetaAction action= {.act = CTG_ACT_REMOVE_DB}; + SCtgRemoveDBMsg *msg = malloc(sizeof(SCtgRemoveDBMsg)); + if (NULL == msg) { + ctgError("malloc %d failed", (int32_t)sizeof(SCtgRemoveDBMsg)); + CTG_API_LEAVE(TSDB_CODE_CTG_MEM_ERROR); } - if (dbCache->dbId != dbId) { - ctgInfo("db id already updated, dbFName:%s, dbId:%"PRIx64 ", targetId:%"PRIx64, dbFName, dbCache->dbId, dbId); - taosHashRelease(pCatalog->dbCache, dbCache); - CTG_API_LEAVE(TSDB_CODE_SUCCESS); - } - - CTG_ERR_JRET(ctgValidateAndRemoveDb(pCatalog, dbCache, dbFName)); + msg->pCtg = pCtg; + strncpy(msg->dbFName, dbFName, sizeof(msg->dbFName)); + msg->dbId = dbId; -_return: + action.data = msg; + + CTG_ERR_JRET(ctgPushAction(&action)); + + CTG_API_LEAVE(TSDB_CODE_SUCCESS); - taosHashRelease(pCatalog->dbCache, dbCache); +_return: + + tfree(action.data); CTG_API_LEAVE(code); } -int32_t catalogRemoveSTableMeta(struct SCatalog* pCatalog, const char* dbName, const char* stbName, uint64_t suid) { +int32_t catalogRemoveStbMeta(SCatalog* pCtg, const char* dbFName, uint64_t dbId, const char* stbName, uint64_t suid) { + CTG_API_ENTER(); + int32_t code = 0; - bool removed = false; - if (NULL == pCatalog || NULL == dbName || NULL == stbName) { - CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT); + if (NULL == pCtg || NULL == dbFName || NULL == stbName) { + CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT); } - CTG_API_ENTER(); - - if (NULL == pCatalog->dbCache) { + if (NULL == pCtg->dbCache) { CTG_API_LEAVE(TSDB_CODE_SUCCESS); } - - CTG_ERR_RET(ctgValidateAndRemoveStbMeta(pCatalog, dbName, stbName, suid, &removed)); - if (!removed) { - CTG_API_LEAVE(TSDB_CODE_SUCCESS); + + SCtgMetaAction action= {.act = CTG_ACT_REMOVE_STB}; + SCtgRemoveStbMsg *msg = malloc(sizeof(SCtgRemoveStbMsg)); + if (NULL == msg) { + ctgError("malloc %d failed", (int32_t)sizeof(SCtgRemoveStbMsg)); + CTG_API_LEAVE(TSDB_CODE_CTG_MEM_ERROR); } - - ctgInfo("stb removed from cache, db:%s, stbName:%s, suid:%"PRIx64, dbName, stbName, suid); - CTG_ERR_JRET(ctgMetaRentRemove(&pCatalog->stbRent, suid, ctgSTableVersionCompare)); - - ctgDebug("stb removed from rent, db:%s, stbName:%s, suid:%"PRIx64, dbName, stbName, suid); + msg->pCtg = pCtg; + strncpy(msg->dbFName, dbFName, sizeof(msg->dbFName)); + strncpy(msg->stbName, stbName, sizeof(msg->stbName)); + msg->dbId = dbId; + msg->suid = suid; -_return: + action.data = msg; + + CTG_ERR_JRET(ctgPushAction(&action)); + + CTG_API_LEAVE(TSDB_CODE_SUCCESS); +_return: + + tfree(action.data); + CTG_API_LEAVE(code); } -int32_t catalogGetTableMeta(struct SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, STableMeta** pTableMeta) { +int32_t catalogGetTableMeta(SCatalog* pCtg, void *pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, STableMeta** pTableMeta) { CTG_API_ENTER(); - CTG_API_LEAVE(ctgGetTableMeta(pCatalog, pTransporter, pMgmtEps, pTableName, false, pTableMeta, -1)); + CTG_API_LEAVE(ctgGetTableMeta(pCtg, pTransporter, pMgmtEps, pTableName, false, pTableMeta, -1)); } -int32_t catalogGetSTableMeta(struct SCatalog* pCatalog, void * pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, STableMeta** pTableMeta) { +int32_t catalogGetSTableMeta(SCatalog* pCtg, void * pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, STableMeta** pTableMeta) { CTG_API_ENTER(); - CTG_API_LEAVE(ctgGetTableMeta(pCatalog, pTransporter, pMgmtEps, pTableName, false, pTableMeta, 1)); + CTG_API_LEAVE(ctgGetTableMeta(pCtg, pTransporter, pMgmtEps, pTableName, false, pTableMeta, 1)); } -int32_t catalogUpdateSTableMeta(struct SCatalog* pCatalog, STableMetaRsp *rspMsg) { - STableMetaOutput output = {0}; +int32_t catalogUpdateSTableMeta(SCatalog* pCtg, STableMetaRsp *rspMsg) { + CTG_API_ENTER(); + + if (NULL == pCtg || NULL == rspMsg) { + CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT); + } + + STableMetaOutput *output = calloc(1, sizeof(STableMetaOutput)); + if (NULL == output) { + ctgError("malloc %d failed", (int32_t)sizeof(STableMetaOutput)); + CTG_API_LEAVE(TSDB_CODE_CTG_MEM_ERROR); + } + int32_t code = 0; - CTG_API_ENTER(); + strcpy(output->dbFName, rspMsg->dbFName); + strcpy(output->tbName, rspMsg->tbName); - strcpy(output.dbFName, rspMsg->dbFName); - strcpy(output.tbName, rspMsg->tbName); + output->dbId = rspMsg->dbId; - SET_META_TYPE_TABLE(output.metaType); + SET_META_TYPE_TABLE(output->metaType); - CTG_ERR_JRET(queryCreateTableMetaFromMsg(rspMsg, true, &output.tbMeta)); + CTG_ERR_JRET(queryCreateTableMetaFromMsg(rspMsg, true, &output->tbMeta)); - CTG_ERR_JRET(ctgUpdateTableMetaCache(pCatalog, &output)); + SCtgMetaAction action= {.act = CTG_ACT_UPDATE_TBL}; + SCtgUpdateTblMsg *msg = malloc(sizeof(SCtgUpdateTblMsg)); + if (NULL == msg) { + ctgError("malloc %d failed", (int32_t)sizeof(SCtgUpdateTblMsg)); + CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR); + } + msg->pCtg = pCtg; + msg->output = output; + + action.data = msg; + + CTG_ERR_JRET(ctgPushAction(&action)); + + CTG_API_LEAVE(code); + _return: - tfree(output.tbMeta); + tfree(output->tbMeta); + tfree(output); + tfree(msg); CTG_API_LEAVE(code); } -int32_t catalogRenewTableMeta(struct SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, int32_t isSTable) { - if (NULL == pCatalog || NULL == pTransporter || NULL == pMgmtEps || NULL == pTableName) { - CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT); - } - +int32_t catalogRefreshTableMeta(SCatalog* pCtg, void *pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, int32_t isSTable) { CTG_API_ENTER(); - CTG_API_LEAVE(ctgRenewTableMetaImpl(pCatalog, pTransporter, pMgmtEps, pTableName, isSTable)); + if (NULL == pCtg || NULL == pTransporter || NULL == pMgmtEps || NULL == pTableName) { + CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT); + } + + CTG_API_LEAVE(ctgRefreshTblMeta(pCtg, pTransporter, pMgmtEps, pTableName, isSTable, NULL)); } -int32_t catalogRenewAndGetTableMeta(struct SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, STableMeta** pTableMeta, int32_t isSTable) { +int32_t catalogRefreshGetTableMeta(SCatalog* pCtg, void *pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, STableMeta** pTableMeta, int32_t isSTable) { CTG_API_ENTER(); - CTG_API_LEAVE(ctgGetTableMeta(pCatalog, pTransporter, pMgmtEps, pTableName, true, pTableMeta, isSTable)); + CTG_API_LEAVE(ctgGetTableMeta(pCtg, pTransporter, pMgmtEps, pTableName, true, pTableMeta, isSTable)); } -int32_t catalogGetTableDistVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const SName* pTableName, SArray** pVgroupList) { - if (NULL == pCatalog || NULL == pRpc || NULL == pMgmtEps || NULL == pTableName || NULL == pVgroupList) { - CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT); - } - +int32_t catalogGetTableDistVgInfo(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const SName* pTableName, SArray** pVgList) { CTG_API_ENTER(); + + if (NULL == pCtg || NULL == pRpc || NULL == pMgmtEps || NULL == pTableName || NULL == pVgList) { + CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT); + } STableMeta *tbMeta = NULL; int32_t code = 0; @@ -1799,29 +2053,38 @@ int32_t catalogGetTableDistVgroup(struct SCatalog* pCatalog, void *pRpc, const S SCtgDBCache* dbCache = NULL; SArray *vgList = NULL; - *pVgroupList = NULL; + *pVgList = NULL; - CTG_ERR_JRET(ctgGetTableMeta(pCatalog, pRpc, pMgmtEps, pTableName, false, &tbMeta, -1)); + CTG_ERR_JRET(ctgGetTableMeta(pCtg, pRpc, pMgmtEps, pTableName, false, &tbMeta, -1)); char db[TSDB_DB_FNAME_LEN] = {0}; tNameGetFullDbName(pTableName, db); - CTG_ERR_JRET(ctgGetDBVgroup(pCatalog, pRpc, pMgmtEps, db, false, &dbCache)); - // TODO REMOEV THIS .... + SHashObj *vgHash = NULL; + SDBVgInfo *vgInfo = NULL; + CTG_ERR_JRET(ctgGetDBVgInfo(pCtg, pRpc, pMgmtEps, db, false, &dbCache, &vgInfo)); + + if (dbCache) { + vgHash = dbCache->vgInfo->vgHash; + } else { + vgHash = vgInfo->vgHash; + } + + /* TODO REMOEV THIS .... if (0 == tbMeta->vgId) { SVgroupInfo vgroup = {0}; - catalogGetTableHashVgroup(pCatalog, pRpc, pMgmtEps, pTableName, &vgroup); + catalogGetTableHashVgroup(pCtg, pRpc, pMgmtEps, pTableName, &vgroup); tbMeta->vgId = vgroup.vgId; } - // TODO REMOVE THIS .... + // TODO REMOVE THIS ....*/ if (tbMeta->tableType == TSDB_SUPER_TABLE) { - CTG_ERR_JRET(ctgGetVgInfoFromDB(pCatalog, pRpc, pMgmtEps, dbCache->vgInfo, pVgroupList)); + CTG_ERR_JRET(ctgGenerateVgList(pCtg, vgHash, pVgList)); } else { int32_t vgId = tbMeta->vgId; - if (NULL == taosHashGetClone(dbCache->vgInfo->vgHash, &vgId, sizeof(vgId), &vgroupInfo)) { + if (NULL == taosHashGetClone(vgHash, &vgId, sizeof(vgId), &vgroupInfo)) { ctgError("table's vgId not found in vgroup list, vgId:%d, tbName:%s", vgId, tNameGetTableName(pTableName)); CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR); } @@ -1837,16 +2100,22 @@ int32_t catalogGetTableDistVgroup(struct SCatalog* pCatalog, void *pRpc, const S CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR); } - *pVgroupList = vgList; + *pVgList = vgList; vgList = NULL; } _return: - tfree(tbMeta); if (dbCache) { - CTG_UNLOCK(CTG_READ, &dbCache->vgLock); - taosHashRelease(pCatalog->dbCache, dbCache); + ctgReleaseVgInfo(dbCache); + ctgReleaseDBCache(pCtg, dbCache); + } + + tfree(tbMeta); + + if (vgInfo) { + taosHashCleanup(vgInfo->vgHash); + tfree(vgInfo); } if (vgList) { @@ -1858,37 +2127,42 @@ _return: } -int32_t catalogGetTableHashVgroup(struct SCatalog *pCatalog, void *pTransporter, const SEpSet *pMgmtEps, const SName *pTableName, SVgroupInfo *pVgroup) { - SCtgDBCache* dbCache = NULL; - int32_t code = 0; - +int32_t catalogGetTableHashVgroup(SCatalog *pCtg, void *pTransporter, const SEpSet *pMgmtEps, const SName *pTableName, SVgroupInfo *pVgroup) { CTG_API_ENTER(); + SCtgDBCache* dbCache = NULL; + int32_t code = 0; char db[TSDB_DB_FNAME_LEN] = {0}; tNameGetFullDbName(pTableName, db); - CTG_ERR_JRET(ctgGetDBVgroup(pCatalog, pTransporter, pMgmtEps, db, false, &dbCache)); + SDBVgInfo *vgInfo = NULL; + CTG_ERR_JRET(ctgGetDBVgInfo(pCtg, pTransporter, pMgmtEps, db, false, &dbCache, &vgInfo)); - CTG_ERR_JRET(ctgGetVgInfoFromHashValue(pCatalog, dbCache->vgInfo, pTableName, pVgroup)); + CTG_ERR_JRET(ctgGetVgInfoFromHashValue(pCtg, vgInfo ? vgInfo : dbCache->vgInfo, pTableName, pVgroup)); _return: if (dbCache) { - CTG_UNLOCK(CTG_READ, &dbCache->vgLock); - taosHashRelease(pCatalog->dbCache, dbCache); + ctgReleaseVgInfo(dbCache); + ctgReleaseDBCache(pCtg, dbCache); + } + + if (vgInfo) { + taosHashCleanup(vgInfo->vgHash); + tfree(vgInfo); } CTG_API_LEAVE(code); } -int32_t catalogGetAllMeta(struct SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const SCatalogReq* pReq, SMetaData* pRsp) { - if (NULL == pCatalog || NULL == pTransporter || NULL == pMgmtEps || NULL == pReq || NULL == pRsp) { - CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT); - } - +int32_t catalogGetAllMeta(SCatalog* pCtg, void *pTransporter, const SEpSet* pMgmtEps, const SCatalogReq* pReq, SMetaData* pRsp) { CTG_API_ENTER(); + if (NULL == pCtg || NULL == pTransporter || NULL == pMgmtEps || NULL == pReq || NULL == pRsp) { + CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT); + } + int32_t code = 0; pRsp->pTableMeta = NULL; @@ -1909,7 +2183,7 @@ int32_t catalogGetAllMeta(struct SCatalog* pCatalog, void *pTransporter, const S SName *name = taosArrayGet(pReq->pTableName, i); STableMeta *pTableMeta = NULL; - CTG_ERR_JRET(ctgGetTableMeta(pCatalog, pTransporter, pMgmtEps, name, false, &pTableMeta, -1)); + CTG_ERR_JRET(ctgGetTableMeta(pCtg, pTransporter, pMgmtEps, name, false, &pTableMeta, -1)); if (NULL == taosArrayPush(pRsp->pTableMeta, &pTableMeta)) { ctgError("taosArrayPush failed, idx:%d", i); @@ -1937,40 +2211,42 @@ _return: CTG_API_LEAVE(code); } -int32_t catalogGetQnodeList(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, SArray* pQnodeList) { - if (NULL == pCatalog || NULL == pRpc || NULL == pMgmtEps || NULL == pQnodeList) { - CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT); - } - +int32_t catalogGetQnodeList(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, SArray* pQnodeList) { CTG_API_ENTER(); + if (NULL == pCtg || NULL == pRpc || NULL == pMgmtEps || NULL == pQnodeList) { + CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT); + } + //TODO CTG_API_LEAVE(TSDB_CODE_SUCCESS); } -int32_t catalogGetExpiredSTables(struct SCatalog* pCatalog, SSTableMetaVersion **stables, uint32_t *num) { - if (NULL == pCatalog || NULL == stables || NULL == num) { - CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT); - } - +int32_t catalogGetExpiredSTables(SCatalog* pCtg, SSTableMetaVersion **stables, uint32_t *num) { CTG_API_ENTER(); - CTG_API_LEAVE(ctgMetaRentGet(&pCatalog->stbRent, (void **)stables, num, sizeof(SSTableMetaVersion))); -} - -int32_t catalogGetExpiredDBs(struct SCatalog* pCatalog, SDbVgVersion **dbs, uint32_t *num) { - if (NULL == pCatalog || NULL == dbs || NULL == num) { - CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT); + if (NULL == pCtg || NULL == stables || NULL == num) { + CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT); } + CTG_API_LEAVE(ctgMetaRentGet(&pCtg->stbRent, (void **)stables, num, sizeof(SSTableMetaVersion))); +} + +int32_t catalogGetExpiredDBs(SCatalog* pCtg, SDbVgVersion **dbs, uint32_t *num) { CTG_API_ENTER(); + + if (NULL == pCtg || NULL == dbs || NULL == num) { + CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT); + } - CTG_API_LEAVE(ctgMetaRentGet(&pCatalog->dbRent, (void **)dbs, num, sizeof(SDbVgVersion))); + CTG_API_LEAVE(ctgMetaRentGet(&pCtg->dbRent, (void **)dbs, num, sizeof(SDbVgVersion))); } void catalogDestroy(void) { + qInfo("start to destroy catalog"); + if (NULL == ctgMgmt.pCluster || atomic_load_8(&ctgMgmt.exit)) { return; } @@ -1979,13 +2255,13 @@ void catalogDestroy(void) { CTG_LOCK(CTG_WRITE, &ctgMgmt.lock); - SCatalog *pCatalog = NULL; + SCatalog *pCtg = NULL; void *pIter = taosHashIterate(ctgMgmt.pCluster, NULL); while (pIter) { - pCatalog = *(SCatalog **)pIter; + pCtg = *(SCatalog **)pIter; - if (pCatalog) { - catalogFreeHandle(pCatalog); + if (pCtg) { + catalogFreeHandle(pCtg); } pIter = taosHashIterate(ctgMgmt.pCluster, pIter); diff --git a/source/libs/catalog/test/catalogTests.cpp b/source/libs/catalog/test/catalogTests.cpp index d0f98e3c2ac5b87229ecfdc766fbe66854e5175a..662923141ac4279c16df35caf95a6362a062dc75 100644 --- a/source/libs/catalog/test/catalogTests.cpp +++ b/source/libs/catalog/test/catalogTests.cpp @@ -38,7 +38,6 @@ namespace { extern "C" int32_t ctgGetTableMetaFromCache(struct SCatalog *pCatalog, const SName *pTableName, STableMeta **pTableMeta, int32_t *exist); -extern "C" int32_t ctgUpdateTableMetaCache(struct SCatalog *pCatalog, STableMetaOutput *output); extern "C" int32_t ctgDbgGetClusterCacheNum(struct SCatalog* pCatalog, int32_t type); void ctgTestSetPrepareTableMeta(); @@ -176,11 +175,11 @@ void ctgTestBuildCTableMetaOutput(STableMetaOutput *output) { strcpy(s->name, "tag1s"); } -void ctgTestBuildDBVgroup(SDBVgroupInfo **pdbVgroup) { +void ctgTestBuildDBVgroup(SDBVgInfo **pdbVgroup) { static int32_t vgVersion = ctgTestVgVersion + 1; int32_t vgNum = 0; SVgroupInfo vgInfo = {0}; - SDBVgroupInfo *dbVgroup = (SDBVgroupInfo *)calloc(1, sizeof(SDBVgroupInfo)); + SDBVgInfo *dbVgroup = (SDBVgInfo *)calloc(1, sizeof(SDBVgInfo)); dbVgroup->vgVersion = vgVersion++; @@ -612,7 +611,7 @@ void *ctgTestGetDbVgroupThread(void *param) { int32_t n = 0; while (!ctgTestStop) { - code = catalogGetDBVgroup(pCtg, mockPointer, (const SEpSet *)mockPointer, ctgTestDbname, false, &vgList); + code = catalogGetDBVgInfo(pCtg, mockPointer, (const SEpSet *)mockPointer, ctgTestDbname, false, &vgList); if (code) { assert(0); } @@ -635,12 +634,12 @@ void *ctgTestGetDbVgroupThread(void *param) { void *ctgTestSetSameDbVgroupThread(void *param) { struct SCatalog *pCtg = (struct SCatalog *)param; int32_t code = 0; - SDBVgroupInfo *dbVgroup = NULL; + SDBVgInfo *dbVgroup = NULL; int32_t n = 0; while (!ctgTestStop) { ctgTestBuildDBVgroup(&dbVgroup); - code = catalogUpdateDBVgroup(pCtg, ctgTestDbname, ctgTestDbId, dbVgroup); + code = catalogUpdateDBVgInfo(pCtg, ctgTestDbname, ctgTestDbId, dbVgroup); if (code) { assert(0); } @@ -660,12 +659,12 @@ void *ctgTestSetSameDbVgroupThread(void *param) { void *ctgTestSetDiffDbVgroupThread(void *param) { struct SCatalog *pCtg = (struct SCatalog *)param; int32_t code = 0; - SDBVgroupInfo *dbVgroup = NULL; + SDBVgInfo *dbVgroup = NULL; int32_t n = 0; while (!ctgTestStop) { ctgTestBuildDBVgroup(&dbVgroup); - code = catalogUpdateDBVgroup(pCtg, ctgTestDbname, ctgTestDbId++, dbVgroup); + code = catalogUpdateDBVgInfo(pCtg, ctgTestDbname, ctgTestDbId++, dbVgroup); if (code) { assert(0); } @@ -716,14 +715,22 @@ void *ctgTestGetCtableMetaThread(void *param) { void *ctgTestSetCtableMetaThread(void *param) { struct SCatalog *pCtg = (struct SCatalog *)param; int32_t code = 0; - SDBVgroupInfo dbVgroup = {0}; + SDBVgInfo dbVgroup = {0}; int32_t n = 0; STableMetaOutput output = {0}; ctgTestBuildCTableMetaOutput(&output); + SCtgMetaAction action = {0}; + + action.act = CTG_ACT_UPDATE_TBL; while (!ctgTestStop) { - code = ctgUpdateTableMetaCache(pCtg, &output); + SCtgUpdateTblMsg *msg = malloc(sizeof(SCtgUpdateTblMsg)); + msg->pCtg = pCtg; + msg->output = output; + action.data = msg; + + code = ctgActUpdateTbl(&action); if (code) { assert(0); } @@ -984,7 +991,7 @@ TEST(tableMeta, superTableCase) { ASSERT_EQ(tableMeta->tableInfo.rowSize, 12); tableMeta = NULL; - code = catalogRenewAndGetTableMeta(pCtg, mockPointer, (const SEpSet *)mockPointer, &n, &tableMeta, 0); + code = catalogRefreshGetTableMeta(pCtg, mockPointer, (const SEpSet *)mockPointer, &n, &tableMeta, 0); ASSERT_EQ(code, 0); ASSERT_EQ(tableMeta->vgId, 9); ASSERT_EQ(tableMeta->tableType, TSDB_CHILD_TABLE); @@ -1174,7 +1181,7 @@ TEST(tableDistVgroup, normalTable) { strcpy(n.dbname, "db1"); strcpy(n.tname, ctgTestTablename); - code = catalogGetTableDistVgroup(pCtg, mockPointer, (const SEpSet *)mockPointer, &n, &vgList); + code = catalogGetTableDistVgInfo(pCtg, mockPointer, (const SEpSet *)mockPointer, &n, &vgList); ASSERT_EQ(code, 0); ASSERT_EQ(taosArrayGetSize((const SArray *)vgList), 1); vgInfo = (SVgroupInfo *)taosArrayGet(vgList, 0); @@ -1206,7 +1213,7 @@ TEST(tableDistVgroup, childTableCase) { strcpy(n.dbname, "db1"); strcpy(n.tname, ctgTestCTablename); - code = catalogGetTableDistVgroup(pCtg, mockPointer, (const SEpSet *)mockPointer, &n, &vgList); + code = catalogGetTableDistVgInfo(pCtg, mockPointer, (const SEpSet *)mockPointer, &n, &vgList); ASSERT_EQ(code, 0); ASSERT_EQ(taosArrayGetSize((const SArray *)vgList), 1); vgInfo = (SVgroupInfo *)taosArrayGet(vgList, 0); @@ -1237,7 +1244,7 @@ TEST(tableDistVgroup, superTableCase) { strcpy(n.dbname, "db1"); strcpy(n.tname, ctgTestSTablename); - code = catalogGetTableDistVgroup(pCtg, mockPointer, (const SEpSet *)mockPointer, &n, &vgList); + code = catalogGetTableDistVgInfo(pCtg, mockPointer, (const SEpSet *)mockPointer, &n, &vgList); ASSERT_EQ(code, 0); ASSERT_EQ(taosArrayGetSize((const SArray *)vgList), 10); vgInfo = (SVgroupInfo *)taosArrayGet(vgList, 0); @@ -1258,7 +1265,7 @@ TEST(dbVgroup, getSetDbVgroupCase) { void *mockPointer = (void *)0x1; SVgroupInfo vgInfo = {0}; SVgroupInfo *pvgInfo = NULL; - SDBVgroupInfo *dbVgroup = NULL; + SDBVgInfo *dbVgroup = NULL; SArray *vgList = NULL; ctgTestInitLogFile(); @@ -1279,7 +1286,7 @@ TEST(dbVgroup, getSetDbVgroupCase) { strcpy(n.dbname, "db1"); strcpy(n.tname, ctgTestTablename); - code = catalogGetDBVgroup(pCtg, mockPointer, (const SEpSet *)mockPointer, ctgTestDbname, false, &vgList); + code = catalogGetDBVgInfo(pCtg, mockPointer, (const SEpSet *)mockPointer, ctgTestDbname, false, &vgList); ASSERT_EQ(code, 0); ASSERT_EQ(taosArrayGetSize((const SArray *)vgList), ctgTestVgNum); @@ -1288,7 +1295,7 @@ TEST(dbVgroup, getSetDbVgroupCase) { ASSERT_EQ(vgInfo.vgId, 8); ASSERT_EQ(vgInfo.epset.numOfEps, 3); - code = catalogGetTableDistVgroup(pCtg, mockPointer, (const SEpSet *)mockPointer, &n, &vgList); + code = catalogGetTableDistVgInfo(pCtg, mockPointer, (const SEpSet *)mockPointer, &n, &vgList); ASSERT_EQ(code, 0); ASSERT_EQ(taosArrayGetSize((const SArray *)vgList), 1); pvgInfo = (SVgroupInfo *)taosArrayGet(vgList, 0); @@ -1297,7 +1304,7 @@ TEST(dbVgroup, getSetDbVgroupCase) { taosArrayDestroy(vgList); ctgTestBuildDBVgroup(&dbVgroup); - code = catalogUpdateDBVgroup(pCtg, ctgTestDbname, ctgTestDbId, dbVgroup); + code = catalogUpdateDBVgInfo(pCtg, ctgTestDbname, ctgTestDbId, dbVgroup); ASSERT_EQ(code, 0); code = catalogGetTableHashVgroup(pCtg, mockPointer, (const SEpSet *)mockPointer, &n, &vgInfo); @@ -1305,7 +1312,7 @@ TEST(dbVgroup, getSetDbVgroupCase) { ASSERT_EQ(vgInfo.vgId, 7); ASSERT_EQ(vgInfo.epset.numOfEps, 2); - code = catalogGetTableDistVgroup(pCtg, mockPointer, (const SEpSet *)mockPointer, &n, &vgList); + code = catalogGetTableDistVgInfo(pCtg, mockPointer, (const SEpSet *)mockPointer, &n, &vgList); ASSERT_EQ(code, 0); ASSERT_EQ(taosArrayGetSize((const SArray *)vgList), 1); pvgInfo = (SVgroupInfo *)taosArrayGet(vgList, 0); @@ -1321,7 +1328,7 @@ TEST(multiThread, getSetRmSameDbVgroup) { void *mockPointer = (void *)0x1; SVgroupInfo vgInfo = {0}; SVgroupInfo *pvgInfo = NULL; - SDBVgroupInfo dbVgroup = {0}; + SDBVgInfo dbVgroup = {0}; SArray *vgList = NULL; ctgTestStop = false; @@ -1372,7 +1379,7 @@ TEST(multiThread, getSetRmDiffDbVgroup) { void *mockPointer = (void *)0x1; SVgroupInfo vgInfo = {0}; SVgroupInfo *pvgInfo = NULL; - SDBVgroupInfo dbVgroup = {0}; + SDBVgInfo dbVgroup = {0}; SArray *vgList = NULL; ctgTestStop = false; @@ -1425,7 +1432,7 @@ TEST(multiThread, ctableMeta) { void *mockPointer = (void *)0x1; SVgroupInfo vgInfo = {0}; SVgroupInfo *pvgInfo = NULL; - SDBVgroupInfo dbVgroup = {0}; + SDBVgInfo dbVgroup = {0}; SArray *vgList = NULL; ctgTestStop = false; @@ -1477,7 +1484,7 @@ TEST(rentTest, allRent) { void *mockPointer = (void *)0x1; SVgroupInfo vgInfo = {0}; SVgroupInfo *pvgInfo = NULL; - SDBVgroupInfo dbVgroup = {0}; + SDBVgInfo dbVgroup = {0}; SArray *vgList = NULL; ctgTestStop = false; SDbVgVersion *dbs = NULL; diff --git a/source/libs/parser/src/astValidate.c b/source/libs/parser/src/astValidate.c index a56a6524fca771bb527bcc8acec31e0ce524513f..ffc9f4a3f6538419e80278a4bb2162439d187770 100644 --- a/source/libs/parser/src/astValidate.c +++ b/source/libs/parser/src/astValidate.c @@ -3647,7 +3647,7 @@ int32_t evaluateSqlNode(SSqlNode* pNode, int32_t tsPrecision, SMsgBuf* pMsgBuf) //TODO remove it int32_t setTableVgroupList(SParseContext *pCtx, SName* name, SVgroupsInfo **pVgList) { SArray* vgroupList = NULL; - int32_t code = catalogGetTableDistVgroup(pCtx->pCatalog, pCtx->pTransporter, &pCtx->mgmtEpSet, name, &vgroupList); + int32_t code = catalogGetTableDistVgInfo(pCtx->pCatalog, pCtx->pTransporter, &pCtx->mgmtEpSet, name, &vgroupList); if (code != TSDB_CODE_SUCCESS) { return code; } diff --git a/source/libs/parser/src/dCDAstProcess.c b/source/libs/parser/src/dCDAstProcess.c index 50ae3bfd262812be8cec8e2d33d467994875a0f1..bbe394d67f3ddf987ff896aa0918e556bbbe6fc3 100644 --- a/source/libs/parser/src/dCDAstProcess.c +++ b/source/libs/parser/src/dCDAstProcess.c @@ -50,7 +50,7 @@ static int32_t setShowInfo(SShowInfo* pShowInfo, SParseContext* pCtx, void** out char dbFname[TSDB_DB_FNAME_LEN] = {0}; tNameGetFullDbName(&name, dbFname); - int32_t code = catalogGetDBVgroup(pCtx->pCatalog, pCtx->pTransporter, &pCtx->mgmtEpSet, dbFname, false, &array); + int32_t code = catalogGetDBVgInfo(pCtx->pCatalog, pCtx->pTransporter, &pCtx->mgmtEpSet, dbFname, false, &array); if (code != TSDB_CODE_SUCCESS) { terrno = code; return code; diff --git a/source/libs/qcom/src/querymsg.c b/source/libs/qcom/src/querymsg.c index 3e14bfca094bd548fe8918cc0be9f6bab4d932ae..9f5e1b93e3d354ce2f93335ad90c75a89f44eca2 100644 --- a/source/libs/qcom/src/querymsg.c +++ b/source/libs/qcom/src/querymsg.c @@ -113,9 +113,9 @@ int32_t queryProcessUseDBRsp(void* output, char *msg, int32_t msgSize) { return TSDB_CODE_TSC_VALUE_OUT_OF_RANGE; } - pOut->dbVgroup = calloc(1, sizeof(SDBVgroupInfo)); + pOut->dbVgroup = calloc(1, sizeof(SDBVgInfo)); if (NULL == pOut->dbVgroup) { - qError("calloc %d failed", (int32_t)sizeof(SDBVgroupInfo)); + qError("calloc %d failed", (int32_t)sizeof(SDBVgInfo)); return TSDB_CODE_TSC_OUT_OF_MEMORY; }