未验证 提交 32459c4c 编写于 作者: D dapan1121 提交者: GitHub

Merge pull request #10236 from taosdata/feature/qnode

Feature/qnode
...@@ -2006,4 +2006,4 @@ static FORCE_INLINE void* tDecodeSMqCMGetSubEpRsp(void* buf, SMqCMGetSubEpRsp* p ...@@ -2006,4 +2006,4 @@ static FORCE_INLINE void* tDecodeSMqCMGetSubEpRsp(void* buf, SMqCMGetSubEpRsp* p
} }
#endif #endif
#endif /*_TD_COMMON_TAOS_MSG_H_*/ #endif /*_TD_COMMON_TAOS_MSG_H_*/
\ No newline at end of file
...@@ -30,7 +30,7 @@ extern "C" { ...@@ -30,7 +30,7 @@ extern "C" {
#include "tmsg.h" #include "tmsg.h"
#include "transport.h" #include "transport.h"
struct SCatalog; typedef struct SCatalog SCatalog;
enum { enum {
CTG_DBG_DB_NUM = 1, CTG_DBG_DB_NUM = 1,
...@@ -64,6 +64,7 @@ typedef struct SCatalogCfg { ...@@ -64,6 +64,7 @@ typedef struct SCatalogCfg {
typedef struct SSTableMetaVersion { typedef struct SSTableMetaVersion {
char dbFName[TSDB_DB_FNAME_LEN]; char dbFName[TSDB_DB_FNAME_LEN];
char stbName[TSDB_TABLE_NAME_LEN]; char stbName[TSDB_TABLE_NAME_LEN];
uint64_t dbId;
uint64_t suid; uint64_t suid;
int16_t sversion; int16_t sversion;
int16_t tversion; int16_t tversion;
...@@ -84,7 +85,7 @@ int32_t catalogInit(SCatalogCfg *cfg); ...@@ -84,7 +85,7 @@ int32_t catalogInit(SCatalogCfg *cfg);
* @param catalogHandle (output, NO need to free it) * @param catalogHandle (output, NO need to free it)
* @return error code * @return error code
*/ */
int32_t catalogGetHandle(uint64_t clusterId, struct SCatalog** catalogHandle); int32_t catalogGetHandle(uint64_t clusterId, SCatalog** catalogHandle);
/** /**
* Free a cluster's all catalog info, usually it's not necessary, until the application is closing. * Free a cluster's all catalog info, usually it's not necessary, until the application is closing.
...@@ -92,9 +93,9 @@ int32_t catalogGetHandle(uint64_t clusterId, struct SCatalog** catalogHandle); ...@@ -92,9 +93,9 @@ int32_t catalogGetHandle(uint64_t clusterId, struct SCatalog** catalogHandle);
* @param pCatalog (input, NO more usage) * @param pCatalog (input, NO more usage)
* @return error code * @return error code
*/ */
void catalogFreeHandle(struct SCatalog* pCatalog); void catalogFreeHandle(SCatalog* pCatalog);
int32_t catalogGetDBVgroupVersion(struct SCatalog* pCatalog, const char* dbName, int32_t* version); int32_t catalogGetDBVgVersion(SCatalog* pCatalog, const char* dbName, int32_t* version);
/** /**
* Get a DB's all vgroup info. * Get a DB's all vgroup info.
...@@ -106,13 +107,13 @@ int32_t catalogGetDBVgroupVersion(struct SCatalog* pCatalog, const char* dbName, ...@@ -106,13 +107,13 @@ int32_t catalogGetDBVgroupVersion(struct SCatalog* pCatalog, const char* dbName,
* @param pVgroupList (output, vgroup info list, element is SVgroupInfo, NEED to simply free the array by caller) * @param pVgroupList (output, vgroup info list, element is SVgroupInfo, NEED to simply free the array by caller)
* @return error code * @return error code
*/ */
int32_t catalogGetDBVgroup(struct SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const char* pDBName, bool forceUpdate, SArray** pVgroupList); int32_t catalogGetDBVgInfo(SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const char* pDBName, bool forceUpdate, SArray** pVgroupList);
int32_t catalogUpdateDBVgroup(struct SCatalog* pCatalog, const char* dbName, uint64_t dbId, SDBVgroupInfo* dbInfo); int32_t catalogUpdateDBVgInfo(SCatalog* pCatalog, const char* dbName, uint64_t dbId, SDBVgInfo* dbInfo);
int32_t catalogRemoveDB(struct SCatalog* pCatalog, const char* dbName, uint64_t dbId); int32_t catalogRemoveDB(SCatalog* pCatalog, const char* dbName, uint64_t dbId);
int32_t catalogRemoveSTableMeta(struct SCatalog* pCatalog, const char* dbName, const char* stbName, uint64_t suid); int32_t catalogRemoveStbMeta(SCatalog* pCtg, const char* dbFName, uint64_t dbId, const char* stbName, uint64_t suid);
/** /**
* Get a table's meta data. * Get a table's meta data.
...@@ -123,7 +124,7 @@ int32_t catalogRemoveSTableMeta(struct SCatalog* pCatalog, const char* dbName, c ...@@ -123,7 +124,7 @@ int32_t catalogRemoveSTableMeta(struct SCatalog* pCatalog, const char* dbName, c
* @param pTableMeta(output, table meta data, NEED to free it by calller) * @param pTableMeta(output, table meta data, NEED to free it by calller)
* @return error code * @return error code
*/ */
int32_t catalogGetTableMeta(struct SCatalog* pCatalog, void * pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, STableMeta** pTableMeta); int32_t catalogGetTableMeta(SCatalog* pCatalog, void * pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, STableMeta** pTableMeta);
/** /**
* Get a super table's meta data. * Get a super table's meta data.
...@@ -134,13 +135,13 @@ int32_t catalogGetTableMeta(struct SCatalog* pCatalog, void * pTransporter, cons ...@@ -134,13 +135,13 @@ int32_t catalogGetTableMeta(struct SCatalog* pCatalog, void * pTransporter, cons
* @param pTableMeta(output, table meta data, NEED to free it by calller) * @param pTableMeta(output, table meta data, NEED to free it by calller)
* @return error code * @return error code
*/ */
int32_t catalogGetSTableMeta(struct SCatalog* pCatalog, void * pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, STableMeta** pTableMeta); int32_t catalogGetSTableMeta(SCatalog* pCatalog, void * pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, STableMeta** pTableMeta);
int32_t catalogUpdateSTableMeta(struct SCatalog* pCatalog, STableMetaRsp *rspMsg); int32_t catalogUpdateSTableMeta(SCatalog* pCatalog, STableMetaRsp *rspMsg);
/** /**
* Force renew a table's local cached meta data. * Force refresh a table's local cached meta data.
* @param pCatalog (input, got with catalogGetHandle) * @param pCatalog (input, got with catalogGetHandle)
* @param pTransporter (input, rpc object) * @param pTransporter (input, rpc object)
* @param pMgmtEps (input, mnode EPs) * @param pMgmtEps (input, mnode EPs)
...@@ -148,10 +149,10 @@ int32_t catalogUpdateSTableMeta(struct SCatalog* pCatalog, STableMetaRsp *rspMsg ...@@ -148,10 +149,10 @@ int32_t catalogUpdateSTableMeta(struct SCatalog* pCatalog, STableMetaRsp *rspMsg
* @param isSTable (input, is super table or not, 1:supposed to be stable, 0: supposed not to be stable, -1:not sure) * @param isSTable (input, is super table or not, 1:supposed to be stable, 0: supposed not to be stable, -1:not sure)
* @return error code * @return error code
*/ */
int32_t catalogRenewTableMeta(struct SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, int32_t isSTable); int32_t catalogRefreshTableMeta(SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, int32_t isSTable);
/** /**
* Force renew a table's local cached meta data and get the new one. * Force refresh a table's local cached meta data and get the new one.
* @param pCatalog (input, got with catalogGetHandle) * @param pCatalog (input, got with catalogGetHandle)
* @param pTransporter (input, rpc object) * @param pTransporter (input, rpc object)
* @param pMgmtEps (input, mnode EPs) * @param pMgmtEps (input, mnode EPs)
...@@ -160,7 +161,7 @@ int32_t catalogUpdateSTableMeta(struct SCatalog* pCatalog, STableMetaRsp *rspMsg ...@@ -160,7 +161,7 @@ int32_t catalogUpdateSTableMeta(struct SCatalog* pCatalog, STableMetaRsp *rspMsg
* @param isSTable (input, is super table or not, 1:supposed to be stable, 0: supposed not to be stable, -1:not sure) * @param isSTable (input, is super table or not, 1:supposed to be stable, 0: supposed not to be stable, -1:not sure)
* @return error code * @return error code
*/ */
int32_t catalogRenewAndGetTableMeta(struct SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, STableMeta** pTableMeta, int32_t isSTable); int32_t catalogRefreshGetTableMeta(SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, STableMeta** pTableMeta, int32_t isSTable);
...@@ -173,7 +174,7 @@ int32_t catalogUpdateSTableMeta(struct SCatalog* pCatalog, STableMetaRsp *rspMsg ...@@ -173,7 +174,7 @@ int32_t catalogUpdateSTableMeta(struct SCatalog* pCatalog, STableMetaRsp *rspMsg
* @param pVgroupList (output, vgroup info list, element is SVgroupInfo, NEED to simply free the array by caller) * @param pVgroupList (output, vgroup info list, element is SVgroupInfo, NEED to simply free the array by caller)
* @return error code * @return error code
*/ */
int32_t catalogGetTableDistVgroup(struct SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, SArray** pVgroupList); int32_t catalogGetTableDistVgInfo(SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, SArray** pVgroupList);
/** /**
* Get a table's vgroup from its name's hash value. * Get a table's vgroup from its name's hash value.
...@@ -184,7 +185,7 @@ int32_t catalogGetTableDistVgroup(struct SCatalog* pCatalog, void *pTransporter, ...@@ -184,7 +185,7 @@ int32_t catalogGetTableDistVgroup(struct SCatalog* pCatalog, void *pTransporter,
* @param vgInfo (output, vgroup info) * @param vgInfo (output, vgroup info)
* @return error code * @return error code
*/ */
int32_t catalogGetTableHashVgroup(struct SCatalog* pCatalog, void * pTransporter, const SEpSet* pMgmtEps, const SName* pName, SVgroupInfo* vgInfo); int32_t catalogGetTableHashVgroup(SCatalog* pCatalog, void * pTransporter, const SEpSet* pMgmtEps, const SName* pName, SVgroupInfo* vgInfo);
/** /**
...@@ -196,14 +197,14 @@ int32_t catalogGetTableHashVgroup(struct SCatalog* pCatalog, void * pTransporter ...@@ -196,14 +197,14 @@ int32_t catalogGetTableHashVgroup(struct SCatalog* pCatalog, void * pTransporter
* @param pRsp (output, response data) * @param pRsp (output, response data)
* @return error code * @return error code
*/ */
int32_t catalogGetAllMeta(struct SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const SCatalogReq* pReq, SMetaData* pRsp); int32_t catalogGetAllMeta(SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const SCatalogReq* pReq, SMetaData* pRsp);
int32_t catalogGetQnodeList(struct SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, SArray* pQnodeList); int32_t catalogGetQnodeList(SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, SArray* pQnodeList);
int32_t catalogGetExpiredSTables(struct SCatalog* pCatalog, SSTableMetaVersion **stables, uint32_t *num); int32_t catalogGetExpiredSTables(SCatalog* pCatalog, SSTableMetaVersion **stables, uint32_t *num);
int32_t catalogGetExpiredDBs(struct SCatalog* pCatalog, SDbVgVersion **dbs, uint32_t *num); int32_t catalogGetExpiredDBs(SCatalog* pCatalog, SDbVgVersion **dbs, uint32_t *num);
/** /**
......
...@@ -80,16 +80,16 @@ typedef struct STableMeta { ...@@ -80,16 +80,16 @@ typedef struct STableMeta {
SSchema schema[]; SSchema schema[];
} STableMeta; } STableMeta;
typedef struct SDBVgroupInfo { typedef struct SDBVgInfo {
int32_t vgVersion; int32_t vgVersion;
int8_t hashMethod; int8_t hashMethod;
SHashObj *vgHash; //key:vgId, value:SVgroupInfo SHashObj *vgHash; //key:vgId, value:SVgroupInfo
} SDBVgroupInfo; } SDBVgInfo;
typedef struct SUseDbOutput { typedef struct SUseDbOutput {
char db[TSDB_DB_FNAME_LEN]; char db[TSDB_DB_FNAME_LEN];
uint64_t dbId; uint64_t dbId;
SDBVgroupInfo *dbVgroup; SDBVgInfo *dbVgroup;
} SUseDbOutput; } SUseDbOutput;
enum { enum {
......
...@@ -43,7 +43,7 @@ static int32_t hbProcessDBInfoRsp(void *value, int32_t valueLen, struct SCatalog ...@@ -43,7 +43,7 @@ static int32_t hbProcessDBInfoRsp(void *value, int32_t valueLen, struct SCatalog
if (rsp->vgVersion < 0) { if (rsp->vgVersion < 0) {
code = catalogRemoveDB(pCatalog, rsp->db, rsp->uid); code = catalogRemoveDB(pCatalog, rsp->db, rsp->uid);
} else { } else {
SDBVgroupInfo vgInfo = {0}; SDBVgInfo vgInfo = {0};
vgInfo.vgVersion = rsp->vgVersion; vgInfo.vgVersion = rsp->vgVersion;
vgInfo.hashMethod = rsp->hashMethod; vgInfo.hashMethod = rsp->hashMethod;
vgInfo.vgHash = taosHashInit(rsp->vgNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK); vgInfo.vgHash = taosHashInit(rsp->vgNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
...@@ -68,10 +68,7 @@ static int32_t hbProcessDBInfoRsp(void *value, int32_t valueLen, struct SCatalog ...@@ -68,10 +68,7 @@ static int32_t hbProcessDBInfoRsp(void *value, int32_t valueLen, struct SCatalog
} }
} }
code = catalogUpdateDBVgroup(pCatalog, rsp->db, rsp->uid, &vgInfo); catalogUpdateDBVgInfo(pCatalog, rsp->db, rsp->uid, &vgInfo);
if (code) {
taosHashCleanup(vgInfo.vgHash);
}
} }
if (code) { if (code) {
...@@ -94,13 +91,14 @@ static int32_t hbProcessStbInfoRsp(void *value, int32_t valueLen, struct SCatalo ...@@ -94,13 +91,14 @@ static int32_t hbProcessStbInfoRsp(void *value, int32_t valueLen, struct SCatalo
rsp->numOfColumns = ntohl(rsp->numOfColumns); rsp->numOfColumns = ntohl(rsp->numOfColumns);
rsp->suid = be64toh(rsp->suid); rsp->suid = be64toh(rsp->suid);
rsp->dbId = be64toh(rsp->dbId);
if (rsp->numOfColumns < 0) { if (rsp->numOfColumns < 0) {
schemaNum = 0; schemaNum = 0;
tscDebug("hb remove stb, db:%s, stb:%s", rsp->dbFName, rsp->stbName); tscDebug("hb remove stb, db:%s, stb:%s", rsp->dbFName, rsp->stbName);
catalogRemoveSTableMeta(pCatalog, rsp->dbFName, rsp->stbName, rsp->suid); catalogRemoveStbMeta(pCatalog, rsp->dbFName, rsp->dbId, rsp->stbName, rsp->suid);
} else { } else {
tscDebug("hb update stb, db:%s, stb:%s", rsp->dbFName, rsp->stbName); tscDebug("hb update stb, db:%s, stb:%s", rsp->dbFName, rsp->stbName);
......
...@@ -382,7 +382,7 @@ static void *dnodeOpenVnodeFunc(void *param) { ...@@ -382,7 +382,7 @@ static void *dnodeOpenVnodeFunc(void *param) {
pMgmt->openVnodes, pMgmt->totalVnodes); pMgmt->openVnodes, pMgmt->totalVnodes);
dndReportStartup(pDnode, "open-vnodes", stepDesc); dndReportStartup(pDnode, "open-vnodes", stepDesc);
SVnodeCfg cfg = {.pDnode = pDnode, .pTfs = pDnode->pTfs, .vgId = pCfg->vgId}; SVnodeCfg cfg = {.pDnode = pDnode, .pTfs = pDnode->pTfs, .vgId = pCfg->vgId, .dbId = pCfg->dbUid};
SVnode * pImpl = vnodeOpen(pCfg->path, &cfg); SVnode * pImpl = vnodeOpen(pCfg->path, &cfg);
if (pImpl == NULL) { if (pImpl == NULL) {
dError("vgId:%d, failed to open vnode by thread:%d", pCfg->vgId, pThread->threadIndex); dError("vgId:%d, failed to open vnode by thread:%d", pCfg->vgId, pThread->threadIndex);
...@@ -594,6 +594,7 @@ int32_t dndProcessCreateVnodeReq(SDnode *pDnode, SRpcMsg *pReq) { ...@@ -594,6 +594,7 @@ int32_t dndProcessCreateVnodeReq(SDnode *pDnode, SRpcMsg *pReq) {
vnodeCfg.pDnode = pDnode; vnodeCfg.pDnode = pDnode;
vnodeCfg.pTfs = pDnode->pTfs; vnodeCfg.pTfs = pDnode->pTfs;
vnodeCfg.dbId = wrapperCfg.dbUid;
SVnode *pImpl = vnodeOpen(wrapperCfg.path, &vnodeCfg); SVnode *pImpl = vnodeOpen(wrapperCfg.path, &vnodeCfg);
if (pImpl == NULL) { if (pImpl == NULL) {
dError("vgId:%d, failed to create vnode since %s", pCreate->vgId, terrstr()); dError("vgId:%d, failed to create vnode since %s", pCreate->vgId, terrstr());
......
...@@ -1523,4 +1523,4 @@ static int32_t mndRetrieveStb(SMnodeMsg *pReq, SShowObj *pShow, char *data, int3 ...@@ -1523,4 +1523,4 @@ static int32_t mndRetrieveStb(SMnodeMsg *pReq, SShowObj *pShow, char *data, int3
static void mndCancelGetNextStb(SMnode *pMnode, void *pIter) { static void mndCancelGetNextStb(SMnode *pMnode, void *pIter) {
SSdb *pSdb = pMnode->pSdb; SSdb *pSdb = pMnode->pSdb;
sdbCancelFetch(pSdb, pIter); sdbCancelFetch(pSdb, pIter);
} }
\ No newline at end of file
...@@ -42,6 +42,7 @@ typedef struct STqCfg { ...@@ -42,6 +42,7 @@ typedef struct STqCfg {
typedef struct SVnodeCfg { typedef struct SVnodeCfg {
int32_t vgId; int32_t vgId;
uint64_t dbId;
SDnode *pDnode; SDnode *pDnode;
STfs *pTfs; STfs *pTfs;
uint64_t wsize; uint64_t wsize;
......
...@@ -29,6 +29,7 @@ SVnode *vnodeOpen(const char *path, const SVnodeCfg *pVnodeCfg) { ...@@ -29,6 +29,7 @@ SVnode *vnodeOpen(const char *path, const SVnodeCfg *pVnodeCfg) {
cfg.vgId = pVnodeCfg->vgId; cfg.vgId = pVnodeCfg->vgId;
cfg.pDnode = pVnodeCfg->pDnode; cfg.pDnode = pVnodeCfg->pDnode;
cfg.pTfs = pVnodeCfg->pTfs; cfg.pTfs = pVnodeCfg->pTfs;
cfg.dbId = pVnodeCfg->dbId;
} }
// Validate options // Validate options
......
...@@ -93,6 +93,7 @@ static int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg) { ...@@ -93,6 +93,7 @@ static int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg) {
if (pTbCfg->type == META_CHILD_TABLE) { if (pTbCfg->type == META_CHILD_TABLE) {
pStbCfg = metaGetTbInfoByUid(pVnode->pMeta, pTbCfg->ctbCfg.suid); pStbCfg = metaGetTbInfoByUid(pVnode->pMeta, pTbCfg->ctbCfg.suid);
if (pStbCfg == NULL) { if (pStbCfg == NULL) {
code = TSDB_CODE_VND_TB_NOT_EXIST;
goto _exit; goto _exit;
} }
...@@ -116,9 +117,11 @@ static int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg) { ...@@ -116,9 +117,11 @@ static int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg) {
msgLen = sizeof(STableMetaRsp) + sizeof(SSchema) * (nCols + nTagCols); msgLen = sizeof(STableMetaRsp) + sizeof(SSchema) * (nCols + nTagCols);
pTbMetaMsg = (STableMetaRsp *)rpcMallocCont(msgLen); pTbMetaMsg = (STableMetaRsp *)rpcMallocCont(msgLen);
if (pTbMetaMsg == NULL) { if (pTbMetaMsg == NULL) {
code = TSDB_CODE_VND_OUT_OF_MEMORY;
goto _exit; goto _exit;
} }
pTbMetaMsg->dbId = htobe64(pVnode->config.dbId);
memcpy(pTbMetaMsg->dbFName, pReq->dbFName, sizeof(pTbMetaMsg->dbFName)); memcpy(pTbMetaMsg->dbFName, pReq->dbFName, sizeof(pTbMetaMsg->dbFName));
strcpy(pTbMetaMsg->tbName, pReq->tbName); strcpy(pTbMetaMsg->tbName, pReq->tbName);
if (pTbCfg->type == META_CHILD_TABLE) { if (pTbCfg->type == META_CHILD_TABLE) {
......
...@@ -27,7 +27,7 @@ extern "C" { ...@@ -27,7 +27,7 @@ extern "C" {
#define CTG_DEFAULT_CACHE_CLUSTER_NUMBER 6 #define CTG_DEFAULT_CACHE_CLUSTER_NUMBER 6
#define CTG_DEFAULT_CACHE_VGROUP_NUMBER 100 #define CTG_DEFAULT_CACHE_VGROUP_NUMBER 100
#define CTG_DEFAULT_CACHE_DB_NUMBER 20 #define CTG_DEFAULT_CACHE_DB_NUMBER 20
#define CTG_DEFAULT_CACHE_TABLEMETA_NUMBER 10000 #define CTG_DEFAULT_CACHE_TBLMETA_NUMBER 1000
#define CTG_DEFAULT_RENT_SECOND 10 #define CTG_DEFAULT_RENT_SECOND 10
#define CTG_DEFAULT_RENT_SLOT_SIZE 10 #define CTG_DEFAULT_RENT_SLOT_SIZE 10
...@@ -47,9 +47,19 @@ enum { ...@@ -47,9 +47,19 @@ enum {
CTG_RENT_STABLE, CTG_RENT_STABLE,
}; };
enum {
CTG_ACT_UPDATE_VG = 0,
CTG_ACT_UPDATE_TBL,
CTG_ACT_REMOVE_DB,
CTG_ACT_REMOVE_STB,
CTG_ACT_REMOVE_TBL,
CTG_ACT_MAX
};
typedef struct SCtgDebug { typedef struct SCtgDebug {
bool lockDebug; bool lockDebug;
bool cacheDebug; bool cacheDebug;
bool apiDebug;
uint32_t showCachePeriodSec; uint32_t showCachePeriodSec;
} SCtgDebug; } SCtgDebug;
...@@ -65,7 +75,7 @@ typedef struct SCtgDBCache { ...@@ -65,7 +75,7 @@ typedef struct SCtgDBCache {
SRWLatch vgLock; SRWLatch vgLock;
uint64_t dbId; uint64_t dbId;
int8_t deleted; int8_t deleted;
SDBVgroupInfo *vgInfo; SDBVgInfo *vgInfo;
SCtgTbMetaCache tbCache; SCtgTbMetaCache tbCache;
} SCtgDBCache; } SCtgDBCache;
...@@ -85,7 +95,6 @@ typedef struct SCtgRentMgmt { ...@@ -85,7 +95,6 @@ typedef struct SCtgRentMgmt {
typedef struct SCatalog { typedef struct SCatalog {
uint64_t clusterId; uint64_t clusterId;
SRWLatch dbLock;
SHashObj *dbCache; //key:dbname, value:SCtgDBCache SHashObj *dbCache; //key:dbname, value:SCtgDBCache
SCtgRentMgmt dbRent; SCtgRentMgmt dbRent;
SCtgRentMgmt stbRent; SCtgRentMgmt stbRent;
...@@ -96,7 +105,8 @@ typedef struct SCtgApiStat { ...@@ -96,7 +105,8 @@ typedef struct SCtgApiStat {
} SCtgApiStat; } SCtgApiStat;
typedef struct SCtgRuntimeStat { typedef struct SCtgRuntimeStat {
uint64_t qNum;
uint64_t qDoneNum;
} SCtgRuntimeStat; } SCtgRuntimeStat;
typedef struct SCtgCacheStat { typedef struct SCtgCacheStat {
...@@ -109,15 +119,70 @@ typedef struct SCatalogStat { ...@@ -109,15 +119,70 @@ typedef struct SCatalogStat {
SCtgCacheStat cache; SCtgCacheStat cache;
} SCatalogStat; } SCatalogStat;
typedef struct SCtgUpdateVgMsg {
SCatalog* pCtg;
char dbFName[TSDB_DB_FNAME_LEN];
uint64_t dbId;
SDBVgInfo* dbInfo;
} SCtgUpdateVgMsg;
typedef struct SCtgUpdateTblMsg {
SCatalog* pCtg;
STableMetaOutput* output;
} SCtgUpdateTblMsg;
typedef struct SCtgRemoveDBMsg {
SCatalog* pCtg;
char dbFName[TSDB_DB_FNAME_LEN];
uint64_t dbId;
} SCtgRemoveDBMsg;
typedef struct SCtgRemoveStbMsg {
SCatalog* pCtg;
char dbFName[TSDB_DB_FNAME_LEN];
char stbName[TSDB_TABLE_NAME_LEN];
uint64_t dbId;
uint64_t suid;
} SCtgRemoveStbMsg;
typedef struct SCtgMetaAction {
int32_t act;
void *data;
} SCtgMetaAction;
typedef struct SCtgQNode {
SCtgMetaAction action;
struct SCtgQNode *next;
} SCtgQNode;
typedef struct SCatalogMgmt { typedef struct SCatalogMgmt {
bool exit; bool exit;
SRWLatch lock; SRWLatch lock;
SRWLatch qlock;
SCtgQNode *head;
SCtgQNode *tail;
tsem_t sem;
uint64_t qRemainNum;
pthread_t updateThread;
SHashObj *pCluster; //key: clusterId, value: SCatalog* SHashObj *pCluster; //key: clusterId, value: SCatalog*
SCatalogStat stat; SCatalogStat stat;
SCatalogCfg cfg; SCatalogCfg cfg;
} SCatalogMgmt; } SCatalogMgmt;
typedef uint32_t (*tableNameHashFp)(const char *, uint32_t); typedef uint32_t (*tableNameHashFp)(const char *, uint32_t);
typedef int32_t (*ctgActFunc)(SCtgMetaAction *);
typedef struct SCtgAction {
int32_t actId;
char name[32];
ctgActFunc func;
} SCtgAction;
#define CTG_QUEUE_ADD() atomic_add_fetch_64(&gCtgMgmt.qRemainNum, 1)
#define CTG_QUEUE_SUB() atomic_sub_fetch_64(&gCtgMgmt.qRemainNum, 1)
#define CTG_STAT_ADD(n) atomic_add_fetch_64(&(n), 1)
#define CTG_STAT_SUB(n) atomic_sub_fetch_64(&(n), 1)
#define CTG_IS_META_NULL(type) ((type) == META_TYPE_NULL_TABLE) #define CTG_IS_META_NULL(type) ((type) == META_TYPE_NULL_TABLE)
#define CTG_IS_META_CTABLE(type) ((type) == META_TYPE_CTABLE) #define CTG_IS_META_CTABLE(type) ((type) == META_TYPE_CTABLE)
...@@ -130,20 +195,26 @@ typedef uint32_t (*tableNameHashFp)(const char *, uint32_t); ...@@ -130,20 +195,26 @@ typedef uint32_t (*tableNameHashFp)(const char *, uint32_t);
#define CTG_SET_STABLE(isSTable, tbType) do { (isSTable) = ((tbType) == TSDB_SUPER_TABLE) ? 1 : ((tbType) > TSDB_SUPER_TABLE ? 0 : -1); } while (0) #define CTG_SET_STABLE(isSTable, tbType) do { (isSTable) = ((tbType) == TSDB_SUPER_TABLE) ? 1 : ((tbType) > TSDB_SUPER_TABLE ? 0 : -1); } while (0)
#define CTG_TBTYPE_MATCH(isSTable, tbType) (CTG_IS_UNKNOWN_STABLE(isSTable) || (CTG_IS_STABLE(isSTable) && (tbType) == TSDB_SUPER_TABLE) || (CTG_IS_NOT_STABLE(isSTable) && (tbType) != TSDB_SUPER_TABLE)) #define CTG_TBTYPE_MATCH(isSTable, tbType) (CTG_IS_UNKNOWN_STABLE(isSTable) || (CTG_IS_STABLE(isSTable) && (tbType) == TSDB_SUPER_TABLE) || (CTG_IS_NOT_STABLE(isSTable) && (tbType) != TSDB_SUPER_TABLE))
#define CTG_META_SIZE(pMeta) (sizeof(STableMeta) + ((pMeta)->tableInfo.numOfTags + (pMeta)->tableInfo.numOfColumns) * sizeof(SSchema))
#define CTG_TABLE_NOT_EXIST(code) (code == CTG_ERR_CODE_TABLE_NOT_EXIST) #define CTG_TABLE_NOT_EXIST(code) (code == CTG_ERR_CODE_TABLE_NOT_EXIST)
#define CTG_DB_NOT_EXIST(code) (code == TSDB_CODE_MND_DB_NOT_EXIST)
#define ctgFatal(param, ...) qFatal("CTG:%p " param, pCatalog, __VA_ARGS__) #define ctgFatal(param, ...) qFatal("CTG:%p " param, pCtg, __VA_ARGS__)
#define ctgError(param, ...) qError("CTG:%p " param, pCatalog, __VA_ARGS__) #define ctgError(param, ...) qError("CTG:%p " param, pCtg, __VA_ARGS__)
#define ctgWarn(param, ...) qWarn("CTG:%p " param, pCatalog, __VA_ARGS__) #define ctgWarn(param, ...) qWarn("CTG:%p " param, pCtg, __VA_ARGS__)
#define ctgInfo(param, ...) qInfo("CTG:%p " param, pCatalog, __VA_ARGS__) #define ctgInfo(param, ...) qInfo("CTG:%p " param, pCtg, __VA_ARGS__)
#define ctgDebug(param, ...) qDebug("CTG:%p " param, pCatalog, __VA_ARGS__) #define ctgDebug(param, ...) qDebug("CTG:%p " param, pCtg, __VA_ARGS__)
#define ctgTrace(param, ...) qTrace("CTG:%p " param, pCatalog, __VA_ARGS__) #define ctgTrace(param, ...) qTrace("CTG:%p " param, pCtg, __VA_ARGS__)
#define CTG_LOCK_DEBUG(...) do { if (gCTGDebug.lockDebug) { qDebug(__VA_ARGS__); } } while (0) #define CTG_LOCK_DEBUG(...) do { if (gCTGDebug.lockDebug) { qDebug(__VA_ARGS__); } } while (0)
#define CTG_CACHE_DEBUG(...) do { if (gCTGDebug.cacheDebug) { qDebug(__VA_ARGS__); } } while (0) #define CTG_CACHE_DEBUG(...) do { if (gCTGDebug.cacheDebug) { qDebug(__VA_ARGS__); } } while (0)
#define CTG_API_DEBUG(...) do { if (gCTGDebug.apiDebug) { qDebug(__VA_ARGS__); } } while (0)
#define TD_RWLATCH_WRITE_FLAG_COPY 0x40000000 #define TD_RWLATCH_WRITE_FLAG_COPY 0x40000000
#define CTG_IS_LOCKED(_lock) atomic_load_32((_lock))
#define CTG_LOCK(type, _lock) do { \ #define CTG_LOCK(type, _lock) do { \
if (CTG_READ == (type)) { \ if (CTG_READ == (type)) { \
assert(atomic_load_32((_lock)) >= 0); \ assert(atomic_load_32((_lock)) >= 0); \
...@@ -181,8 +252,8 @@ typedef uint32_t (*tableNameHashFp)(const char *, uint32_t); ...@@ -181,8 +252,8 @@ typedef uint32_t (*tableNameHashFp)(const char *, uint32_t);
#define CTG_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; } return _code; } while (0) #define CTG_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; } return _code; } while (0)
#define CTG_ERR_JRET(c) do { code = c; if (code != TSDB_CODE_SUCCESS) { terrno = code; goto _return; } } while (0) #define CTG_ERR_JRET(c) do { code = c; if (code != TSDB_CODE_SUCCESS) { terrno = code; goto _return; } } while (0)
#define CTG_API_ENTER() do { CTG_LOCK(CTG_READ, &ctgMgmt.lock); if (atomic_load_8(&ctgMgmt.exit)) { CTG_RET(TSDB_CODE_CTG_OUT_OF_SERVICE); } } while (0) #define CTG_API_LEAVE(c) do { int32_t __code = c; CTG_UNLOCK(CTG_READ, &gCtgMgmt.lock); CTG_API_DEBUG("CTG API leave %s", __FUNCTION__); CTG_RET(__code); } while (0)
#define CTG_API_LEAVE(c) do { int32_t __code = c; CTG_UNLOCK(CTG_READ, &ctgMgmt.lock); CTG_RET(__code); } while (0) #define CTG_API_ENTER() do { CTG_API_DEBUG("CTG API enter %s", __FUNCTION__); CTG_LOCK(CTG_READ, &gCtgMgmt.lock); if (atomic_load_8(&gCtgMgmt.exit)) { CTG_API_LEAVE(TSDB_CODE_CTG_OUT_OF_SERVICE); } } while (0)
......
此差异已折叠。
...@@ -3647,7 +3647,7 @@ int32_t evaluateSqlNode(SSqlNode* pNode, int32_t tsPrecision, SMsgBuf* pMsgBuf) ...@@ -3647,7 +3647,7 @@ int32_t evaluateSqlNode(SSqlNode* pNode, int32_t tsPrecision, SMsgBuf* pMsgBuf)
//TODO remove it //TODO remove it
int32_t setTableVgroupList(SParseContext *pCtx, SName* name, SVgroupsInfo **pVgList) { int32_t setTableVgroupList(SParseContext *pCtx, SName* name, SVgroupsInfo **pVgList) {
SArray* vgroupList = NULL; SArray* vgroupList = NULL;
int32_t code = catalogGetTableDistVgroup(pCtx->pCatalog, pCtx->pTransporter, &pCtx->mgmtEpSet, name, &vgroupList); int32_t code = catalogGetTableDistVgInfo(pCtx->pCatalog, pCtx->pTransporter, &pCtx->mgmtEpSet, name, &vgroupList);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
......
...@@ -50,7 +50,7 @@ static int32_t setShowInfo(SShowInfo* pShowInfo, SParseContext* pCtx, void** out ...@@ -50,7 +50,7 @@ static int32_t setShowInfo(SShowInfo* pShowInfo, SParseContext* pCtx, void** out
char dbFname[TSDB_DB_FNAME_LEN] = {0}; char dbFname[TSDB_DB_FNAME_LEN] = {0};
tNameGetFullDbName(&name, dbFname); tNameGetFullDbName(&name, dbFname);
int32_t code = catalogGetDBVgroup(pCtx->pCatalog, pCtx->pTransporter, &pCtx->mgmtEpSet, dbFname, false, &array); int32_t code = catalogGetDBVgInfo(pCtx->pCatalog, pCtx->pTransporter, &pCtx->mgmtEpSet, dbFname, false, &array);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
terrno = code; terrno = code;
return code; return code;
......
...@@ -113,9 +113,9 @@ int32_t queryProcessUseDBRsp(void* output, char *msg, int32_t msgSize) { ...@@ -113,9 +113,9 @@ int32_t queryProcessUseDBRsp(void* output, char *msg, int32_t msgSize) {
return TSDB_CODE_TSC_VALUE_OUT_OF_RANGE; return TSDB_CODE_TSC_VALUE_OUT_OF_RANGE;
} }
pOut->dbVgroup = calloc(1, sizeof(SDBVgroupInfo)); pOut->dbVgroup = calloc(1, sizeof(SDBVgInfo));
if (NULL == pOut->dbVgroup) { if (NULL == pOut->dbVgroup) {
qError("calloc %d failed", (int32_t)sizeof(SDBVgroupInfo)); qError("calloc %d failed", (int32_t)sizeof(SDBVgInfo));
return TSDB_CODE_TSC_OUT_OF_MEMORY; return TSDB_CODE_TSC_OUT_OF_MEMORY;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册