提交 5f2dc140 编写于 作者: D dapan1121

feature/qnode

上级 e31e8336
......@@ -30,7 +30,7 @@ extern "C" {
#include "tmsg.h"
#include "transport.h"
struct SCatalog;
typedef struct SCatalog SCatalog;
enum {
CTG_DBG_DB_NUM = 1,
......@@ -64,6 +64,7 @@ typedef struct SCatalogCfg {
typedef struct SSTableMetaVersion {
char dbFName[TSDB_DB_FNAME_LEN];
char stbName[TSDB_TABLE_NAME_LEN];
uint64_t dbId;
uint64_t suid;
int16_t sversion;
int16_t tversion;
......@@ -84,7 +85,7 @@ int32_t catalogInit(SCatalogCfg *cfg);
* @param catalogHandle (output, NO need to free it)
* @return error code
*/
int32_t catalogGetHandle(uint64_t clusterId, struct SCatalog** catalogHandle);
int32_t catalogGetHandle(uint64_t clusterId, SCatalog** catalogHandle);
/**
* Free a cluster's all catalog info, usually it's not necessary, until the application is closing.
......@@ -92,9 +93,9 @@ int32_t catalogGetHandle(uint64_t clusterId, struct SCatalog** catalogHandle);
* @param pCatalog (input, NO more usage)
* @return error code
*/
void catalogFreeHandle(struct SCatalog* pCatalog);
void catalogFreeHandle(SCatalog* pCatalog);
int32_t catalogGetDBVgroupVersion(struct SCatalog* pCatalog, const char* dbName, int32_t* version);
int32_t catalogGetDBVgVersion(SCatalog* pCatalog, const char* dbName, int32_t* version);
/**
* Get a DB's all vgroup info.
......@@ -106,13 +107,13 @@ int32_t catalogGetDBVgroupVersion(struct SCatalog* pCatalog, const char* dbName,
* @param pVgroupList (output, vgroup info list, element is SVgroupInfo, NEED to simply free the array by caller)
* @return error code
*/
int32_t catalogGetDBVgroup(struct SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const char* pDBName, bool forceUpdate, SArray** pVgroupList);
int32_t catalogGetDBVgInfo(SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const char* pDBName, bool forceUpdate, SArray** pVgroupList);
int32_t catalogUpdateDBVgroup(struct SCatalog* pCatalog, const char* dbName, uint64_t dbId, SDBVgroupInfo* dbInfo);
int32_t catalogUpdateDBVgInfo(SCatalog* pCatalog, const char* dbName, uint64_t dbId, SDBVgInfo* dbInfo);
int32_t catalogRemoveDB(struct SCatalog* pCatalog, const char* dbName, uint64_t dbId);
int32_t catalogRemoveDB(SCatalog* pCatalog, const char* dbName, uint64_t dbId);
int32_t catalogRemoveSTableMeta(struct SCatalog* pCatalog, const char* dbName, const char* stbName, uint64_t suid);
int32_t catalogRemoveStbMeta(SCatalog* pCtg, const char* dbFName, uint64_t dbId, const char* stbName, uint64_t suid);
/**
* Get a table's meta data.
......@@ -123,7 +124,7 @@ int32_t catalogRemoveSTableMeta(struct SCatalog* pCatalog, const char* dbName, c
* @param pTableMeta(output, table meta data, NEED to free it by calller)
* @return error code
*/
int32_t catalogGetTableMeta(struct SCatalog* pCatalog, void * pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, STableMeta** pTableMeta);
int32_t catalogGetTableMeta(SCatalog* pCatalog, void * pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, STableMeta** pTableMeta);
/**
* Get a super table's meta data.
......@@ -134,13 +135,13 @@ int32_t catalogGetTableMeta(struct SCatalog* pCatalog, void * pTransporter, cons
* @param pTableMeta(output, table meta data, NEED to free it by calller)
* @return error code
*/
int32_t catalogGetSTableMeta(struct SCatalog* pCatalog, void * pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, STableMeta** pTableMeta);
int32_t catalogGetSTableMeta(SCatalog* pCatalog, void * pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, STableMeta** pTableMeta);
int32_t catalogUpdateSTableMeta(struct SCatalog* pCatalog, STableMetaRsp *rspMsg);
int32_t catalogUpdateSTableMeta(SCatalog* pCatalog, STableMetaRsp *rspMsg);
/**
* Force renew a table's local cached meta data.
* Force refresh a table's local cached meta data.
* @param pCatalog (input, got with catalogGetHandle)
* @param pTransporter (input, rpc object)
* @param pMgmtEps (input, mnode EPs)
......@@ -148,10 +149,10 @@ int32_t catalogUpdateSTableMeta(struct SCatalog* pCatalog, STableMetaRsp *rspMsg
* @param isSTable (input, is super table or not, 1:supposed to be stable, 0: supposed not to be stable, -1:not sure)
* @return error code
*/
int32_t catalogRenewTableMeta(struct SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, int32_t isSTable);
int32_t catalogRefreshTableMeta(SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, int32_t isSTable);
/**
* Force renew a table's local cached meta data and get the new one.
* Force refresh a table's local cached meta data and get the new one.
* @param pCatalog (input, got with catalogGetHandle)
* @param pTransporter (input, rpc object)
* @param pMgmtEps (input, mnode EPs)
......@@ -160,7 +161,7 @@ int32_t catalogUpdateSTableMeta(struct SCatalog* pCatalog, STableMetaRsp *rspMsg
* @param isSTable (input, is super table or not, 1:supposed to be stable, 0: supposed not to be stable, -1:not sure)
* @return error code
*/
int32_t catalogRenewAndGetTableMeta(struct SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, STableMeta** pTableMeta, int32_t isSTable);
int32_t catalogRefreshGetTableMeta(SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, STableMeta** pTableMeta, int32_t isSTable);
......@@ -173,7 +174,7 @@ int32_t catalogUpdateSTableMeta(struct SCatalog* pCatalog, STableMetaRsp *rspMsg
* @param pVgroupList (output, vgroup info list, element is SVgroupInfo, NEED to simply free the array by caller)
* @return error code
*/
int32_t catalogGetTableDistVgroup(struct SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, SArray** pVgroupList);
int32_t catalogGetTableDistVgInfo(SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, SArray** pVgroupList);
/**
* Get a table's vgroup from its name's hash value.
......@@ -184,7 +185,7 @@ int32_t catalogGetTableDistVgroup(struct SCatalog* pCatalog, void *pTransporter,
* @param vgInfo (output, vgroup info)
* @return error code
*/
int32_t catalogGetTableHashVgroup(struct SCatalog* pCatalog, void * pTransporter, const SEpSet* pMgmtEps, const SName* pName, SVgroupInfo* vgInfo);
int32_t catalogGetTableHashVgroup(SCatalog* pCatalog, void * pTransporter, const SEpSet* pMgmtEps, const SName* pName, SVgroupInfo* vgInfo);
/**
......@@ -196,14 +197,14 @@ int32_t catalogGetTableHashVgroup(struct SCatalog* pCatalog, void * pTransporter
* @param pRsp (output, response data)
* @return error code
*/
int32_t catalogGetAllMeta(struct SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const SCatalogReq* pReq, SMetaData* pRsp);
int32_t catalogGetAllMeta(SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const SCatalogReq* pReq, SMetaData* pRsp);
int32_t catalogGetQnodeList(struct SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, SArray* pQnodeList);
int32_t catalogGetQnodeList(SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, SArray* pQnodeList);
int32_t catalogGetExpiredSTables(struct SCatalog* pCatalog, SSTableMetaVersion **stables, uint32_t *num);
int32_t catalogGetExpiredSTables(SCatalog* pCatalog, SSTableMetaVersion **stables, uint32_t *num);
int32_t catalogGetExpiredDBs(struct SCatalog* pCatalog, SDbVgVersion **dbs, uint32_t *num);
int32_t catalogGetExpiredDBs(SCatalog* pCatalog, SDbVgVersion **dbs, uint32_t *num);
/**
......
......@@ -80,16 +80,16 @@ typedef struct STableMeta {
SSchema schema[];
} STableMeta;
typedef struct SDBVgroupInfo {
typedef struct SDBVgInfo {
int32_t vgVersion;
int8_t hashMethod;
SHashObj *vgHash; //key:vgId, value:SVgroupInfo
} SDBVgroupInfo;
} SDBVgInfo;
typedef struct SUseDbOutput {
char db[TSDB_DB_FNAME_LEN];
uint64_t dbId;
SDBVgroupInfo *dbVgroup;
SDBVgInfo *dbVgroup;
} SUseDbOutput;
enum {
......
......@@ -43,7 +43,7 @@ static int32_t hbProcessDBInfoRsp(void *value, int32_t valueLen, struct SCatalog
if (rsp->vgVersion < 0) {
code = catalogRemoveDB(pCatalog, rsp->db, rsp->uid);
} else {
SDBVgroupInfo vgInfo = {0};
SDBVgInfo vgInfo = {0};
vgInfo.vgVersion = rsp->vgVersion;
vgInfo.hashMethod = rsp->hashMethod;
vgInfo.vgHash = taosHashInit(rsp->vgNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
......@@ -68,10 +68,7 @@ static int32_t hbProcessDBInfoRsp(void *value, int32_t valueLen, struct SCatalog
}
}
code = catalogUpdateDBVgroup(pCatalog, rsp->db, rsp->uid, &vgInfo);
if (code) {
taosHashCleanup(vgInfo.vgHash);
}
catalogUpdateDBVgInfo(pCatalog, rsp->db, rsp->uid, &vgInfo);
}
if (code) {
......@@ -94,13 +91,14 @@ static int32_t hbProcessStbInfoRsp(void *value, int32_t valueLen, struct SCatalo
rsp->numOfColumns = ntohl(rsp->numOfColumns);
rsp->suid = be64toh(rsp->suid);
rsp->dbId = be64toh(rsp->dbId);
if (rsp->numOfColumns < 0) {
schemaNum = 0;
tscDebug("hb remove stb, db:%s, stb:%s", rsp->dbFName, rsp->stbName);
catalogRemoveSTableMeta(pCatalog, rsp->dbFName, rsp->stbName, rsp->suid);
catalogRemoveStbMeta(pCatalog, rsp->dbFName, rsp->dbId, rsp->stbName, rsp->suid);
} else {
tscDebug("hb update stb, db:%s, stb:%s", rsp->dbFName, rsp->stbName);
......
......@@ -382,7 +382,7 @@ static void *dnodeOpenVnodeFunc(void *param) {
pMgmt->openVnodes, pMgmt->totalVnodes);
dndReportStartup(pDnode, "open-vnodes", stepDesc);
SVnodeCfg cfg = {.pDnode = pDnode, .pTfs = pDnode->pTfs, .vgId = pCfg->vgId};
SVnodeCfg cfg = {.pDnode = pDnode, .pTfs = pDnode->pTfs, .vgId = pCfg->vgId, .dbId = pCfg->dbUid};
SVnode * pImpl = vnodeOpen(pCfg->path, &cfg);
if (pImpl == NULL) {
dError("vgId:%d, failed to open vnode by thread:%d", pCfg->vgId, pThread->threadIndex);
......@@ -594,6 +594,7 @@ int32_t dndProcessCreateVnodeReq(SDnode *pDnode, SRpcMsg *pReq) {
vnodeCfg.pDnode = pDnode;
vnodeCfg.pTfs = pDnode->pTfs;
vnodeCfg.dbId = wrapperCfg.dbUid;
SVnode *pImpl = vnodeOpen(wrapperCfg.path, &vnodeCfg);
if (pImpl == NULL) {
dError("vgId:%d, failed to create vnode since %s", pCreate->vgId, terrstr());
......
......@@ -42,6 +42,7 @@ typedef struct STqCfg {
typedef struct SVnodeCfg {
int32_t vgId;
uint64_t dbId;
SDnode *pDnode;
STfs *pTfs;
uint64_t wsize;
......
......@@ -29,6 +29,7 @@ SVnode *vnodeOpen(const char *path, const SVnodeCfg *pVnodeCfg) {
cfg.vgId = pVnodeCfg->vgId;
cfg.pDnode = pVnodeCfg->pDnode;
cfg.pTfs = pVnodeCfg->pTfs;
cfg.dbId = pVnodeCfg->dbId;
}
// Validate options
......
......@@ -93,6 +93,7 @@ static int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg) {
if (pTbCfg->type == META_CHILD_TABLE) {
pStbCfg = metaGetTbInfoByUid(pVnode->pMeta, pTbCfg->ctbCfg.suid);
if (pStbCfg == NULL) {
code = TSDB_CODE_VND_TB_NOT_EXIST;
goto _exit;
}
......@@ -116,9 +117,11 @@ static int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg) {
msgLen = sizeof(STableMetaRsp) + sizeof(SSchema) * (nCols + nTagCols);
pTbMetaMsg = (STableMetaRsp *)rpcMallocCont(msgLen);
if (pTbMetaMsg == NULL) {
code = TSDB_CODE_VND_OUT_OF_MEMORY;
goto _exit;
}
pTbMetaMsg->dbId = htobe64(pVnode->config.dbId);
memcpy(pTbMetaMsg->dbFName, pReq->dbFName, sizeof(pTbMetaMsg->dbFName));
strcpy(pTbMetaMsg->tbName, pReq->tbName);
if (pTbCfg->type == META_CHILD_TABLE) {
......
......@@ -27,7 +27,7 @@ extern "C" {
#define CTG_DEFAULT_CACHE_CLUSTER_NUMBER 6
#define CTG_DEFAULT_CACHE_VGROUP_NUMBER 100
#define CTG_DEFAULT_CACHE_DB_NUMBER 20
#define CTG_DEFAULT_CACHE_TABLEMETA_NUMBER 10000
#define CTG_DEFAULT_CACHE_TBLMETA_NUMBER 1000
#define CTG_DEFAULT_RENT_SECOND 10
#define CTG_DEFAULT_RENT_SLOT_SIZE 10
......@@ -47,9 +47,19 @@ enum {
CTG_RENT_STABLE,
};
enum {
CTG_ACT_UPDATE_VG = 0,
CTG_ACT_UPDATE_TBL,
CTG_ACT_REMOVE_DB,
CTG_ACT_REMOVE_STB,
CTG_ACT_REMOVE_TBL,
CTG_ACT_MAX
};
typedef struct SCtgDebug {
bool lockDebug;
bool cacheDebug;
bool apiDebug;
uint32_t showCachePeriodSec;
} SCtgDebug;
......@@ -65,7 +75,7 @@ typedef struct SCtgDBCache {
SRWLatch vgLock;
uint64_t dbId;
int8_t deleted;
SDBVgroupInfo *vgInfo;
SDBVgInfo *vgInfo;
SCtgTbMetaCache tbCache;
} SCtgDBCache;
......@@ -85,7 +95,6 @@ typedef struct SCtgRentMgmt {
typedef struct SCatalog {
uint64_t clusterId;
SRWLatch dbLock;
SHashObj *dbCache; //key:dbname, value:SCtgDBCache
SCtgRentMgmt dbRent;
SCtgRentMgmt stbRent;
......@@ -109,15 +118,57 @@ typedef struct SCatalogStat {
SCtgCacheStat cache;
} SCatalogStat;
typedef struct SCtgUpdateVgMsg {
SCatalog* pCtg;
char dbFName[TSDB_DB_FNAME_LEN];
uint64_t dbId;
SDBVgInfo* dbInfo;
} SCtgUpdateVgMsg;
typedef struct SCtgUpdateTblMsg {
SCatalog* pCtg;
STableMetaOutput* output;
} SCtgUpdateTblMsg;
typedef struct SCtgRemoveDBMsg {
SCatalog* pCtg;
char dbFName[TSDB_DB_FNAME_LEN];
uint64_t dbId;
} SCtgRemoveDBMsg;
typedef struct SCtgRemoveStbMsg {
SCatalog* pCtg;
char dbFName[TSDB_DB_FNAME_LEN];
char stbName[TSDB_TABLE_NAME_LEN];
uint64_t dbId;
uint64_t suid;
} SCtgRemoveStbMsg;
typedef struct SCtgMetaAction {
int32_t act;
void *data;
} SCtgMetaAction;
typedef struct SCtgQNode {
SCtgMetaAction action;
struct SCtgQNode *next;
} SCtgQNode;
typedef struct SCatalogMgmt {
bool exit;
SRWLatch lock;
SRWLatch qlock;
SCtgQNode *head;
SCtgQNode *tail;
tsem_t sem;
pthread_t updateThread;
SHashObj *pCluster; //key: clusterId, value: SCatalog*
SCatalogStat stat;
SCatalogCfg cfg;
} SCatalogMgmt;
typedef uint32_t (*tableNameHashFp)(const char *, uint32_t);
typedef int32_t (*ctgActFunc)(SCtgMetaAction *);
#define CTG_IS_META_NULL(type) ((type) == META_TYPE_NULL_TABLE)
#define CTG_IS_META_CTABLE(type) ((type) == META_TYPE_CTABLE)
......@@ -130,17 +181,21 @@ typedef uint32_t (*tableNameHashFp)(const char *, uint32_t);
#define CTG_SET_STABLE(isSTable, tbType) do { (isSTable) = ((tbType) == TSDB_SUPER_TABLE) ? 1 : ((tbType) > TSDB_SUPER_TABLE ? 0 : -1); } while (0)
#define CTG_TBTYPE_MATCH(isSTable, tbType) (CTG_IS_UNKNOWN_STABLE(isSTable) || (CTG_IS_STABLE(isSTable) && (tbType) == TSDB_SUPER_TABLE) || (CTG_IS_NOT_STABLE(isSTable) && (tbType) != TSDB_SUPER_TABLE))
#define CTG_META_SIZE(pMeta) (sizeof(STableMeta) + ((pMeta)->tableInfo.numOfTags + (pMeta)->tableInfo.numOfColumns) * sizeof(SSchema))
#define CTG_TABLE_NOT_EXIST(code) (code == CTG_ERR_CODE_TABLE_NOT_EXIST)
#define CTG_DB_NOT_EXIST(code) (code == TSDB_CODE_MND_DB_NOT_EXIST)
#define ctgFatal(param, ...) qFatal("CTG:%p " param, pCatalog, __VA_ARGS__)
#define ctgError(param, ...) qError("CTG:%p " param, pCatalog, __VA_ARGS__)
#define ctgWarn(param, ...) qWarn("CTG:%p " param, pCatalog, __VA_ARGS__)
#define ctgInfo(param, ...) qInfo("CTG:%p " param, pCatalog, __VA_ARGS__)
#define ctgDebug(param, ...) qDebug("CTG:%p " param, pCatalog, __VA_ARGS__)
#define ctgTrace(param, ...) qTrace("CTG:%p " param, pCatalog, __VA_ARGS__)
#define ctgFatal(param, ...) qFatal("CTG:%p " param, pCtg, __VA_ARGS__)
#define ctgError(param, ...) qError("CTG:%p " param, pCtg, __VA_ARGS__)
#define ctgWarn(param, ...) qWarn("CTG:%p " param, pCtg, __VA_ARGS__)
#define ctgInfo(param, ...) qInfo("CTG:%p " param, pCtg, __VA_ARGS__)
#define ctgDebug(param, ...) qDebug("CTG:%p " param, pCtg, __VA_ARGS__)
#define ctgTrace(param, ...) qTrace("CTG:%p " param, pCtg, __VA_ARGS__)
#define CTG_LOCK_DEBUG(...) do { if (gCTGDebug.lockDebug) { qDebug(__VA_ARGS__); } } while (0)
#define CTG_CACHE_DEBUG(...) do { if (gCTGDebug.cacheDebug) { qDebug(__VA_ARGS__); } } while (0)
#define CTG_API_DEBUG(...) do { if (gCTGDebug.apiDebug) { qDebug(__VA_ARGS__); } } while (0)
#define TD_RWLATCH_WRITE_FLAG_COPY 0x40000000
......@@ -181,8 +236,8 @@ typedef uint32_t (*tableNameHashFp)(const char *, uint32_t);
#define CTG_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; } return _code; } while (0)
#define CTG_ERR_JRET(c) do { code = c; if (code != TSDB_CODE_SUCCESS) { terrno = code; goto _return; } } while (0)
#define CTG_API_ENTER() do { CTG_LOCK(CTG_READ, &ctgMgmt.lock); if (atomic_load_8(&ctgMgmt.exit)) { CTG_RET(TSDB_CODE_CTG_OUT_OF_SERVICE); } } while (0)
#define CTG_API_LEAVE(c) do { int32_t __code = c; CTG_UNLOCK(CTG_READ, &ctgMgmt.lock); CTG_RET(__code); } while (0)
#define CTG_API_ENTER() do { CTG_API_DEBUG("enter %s", __FUNCTION__); CTG_LOCK(CTG_READ, &ctgMgmt.lock); if (atomic_load_8(&ctgMgmt.exit)) { CTG_UNLOCK(CTG_READ, &ctgMgmt.lock); CTG_RET(TSDB_CODE_CTG_OUT_OF_SERVICE); } } while (0)
#define CTG_API_LEAVE(c) do { int32_t __code = c; CTG_UNLOCK(CTG_READ, &ctgMgmt.lock); CTG_API_DEBUG("leave %s", __FUNCTION__); CTG_RET(__code); } while (0)
......
......@@ -22,6 +22,8 @@ SCatalogMgmt ctgMgmt = {0};
SCtgDebug gCTGDebug = {0};
ctgActFunc ctgActFuncs[] = {ctgActUpdateVg, ctgActUpdateTbl, ctgActRemoveDB, ctgActRemoveStb, ctgActRemoveTbl};
int32_t ctgDbgGetTbMetaNum(SCtgDBCache *dbCache) {
return dbCache->tbCache.metaCache ? (int32_t)taosHashGetSize(dbCache->tbCache.metaCache) : 0;
}
......@@ -44,25 +46,25 @@ int32_t ctgDbgGetRentNum(SCtgRentMgmt *rent) {
return num;
}
int32_t ctgDbgGetClusterCacheNum(struct SCatalog* pCatalog, int32_t type) {
if (NULL == pCatalog || NULL == pCatalog->dbCache) {
int32_t ctgDbgGetClusterCacheNum(SCatalog* pCtg, int32_t type) {
if (NULL == pCtg || NULL == pCtg->dbCache) {
return 0;
}
switch (type) {
case CTG_DBG_DB_NUM:
return (int32_t)taosHashGetSize(pCatalog->dbCache);
return (int32_t)taosHashGetSize(pCtg->dbCache);
case CTG_DBG_DB_RENT_NUM:
return ctgDbgGetRentNum(&pCatalog->dbRent);
return ctgDbgGetRentNum(&pCtg->dbRent);
case CTG_DBG_STB_RENT_NUM:
return ctgDbgGetRentNum(&pCatalog->stbRent);
return ctgDbgGetRentNum(&pCtg->stbRent);
default:
break;
}
SCtgDBCache *dbCache = NULL;
int32_t num = 0;
void *pIter = taosHashIterate(pCatalog->dbCache, NULL);
void *pIter = taosHashIterate(pCtg->dbCache, NULL);
while (pIter) {
dbCache = (SCtgDBCache *)pIter;
switch (type) {
......@@ -76,7 +78,7 @@ int32_t ctgDbgGetClusterCacheNum(struct SCatalog* pCatalog, int32_t type) {
ctgError("invalid type:%d", type);
break;
}
pIter = taosHashIterate(pCatalog->dbCache, pIter);
pIter = taosHashIterate(pCtg->dbCache, pIter);
}
return num;
......@@ -108,79 +110,18 @@ void ctgDbgShowDBCache(SHashObj *dbHash) {
void ctgDbgShowClusterCache(struct SCatalog* pCatalog) {
if (NULL == pCatalog) {
void ctgDbgShowClusterCache(SCatalog* pCtg) {
if (NULL == pCtg) {
return;
}
CTG_CACHE_DEBUG("## cluster %"PRIx64" %p cache Info ##", pCatalog->clusterId, pCatalog);
CTG_CACHE_DEBUG("db:%d meta:%d stb:%d dbRent:%d stbRent:%d", ctgDbgGetClusterCacheNum(pCatalog, CTG_DBG_DB_NUM), ctgDbgGetClusterCacheNum(pCatalog, CTG_DBG_META_NUM),
ctgDbgGetClusterCacheNum(pCatalog, CTG_DBG_STB_NUM), ctgDbgGetClusterCacheNum(pCatalog, CTG_DBG_DB_RENT_NUM), ctgDbgGetClusterCacheNum(pCatalog, CTG_DBG_STB_RENT_NUM));
ctgDbgShowDBCache(pCatalog->dbCache);
}
int32_t ctgInitDBCache(struct SCatalog* pCatalog) {
if (NULL == pCatalog->dbCache) {
SHashObj *cache = taosHashInit(ctgMgmt.cfg.maxDBCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
if (NULL == cache) {
ctgError("taosHashInit %d failed", CTG_DEFAULT_CACHE_DB_NUMBER);
CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
}
if (NULL != atomic_val_compare_exchange_ptr(&pCatalog->dbCache, NULL, cache)) {
taosHashCleanup(cache);
}
}
return TSDB_CODE_SUCCESS;
}
int32_t ctgInitTbMetaCache(struct SCatalog* pCatalog, SCtgDBCache *dbCache) {
if (NULL == dbCache->tbCache.metaCache) {
if (dbCache->deleted) {
ctgInfo("db is dropping, dbId:%"PRIx64, dbCache->dbId);
CTG_ERR_RET(TSDB_CODE_CTG_DB_DROPPED);
}
SHashObj *metaCache = taosHashInit(ctgMgmt.cfg.maxTblCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
if (NULL == metaCache) {
ctgError("taosHashInit failed, num:%d", ctgMgmt.cfg.maxTblCacheNum);
CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
}
if (NULL != atomic_val_compare_exchange_ptr(&dbCache->tbCache.metaCache, NULL, metaCache)) {
taosHashCleanup(metaCache);
}
}
return TSDB_CODE_SUCCESS;
}
int32_t ctgInitStbCache(struct SCatalog* pCatalog, SCtgDBCache *dbCache) {
if (NULL == dbCache->tbCache.stbCache) {
if (dbCache->deleted) {
ctgInfo("db is dropping, dbId:%"PRIx64, dbCache->dbId);
CTG_ERR_RET(TSDB_CODE_CTG_DB_DROPPED);
}
CTG_CACHE_DEBUG("## cluster %"PRIx64" %p cache Info ##", pCtg->clusterId, pCtg);
CTG_CACHE_DEBUG("db:%d meta:%d stb:%d dbRent:%d stbRent:%d", ctgDbgGetClusterCacheNum(pCtg, CTG_DBG_DB_NUM), ctgDbgGetClusterCacheNum(pCtg, CTG_DBG_META_NUM),
ctgDbgGetClusterCacheNum(pCtg, CTG_DBG_STB_NUM), ctgDbgGetClusterCacheNum(pCtg, CTG_DBG_DB_RENT_NUM), ctgDbgGetClusterCacheNum(pCtg, CTG_DBG_STB_RENT_NUM));
SHashObj *cache = taosHashInit(ctgMgmt.cfg.maxTblCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), true, HASH_ENTRY_LOCK);
if (NULL == cache) {
ctgError("taosHashInit failed, num:%d", ctgMgmt.cfg.maxTblCacheNum);
CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
}
if (NULL != atomic_val_compare_exchange_ptr(&dbCache->tbCache.stbCache, NULL, cache)) {
taosHashCleanup(cache);
}
}
return TSDB_CODE_SUCCESS;
ctgDbgShowDBCache(pCtg->dbCache);
}
void ctgFreeMetaRent(SCtgRentMgmt *mgmt) {
if (NULL == mgmt->slots) {
return;
......@@ -214,89 +155,129 @@ void ctgFreeTableMetaCache(SCtgTbMetaCache *cache) {
CTG_UNLOCK(CTG_WRITE, &cache->metaLock);
}
void ctgFreeVgInfo(SDBVgInfo *vgInfo) {
if (NULL == vgInfo) {
return;
}
if (vgInfo->vgHash) {
taosHashCleanup(vgInfo->vgHash);
vgInfo->vgHash = NULL;
}
tfree(vgInfo);
}
void ctgFreeDbCache(SCtgDBCache *dbCache) {
if (NULL == dbCache) {
return;
}
atomic_store_8(&dbCache->deleted, 1);
CTG_LOCK(CTG_WRITE, &dbCache->vgLock);
if (dbCache->vgInfo) {
if (dbCache->vgInfo->vgHash) {
taosHashCleanup(dbCache->vgInfo->vgHash);
dbCache->vgInfo->vgHash = NULL;
}
tfree(dbCache->vgInfo);
}
ctgFreeVgInfo (dbCache->vgInfo);
CTG_UNLOCK(CTG_WRITE, &dbCache->vgLock);
ctgFreeTableMetaCache(&dbCache->tbCache);
}
void ctgFreeHandle(struct SCatalog* pCatalog) {
ctgFreeMetaRent(&pCatalog->dbRent);
ctgFreeMetaRent(&pCatalog->stbRent);
if (pCatalog->dbCache) {
void *pIter = taosHashIterate(pCatalog->dbCache, NULL);
void ctgFreeHandle(SCatalog* pCtg) {
ctgFreeMetaRent(&pCtg->dbRent);
ctgFreeMetaRent(&pCtg->stbRent);
if (pCtg->dbCache) {
void *pIter = taosHashIterate(pCtg->dbCache, NULL);
while (pIter) {
SCtgDBCache *dbCache = pIter;
atomic_store_8(&dbCache->deleted, 1);
ctgFreeDbCache(dbCache);
pIter = taosHashIterate(pCatalog->dbCache, pIter);
pIter = taosHashIterate(pCtg->dbCache, pIter);
}
taosHashCleanup(pCatalog->dbCache);
taosHashCleanup(pCtg->dbCache);
}
free(pCatalog);
free(pCtg);
}
int32_t ctgGetDBVgroupFromCache(struct SCatalog* pCatalog, const char *dbFName, SCtgDBCache **dbCache, bool *inCache) {
if (NULL == pCatalog->dbCache) {
*inCache = false;
ctgWarn("empty db cache, dbFName:%s", dbFName);
int32_t ctgAcquireVgInfo(SCatalog *pCtg, SCtgDBCache *dbCache) {
CTG_LOCK(CTG_READ, &dbCache->vgLock);
if (NULL == dbCache->vgInfo) {
CTG_UNLOCK(CTG_READ, &dbCache->vgLock);
ctgDebug("db vgInfo is empty, dbId:%"PRIx64, dbCache->dbId);
return TSDB_CODE_SUCCESS;
}
SCtgDBCache *cache = NULL;
return TSDB_CODE_SUCCESS;
}
int32_t ctgWAcquireVgInfo(SCatalog *pCtg, SCtgDBCache *dbCache) {
CTG_LOCK(CTG_WRITE, &dbCache->vgLock);
if (dbCache->deleted) {
ctgDebug("db is dropping, dbId:%"PRIx64, dbCache->dbId);
CTG_UNLOCK(CTG_WRITE, &dbCache->vgLock);
CTG_ERR_RET(TSDB_CODE_CTG_DB_DROPPED);
}
return TSDB_CODE_SUCCESS;
}
while (true) {
cache = taosHashAcquire(pCatalog->dbCache, dbFName, strlen(dbFName));
void ctgReleaseDBCache(SCatalog *pCtg, SCtgDBCache *dbCache) {
taosHashRelease(pCtg->dbCache, dbCache);
}
if (NULL == cache) {
*inCache = false;
ctgWarn("not in db vgroup cache, dbFName:%s", dbFName);
return TSDB_CODE_SUCCESS;
}
void ctgReleaseVgInfo(SCtgDBCache *dbCache) {
CTG_UNLOCK(CTG_READ, &dbCache->vgLock);
}
CTG_LOCK(CTG_READ, &cache->vgLock);
if (NULL == cache->vgInfo) {
CTG_UNLOCK(CTG_READ, &cache->vgLock);
taosHashRelease(pCatalog->dbCache, cache);
ctgWarn("db cache vgInfo is NULL, dbFName:%s", dbFName);
continue;
}
void ctgWReleaseVgInfo(SCtgDBCache *dbCache) {
CTG_UNLOCK(CTG_WRITE, &dbCache->vgLock);
}
break;
int32_t ctgAcquireVgInfoFromCache(SCatalog* pCtg, const char *dbFName, SCtgDBCache **pCache, bool *inCache) {
if (NULL == pCtg->dbCache) {
*pCache = NULL;
*inCache = false;
ctgWarn("empty db cache, dbFName:%s", dbFName);
return TSDB_CODE_SUCCESS;
}
SCtgDBCache *dbCache = NULL;
ctgAcquireDBCache(pCtg, dbFName, &dbCache);
if (NULL == dbCache) {
*pCache = NULL;
*inCache = false;
return TSDB_CODE_SUCCESS;
}
ctgAcquireVgInfo(pCtg, dbCache);
if (NULL == dbCache->vgInfo) {
ctgReleaseDBCache(pCtg, dbCache);
*pCache = NULL;
*inCache = false;
return TSDB_CODE_SUCCESS;
}
*dbCache = cache;
*pCache = dbCache;
*inCache = true;
ctgDebug("Got db vgroup from cache, dbFName:%s", dbFName);
ctgDebug("Got db vgInfo from cache, dbFName:%s", dbFName);
return TSDB_CODE_SUCCESS;
}
int32_t ctgGetDBVgroupFromMnode(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, SBuildUseDBInput *input, SUseDbOutput *out) {
int32_t ctgGetDBVgInfoFromMnode(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, SBuildUseDBInput *input, SUseDbOutput *out) {
char *msg = NULL;
int32_t msgLen = 0;
......@@ -318,6 +299,11 @@ int32_t ctgGetDBVgroupFromMnode(struct SCatalog* pCatalog, void *pRpc, const SEp
rpcSendRecv(pRpc, (SEpSet*)pMgmtEps, &rpcMsg, &rpcRsp);
if (TSDB_CODE_SUCCESS != rpcRsp.code) {
if (CTG_DB_NOT_EXIST(rpcRsp.code)) {
ctgDebug("db not exist in mnode, dbFName:%s", input->db);
return rpcRsp.code;
}
ctgError("error rsp for use db, code:%s, db:%s", tstrerror(rpcRsp.code), input->db);
CTG_ERR_RET(rpcRsp.code);
}
......@@ -331,28 +317,27 @@ int32_t ctgGetDBVgroupFromMnode(struct SCatalog* pCatalog, void *pRpc, const SEp
return TSDB_CODE_SUCCESS;
}
int32_t ctgIsTableMetaExistInCache(struct SCatalog* pCatalog, char *dbFName, char* tbName, int32_t *exist) {
if (NULL == pCatalog->dbCache) {
int32_t ctgIsTableMetaExistInCache(SCatalog* pCtg, char *dbFName, char* tbName, int32_t *exist) {
if (NULL == pCtg->dbCache) {
*exist = 0;
ctgWarn("empty db cache, dbFName:%s, tbName:%s", dbFName, tbName);
return TSDB_CODE_SUCCESS;
}
SCtgDBCache *dbCache = taosHashAcquire(pCatalog->dbCache, dbFName, strlen(dbFName));
SCtgDBCache *dbCache = NULL;
ctgAcquireDBCache(pCtg, dbFName, &dbCache);
if (NULL == dbCache) {
*exist = 0;
ctgWarn("db not exist in cache, dbFName:%s", dbFName);
return TSDB_CODE_SUCCESS;
}
size_t sz = 0;
CTG_LOCK(CTG_READ, &dbCache->tbCache.metaLock);
STableMeta *tbMeta = taosHashGet(dbCache->tbCache.metaCache, tbName, strlen(tbName));
CTG_UNLOCK(CTG_READ, &dbCache->tbCache.metaLock);
if (NULL == tbMeta) {
taosHashRelease(pCatalog->dbCache, dbCache);
taosHashRelease(pCtg->dbCache, dbCache);
*exist = 0;
ctgDebug("tbmeta not in cache, dbFName:%s, tbName:%s", dbFName, tbName);
......@@ -361,7 +346,7 @@ int32_t ctgIsTableMetaExistInCache(struct SCatalog* pCatalog, char *dbFName, cha
*exist = 1;
taosHashRelease(pCatalog->dbCache, dbCache);
taosHashRelease(pCtg->dbCache, dbCache);
ctgDebug("tbmeta is in cache, dbFName:%s, tbName:%s", dbFName, tbName);
......@@ -369,29 +354,23 @@ int32_t ctgIsTableMetaExistInCache(struct SCatalog* pCatalog, char *dbFName, cha
}
int32_t ctgGetTableMetaFromCache(struct SCatalog* pCatalog, const SName* pTableName, STableMeta** pTableMeta, int32_t *exist) {
if (NULL == pCatalog->dbCache) {
int32_t ctgGetTableMetaFromCache(SCatalog* pCtg, const SName* pTableName, STableMeta** pTableMeta, int32_t *exist) {
if (NULL == pCtg->dbCache) {
*exist = 0;
ctgWarn("empty tbmeta cache, tbName:%s", pTableName->tname);
return TSDB_CODE_SUCCESS;
}
char db[TSDB_DB_FNAME_LEN] = {0};
tNameGetFullDbName(pTableName, db);
char dbFName[TSDB_DB_FNAME_LEN] = {0};
tNameGetFullDbName(pTableName, dbFName);
*pTableMeta = NULL;
SCtgDBCache *dbCache = taosHashAcquire(pCatalog->dbCache, db, strlen(db));
SCtgDBCache *dbCache = NULL;
ctgAcquireDBCache(pCtg, dbFName, &dbCache);
if (NULL == dbCache) {
*exist = 0;
ctgWarn("no db cache, dbFName:%s, tbName:%s", db, pTableName->tname);
return TSDB_CODE_SUCCESS;
}
if (NULL == dbCache->tbCache.metaCache) {
*exist = 0;
taosHashRelease(pCatalog->dbCache, dbCache);
ctgWarn("empty tbmeta cache, dbFName:%s, tbName:%s", db, pTableName->tname);
ctgWarn("no db cache, dbFName:%s, tbName:%s", dbFName, pTableName->tname);
return TSDB_CODE_SUCCESS;
}
......@@ -402,8 +381,8 @@ int32_t ctgGetTableMetaFromCache(struct SCatalog* pCatalog, const SName* pTableN
if (NULL == *pTableMeta) {
*exist = 0;
taosHashRelease(pCatalog->dbCache, dbCache);
ctgDebug("tbmeta not in cache, dbFName:%s, tbName:%s", db, pTableName->tname);
ctgReleaseDBCache(pCtg, dbCache);
ctgDebug("tbl not in cache, dbFName:%s, tbName:%s", dbFName, pTableName->tname);
return TSDB_CODE_SUCCESS;
}
......@@ -412,8 +391,8 @@ int32_t ctgGetTableMetaFromCache(struct SCatalog* pCatalog, const SName* pTableN
tbMeta = *pTableMeta;
if (tbMeta->tableType != TSDB_CHILD_TABLE) {
taosHashRelease(pCatalog->dbCache, dbCache);
ctgDebug("Got tbmeta from cache, type:%d, dbFName:%s, tbName:%s", tbMeta->tableType, db, pTableName->tname);
ctgReleaseDBCache(pCtg, dbCache);
ctgDebug("Got tbl from cache, type:%d, dbFName:%s, tbName:%s", tbMeta->tableType, dbFName, pTableName->tname);
return TSDB_CODE_SUCCESS;
}
......@@ -422,8 +401,8 @@ int32_t ctgGetTableMetaFromCache(struct SCatalog* pCatalog, const SName* pTableN
STableMeta **stbMeta = taosHashGet(dbCache->tbCache.stbCache, &tbMeta->suid, sizeof(tbMeta->suid));
if (NULL == stbMeta || NULL == *stbMeta) {
CTG_UNLOCK(CTG_READ, &dbCache->tbCache.stbLock);
taosHashRelease(pCatalog->dbCache, dbCache);
ctgError("stable not in stbCache, suid:%"PRIx64, tbMeta->suid);
ctgReleaseDBCache(pCtg, dbCache);
ctgError("stb not in stbCache, suid:%"PRIx64, tbMeta->suid);
tfree(*pTableMeta);
*exist = 0;
return TSDB_CODE_SUCCESS;
......@@ -431,17 +410,17 @@ int32_t ctgGetTableMetaFromCache(struct SCatalog* pCatalog, const SName* pTableN
if ((*stbMeta)->suid != tbMeta->suid) {
CTG_UNLOCK(CTG_READ, &dbCache->tbCache.stbLock);
taosHashRelease(pCatalog->dbCache, dbCache);
ctgReleaseDBCache(pCtg, dbCache);
tfree(*pTableMeta);
ctgError("stable suid in stbCache mis-match, expected suid:%"PRIx64 ",actual suid:%"PRIx64, tbMeta->suid, (*stbMeta)->suid);
CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
}
int32_t metaSize = sizeof(STableMeta) + ((*stbMeta)->tableInfo.numOfTags + (*stbMeta)->tableInfo.numOfColumns) * sizeof(SSchema);
int32_t metaSize = CTG_META_SIZE(*stbMeta);
*pTableMeta = realloc(*pTableMeta, metaSize);
if (NULL == *pTableMeta) {
CTG_UNLOCK(CTG_READ, &dbCache->tbCache.stbLock);
taosHashRelease(pCatalog->dbCache, dbCache);
ctgReleaseDBCache(pCtg, dbCache);
ctgError("realloc size[%d] failed", metaSize);
CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
}
......@@ -450,15 +429,15 @@ int32_t ctgGetTableMetaFromCache(struct SCatalog* pCatalog, const SName* pTableN
CTG_UNLOCK(CTG_READ, &dbCache->tbCache.stbLock);
taosHashRelease(pCatalog->dbCache, dbCache);
ctgReleaseDBCache(pCtg, dbCache);
ctgDebug("Got tbmeta from cache, dbFName:%s, tbName:%s", db, pTableName->tname);
ctgDebug("Got tbmeta from cache, dbFName:%s, tbName:%s", dbFName, pTableName->tname);
return TSDB_CODE_SUCCESS;
}
int32_t ctgGetTableTypeFromCache(struct SCatalog* pCatalog, const SName* pTableName, int32_t *tbType) {
if (NULL == pCatalog->dbCache) {
int32_t ctgGetTableTypeFromCache(SCatalog* pCtg, const SName* pTableName, int32_t *tbType) {
if (NULL == pCtg->dbCache) {
ctgWarn("empty db cache, tbName:%s", pTableName->tname);
return TSDB_CODE_SUCCESS;
}
......@@ -466,7 +445,7 @@ int32_t ctgGetTableTypeFromCache(struct SCatalog* pCatalog, const SName* pTableN
char dbName[TSDB_DB_FNAME_LEN] = {0};
tNameGetFullDbName(pTableName, dbName);
SCtgDBCache *dbCache = taosHashAcquire(pCatalog->dbCache, dbName, strlen(dbName));
SCtgDBCache *dbCache = taosHashAcquire(pCtg->dbCache, dbName, strlen(dbName));
if (NULL == dbCache) {
ctgInfo("db not in cache, dbFName:%s", dbName);
return TSDB_CODE_SUCCESS;
......@@ -474,26 +453,29 @@ int32_t ctgGetTableTypeFromCache(struct SCatalog* pCatalog, const SName* pTableN
CTG_LOCK(CTG_READ, &dbCache->tbCache.metaLock);
STableMeta *pTableMeta = (STableMeta *)taosHashAcquire(dbCache->tbCache.metaCache, pTableName->tname, strlen(pTableName->tname));
CTG_UNLOCK(CTG_READ, &dbCache->tbCache.metaLock);
if (NULL == pTableMeta) {
ctgWarn("tbmeta not in cache, dbFName:%s, tbName:%s", dbName, pTableName->tname);
taosHashRelease(pCatalog->dbCache, dbCache);
CTG_UNLOCK(CTG_READ, &dbCache->tbCache.metaLock);
ctgWarn("tbl not in cache, dbFName:%s, tbName:%s", dbName, pTableName->tname);
taosHashRelease(pCtg->dbCache, dbCache);
return TSDB_CODE_SUCCESS;
}
*tbType = atomic_load_8(&pTableMeta->tableType);
taosHashRelease(dbCache->tbCache.metaCache, dbCache);
taosHashRelease(pCatalog->dbCache, dbCache);
taosHashRelease(dbCache->tbCache.metaCache, pTableMeta);
CTG_UNLOCK(CTG_READ, &dbCache->tbCache.metaLock);
taosHashRelease(pCtg->dbCache, dbCache);
ctgDebug("Got tbtype from cache, dbFName:%s, tbName:%s, type:%d", dbName, pTableName->tname, *tbType);
return TSDB_CODE_SUCCESS;
}
int32_t ctgGetTableMetaFromMnodeImpl(struct SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, char *dbFName, char* tbName, STableMetaOutput* output) {
int32_t ctgGetTableMetaFromMnodeImpl(SCatalog* pCtg, void *pTransporter, const SEpSet* pMgmtEps, char *dbFName, char* tbName, STableMetaOutput* output) {
SBuildTableMetaInput bInput = {.vgId = 0, .dbFName = dbFName, .tbName = tbName};
char *msg = NULL;
SEpSet *pVnodeEpSet = NULL;
......@@ -539,15 +521,15 @@ int32_t ctgGetTableMetaFromMnodeImpl(struct SCatalog* pCatalog, void *pTransport
return TSDB_CODE_SUCCESS;
}
int32_t ctgGetTableMetaFromMnode(struct SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, STableMetaOutput* output) {
int32_t ctgGetTableMetaFromMnode(SCatalog* pCtg, void *pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, STableMetaOutput* output) {
char dbFName[TSDB_DB_FNAME_LEN];
tNameGetFullDbName(pTableName, dbFName);
return ctgGetTableMetaFromMnodeImpl(pCatalog, pTransporter, pMgmtEps, dbFName, (char *)pTableName->tname, output);
return ctgGetTableMetaFromMnodeImpl(pCtg, pTransporter, pMgmtEps, dbFName, (char *)pTableName->tname, output);
}
int32_t ctgGetTableMetaFromVnode(struct SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, SVgroupInfo *vgroupInfo, STableMetaOutput* output) {
if (NULL == pCatalog || NULL == pTransporter || NULL == pMgmtEps || NULL == pTableName || NULL == vgroupInfo || NULL == output) {
int32_t ctgGetTableMetaFromVnode(SCatalog* pCtg, void *pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, SVgroupInfo *vgroupInfo, STableMetaOutput* output) {
if (NULL == pCtg || NULL == pTransporter || NULL == pMgmtEps || NULL == pTableName || NULL == vgroupInfo || NULL == output) {
CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
}
......@@ -607,12 +589,12 @@ int32_t ctgGetHashFunction(int8_t hashMethod, tableNameHashFp *fp) {
return TSDB_CODE_SUCCESS;
}
int32_t ctgGetVgInfoFromDB(struct SCatalog *pCatalog, void *pRpc, const SEpSet *pMgmtEps, SDBVgroupInfo *dbInfo, SArray** vgroupList) {
int32_t ctgGenerateVgList(SCatalog *pCtg, SHashObj *vgHash, SArray** pList) {
SHashObj *vgroupHash = NULL;
SVgroupInfo *vgInfo = NULL;
SArray *vgList = NULL;
int32_t code = 0;
int32_t vgNum = taosHashGetSize(dbInfo->vgHash);
int32_t vgNum = taosHashGetSize(vgHash);
vgList = taosArrayInit(vgNum, sizeof(SVgroupInfo));
if (NULL == vgList) {
......@@ -620,23 +602,23 @@ int32_t ctgGetVgInfoFromDB(struct SCatalog *pCatalog, void *pRpc, const SEpSet *
CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
}
void *pIter = taosHashIterate(dbInfo->vgHash, NULL);
void *pIter = taosHashIterate(vgHash, NULL);
while (pIter) {
vgInfo = pIter;
if (NULL == taosArrayPush(vgList, vgInfo)) {
ctgError("taosArrayPush failed, vgId:%d", vgInfo->vgId);
taosHashCancelIterate(vgHash, pIter);
CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
}
pIter = taosHashIterate(dbInfo->vgHash, pIter);
pIter = taosHashIterate(vgHash, pIter);
vgInfo = NULL;
}
*vgroupList = vgList;
vgList = NULL;
*pList = vgList;
ctgDebug("Got vg list from DB, vgNum:%d", vgNum);
ctgDebug("Got vgList from cache, vgNum:%d", vgNum);
return TSDB_CODE_SUCCESS;
......@@ -649,7 +631,7 @@ _return:
CTG_RET(code);
}
int32_t ctgGetVgInfoFromHashValue(struct SCatalog *pCatalog, SDBVgroupInfo *dbInfo, const SName *pTableName, SVgroupInfo *pVgroup) {
int32_t ctgGetVgInfoFromHashValue(SCatalog *pCtg, SDBVgInfo *dbInfo, const SName *pTableName, SVgroupInfo *pVgroup) {
int32_t code = 0;
int32_t vgNum = taosHashGetSize(dbInfo->vgHash);
......@@ -693,7 +675,6 @@ int32_t ctgGetVgInfoFromHashValue(struct SCatalog *pCatalog, SDBVgroupInfo *dbIn
CTG_RET(code);
}
#if 1
int32_t ctgSTableVersionCompare(const void* key1, const void* key2) {
if (*(uint64_t *)key1 < ((SSTableMetaVersion*)key2)->suid) {
return -1;
......@@ -713,29 +694,6 @@ int32_t ctgDbVgVersionCompare(const void* key1, const void* key2) {
return 0;
}
}
#else
int32_t ctgSTableVersionCompare(const void* key1, const void* key2) {
if (((SSTableMetaVersion*)key1)->suid < ((SSTableMetaVersion*)key2)->suid) {
return -1;
} else if (((SSTableMetaVersion*)key1)->suid > ((SSTableMetaVersion*)key2)->suid) {
return 1;
} else {
return 0;
}
}
int32_t ctgDbVgVersionCompare(const void* key1, const void* key2) {
if (((SDbVgVersion*)key1)->dbId < ((SDbVgVersion*)key2)->dbId) {
return -1;
} else if (((SDbVgVersion*)key1)->dbId > ((SDbVgVersion*)key2)->dbId) {
return 1;
} else {
return 0;
}
}
#endif
int32_t ctgMetaRentInit(SCtgRentMgmt *mgmt, uint32_t rentSec, int8_t type) {
mgmt->slotRIdx = 0;
......@@ -932,38 +890,55 @@ int32_t ctgMetaRentGet(SCtgRentMgmt *mgmt, void **res, uint32_t *num, int32_t si
return TSDB_CODE_SUCCESS;
}
int32_t ctgAddDBCache(struct SCatalog *pCatalog, const char *dbFName, SCtgDBCache *dbCache) {
int32_t ctgAddNewDBCache(SCatalog *pCtg, const char *dbFName, uint64_t dbId) {
int32_t code = 0;
code = taosHashPut(pCatalog->dbCache, dbFName, strlen(dbFName), dbCache, sizeof(SCtgDBCache));
SCtgDBCache newDBCache = {0};
newDBCache.dbId = dbId;
newDBCache.tbCache.metaCache = taosHashInit(ctgMgmt.cfg.maxTblCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
if (NULL == newDBCache.tbCache.metaCache) {
ctgError("taosHashInit %d metaCache failed", ctgMgmt.cfg.maxTblCacheNum);
CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
}
newDBCache.tbCache.stbCache = taosHashInit(ctgMgmt.cfg.maxTblCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), true, HASH_ENTRY_LOCK);
if (NULL == newDBCache.tbCache.stbCache) {
ctgError("taosHashInit %d stbCache failed", ctgMgmt.cfg.maxTblCacheNum);
CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
}
code = taosHashPut(pCtg->dbCache, dbFName, strlen(dbFName), &newDBCache, sizeof(SCtgDBCache));
if (code) {
if (HASH_NODE_EXIST(code)) {
ctgDebug("db already in cache, dbFName:%s", dbFName);
return TSDB_CODE_SUCCESS;
goto _return;
}
ctgError("taosHashPut db to cache failed, dbFName:%s", dbFName);
CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
}
SDbVgVersion vgVersion = {.dbId = dbCache->dbId, .vgVersion = dbCache->vgInfo ? dbCache->vgInfo->vgVersion : -1};
SDbVgVersion vgVersion = {.dbId = newDBCache.dbId, .vgVersion = -1};
strncpy(vgVersion.dbFName, dbFName, sizeof(vgVersion.dbFName));
ctgDebug("dbCache added, dbFName:%s, vgVersion:%d, dbId:%"PRIx64, dbFName, vgVersion.vgVersion, dbCache->dbId);
ctgDebug("db added to cache, dbFName:%s, dbId:%"PRIx64, dbFName, dbId);
CTG_ERR_JRET(ctgMetaRentAdd(&pCatalog->dbRent, &vgVersion, vgVersion.dbId, sizeof(SDbVgVersion)));
CTG_ERR_RET(ctgMetaRentAdd(&pCtg->dbRent, &vgVersion, dbId, sizeof(SDbVgVersion)));
ctgDebug("db added to rent, dbFName:%s, vgVersion:%d, dbId:%"PRIx64, dbFName, vgVersion.vgVersion, dbId);
return TSDB_CODE_SUCCESS;
_return:
ctgFreeDbCache(dbCache);
ctgFreeDbCache(&newDBCache);
CTG_RET(code);
}
void ctgRemoveAndFreeTableMeta(struct SCatalog* pCatalog, SCtgTbMetaCache *cache) {
void ctgRemoveStbRent(SCatalog* pCtg, SCtgTbMetaCache *cache) {
CTG_LOCK(CTG_WRITE, &cache->stbLock);
if (cache->stbCache) {
void *pIter = taosHashIterate(cache->stbCache, NULL);
......@@ -971,7 +946,7 @@ void ctgRemoveAndFreeTableMeta(struct SCatalog* pCatalog, SCtgTbMetaCache *cache
uint64_t *suid = NULL;
taosHashGetKey(pIter, (void **)&suid, NULL);
if (TSDB_CODE_SUCCESS == ctgMetaRentRemove(&pCatalog->stbRent, *suid, ctgSTableVersionCompare)) {
if (TSDB_CODE_SUCCESS == ctgMetaRentRemove(&pCtg->stbRent, *suid, ctgSTableVersionCompare)) {
ctgDebug("stb removed from rent, suid:%"PRIx64, *suid);
}
......@@ -979,232 +954,246 @@ void ctgRemoveAndFreeTableMeta(struct SCatalog* pCatalog, SCtgTbMetaCache *cache
}
}
CTG_UNLOCK(CTG_WRITE, &cache->stbLock);
ctgFreeTableMetaCache(cache);
}
int32_t ctgValidateAndRemoveDb(struct SCatalog* pCatalog, SCtgDBCache *dbCache, const char* dbFName) {
if (taosHashRemove(pCatalog->dbCache, dbFName, strlen(dbFName))) {
int32_t ctgRemoveDB(SCatalog* pCtg, SCtgDBCache *dbCache, const char* dbFName) {
atomic_store_8(&dbCache->deleted, 1);
ctgRemoveStbRent(pCtg, &dbCache->tbCache);
ctgFreeDbCache(dbCache);
ctgInfo("db removed from cache, dbFName:%s, dbId:%"PRIx64, dbFName, dbCache->dbId);
CTG_ERR_RET(ctgMetaRentRemove(&pCtg->dbRent, dbCache->dbId, ctgDbVgVersionCompare));
ctgDebug("db removed from rent, dbFName:%s, dbId:%"PRIx64, dbFName, dbCache->dbId);
if (taosHashRemove(pCtg->dbCache, dbFName, strlen(dbFName))) {
ctgInfo("taosHashRemove from dbCache failed, may be removed, dbFName:%s", dbFName);
CTG_ERR_RET(TSDB_CODE_CTG_DB_DROPPED);
}
return TSDB_CODE_SUCCESS;
}
atomic_store_8(&dbCache->deleted, 1);
int32_t ctgAcquireDBCacheImpl(SCatalog* pCtg, const char *dbFName, SCtgDBCache **pCache, bool acquire) {
SCtgDBCache *dbCache = NULL;
if (acquire) {
dbCache = (SCtgDBCache *)taosHashAcquire(pCtg->dbCache, dbFName, strlen(dbFName));
} else {
dbCache = (SCtgDBCache *)taosHashGet(pCtg->dbCache, dbFName, strlen(dbFName));
}
CTG_LOCK(CTG_WRITE, &dbCache->vgLock);
if (dbCache->vgInfo) {
ctgInfo("cleanup db vgInfo, dbFName:%s, dbId:%"PRIx64, dbFName, dbCache->dbId);
if (dbCache->vgInfo->vgHash) {
taosHashCleanup(dbCache->vgInfo->vgHash);
}
if (NULL == dbCache) {
*pCache = NULL;
ctgDebug("db not in cache, dbFName:%s", dbFName);
return TSDB_CODE_SUCCESS;
}
if (dbCache->deleted) {
if (acquire) {
ctgReleaseDBCache(pCtg, dbCache);
}
tfree(dbCache->vgInfo);
*pCache = NULL;
ctgDebug("db is removing from cache, dbFName:%s", dbFName);
return TSDB_CODE_SUCCESS;
}
CTG_UNLOCK(CTG_WRITE, &dbCache->vgLock);
ctgRemoveAndFreeTableMeta(pCatalog, &dbCache->tbCache);
*pCache = dbCache;
return TSDB_CODE_SUCCESS;
}
ctgInfo("db removed from cache, dbFName:%s, uid:%"PRIx64, dbFName, dbCache->dbId);
int32_t ctgAcquireDBCache(SCatalog* pCtg, const char *dbFName, SCtgDBCache **pCache) {
CTG_RET(ctgAcquireDBCacheImpl(pCtg, dbFName, pCache, true));
}
CTG_ERR_RET(ctgMetaRentRemove(&pCatalog->dbRent, dbCache->dbId, ctgDbVgVersionCompare));
ctgDebug("db removed from rent, dbFName:%s, uid:%"PRIx64, dbFName, dbCache->dbId);
return TSDB_CODE_SUCCESS;
int32_t ctgGetDBCache(SCatalog* pCtg, const char *dbFName, SCtgDBCache **pCache) {
CTG_RET(ctgAcquireDBCacheImpl(pCtg, dbFName, pCache, false));
}
int32_t ctgAcquireDBCache(struct SCatalog* pCatalog, const char *dbFName, uint64_t dbId, SCtgDBCache **pCache) {
int32_t ctgGetAddDBCache(SCatalog* pCtg, const char *dbFName, uint64_t dbId, SCtgDBCache **pCache) {
int32_t code = 0;
SCtgDBCache *dbCache = NULL;
CTG_LOCK(CTG_WRITE, &pCatalog->dbLock);
ctgGetDBCache(pCtg, dbFName, &dbCache);
while (true) {
dbCache = (SCtgDBCache *)taosHashAcquire(pCatalog->dbCache, dbFName, strlen(dbFName));
if (dbCache) {
// TODO OPEN IT
if (dbCache) {
// TODO OPEN IT
#if 0
if (dbCache->dbId == dbId) {
*pCache = dbCache;
return TSDB_CODE_SUCCESS;
}
if (dbCache->dbId == dbId) {
*pCache = dbCache;
return TSDB_CODE_SUCCESS;
}
#else
if (0 == dbId) {
*pCache = dbCache;
return TSDB_CODE_SUCCESS;
}
if (dbId && (dbCache->dbId == 0)) {
dbCache->dbId = dbId;
*pCache = dbCache;
return TSDB_CODE_SUCCESS;
}
if (dbCache->dbId == dbId) {
*pCache = dbCache;
return TSDB_CODE_SUCCESS;
}
#endif
code = ctgValidateAndRemoveDb(pCatalog, dbCache, dbFName);
taosHashRelease(pCatalog->dbCache, dbCache);
dbCache = NULL;
if (code) {
if (TSDB_CODE_CTG_DB_DROPPED == code) {
continue;
}
CTG_ERR_JRET(code);
}
if (0 == dbId) {
*pCache = dbCache;
return TSDB_CODE_SUCCESS;
}
SCtgDBCache newDBCache = {0};
newDBCache.dbId = dbId;
if (dbId && (dbCache->dbId == 0)) {
dbCache->dbId = dbId;
*pCache = dbCache;
return TSDB_CODE_SUCCESS;
}
CTG_ERR_JRET(ctgAddDBCache(pCatalog, dbFName, &newDBCache));
if (dbCache->dbId == dbId) {
*pCache = dbCache;
return TSDB_CODE_SUCCESS;
}
#endif
CTG_ERR_RET(ctgRemoveDB(pCtg, dbCache, dbFName));
}
CTG_ERR_RET(ctgAddNewDBCache(pCtg, dbFName, dbId));
_return:
ctgGetDBCache(pCtg, dbFName, &dbCache);
if (dbCache) {
taosHashRelease(pCatalog->dbCache, dbCache);
}
*pCache = dbCache;
CTG_UNLOCK(CTG_WRITE, &pCatalog->dbLock);
CTG_RET(code);
return TSDB_CODE_SUCCESS;
}
int32_t ctgUpdateTbMetaImpl(struct SCatalog *pCatalog, SCtgTbMetaCache *tbCache, char *dbFName, char *tbName, STableMeta *meta, int32_t metaSize) {
CTG_LOCK(CTG_READ, &tbCache->metaLock);
if (taosHashPut(tbCache->metaCache, tbName, strlen(tbName), meta, metaSize) != 0) {
CTG_UNLOCK(CTG_READ, &tbCache->metaLock);
ctgError("taosHashPut tbmeta to cache failed, dbFName:%s, tbName:%s, tbType:%d", dbFName, tbName, meta->tableType);
int32_t ctgUpdateDBVgInfo(SCatalog* pCtg, const char* dbFName, uint64_t dbId, SDBVgInfo** pDbInfo) {
int32_t code = 0;
SDBVgInfo* dbInfo = *pDbInfo;
if (NULL == dbInfo->vgHash || dbInfo->vgVersion < 0 || taosHashGetSize(dbInfo->vgHash) <= 0) {
ctgError("invalid db vgInfo, dbFName:%s, vgHash:%p, vgVersion:%d", dbFName, dbInfo->vgHash, dbInfo->vgVersion);
CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
}
CTG_UNLOCK(CTG_READ, &tbCache->metaLock);
ctgDebug("tbmeta updated to cache, dbFName:%s, tbName:%s, tbType:%d", dbFName, tbName, meta->tableType);
return TSDB_CODE_SUCCESS;
}
int32_t ctgUpdateStbMetaImpl(struct SCatalog *pCatalog, SCtgTbMetaCache *tbCache, char *dbFName, char *tbName, STableMeta *meta, int32_t metaSize) {
bool newAdded = false;
int32_t code = 0;
SSTableMetaVersion metaRent = {.suid = meta->suid, .sversion = meta->sversion, .tversion = meta->tversion};
strcpy(metaRent.dbFName, dbFName);
strcpy(metaRent.stbName, tbName);
CTG_LOCK(CTG_WRITE, &tbCache->stbLock);
SDbVgVersion vgVersion = {.dbId = dbId, .vgVersion = dbInfo->vgVersion};
SCtgDBCache *dbCache = NULL;
CTG_ERR_RET(ctgGetAddDBCache(pCtg, dbFName, dbId, &dbCache));
if (NULL == dbCache) {
ctgInfo("conflict db update, ignore this update, dbFName:%s, dbId:%"PRIx64, dbFName, dbId);
CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
}
SDBVgInfo *vgInfo = NULL;
CTG_ERR_RET(ctgWAcquireVgInfo(pCtg, dbCache));
CTG_LOCK(CTG_READ, &tbCache->metaLock);
STableMeta *orig = taosHashAcquire(tbCache->metaCache, tbName, strlen(tbName));
if (orig) {
if (orig->suid != meta->suid) {
if (taosHashRemove(tbCache->stbCache, &orig->suid, sizeof(orig->suid))) {
ctgError("stb not exist in stbCache, db:%s, stb:%s, suid:%"PRIx64, dbFName, tbName, orig->suid);
}
if (dbCache->vgInfo) {
if (dbInfo->vgVersion <= dbCache->vgInfo->vgVersion) {
ctgInfo("db vgVersion is old, dbFName:%s, vgVersion:%d, currentVersion:%d", dbFName, dbInfo->vgVersion, dbCache->vgInfo->vgVersion);
ctgWReleaseVgInfo(dbCache);
ctgMetaRentRemove(&pCatalog->stbRent, orig->suid, ctgSTableVersionCompare);
return TSDB_CODE_SUCCESS;
}
taosHashRelease(tbCache->metaCache, orig);
ctgFreeVgInfo(dbCache->vgInfo);
}
CTG_UNLOCK(CTG_READ, &tbCache->metaLock);
CTG_ERR_JRET(ctgUpdateTbMetaImpl(pCatalog, tbCache, dbFName, tbName, meta, metaSize));
dbCache->vgInfo = dbInfo;
CTG_LOCK(CTG_READ, &tbCache->metaLock);
STableMeta *tbMeta = taosHashGet(tbCache->metaCache, tbName, strlen(tbName));
if (taosHashPutExt(tbCache->stbCache, &meta->suid, sizeof(meta->suid), &tbMeta, POINTER_BYTES, &newAdded) != 0) {
CTG_UNLOCK(CTG_READ, &tbCache->metaLock);
CTG_UNLOCK(CTG_WRITE, &tbCache->stbLock);
ctgError("taosHashPutExt stable to stable cache failed, suid:%"PRIx64, meta->suid);
CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
}
CTG_UNLOCK(CTG_READ, &tbCache->metaLock);
CTG_UNLOCK(CTG_WRITE, &tbCache->stbLock);
ctgDebug("update stable to cache, suid:%"PRIx64, meta->suid);
if (newAdded) {
CTG_ERR_RET(ctgMetaRentAdd(&pCatalog->stbRent, &metaRent, metaRent.suid, sizeof(SSTableMetaVersion)));
} else {
CTG_ERR_RET(ctgMetaRentUpdate(&pCatalog->stbRent, &metaRent, metaRent.suid, sizeof(SSTableMetaVersion), ctgSTableVersionCompare));
}
*pDbInfo = NULL;
return TSDB_CODE_SUCCESS;
ctgDebug("db vgInfo updated, dbFName:%s, vgVersion:%d, dbId:%"PRIx64, dbFName, vgVersion.vgVersion, vgVersion.dbId);
_return:
ctgWReleaseVgInfo(dbCache);
CTG_UNLOCK(CTG_WRITE, &tbCache->stbLock);
dbCache = NULL;
strncpy(vgVersion.dbFName, dbFName, sizeof(vgVersion.dbFName));
CTG_ERR_RET(ctgMetaRentUpdate(&pCtg->dbRent, &vgVersion, vgVersion.dbId, sizeof(SDbVgVersion), ctgDbVgVersionCompare));
CTG_RET(code);
}
int32_t ctgUpdateTableMetaCache(struct SCatalog *pCatalog, STableMetaOutput *output) {
int32_t code = 0;
SCtgDBCache *dbCache = NULL;
int32_t ctgUpdateTblMeta(SCatalog *pCtg, SCtgDBCache *dbCache, char *dbFName, uint64_t dbId, char *tbName, STableMeta *meta, int32_t metaSize) {
SCtgTbMetaCache *tbCache = &dbCache->tbCache;
if ((!CTG_IS_META_CTABLE(output->metaType)) && NULL == output->tbMeta) {
ctgError("no valid tbmeta got from meta rsp, dbFName:%s, tbName:%s", output->dbFName, output->tbName);
CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
CTG_LOCK(CTG_READ, &tbCache->metaLock);
if (dbCache->deleted || NULL == tbCache->metaCache || NULL == tbCache->stbCache) {
CTG_UNLOCK(CTG_READ, &tbCache->metaLock);
ctgError("db is dropping, dbId:%"PRIx64, dbCache->dbId);
CTG_ERR_RET(TSDB_CODE_CTG_DB_DROPPED);
}
CTG_ERR_RET(ctgInitDBCache(pCatalog));
CTG_ERR_JRET(ctgAcquireDBCache(pCatalog, output->dbFName, output->dbId, &dbCache));
CTG_ERR_JRET(ctgInitTbMetaCache(pCatalog, dbCache));
CTG_ERR_JRET(ctgInitStbCache(pCatalog, dbCache));
int8_t origType = 0;
uint64_t origSuid = 0;
bool isStb = meta->tableType == TSDB_SUPER_TABLE;
STableMeta *orig = taosHashGet(tbCache->metaCache, tbName, strlen(tbName));
if (orig) {
origType = orig->tableType;
origSuid = orig->suid;
if (origType == TSDB_SUPER_TABLE && ((!isStb) || origSuid != meta->suid)) {
CTG_LOCK(CTG_WRITE, &tbCache->stbLock);
if (taosHashRemove(tbCache->stbCache, &orig->suid, sizeof(orig->suid))) {
ctgError("stb not exist in stbCache, dbFName:%s, stb:%s, suid:%"PRIx64, dbFName, tbName, orig->suid);
}
CTG_UNLOCK(CTG_WRITE, &tbCache->stbLock);
if (CTG_IS_META_CTABLE(output->metaType) || CTG_IS_META_BOTH(output->metaType)) {
CTG_ERR_JRET(ctgUpdateTbMetaImpl(pCatalog, &dbCache->tbCache, output->dbFName, output->ctbName, (STableMeta *)&output->ctbMeta, sizeof(output->ctbMeta)));
ctgDebug("stb removed from stbCache, dbFName:%s, stb:%s, suid:%"PRIx64, dbFName, tbName, orig->suid);
ctgMetaRentRemove(&pCtg->stbRent, orig->suid, ctgSTableVersionCompare);
}
}
if (CTG_IS_META_CTABLE(output->metaType)) {
goto _return;
if (isStb) {
CTG_LOCK(CTG_WRITE, &tbCache->stbLock);
}
if (CTG_IS_META_BOTH(output->metaType) && TSDB_SUPER_TABLE != output->tbMeta->tableType) {
ctgError("table type error, expected:%d, actual:%d", TSDB_SUPER_TABLE, output->tbMeta->tableType);
CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
}
int32_t tbSize = sizeof(*output->tbMeta) + sizeof(SSchema) * (output->tbMeta->tableInfo.numOfColumns + output->tbMeta->tableInfo.numOfTags);
if (taosHashPut(tbCache->metaCache, tbName, strlen(tbName), meta, metaSize) != 0) {
if (isStb) {
CTG_UNLOCK(CTG_WRITE, &tbCache->stbLock);
}
CTG_UNLOCK(CTG_READ, &tbCache->metaLock);
ctgError("taosHashPut tbmeta to cache failed, dbFName:%s, tbName:%s, tbType:%d", dbFName, tbName, meta->tableType);
CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
}
if (TSDB_SUPER_TABLE == output->tbMeta->tableType) {
CTG_ERR_JRET(ctgUpdateStbMetaImpl(pCatalog, &dbCache->tbCache, output->dbFName, output->tbName, output->tbMeta, tbSize));
} else {
CTG_ERR_JRET(ctgUpdateTbMetaImpl(pCatalog, &dbCache->tbCache, output->dbFName, output->tbName, output->tbMeta, tbSize));
if (!isStb) {
CTG_UNLOCK(CTG_READ, &tbCache->metaLock);
return TSDB_CODE_SUCCESS;
}
_return:
if (isStb && origSuid == meta->suid) {
CTG_UNLOCK(CTG_WRITE, &tbCache->stbLock);
CTG_UNLOCK(CTG_READ, &tbCache->metaLock);
return TSDB_CODE_SUCCESS;
}
if (dbCache) {
taosHashRelease(pCatalog->dbCache, dbCache);
CTG_UNLOCK(CTG_WRITE, &pCatalog->dbLock);
STableMeta *tbMeta = taosHashGet(tbCache->metaCache, tbName, strlen(tbName));
if (taosHashPut(tbCache->stbCache, &meta->suid, sizeof(meta->suid), &tbMeta, POINTER_BYTES) != 0) {
CTG_UNLOCK(CTG_WRITE, &tbCache->stbLock);
CTG_UNLOCK(CTG_READ, &tbCache->metaLock);
ctgError("taosHashPutExt stable to stable cache failed, suid:%"PRIx64, meta->suid);
CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
}
CTG_UNLOCK(CTG_WRITE, &tbCache->stbLock);
CTG_RET(code);
CTG_UNLOCK(CTG_READ, &tbCache->metaLock);
ctgDebug("meta updated to cache, dbFName:%s, tbName:%s, tbType:%d", dbFName, tbName, meta->tableType);
SSTableMetaVersion metaRent = {.dbId = dbId, .suid = meta->suid, .sversion = meta->sversion, .tversion = meta->tversion};
strcpy(metaRent.dbFName, dbFName);
strcpy(metaRent.stbName, tbName);
CTG_ERR_RET(ctgMetaRentAdd(&pCtg->stbRent, &metaRent, metaRent.suid, sizeof(SSTableMetaVersion)));
return TSDB_CODE_SUCCESS;
}
int32_t ctgGetDBVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* dbFName, bool forceUpdate, SCtgDBCache** dbCache) {
int32_t ctgGetDBVgInfo(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const char* dbFName, bool forceUpdate, SCtgDBCache** dbCache, SDBVgInfo **pInfo) {
bool inCache = false;
int32_t code = 0;
if (!forceUpdate) {
CTG_ERR_RET(ctgGetDBVgroupFromCache(pCatalog, dbFName, dbCache, &inCache));
CTG_ERR_RET(ctgAcquireVgInfoFromCache(pCtg, dbFName, dbCache, &inCache));
if (inCache) {
return TSDB_CODE_SUCCESS;
}
ctgDebug("failed to get DB vgroupInfo from cache, dbName:%s, load it from mnode, update:%d", dbFName, forceUpdate);
}
SUseDbOutput DbOut = {0};
......@@ -1214,117 +1203,136 @@ int32_t ctgGetDBVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgm
input.vgVersion = CTG_DEFAULT_INVALID_VERSION;
while (true) {
CTG_ERR_RET(ctgGetDBVgroupFromMnode(pCatalog, pRpc, pMgmtEps, &input, &DbOut));
CTG_ERR_RET(catalogUpdateDBVgroup(pCatalog, dbFName, DbOut.dbId, DbOut.dbVgroup));
CTG_ERR_RET(ctgGetDBVgroupFromCache(pCatalog, dbFName, dbCache, &inCache));
CTG_ERR_RET(ctgGetDBVgInfoFromMnode(pCtg, pRpc, pMgmtEps, &input, &DbOut));
if (!inCache) {
ctgWarn("can't get db vgroup from cache, will retry, db:%s", dbFName);
continue;
code = ctgUpdateDBVgInfo(pCtg, dbFName, DbOut.dbId, &DbOut.dbVgroup);
if (code && DbOut.dbVgroup) {
*pInfo = DbOut.dbVgroup;
return TSDB_CODE_SUCCESS;
}
CTG_ERR_RET(ctgAcquireVgInfoFromCache(pCtg, dbFName, dbCache, &inCache));
break;
if (inCache) {
return TSDB_CODE_SUCCESS;
}
}
return TSDB_CODE_SUCCESS;
}
int32_t ctgValidateAndRemoveStbMeta(struct SCatalog* pCatalog, const char* dbName, const char* stbName, uint64_t suid, bool *removed) {
*removed = false;
SCtgDBCache *dbCache = (SCtgDBCache *)taosHashAcquire(pCatalog->dbCache, dbName, strlen(dbName));
if (NULL == dbCache) {
ctgInfo("db not exist in dbCache, may be removed, db:%s", dbName);
return TSDB_CODE_SUCCESS;
int32_t ctgCloneMetaOutput(STableMetaOutput *output, STableMetaOutput **pOutput) {
*pOutput = malloc(sizeof(STableMetaOutput));
if (NULL == *pOutput) {
qError("malloc %d failed", (int32_t)sizeof(STableMetaOutput));
CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
}
CTG_LOCK(CTG_WRITE, &dbCache->tbCache.stbLock);
if (taosHashRemove(dbCache->tbCache.stbCache, &suid, sizeof(suid))) {
CTG_UNLOCK(CTG_WRITE, &dbCache->tbCache.stbLock);
taosHashRelease(pCatalog->dbCache, dbCache);
ctgInfo("stb not exist in stbCache, may be removed, db:%s, stb:%s, suid:%"PRIx64, dbName, stbName, suid);
return TSDB_CODE_SUCCESS;
memcpy(*pOutput, output, sizeof(STableMetaOutput));
if (output->tbMeta) {
int32_t metaSize = CTG_META_SIZE(output->tbMeta);
(*pOutput)->tbMeta = malloc(metaSize);
if (NULL == (*pOutput)->tbMeta) {
qError("malloc %d failed", sizeof(STableMetaOutput));
tfree(*pOutput);
CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
}
memcpy((*pOutput)->tbMeta, output->tbMeta, metaSize);
}
CTG_LOCK(CTG_READ, &dbCache->tbCache.metaLock);
if (taosHashRemove(dbCache->tbCache.metaCache, stbName, strlen(stbName))) {
CTG_UNLOCK(CTG_READ, &dbCache->tbCache.metaLock);
CTG_UNLOCK(CTG_WRITE, &dbCache->tbCache.stbLock);
taosHashRelease(pCatalog->dbCache, dbCache);
ctgError("stb not exist in cache, db:%s, stb:%s, suid:%"PRIx64, dbName, stbName, suid);
CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
}
CTG_UNLOCK(CTG_READ, &dbCache->tbCache.metaLock);
return TSDB_CODE_SUCCESS;
}
void ctgPopAction(SCtgMetaAction **action) {
SCtgQNode *orig = ctgMgmt.head;
CTG_UNLOCK(CTG_WRITE, &dbCache->tbCache.stbLock);
SCtgQNode *node = ctgMgmt.head->next;
ctgMgmt.head = ctgMgmt.head->next;
taosHashRelease(pCatalog->dbCache, dbCache);
tfree(orig);
*action = &node->action;
}
*removed = true;
int32_t ctgPushAction(SCtgMetaAction *action) {
SCtgQNode *node = calloc(1, sizeof(SCtgQNode));
if (NULL == node) {
qError("calloc %d failed", (int32_t)sizeof(SCtgQNode));
CTG_RET(TSDB_CODE_CTG_MEM_ERROR);
}
node->action = *action;
CTG_LOCK(CTG_WRITE, &ctgMgmt.qlock);
ctgMgmt.tail->next = node;
ctgMgmt.tail = node;
CTG_UNLOCK(CTG_WRITE, &ctgMgmt.qlock);
tsem_post(&ctgMgmt.sem);
return TSDB_CODE_SUCCESS;
}
int32_t ctgRenewTableMetaImpl(struct SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, int32_t isSTable) {
if (NULL == pCatalog || NULL == pTransporter || NULL == pMgmtEps || NULL == pTableName) {
int32_t ctgRefreshTblMeta(SCatalog* pCtg, void *pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, int32_t isSTable, STableMetaOutput **pOutput) {
if (NULL == pCtg || NULL == pTransporter || NULL == pMgmtEps || NULL == pTableName) {
CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
}
SVgroupInfo vgroupInfo = {0};
int32_t code = 0;
CTG_ERR_RET(catalogGetTableHashVgroup(pCatalog, pTransporter, pMgmtEps, pTableName, &vgroupInfo));
STableMetaOutput voutput = {0};
STableMetaOutput moutput = {0};
STableMetaOutput *output = &voutput;
CTG_ERR_RET(catalogGetTableHashVgroup(pCtg, pTransporter, pMgmtEps, pTableName, &vgroupInfo));
STableMetaOutput moutput = {0};
STableMetaOutput *output = malloc(sizeof(STableMetaOutput));
if (NULL == output) {
ctgError("malloc %d failed", (int32_t)sizeof(STableMetaOutput));
CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
}
if (CTG_IS_STABLE(isSTable)) {
ctgDebug("will renew tbmeta, supposed to be stb, tbName:%s", tNameGetTableName(pTableName));
// if get from mnode failed, will not try vnode
CTG_ERR_JRET(ctgGetTableMetaFromMnode(pCatalog, pTransporter, pMgmtEps, pTableName, &moutput));
CTG_ERR_JRET(ctgGetTableMetaFromMnode(pCtg, pTransporter, pMgmtEps, pTableName, output));
if (CTG_IS_META_NULL(moutput.metaType)) {
CTG_ERR_JRET(ctgGetTableMetaFromVnode(pCatalog, pTransporter, pMgmtEps, pTableName, &vgroupInfo, &voutput));
} else {
output = &moutput;
if (CTG_IS_META_NULL(output->metaType)) {
CTG_ERR_JRET(ctgGetTableMetaFromVnode(pCtg, pTransporter, pMgmtEps, pTableName, &vgroupInfo, output));
}
} else {
ctgDebug("will renew tbmeta, not supposed to be stb, tbName:%s, isStable:%d", tNameGetTableName(pTableName), isSTable);
// if get from vnode failed or no table meta, will not try mnode
CTG_ERR_JRET(ctgGetTableMetaFromVnode(pCatalog, pTransporter, pMgmtEps, pTableName, &vgroupInfo, &voutput));
CTG_ERR_JRET(ctgGetTableMetaFromVnode(pCtg, pTransporter, pMgmtEps, pTableName, &vgroupInfo, output));
if (CTG_IS_META_TABLE(voutput.metaType) && TSDB_SUPER_TABLE == voutput.tbMeta->tableType) {
ctgDebug("will continue to renew tbmeta since got stb, tbName:%s, metaType:%d", tNameGetTableName(pTableName), voutput.metaType);
CTG_ERR_JRET(ctgGetTableMetaFromMnodeImpl(pCatalog, pTransporter, pMgmtEps, voutput.dbFName, voutput.tbName, &moutput));
if (CTG_IS_META_TABLE(output->metaType) && TSDB_SUPER_TABLE == output->tbMeta->tableType) {
ctgDebug("will continue to renew tbmeta since got stb, tbName:%s, metaType:%d", tNameGetTableName(pTableName), output->metaType);
voutput.metaType = moutput.metaType;
tfree(output->tbMeta);
tfree(voutput.tbMeta);
voutput.tbMeta = moutput.tbMeta;
moutput.tbMeta = NULL;
} else if (CTG_IS_META_BOTH(voutput.metaType)) {
CTG_ERR_JRET(ctgGetTableMetaFromMnodeImpl(pCtg, pTransporter, pMgmtEps, output->dbFName, output->tbName, output));
} else if (CTG_IS_META_BOTH(output->metaType)) {
int32_t exist = 0;
CTG_ERR_JRET(ctgIsTableMetaExistInCache(pCatalog, voutput.dbFName, voutput.tbName, &exist));
CTG_ERR_JRET(ctgIsTableMetaExistInCache(pCtg, output->dbFName, output->tbName, &exist));
if (0 == exist) {
CTG_ERR_JRET(ctgGetTableMetaFromMnodeImpl(pCatalog, pTransporter, pMgmtEps, voutput.dbFName, voutput.tbName, &moutput));
CTG_ERR_JRET(ctgGetTableMetaFromMnodeImpl(pCtg, pTransporter, pMgmtEps, output->dbFName, output->tbName, &moutput));
if (CTG_IS_META_NULL(moutput.metaType)) {
SET_META_TYPE_NULL(voutput.metaType);
SET_META_TYPE_NULL(output->metaType);
}
tfree(voutput.tbMeta);
voutput.tbMeta = moutput.tbMeta;
tfree(output->tbMeta);
output->tbMeta = moutput.tbMeta;
moutput.tbMeta = NULL;
} else {
tfree(voutput.tbMeta);
tfree(output->tbMeta);
SET_META_TYPE_CTABLE(voutput.metaType);
SET_META_TYPE_CTABLE(output->metaType);
}
}
}
......@@ -1334,46 +1342,282 @@ int32_t ctgRenewTableMetaImpl(struct SCatalog* pCatalog, void *pTransporter, con
CTG_ERR_JRET(CTG_ERR_CODE_TABLE_NOT_EXIST);
}
CTG_ERR_JRET(ctgUpdateTableMetaCache(pCatalog, output));
if (pOutput) {
CTG_ERR_JRET(ctgCloneMetaOutput(output, pOutput));
}
SCtgMetaAction action= {.act = CTG_ACT_UPDATE_TBL};
SCtgUpdateTblMsg *msg = malloc(sizeof(SCtgUpdateTblMsg));
if (NULL == msg) {
ctgError("malloc %d failed", (int32_t)sizeof(SCtgUpdateTblMsg));
CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
}
msg->pCtg = pCtg;
msg->output = output;
action.data = msg;
CTG_ERR_JRET(ctgPushAction(&action));
return TSDB_CODE_SUCCESS;
_return:
tfree(voutput.tbMeta);
tfree(moutput.tbMeta);
tfree(output->tbMeta);
tfree(msg);
CTG_RET(code);
}
int32_t ctgGetTableMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const SName* pTableName, bool forceUpdate, STableMeta** pTableMeta, int32_t isSTable) {
if (NULL == pCatalog || NULL == pRpc || NULL == pMgmtEps || NULL == pTableName || NULL == pTableMeta) {
int32_t ctgGetTableMeta(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const SName* pTableName, bool forceUpdate, STableMeta** pTableMeta, int32_t isSTable) {
if (NULL == pCtg || NULL == pRpc || NULL == pMgmtEps || NULL == pTableName || NULL == pTableMeta) {
CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
}
int32_t exist = 0;
int32_t code = 0;
if (!forceUpdate) {
CTG_ERR_RET(ctgGetTableMetaFromCache(pCatalog, pTableName, pTableMeta, &exist));
CTG_ERR_RET(ctgGetTableMetaFromCache(pCtg, pTableName, pTableMeta, &exist));
if (exist && CTG_TBTYPE_MATCH(isSTable, (*pTableMeta)->tableType)) {
return TSDB_CODE_SUCCESS;
}
tfree(*pTableMeta);
} else if (CTG_IS_UNKNOWN_STABLE(isSTable)) {
int32_t tbType = 0;
CTG_ERR_RET(ctgGetTableTypeFromCache(pCatalog, pTableName, &tbType));
CTG_ERR_RET(ctgGetTableTypeFromCache(pCtg, pTableName, &tbType));
CTG_SET_STABLE(isSTable, tbType);
}
CTG_ERR_RET(ctgRenewTableMetaImpl(pCatalog, pRpc, pMgmtEps, pTableName, isSTable));
STableMetaOutput *output = NULL;
CTG_ERR_JRET(ctgRefreshTblMeta(pCtg, pRpc, pMgmtEps, pTableName, isSTable, &output));
if (CTG_IS_META_TABLE(output->metaType)) {
*pTableMeta = output->tbMeta;
goto _return;
}
if (CTG_IS_META_BOTH(output->metaType)) {
memcpy(output->tbMeta, &output->ctbMeta, sizeof(output->ctbMeta));
*pTableMeta = output->tbMeta;
goto _return;
}
if ((!CTG_IS_META_CTABLE(output->metaType)) || output->tbMeta) {
ctgError("invalid metaType:%d", output->metaType);
tfree(output->tbMeta);
CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
}
CTG_ERR_RET(ctgGetTableMetaFromCache(pCatalog, pTableName, pTableMeta, &exist));
// HANDLE ONLY CHILD TABLE META
SName stbName = *pTableName;
strcpy(stbName.tname, output->tbName);
CTG_ERR_JRET(ctgGetTableMetaFromCache(pCtg, &stbName, pTableMeta, &exist));
if (0 == exist) {
ctgError("renew tablemeta succeed but get from cache failed, may be deleted, tbName:%s", tNameGetTableName(pTableName));
ctgDebug("stb no longer exist, dbFName:%s, tbName:%s", output->dbFName, pTableName->tname);
CTG_ERR_JRET(TSDB_CODE_VND_TB_NOT_EXIST);
}
memcpy(*pTableMeta, &output->ctbMeta, sizeof(output->ctbMeta));
_return:
tfree(output);
CTG_RET(code);
}
int32_t ctgActUpdateVg(SCtgMetaAction *action) {
int32_t code = 0;
SCtgUpdateVgMsg *msg = action->data;
CTG_ERR_JRET(ctgUpdateDBVgInfo(msg->pCtg, msg->dbFName, msg->dbId, &msg->dbInfo));
_return:
tfree(msg->dbInfo);
tfree(msg);
CTG_RET(code);
}
int32_t ctgActRemoveDB(SCtgMetaAction *action) {
int32_t code = 0;
SCtgRemoveDBMsg *msg = action->data;
SCatalog* pCtg = msg->pCtg;
SCtgDBCache *dbCache = NULL;
ctgAcquireDBCache(msg->pCtg, msg->dbFName, &dbCache);
if (NULL == dbCache) {
ctgInfo("db not exist in cache, may be removed, dbFName:%s", msg->dbFName);
goto _return;
}
if (dbCache->dbId != msg->dbId) {
ctgInfo("dbId already updated, dbFName:%s, dbId:%"PRIx64 ", targetId:%"PRIx64, msg->dbFName, dbCache->dbId, msg->dbId);
ctgReleaseDBCache(msg->pCtg, dbCache);
goto _return;
}
CTG_ERR_JRET(ctgRemoveDB(pCtg, dbCache, msg->dbFName));
_return:
tfree(msg);
CTG_RET(code);
}
int32_t ctgActUpdateTbl(SCtgMetaAction *action) {
int32_t code = 0;
SCtgUpdateTblMsg *msg = action->data;
SCatalog* pCtg = msg->pCtg;
STableMetaOutput* output = msg->output;
SCtgDBCache *dbCache = NULL;
if ((!CTG_IS_META_CTABLE(output->metaType)) && NULL == output->tbMeta) {
ctgError("no valid tbmeta got from meta rsp, dbFName:%s, tbName:%s", output->dbFName, output->tbName);
CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
}
if (CTG_IS_META_BOTH(output->metaType) && TSDB_SUPER_TABLE != output->tbMeta->tableType) {
ctgError("table type error, expected:%d, actual:%d", TSDB_SUPER_TABLE, output->tbMeta->tableType);
CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
}
CTG_ERR_JRET(ctgGetAddDBCache(pCtg, output->dbFName, output->dbId, &dbCache));
if (NULL == dbCache) {
ctgInfo("conflict db update, ignore this update, dbFName:%s, dbId:%"PRIx64, output->dbFName, output->dbId);
CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
}
if (CTG_IS_META_TABLE(output->metaType) || CTG_IS_META_BOTH(output->metaType)) {
int32_t metaSize = CTG_META_SIZE(output->tbMeta);
CTG_ERR_JRET(ctgUpdateTblMeta(pCtg, dbCache, output->dbFName, output->dbId, output->tbName, output->tbMeta, metaSize));
}
if (CTG_IS_META_CTABLE(output->metaType) || CTG_IS_META_BOTH(output->metaType)) {
CTG_ERR_JRET(ctgUpdateTblMeta(pCtg, dbCache, output->dbFName, output->dbId, output->ctbName, (STableMeta *)&output->ctbMeta, sizeof(output->ctbMeta)));
}
_return:
if (dbCache) {
taosHashRelease(pCtg->dbCache, dbCache);
}
tfree(msg);
CTG_RET(code);
}
int32_t ctgActRemoveStb(SCtgMetaAction *action) {
int32_t code = 0;
SCtgRemoveStbMsg *msg = action->data;
bool removed = false;
SCatalog* pCtg = msg->pCtg;
SCtgDBCache *dbCache = NULL;
ctgGetDBCache(pCtg, msg->dbFName, &dbCache);
if (NULL == dbCache) {
return TSDB_CODE_SUCCESS;
}
if (dbCache->dbId != msg->dbId) {
ctgDebug("dbId already modified, dbFName:%s, current:%"PRIx64", dbId:%"PRIx64", stb:%s, suid:%"PRIx64, msg->dbFName, dbCache->dbId, msg->dbId, msg->stbName, msg->suid);
return TSDB_CODE_SUCCESS;
}
CTG_LOCK(CTG_WRITE, &dbCache->tbCache.stbLock);
if (taosHashRemove(dbCache->tbCache.stbCache, &msg->suid, sizeof(msg->suid))) {
CTG_UNLOCK(CTG_WRITE, &dbCache->tbCache.stbLock);
ctgDebug("stb not exist in stbCache, may be removed, dbFName:%s, stb:%s, suid:%"PRIx64, msg->dbFName, msg->stbName, msg->suid);
return TSDB_CODE_SUCCESS;
}
CTG_LOCK(CTG_READ, &dbCache->tbCache.metaLock);
if (taosHashRemove(dbCache->tbCache.metaCache, msg->stbName, strlen(msg->stbName))) {
CTG_UNLOCK(CTG_READ, &dbCache->tbCache.metaLock);
CTG_UNLOCK(CTG_WRITE, &dbCache->tbCache.stbLock);
ctgError("stb not exist in cache, dbFName:%s, stb:%s, suid:%"PRIx64, msg->dbFName, msg->stbName, msg->suid);
CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
}
CTG_UNLOCK(CTG_READ, &dbCache->tbCache.metaLock);
CTG_UNLOCK(CTG_WRITE, &dbCache->tbCache.stbLock);
ctgInfo("stb removed from cache, dbFName:%s, stbName:%s, suid:%"PRIx64, msg->dbFName, msg->stbName, msg->suid);
CTG_ERR_JRET(ctgMetaRentRemove(&msg->pCtg->stbRent, msg->suid, ctgSTableVersionCompare));
ctgDebug("stb removed from rent, dbFName:%s, stbName:%s, suid:%"PRIx64, msg->dbFName, msg->stbName, msg->suid);
_return:
tfree(msg);
CTG_RET(code);
}
int32_t ctgActRemoveTbl(SCtgMetaAction *action) {
}
void* ctgUpdateThreadFunc(void* param) {
setThreadName("catalog");
qInfo("catalog update thread started");
CTG_LOCK(CTG_READ, &ctgMgmt.lock);
while (true) {
tsem_wait(&ctgMgmt.sem);
if (atomic_load_8(&ctgMgmt.exit)) {
break;
}
SCtgMetaAction *action = NULL;
ctgPopAction(&action);
(*ctgActFuncs[action->act])(action);
}
CTG_UNLOCK(CTG_READ, &ctgMgmt.lock);
qInfo("catalog update thread stopped");
return NULL;
}
int32_t ctgStartUpdateThread() {
pthread_attr_t thAttr;
pthread_attr_init(&thAttr);
pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE);
if (pthread_create(&ctgMgmt.updateThread, &thAttr, ctgUpdateThreadFunc, NULL) != 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
CTG_ERR_RET(terrno);
}
pthread_attr_destroy(&thAttr);
return TSDB_CODE_SUCCESS;
}
......@@ -1394,7 +1638,7 @@ int32_t catalogInit(SCatalogCfg *cfg) {
}
if (ctgMgmt.cfg.maxTblCacheNum == 0) {
ctgMgmt.cfg.maxTblCacheNum = CTG_DEFAULT_CACHE_TABLEMETA_NUMBER;
ctgMgmt.cfg.maxTblCacheNum = CTG_DEFAULT_CACHE_TBLMETA_NUMBER;
}
if (ctgMgmt.cfg.dbRentSec == 0) {
......@@ -1406,7 +1650,7 @@ int32_t catalogInit(SCatalogCfg *cfg) {
}
} else {
ctgMgmt.cfg.maxDBCacheNum = CTG_DEFAULT_CACHE_DB_NUMBER;
ctgMgmt.cfg.maxTblCacheNum = CTG_DEFAULT_CACHE_TABLEMETA_NUMBER;
ctgMgmt.cfg.maxTblCacheNum = CTG_DEFAULT_CACHE_TBLMETA_NUMBER;
ctgMgmt.cfg.dbRentSec = CTG_DEFAULT_RENT_SECOND;
ctgMgmt.cfg.stbRentSec = CTG_DEFAULT_RENT_SECOND;
}
......@@ -1417,12 +1661,23 @@ int32_t catalogInit(SCatalogCfg *cfg) {
CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
}
CTG_ERR_RET(ctgStartUpdateThread());
tsem_init(&ctgMgmt.sem, 0, 0);
ctgMgmt.head = calloc(1, sizeof(SCtgQNode));
if (NULL == ctgMgmt.head) {
qError("calloc %d failed", (int32_t)sizeof(SCtgQNode));
CTG_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
ctgMgmt.tail = ctgMgmt.head;
qDebug("catalog initialized, maxDb:%u, maxTbl:%u, dbRentSec:%u, stbRentSec:%u", ctgMgmt.cfg.maxDBCacheNum, ctgMgmt.cfg.maxTblCacheNum, ctgMgmt.cfg.dbRentSec, ctgMgmt.cfg.stbRentSec);
return TSDB_CODE_SUCCESS;
}
int32_t catalogGetHandle(uint64_t clusterId, struct SCatalog** catalogHandle) {
int32_t catalogGetHandle(uint64_t clusterId, SCatalog** catalogHandle) {
if (NULL == catalogHandle) {
CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
}
......@@ -1455,6 +1710,18 @@ int32_t catalogGetHandle(uint64_t clusterId, struct SCatalog** catalogHandle) {
CTG_ERR_JRET(ctgMetaRentInit(&clusterCtg->dbRent, ctgMgmt.cfg.dbRentSec, CTG_RENT_DB));
CTG_ERR_JRET(ctgMetaRentInit(&clusterCtg->stbRent, ctgMgmt.cfg.stbRentSec, CTG_RENT_STABLE));
clusterCtg->dbCache = taosHashInit(ctgMgmt.cfg.maxDBCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
if (NULL == clusterCtg->dbCache) {
qError("taosHashInit %d dbCache failed", CTG_DEFAULT_CACHE_DB_NUMBER);
CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
}
SHashObj *metaCache = taosHashInit(ctgMgmt.cfg.maxTblCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
if (NULL == metaCache) {
qError("taosHashInit failed, num:%d", ctgMgmt.cfg.maxTblCacheNum);
CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
}
code = taosHashPut(ctgMgmt.pCluster, &clusterId, sizeof(clusterId), &clusterCtg, POINTER_BYTES);
if (code) {
if (HASH_NODE_EXIST(code)) {
......@@ -1482,97 +1749,81 @@ _return:
CTG_RET(code);
}
void catalogFreeHandle(struct SCatalog* pCatalog) {
if (NULL == pCatalog) {
void catalogFreeHandle(SCatalog* pCtg) {
if (NULL == pCtg) {
return;
}
if (taosHashRemove(ctgMgmt.pCluster, &pCatalog->clusterId, sizeof(pCatalog->clusterId))) {
ctgWarn("taosHashRemove from cluster failed, may already be freed, clusterId:%"PRIx64, pCatalog->clusterId);
if (taosHashRemove(ctgMgmt.pCluster, &pCtg->clusterId, sizeof(pCtg->clusterId))) {
ctgWarn("taosHashRemove from cluster failed, may already be freed, clusterId:%"PRIx64, pCtg->clusterId);
return;
}
uint64_t clusterId = pCatalog->clusterId;
uint64_t clusterId = pCtg->clusterId;
ctgFreeHandle(pCatalog);
ctgFreeHandle(pCtg);
ctgInfo("handle freed, culsterId:%"PRIx64, clusterId);
}
int32_t catalogGetDBVgroupVersion(struct SCatalog* pCatalog, const char* dbName, int32_t* version) {
if (NULL == pCatalog || NULL == dbName || NULL == version) {
CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
}
int32_t catalogGetDBVgVersion(SCatalog* pCtg, const char* dbFName, int32_t* version) {
CTG_API_ENTER();
if (NULL == pCatalog->dbCache) {
if (NULL == pCtg || NULL == dbFName || NULL == version) {
CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT);
}
if (NULL == pCtg->dbCache) {
*version = CTG_DEFAULT_INVALID_VERSION;
ctgInfo("empty db cache, dbName:%s", dbName);
ctgInfo("empty db cache, dbFName:%s", dbFName);
CTG_API_LEAVE(TSDB_CODE_SUCCESS);
}
SCtgDBCache *db = taosHashAcquire(pCatalog->dbCache, dbName, strlen(dbName));
if (NULL == db) {
SCtgDBCache *dbCache = NULL;
ctgAcquireDBCache(pCtg, dbFName, &dbCache);
if (NULL == dbCache) {
*version = CTG_DEFAULT_INVALID_VERSION;
ctgInfo("db not in cache, dbName:%s", dbName);
CTG_API_LEAVE(TSDB_CODE_SUCCESS);
}
CTG_LOCK(CTG_READ, &db->vgLock);
if (NULL == db->vgInfo) {
CTG_UNLOCK(CTG_READ, &db->vgLock);
ctgAcquireVgInfo(pCtg, dbCache);
if (NULL == dbCache->vgInfo) {
ctgReleaseDBCache(pCtg, dbCache);
*version = CTG_DEFAULT_INVALID_VERSION;
ctgInfo("db not in cache, dbName:%s", dbName);
CTG_API_LEAVE(TSDB_CODE_SUCCESS);
}
*version = db->vgInfo->vgVersion;
CTG_UNLOCK(CTG_READ, &db->vgLock);
taosHashRelease(pCatalog->dbCache, db);
*version = dbCache->vgInfo->vgVersion;
ctgReleaseVgInfo(dbCache);
ctgReleaseDBCache(pCtg, dbCache);
ctgDebug("Got db vgVersion from cache, dbName:%s, vgVersion:%d", dbName, *version);
ctgDebug("Got db vgVersion from cache, dbFName:%s, vgVersion:%d", dbFName, *version);
CTG_API_LEAVE(TSDB_CODE_SUCCESS);
}
int32_t catalogGetDBVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* dbFName, bool forceUpdate, SArray** vgroupList) {
if (NULL == pCatalog || NULL == dbFName || NULL == pRpc || NULL == pMgmtEps || NULL == vgroupList) {
CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
}
int32_t catalogGetDBVgInfo(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const char* dbFName, bool forceUpdate, SArray** vgroupList) {
CTG_API_ENTER();
SCtgDBCache* dbCache = NULL;
SVgroupInfo *vgInfo = NULL;
if (NULL == pCtg || NULL == dbFName || NULL == pRpc || NULL == pMgmtEps || NULL == vgroupList) {
CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT);
}
SCtgDBCache* dbCache = NULL;
int32_t code = 0;
SArray *vgList = NULL;
CTG_ERR_JRET(ctgGetDBVgroup(pCatalog, pRpc, pMgmtEps, dbFName, forceUpdate, &dbCache));
int32_t vgNum = (int32_t)taosHashGetSize(dbCache->vgInfo->vgHash);
vgList = taosArrayInit(vgNum, sizeof(SVgroupInfo));
if (NULL == vgList) {
ctgError("taosArrayInit %d failed", vgNum);
CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
SHashObj *vgHash = NULL;
SDBVgInfo *vgInfo = NULL;
CTG_ERR_JRET(ctgGetDBVgInfo(pCtg, pRpc, pMgmtEps, dbFName, forceUpdate, &dbCache, &vgInfo));
if (dbCache) {
vgHash = dbCache->vgInfo->vgHash;
} else {
vgHash = vgInfo->vgHash;
}
void *pIter = taosHashIterate(dbCache->vgInfo->vgHash, NULL);
while (pIter) {
vgInfo = pIter;
if (NULL == taosArrayPush(vgList, vgInfo)) {
ctgError("taosArrayPush failed, vgId:%d", vgInfo->vgId);
taosHashCancelIterate(dbCache->vgInfo->vgHash, pIter);
CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
}
pIter = taosHashIterate(dbCache->vgInfo->vgHash, pIter);
vgInfo = NULL;
}
CTG_ERR_JRET(ctgGenerateVgList(pCtg, vgHash, &vgList));
*vgroupList = vgList;
vgList = NULL;
......@@ -1580,218 +1831,221 @@ int32_t catalogGetDBVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet*
_return:
if (dbCache) {
CTG_UNLOCK(CTG_READ, &dbCache->vgLock);
taosHashRelease(pCatalog->dbCache, dbCache);
ctgReleaseVgInfo(dbCache);
ctgReleaseDBCache(pCtg, dbCache);
}
if (vgList) {
taosArrayDestroy(vgList);
vgList = NULL;
if (vgInfo) {
taosHashCleanup(vgInfo->vgHash);
tfree(vgInfo);
}
CTG_API_LEAVE(code);
}
int32_t catalogUpdateDBVgroup(struct SCatalog* pCatalog, const char* dbFName, uint64_t dbId, SDBVgroupInfo* dbInfo) {
int32_t code = 0;
int32_t catalogUpdateDBVgInfo(SCatalog* pCtg, const char* dbFName, uint64_t dbId, SDBVgInfo* dbInfo) {
CTG_API_ENTER();
int32_t code = 0;
if (NULL == pCatalog || NULL == dbFName || NULL == dbInfo) {
if (NULL == pCtg || NULL == dbFName || NULL == dbInfo) {
CTG_ERR_JRET(TSDB_CODE_CTG_INVALID_INPUT);
}
if (NULL == dbInfo->vgHash || dbInfo->vgVersion < 0 || taosHashGetSize(dbInfo->vgHash) <= 0) {
ctgError("invalid db vgInfo, dbFName:%s, vgHash:%p, vgVersion:%d", dbFName, dbInfo->vgHash, dbInfo->vgVersion);
SCtgMetaAction action= {.act = CTG_ACT_UPDATE_VG};
SCtgUpdateVgMsg *msg = malloc(sizeof(SCtgUpdateVgMsg));
if (NULL == msg) {
ctgError("malloc %d failed", (int32_t)sizeof(SCtgUpdateVgMsg));
CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
}
CTG_ERR_JRET(ctgInitDBCache(pCatalog));
bool newAdded = false;
SDbVgVersion vgVersion = {.dbId = dbId, .vgVersion = dbInfo->vgVersion};
SCtgDBCache *dbCache = NULL;
CTG_ERR_JRET(ctgAcquireDBCache(pCatalog, dbFName, dbId, &dbCache));
CTG_LOCK(CTG_WRITE, &dbCache->vgLock);
if (dbCache->deleted) {
ctgInfo("db is dropping, dbFName:%s, dbId:%"PRIx64, dbFName, dbId);
CTG_UNLOCK(CTG_WRITE, &dbCache->vgLock);
taosHashRelease(pCatalog->dbCache, dbCache);
CTG_ERR_JRET(TSDB_CODE_CTG_DB_DROPPED);
}
if (NULL == dbCache->vgInfo) {
dbCache->vgInfo = dbInfo;
} else {
if (dbInfo->vgVersion <= dbCache->vgInfo->vgVersion) {
ctgInfo("db vgVersion is old, dbFName:%s, vgVersion:%d, current:%d", dbFName, dbInfo->vgVersion, dbCache->vgInfo->vgVersion);
CTG_UNLOCK(CTG_WRITE, &dbCache->vgLock);
taosHashRelease(pCatalog->dbCache, dbCache);
goto _return;
}
if (dbCache->vgInfo->vgHash) {
ctgDebug("cleanup db vgHash, dbFName:%s", dbFName);
taosHashCleanup(dbCache->vgInfo->vgHash);
dbCache->vgInfo->vgHash = NULL;
}
tfree(dbCache->vgInfo);
dbCache->vgInfo = dbInfo;
}
msg->pCtg = pCtg;
strncpy(msg->dbFName, dbFName, sizeof(msg->dbFName));
msg->dbId = dbId;
msg->dbInfo = dbInfo;
dbInfo = NULL;
CTG_UNLOCK(CTG_WRITE, &dbCache->vgLock);
taosHashRelease(pCatalog->dbCache, dbCache);
action.data = msg;
strncpy(vgVersion.dbFName, dbFName, sizeof(vgVersion.dbFName));
CTG_ERR_JRET(ctgMetaRentUpdate(&pCatalog->dbRent, &vgVersion, vgVersion.dbId, sizeof(SDbVgVersion), ctgDbVgVersionCompare));
ctgDebug("dbCache updated, dbFName:%s, vgVersion:%d, dbId:%"PRIx64, dbFName, vgVersion.vgVersion, vgVersion.dbId);
CTG_ERR_JRET(ctgPushAction(&action));
CTG_API_LEAVE(code);
_return:
if (dbCache) {
CTG_UNLOCK(CTG_WRITE, &pCatalog->dbLock);
}
if (dbInfo) {
taosHashCleanup(dbInfo->vgHash);
dbInfo->vgHash = NULL;
tfree(dbInfo);
}
tfree(msg);
CTG_API_LEAVE(code);
}
int32_t catalogRemoveDB(struct SCatalog* pCatalog, const char* dbFName, uint64_t dbId) {
int32_t catalogRemoveDB(SCatalog* pCtg, const char* dbFName, uint64_t dbId) {
CTG_API_ENTER();
int32_t code = 0;
if (NULL == pCatalog || NULL == dbFName) {
CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
if (NULL == pCtg || NULL == dbFName) {
CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT);
}
CTG_API_ENTER();
if (NULL == pCatalog->dbCache) {
if (NULL == pCtg->dbCache) {
CTG_API_LEAVE(TSDB_CODE_SUCCESS);
}
SCtgDBCache *dbCache = (SCtgDBCache *)taosHashAcquire(pCatalog->dbCache, dbFName, strlen(dbFName));
if (NULL == dbCache) {
ctgInfo("db not exist in dbCache, may be removed, dbFName:%s", dbFName);
CTG_API_LEAVE(TSDB_CODE_SUCCESS);
SCtgMetaAction action= {.act = CTG_ACT_REMOVE_DB};
SCtgRemoveDBMsg *msg = malloc(sizeof(SCtgRemoveDBMsg));
if (NULL == msg) {
ctgError("malloc %d failed", (int32_t)sizeof(SCtgRemoveDBMsg));
CTG_API_LEAVE(TSDB_CODE_CTG_MEM_ERROR);
}
if (dbCache->dbId != dbId) {
ctgInfo("db id already updated, dbFName:%s, dbId:%"PRIx64 ", targetId:%"PRIx64, dbFName, dbCache->dbId, dbId);
taosHashRelease(pCatalog->dbCache, dbCache);
CTG_API_LEAVE(TSDB_CODE_SUCCESS);
}
CTG_ERR_JRET(ctgValidateAndRemoveDb(pCatalog, dbCache, dbFName));
msg->pCtg = pCtg;
strncpy(msg->dbFName, dbFName, sizeof(msg->dbFName));
msg->dbId = dbId;
_return:
action.data = msg;
CTG_ERR_JRET(ctgPushAction(&action));
CTG_API_LEAVE(TSDB_CODE_SUCCESS);
taosHashRelease(pCatalog->dbCache, dbCache);
_return:
tfree(action.data);
CTG_API_LEAVE(code);
}
int32_t catalogRemoveSTableMeta(struct SCatalog* pCatalog, const char* dbName, const char* stbName, uint64_t suid) {
int32_t catalogRemoveStbMeta(SCatalog* pCtg, const char* dbFName, uint64_t dbId, const char* stbName, uint64_t suid) {
CTG_API_ENTER();
int32_t code = 0;
bool removed = false;
if (NULL == pCatalog || NULL == dbName || NULL == stbName) {
CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
if (NULL == pCtg || NULL == dbFName || NULL == stbName) {
CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT);
}
CTG_API_ENTER();
if (NULL == pCatalog->dbCache) {
if (NULL == pCtg->dbCache) {
CTG_API_LEAVE(TSDB_CODE_SUCCESS);
}
CTG_ERR_RET(ctgValidateAndRemoveStbMeta(pCatalog, dbName, stbName, suid, &removed));
if (!removed) {
CTG_API_LEAVE(TSDB_CODE_SUCCESS);
SCtgMetaAction action= {.act = CTG_ACT_REMOVE_STB};
SCtgRemoveStbMsg *msg = malloc(sizeof(SCtgRemoveStbMsg));
if (NULL == msg) {
ctgError("malloc %d failed", (int32_t)sizeof(SCtgRemoveStbMsg));
CTG_API_LEAVE(TSDB_CODE_CTG_MEM_ERROR);
}
ctgInfo("stb removed from cache, db:%s, stbName:%s, suid:%"PRIx64, dbName, stbName, suid);
CTG_ERR_JRET(ctgMetaRentRemove(&pCatalog->stbRent, suid, ctgSTableVersionCompare));
ctgDebug("stb removed from rent, db:%s, stbName:%s, suid:%"PRIx64, dbName, stbName, suid);
msg->pCtg = pCtg;
strncpy(msg->dbFName, dbFName, sizeof(msg->dbFName));
strncpy(msg->stbName, stbName, sizeof(msg->stbName));
msg->dbId = dbId;
msg->suid = suid;
_return:
action.data = msg;
CTG_ERR_JRET(ctgPushAction(&action));
CTG_API_LEAVE(TSDB_CODE_SUCCESS);
_return:
tfree(action.data);
CTG_API_LEAVE(code);
}
int32_t catalogGetTableMeta(struct SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, STableMeta** pTableMeta) {
int32_t catalogGetTableMeta(SCatalog* pCtg, void *pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, STableMeta** pTableMeta) {
CTG_API_ENTER();
CTG_API_LEAVE(ctgGetTableMeta(pCatalog, pTransporter, pMgmtEps, pTableName, false, pTableMeta, -1));
CTG_API_LEAVE(ctgGetTableMeta(pCtg, pTransporter, pMgmtEps, pTableName, false, pTableMeta, -1));
}
int32_t catalogGetSTableMeta(struct SCatalog* pCatalog, void * pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, STableMeta** pTableMeta) {
int32_t catalogGetSTableMeta(SCatalog* pCtg, void * pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, STableMeta** pTableMeta) {
CTG_API_ENTER();
CTG_API_LEAVE(ctgGetTableMeta(pCatalog, pTransporter, pMgmtEps, pTableName, false, pTableMeta, 1));
CTG_API_LEAVE(ctgGetTableMeta(pCtg, pTransporter, pMgmtEps, pTableName, false, pTableMeta, 1));
}
int32_t catalogUpdateSTableMeta(struct SCatalog* pCatalog, STableMetaRsp *rspMsg) {
STableMetaOutput output = {0};
int32_t catalogUpdateSTableMeta(SCatalog* pCtg, STableMetaRsp *rspMsg) {
CTG_API_ENTER();
if (NULL == pCtg || NULL == rspMsg) {
CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT);
}
STableMetaOutput *output = calloc(1, sizeof(STableMetaOutput));
if (NULL == output) {
ctgError("malloc %d failed", (int32_t)sizeof(STableMetaOutput));
CTG_API_LEAVE(TSDB_CODE_CTG_MEM_ERROR);
}
int32_t code = 0;
CTG_API_ENTER();
strcpy(output->dbFName, rspMsg->dbFName);
strcpy(output->tbName, rspMsg->tbName);
strcpy(output.dbFName, rspMsg->dbFName);
strcpy(output.tbName, rspMsg->tbName);
output->dbId = rspMsg->dbId;
SET_META_TYPE_TABLE(output.metaType);
SET_META_TYPE_TABLE(output->metaType);
CTG_ERR_JRET(queryCreateTableMetaFromMsg(rspMsg, true, &output.tbMeta));
CTG_ERR_JRET(queryCreateTableMetaFromMsg(rspMsg, true, &output->tbMeta));
CTG_ERR_JRET(ctgUpdateTableMetaCache(pCatalog, &output));
SCtgMetaAction action= {.act = CTG_ACT_UPDATE_TBL};
SCtgUpdateTblMsg *msg = malloc(sizeof(SCtgUpdateTblMsg));
if (NULL == msg) {
ctgError("malloc %d failed", (int32_t)sizeof(SCtgUpdateTblMsg));
CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
}
msg->pCtg = pCtg;
msg->output = output;
action.data = msg;
CTG_ERR_JRET(ctgPushAction(&action));
CTG_API_LEAVE(code);
_return:
tfree(output.tbMeta);
tfree(output->tbMeta);
tfree(output);
tfree(msg);
CTG_API_LEAVE(code);
}
int32_t catalogRenewTableMeta(struct SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, int32_t isSTable) {
if (NULL == pCatalog || NULL == pTransporter || NULL == pMgmtEps || NULL == pTableName) {
CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
}
int32_t catalogRefreshTableMeta(SCatalog* pCtg, void *pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, int32_t isSTable) {
CTG_API_ENTER();
CTG_API_LEAVE(ctgRenewTableMetaImpl(pCatalog, pTransporter, pMgmtEps, pTableName, isSTable));
if (NULL == pCtg || NULL == pTransporter || NULL == pMgmtEps || NULL == pTableName) {
CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT);
}
CTG_API_LEAVE(ctgRefreshTblMeta(pCtg, pTransporter, pMgmtEps, pTableName, isSTable, NULL));
}
int32_t catalogRenewAndGetTableMeta(struct SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, STableMeta** pTableMeta, int32_t isSTable) {
int32_t catalogRefreshGetTableMeta(SCatalog* pCtg, void *pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, STableMeta** pTableMeta, int32_t isSTable) {
CTG_API_ENTER();
CTG_API_LEAVE(ctgGetTableMeta(pCatalog, pTransporter, pMgmtEps, pTableName, true, pTableMeta, isSTable));
CTG_API_LEAVE(ctgGetTableMeta(pCtg, pTransporter, pMgmtEps, pTableName, true, pTableMeta, isSTable));
}
int32_t catalogGetTableDistVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const SName* pTableName, SArray** pVgroupList) {
if (NULL == pCatalog || NULL == pRpc || NULL == pMgmtEps || NULL == pTableName || NULL == pVgroupList) {
CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
}
int32_t catalogGetTableDistVgInfo(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const SName* pTableName, SArray** pVgList) {
CTG_API_ENTER();
if (NULL == pCtg || NULL == pRpc || NULL == pMgmtEps || NULL == pTableName || NULL == pVgList) {
CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT);
}
STableMeta *tbMeta = NULL;
int32_t code = 0;
......@@ -1799,29 +2053,38 @@ int32_t catalogGetTableDistVgroup(struct SCatalog* pCatalog, void *pRpc, const S
SCtgDBCache* dbCache = NULL;
SArray *vgList = NULL;
*pVgroupList = NULL;
*pVgList = NULL;
CTG_ERR_JRET(ctgGetTableMeta(pCatalog, pRpc, pMgmtEps, pTableName, false, &tbMeta, -1));
CTG_ERR_JRET(ctgGetTableMeta(pCtg, pRpc, pMgmtEps, pTableName, false, &tbMeta, -1));
char db[TSDB_DB_FNAME_LEN] = {0};
tNameGetFullDbName(pTableName, db);
CTG_ERR_JRET(ctgGetDBVgroup(pCatalog, pRpc, pMgmtEps, db, false, &dbCache));
// TODO REMOEV THIS ....
SHashObj *vgHash = NULL;
SDBVgInfo *vgInfo = NULL;
CTG_ERR_JRET(ctgGetDBVgInfo(pCtg, pRpc, pMgmtEps, db, false, &dbCache, &vgInfo));
if (dbCache) {
vgHash = dbCache->vgInfo->vgHash;
} else {
vgHash = vgInfo->vgHash;
}
/* TODO REMOEV THIS ....
if (0 == tbMeta->vgId) {
SVgroupInfo vgroup = {0};
catalogGetTableHashVgroup(pCatalog, pRpc, pMgmtEps, pTableName, &vgroup);
catalogGetTableHashVgroup(pCtg, pRpc, pMgmtEps, pTableName, &vgroup);
tbMeta->vgId = vgroup.vgId;
}
// TODO REMOVE THIS ....
// TODO REMOVE THIS ....*/
if (tbMeta->tableType == TSDB_SUPER_TABLE) {
CTG_ERR_JRET(ctgGetVgInfoFromDB(pCatalog, pRpc, pMgmtEps, dbCache->vgInfo, pVgroupList));
CTG_ERR_JRET(ctgGenerateVgList(pCtg, vgHash, pVgList));
} else {
int32_t vgId = tbMeta->vgId;
if (NULL == taosHashGetClone(dbCache->vgInfo->vgHash, &vgId, sizeof(vgId), &vgroupInfo)) {
if (NULL == taosHashGetClone(vgHash, &vgId, sizeof(vgId), &vgroupInfo)) {
ctgError("table's vgId not found in vgroup list, vgId:%d, tbName:%s", vgId, tNameGetTableName(pTableName));
CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
}
......@@ -1837,16 +2100,22 @@ int32_t catalogGetTableDistVgroup(struct SCatalog* pCatalog, void *pRpc, const S
CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
}
*pVgroupList = vgList;
*pVgList = vgList;
vgList = NULL;
}
_return:
tfree(tbMeta);
if (dbCache) {
CTG_UNLOCK(CTG_READ, &dbCache->vgLock);
taosHashRelease(pCatalog->dbCache, dbCache);
ctgReleaseVgInfo(dbCache);
ctgReleaseDBCache(pCtg, dbCache);
}
tfree(tbMeta);
if (vgInfo) {
taosHashCleanup(vgInfo->vgHash);
tfree(vgInfo);
}
if (vgList) {
......@@ -1858,37 +2127,42 @@ _return:
}
int32_t catalogGetTableHashVgroup(struct SCatalog *pCatalog, void *pTransporter, const SEpSet *pMgmtEps, const SName *pTableName, SVgroupInfo *pVgroup) {
SCtgDBCache* dbCache = NULL;
int32_t code = 0;
int32_t catalogGetTableHashVgroup(SCatalog *pCtg, void *pTransporter, const SEpSet *pMgmtEps, const SName *pTableName, SVgroupInfo *pVgroup) {
CTG_API_ENTER();
SCtgDBCache* dbCache = NULL;
int32_t code = 0;
char db[TSDB_DB_FNAME_LEN] = {0};
tNameGetFullDbName(pTableName, db);
CTG_ERR_JRET(ctgGetDBVgroup(pCatalog, pTransporter, pMgmtEps, db, false, &dbCache));
SDBVgInfo *vgInfo = NULL;
CTG_ERR_JRET(ctgGetDBVgInfo(pCtg, pTransporter, pMgmtEps, db, false, &dbCache, &vgInfo));
CTG_ERR_JRET(ctgGetVgInfoFromHashValue(pCatalog, dbCache->vgInfo, pTableName, pVgroup));
CTG_ERR_JRET(ctgGetVgInfoFromHashValue(pCtg, vgInfo ? vgInfo : dbCache->vgInfo, pTableName, pVgroup));
_return:
if (dbCache) {
CTG_UNLOCK(CTG_READ, &dbCache->vgLock);
taosHashRelease(pCatalog->dbCache, dbCache);
ctgReleaseVgInfo(dbCache);
ctgReleaseDBCache(pCtg, dbCache);
}
if (vgInfo) {
taosHashCleanup(vgInfo->vgHash);
tfree(vgInfo);
}
CTG_API_LEAVE(code);
}
int32_t catalogGetAllMeta(struct SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const SCatalogReq* pReq, SMetaData* pRsp) {
if (NULL == pCatalog || NULL == pTransporter || NULL == pMgmtEps || NULL == pReq || NULL == pRsp) {
CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
}
int32_t catalogGetAllMeta(SCatalog* pCtg, void *pTransporter, const SEpSet* pMgmtEps, const SCatalogReq* pReq, SMetaData* pRsp) {
CTG_API_ENTER();
if (NULL == pCtg || NULL == pTransporter || NULL == pMgmtEps || NULL == pReq || NULL == pRsp) {
CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT);
}
int32_t code = 0;
pRsp->pTableMeta = NULL;
......@@ -1909,7 +2183,7 @@ int32_t catalogGetAllMeta(struct SCatalog* pCatalog, void *pTransporter, const S
SName *name = taosArrayGet(pReq->pTableName, i);
STableMeta *pTableMeta = NULL;
CTG_ERR_JRET(ctgGetTableMeta(pCatalog, pTransporter, pMgmtEps, name, false, &pTableMeta, -1));
CTG_ERR_JRET(ctgGetTableMeta(pCtg, pTransporter, pMgmtEps, name, false, &pTableMeta, -1));
if (NULL == taosArrayPush(pRsp->pTableMeta, &pTableMeta)) {
ctgError("taosArrayPush failed, idx:%d", i);
......@@ -1937,40 +2211,42 @@ _return:
CTG_API_LEAVE(code);
}
int32_t catalogGetQnodeList(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, SArray* pQnodeList) {
if (NULL == pCatalog || NULL == pRpc || NULL == pMgmtEps || NULL == pQnodeList) {
CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
}
int32_t catalogGetQnodeList(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, SArray* pQnodeList) {
CTG_API_ENTER();
if (NULL == pCtg || NULL == pRpc || NULL == pMgmtEps || NULL == pQnodeList) {
CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT);
}
//TODO
CTG_API_LEAVE(TSDB_CODE_SUCCESS);
}
int32_t catalogGetExpiredSTables(struct SCatalog* pCatalog, SSTableMetaVersion **stables, uint32_t *num) {
if (NULL == pCatalog || NULL == stables || NULL == num) {
CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
}
int32_t catalogGetExpiredSTables(SCatalog* pCtg, SSTableMetaVersion **stables, uint32_t *num) {
CTG_API_ENTER();
CTG_API_LEAVE(ctgMetaRentGet(&pCatalog->stbRent, (void **)stables, num, sizeof(SSTableMetaVersion)));
}
int32_t catalogGetExpiredDBs(struct SCatalog* pCatalog, SDbVgVersion **dbs, uint32_t *num) {
if (NULL == pCatalog || NULL == dbs || NULL == num) {
CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
if (NULL == pCtg || NULL == stables || NULL == num) {
CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT);
}
CTG_API_LEAVE(ctgMetaRentGet(&pCtg->stbRent, (void **)stables, num, sizeof(SSTableMetaVersion)));
}
int32_t catalogGetExpiredDBs(SCatalog* pCtg, SDbVgVersion **dbs, uint32_t *num) {
CTG_API_ENTER();
if (NULL == pCtg || NULL == dbs || NULL == num) {
CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT);
}
CTG_API_LEAVE(ctgMetaRentGet(&pCatalog->dbRent, (void **)dbs, num, sizeof(SDbVgVersion)));
CTG_API_LEAVE(ctgMetaRentGet(&pCtg->dbRent, (void **)dbs, num, sizeof(SDbVgVersion)));
}
void catalogDestroy(void) {
qInfo("start to destroy catalog");
if (NULL == ctgMgmt.pCluster || atomic_load_8(&ctgMgmt.exit)) {
return;
}
......@@ -1979,13 +2255,13 @@ void catalogDestroy(void) {
CTG_LOCK(CTG_WRITE, &ctgMgmt.lock);
SCatalog *pCatalog = NULL;
SCatalog *pCtg = NULL;
void *pIter = taosHashIterate(ctgMgmt.pCluster, NULL);
while (pIter) {
pCatalog = *(SCatalog **)pIter;
pCtg = *(SCatalog **)pIter;
if (pCatalog) {
catalogFreeHandle(pCatalog);
if (pCtg) {
catalogFreeHandle(pCtg);
}
pIter = taosHashIterate(ctgMgmt.pCluster, pIter);
......
......@@ -38,7 +38,6 @@ namespace {
extern "C" int32_t ctgGetTableMetaFromCache(struct SCatalog *pCatalog, const SName *pTableName, STableMeta **pTableMeta,
int32_t *exist);
extern "C" int32_t ctgUpdateTableMetaCache(struct SCatalog *pCatalog, STableMetaOutput *output);
extern "C" int32_t ctgDbgGetClusterCacheNum(struct SCatalog* pCatalog, int32_t type);
void ctgTestSetPrepareTableMeta();
......@@ -176,11 +175,11 @@ void ctgTestBuildCTableMetaOutput(STableMetaOutput *output) {
strcpy(s->name, "tag1s");
}
void ctgTestBuildDBVgroup(SDBVgroupInfo **pdbVgroup) {
void ctgTestBuildDBVgroup(SDBVgInfo **pdbVgroup) {
static int32_t vgVersion = ctgTestVgVersion + 1;
int32_t vgNum = 0;
SVgroupInfo vgInfo = {0};
SDBVgroupInfo *dbVgroup = (SDBVgroupInfo *)calloc(1, sizeof(SDBVgroupInfo));
SDBVgInfo *dbVgroup = (SDBVgInfo *)calloc(1, sizeof(SDBVgInfo));
dbVgroup->vgVersion = vgVersion++;
......@@ -612,7 +611,7 @@ void *ctgTestGetDbVgroupThread(void *param) {
int32_t n = 0;
while (!ctgTestStop) {
code = catalogGetDBVgroup(pCtg, mockPointer, (const SEpSet *)mockPointer, ctgTestDbname, false, &vgList);
code = catalogGetDBVgInfo(pCtg, mockPointer, (const SEpSet *)mockPointer, ctgTestDbname, false, &vgList);
if (code) {
assert(0);
}
......@@ -635,12 +634,12 @@ void *ctgTestGetDbVgroupThread(void *param) {
void *ctgTestSetSameDbVgroupThread(void *param) {
struct SCatalog *pCtg = (struct SCatalog *)param;
int32_t code = 0;
SDBVgroupInfo *dbVgroup = NULL;
SDBVgInfo *dbVgroup = NULL;
int32_t n = 0;
while (!ctgTestStop) {
ctgTestBuildDBVgroup(&dbVgroup);
code = catalogUpdateDBVgroup(pCtg, ctgTestDbname, ctgTestDbId, dbVgroup);
code = catalogUpdateDBVgInfo(pCtg, ctgTestDbname, ctgTestDbId, dbVgroup);
if (code) {
assert(0);
}
......@@ -660,12 +659,12 @@ void *ctgTestSetSameDbVgroupThread(void *param) {
void *ctgTestSetDiffDbVgroupThread(void *param) {
struct SCatalog *pCtg = (struct SCatalog *)param;
int32_t code = 0;
SDBVgroupInfo *dbVgroup = NULL;
SDBVgInfo *dbVgroup = NULL;
int32_t n = 0;
while (!ctgTestStop) {
ctgTestBuildDBVgroup(&dbVgroup);
code = catalogUpdateDBVgroup(pCtg, ctgTestDbname, ctgTestDbId++, dbVgroup);
code = catalogUpdateDBVgInfo(pCtg, ctgTestDbname, ctgTestDbId++, dbVgroup);
if (code) {
assert(0);
}
......@@ -716,14 +715,22 @@ void *ctgTestGetCtableMetaThread(void *param) {
void *ctgTestSetCtableMetaThread(void *param) {
struct SCatalog *pCtg = (struct SCatalog *)param;
int32_t code = 0;
SDBVgroupInfo dbVgroup = {0};
SDBVgInfo dbVgroup = {0};
int32_t n = 0;
STableMetaOutput output = {0};
ctgTestBuildCTableMetaOutput(&output);
SCtgMetaAction action = {0};
action.act = CTG_ACT_UPDATE_TBL;
while (!ctgTestStop) {
code = ctgUpdateTableMetaCache(pCtg, &output);
SCtgUpdateTblMsg *msg = malloc(sizeof(SCtgUpdateTblMsg));
msg->pCtg = pCtg;
msg->output = output;
action.data = msg;
code = ctgActUpdateTbl(&action);
if (code) {
assert(0);
}
......@@ -984,7 +991,7 @@ TEST(tableMeta, superTableCase) {
ASSERT_EQ(tableMeta->tableInfo.rowSize, 12);
tableMeta = NULL;
code = catalogRenewAndGetTableMeta(pCtg, mockPointer, (const SEpSet *)mockPointer, &n, &tableMeta, 0);
code = catalogRefreshGetTableMeta(pCtg, mockPointer, (const SEpSet *)mockPointer, &n, &tableMeta, 0);
ASSERT_EQ(code, 0);
ASSERT_EQ(tableMeta->vgId, 9);
ASSERT_EQ(tableMeta->tableType, TSDB_CHILD_TABLE);
......@@ -1174,7 +1181,7 @@ TEST(tableDistVgroup, normalTable) {
strcpy(n.dbname, "db1");
strcpy(n.tname, ctgTestTablename);
code = catalogGetTableDistVgroup(pCtg, mockPointer, (const SEpSet *)mockPointer, &n, &vgList);
code = catalogGetTableDistVgInfo(pCtg, mockPointer, (const SEpSet *)mockPointer, &n, &vgList);
ASSERT_EQ(code, 0);
ASSERT_EQ(taosArrayGetSize((const SArray *)vgList), 1);
vgInfo = (SVgroupInfo *)taosArrayGet(vgList, 0);
......@@ -1206,7 +1213,7 @@ TEST(tableDistVgroup, childTableCase) {
strcpy(n.dbname, "db1");
strcpy(n.tname, ctgTestCTablename);
code = catalogGetTableDistVgroup(pCtg, mockPointer, (const SEpSet *)mockPointer, &n, &vgList);
code = catalogGetTableDistVgInfo(pCtg, mockPointer, (const SEpSet *)mockPointer, &n, &vgList);
ASSERT_EQ(code, 0);
ASSERT_EQ(taosArrayGetSize((const SArray *)vgList), 1);
vgInfo = (SVgroupInfo *)taosArrayGet(vgList, 0);
......@@ -1237,7 +1244,7 @@ TEST(tableDistVgroup, superTableCase) {
strcpy(n.dbname, "db1");
strcpy(n.tname, ctgTestSTablename);
code = catalogGetTableDistVgroup(pCtg, mockPointer, (const SEpSet *)mockPointer, &n, &vgList);
code = catalogGetTableDistVgInfo(pCtg, mockPointer, (const SEpSet *)mockPointer, &n, &vgList);
ASSERT_EQ(code, 0);
ASSERT_EQ(taosArrayGetSize((const SArray *)vgList), 10);
vgInfo = (SVgroupInfo *)taosArrayGet(vgList, 0);
......@@ -1258,7 +1265,7 @@ TEST(dbVgroup, getSetDbVgroupCase) {
void *mockPointer = (void *)0x1;
SVgroupInfo vgInfo = {0};
SVgroupInfo *pvgInfo = NULL;
SDBVgroupInfo *dbVgroup = NULL;
SDBVgInfo *dbVgroup = NULL;
SArray *vgList = NULL;
ctgTestInitLogFile();
......@@ -1279,7 +1286,7 @@ TEST(dbVgroup, getSetDbVgroupCase) {
strcpy(n.dbname, "db1");
strcpy(n.tname, ctgTestTablename);
code = catalogGetDBVgroup(pCtg, mockPointer, (const SEpSet *)mockPointer, ctgTestDbname, false, &vgList);
code = catalogGetDBVgInfo(pCtg, mockPointer, (const SEpSet *)mockPointer, ctgTestDbname, false, &vgList);
ASSERT_EQ(code, 0);
ASSERT_EQ(taosArrayGetSize((const SArray *)vgList), ctgTestVgNum);
......@@ -1288,7 +1295,7 @@ TEST(dbVgroup, getSetDbVgroupCase) {
ASSERT_EQ(vgInfo.vgId, 8);
ASSERT_EQ(vgInfo.epset.numOfEps, 3);
code = catalogGetTableDistVgroup(pCtg, mockPointer, (const SEpSet *)mockPointer, &n, &vgList);
code = catalogGetTableDistVgInfo(pCtg, mockPointer, (const SEpSet *)mockPointer, &n, &vgList);
ASSERT_EQ(code, 0);
ASSERT_EQ(taosArrayGetSize((const SArray *)vgList), 1);
pvgInfo = (SVgroupInfo *)taosArrayGet(vgList, 0);
......@@ -1297,7 +1304,7 @@ TEST(dbVgroup, getSetDbVgroupCase) {
taosArrayDestroy(vgList);
ctgTestBuildDBVgroup(&dbVgroup);
code = catalogUpdateDBVgroup(pCtg, ctgTestDbname, ctgTestDbId, dbVgroup);
code = catalogUpdateDBVgInfo(pCtg, ctgTestDbname, ctgTestDbId, dbVgroup);
ASSERT_EQ(code, 0);
code = catalogGetTableHashVgroup(pCtg, mockPointer, (const SEpSet *)mockPointer, &n, &vgInfo);
......@@ -1305,7 +1312,7 @@ TEST(dbVgroup, getSetDbVgroupCase) {
ASSERT_EQ(vgInfo.vgId, 7);
ASSERT_EQ(vgInfo.epset.numOfEps, 2);
code = catalogGetTableDistVgroup(pCtg, mockPointer, (const SEpSet *)mockPointer, &n, &vgList);
code = catalogGetTableDistVgInfo(pCtg, mockPointer, (const SEpSet *)mockPointer, &n, &vgList);
ASSERT_EQ(code, 0);
ASSERT_EQ(taosArrayGetSize((const SArray *)vgList), 1);
pvgInfo = (SVgroupInfo *)taosArrayGet(vgList, 0);
......@@ -1321,7 +1328,7 @@ TEST(multiThread, getSetRmSameDbVgroup) {
void *mockPointer = (void *)0x1;
SVgroupInfo vgInfo = {0};
SVgroupInfo *pvgInfo = NULL;
SDBVgroupInfo dbVgroup = {0};
SDBVgInfo dbVgroup = {0};
SArray *vgList = NULL;
ctgTestStop = false;
......@@ -1372,7 +1379,7 @@ TEST(multiThread, getSetRmDiffDbVgroup) {
void *mockPointer = (void *)0x1;
SVgroupInfo vgInfo = {0};
SVgroupInfo *pvgInfo = NULL;
SDBVgroupInfo dbVgroup = {0};
SDBVgInfo dbVgroup = {0};
SArray *vgList = NULL;
ctgTestStop = false;
......@@ -1425,7 +1432,7 @@ TEST(multiThread, ctableMeta) {
void *mockPointer = (void *)0x1;
SVgroupInfo vgInfo = {0};
SVgroupInfo *pvgInfo = NULL;
SDBVgroupInfo dbVgroup = {0};
SDBVgInfo dbVgroup = {0};
SArray *vgList = NULL;
ctgTestStop = false;
......@@ -1477,7 +1484,7 @@ TEST(rentTest, allRent) {
void *mockPointer = (void *)0x1;
SVgroupInfo vgInfo = {0};
SVgroupInfo *pvgInfo = NULL;
SDBVgroupInfo dbVgroup = {0};
SDBVgInfo dbVgroup = {0};
SArray *vgList = NULL;
ctgTestStop = false;
SDbVgVersion *dbs = NULL;
......
......@@ -3647,7 +3647,7 @@ int32_t evaluateSqlNode(SSqlNode* pNode, int32_t tsPrecision, SMsgBuf* pMsgBuf)
//TODO remove it
int32_t setTableVgroupList(SParseContext *pCtx, SName* name, SVgroupsInfo **pVgList) {
SArray* vgroupList = NULL;
int32_t code = catalogGetTableDistVgroup(pCtx->pCatalog, pCtx->pTransporter, &pCtx->mgmtEpSet, name, &vgroupList);
int32_t code = catalogGetTableDistVgInfo(pCtx->pCatalog, pCtx->pTransporter, &pCtx->mgmtEpSet, name, &vgroupList);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
......
......@@ -50,7 +50,7 @@ static int32_t setShowInfo(SShowInfo* pShowInfo, SParseContext* pCtx, void** out
char dbFname[TSDB_DB_FNAME_LEN] = {0};
tNameGetFullDbName(&name, dbFname);
int32_t code = catalogGetDBVgroup(pCtx->pCatalog, pCtx->pTransporter, &pCtx->mgmtEpSet, dbFname, false, &array);
int32_t code = catalogGetDBVgInfo(pCtx->pCatalog, pCtx->pTransporter, &pCtx->mgmtEpSet, dbFname, false, &array);
if (code != TSDB_CODE_SUCCESS) {
terrno = code;
return code;
......
......@@ -113,9 +113,9 @@ int32_t queryProcessUseDBRsp(void* output, char *msg, int32_t msgSize) {
return TSDB_CODE_TSC_VALUE_OUT_OF_RANGE;
}
pOut->dbVgroup = calloc(1, sizeof(SDBVgroupInfo));
pOut->dbVgroup = calloc(1, sizeof(SDBVgInfo));
if (NULL == pOut->dbVgroup) {
qError("calloc %d failed", (int32_t)sizeof(SDBVgroupInfo));
qError("calloc %d failed", (int32_t)sizeof(SDBVgInfo));
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册