From 0adbbe080ce644469a4c2a5c3c20b27793469cd9 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Thu, 6 Jan 2022 15:30:32 +0800 Subject: [PATCH] feature/qnode --- include/common/tmsg.h | 1 + include/libs/catalog/catalog.h | 18 + include/libs/qcom/query.h | 1 + include/util/thash.h | 3 + include/util/tlog.h | 1 - source/dnode/mnode/impl/src/mndDb.c | 1 + source/libs/catalog/inc/catalogInt.h | 60 ++- source/libs/catalog/src/catalog.c | 515 ++++++++++++++++++---- source/libs/catalog/test/catalogTests.cpp | 122 ++++- source/libs/qcom/src/querymsg.c | 6 +- source/util/src/thash.c | 19 +- source/util/src/tlog.c | 1 - 12 files changed, 626 insertions(+), 122 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index affab8903c..6f6497fe0a 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -796,6 +796,7 @@ typedef struct { typedef struct { char db[TSDB_DB_FNAME_LEN]; + int64_t uid; int32_t vgVersion; int32_t vgNum; int8_t hashMethod; diff --git a/include/libs/catalog/catalog.h b/include/libs/catalog/catalog.h index e39236ea76..a7842f9ef6 100644 --- a/include/libs/catalog/catalog.h +++ b/include/libs/catalog/catalog.h @@ -48,8 +48,21 @@ typedef struct SMetaData { typedef struct SCatalogCfg { uint32_t maxTblCacheNum; uint32_t maxDBCacheNum; + uint32_t dbRentSec; + uint32_t stableRentSec; } SCatalogCfg; +typedef struct SSTableMetaVersion { + uint64_t suid; + int16_t sversion; + int16_t tversion; +} SSTableMetaVersion; + +typedef struct SDbVgVersion { + int64_t dbId; + int32_t vgVersion; +} SDbVgVersion; + int32_t catalogInit(SCatalogCfg *cfg); @@ -61,6 +74,8 @@ int32_t catalogInit(SCatalogCfg *cfg); */ int32_t catalogGetHandle(uint64_t clusterId, struct SCatalog** catalogHandle); +void catalogFreeHandle(struct SCatalog* pCatalog); + int32_t catalogGetDBVgroupVersion(struct SCatalog* pCatalog, const char* dbName, int32_t* version); /** @@ -161,6 +176,9 @@ int32_t catalogGetAllMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* p int32_t catalogGetQnodeList(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, SArray* pQnodeList); +int32_t catalogGetExpiredSTables(struct SCatalog* pCatalog, SSTableMetaVersion **stables, uint32_t *num); + +int32_t catalogGetExpiredDBs(struct SCatalog* pCatalog, SDbVgVersion **dbs, uint32_t *num); /** diff --git a/include/libs/qcom/query.h b/include/libs/qcom/query.h index 4bfb774435..637035698b 100644 --- a/include/libs/qcom/query.h +++ b/include/libs/qcom/query.h @@ -76,6 +76,7 @@ typedef struct STableMeta { typedef struct SDBVgroupInfo { SRWLatch lock; + int64_t dbId; int32_t vgVersion; int8_t hashMethod; SHashObj *vgInfo; //key:vgId, value:SVgroupInfo diff --git a/include/util/thash.h b/include/util/thash.h index a736fc26af..e46e813a0e 100644 --- a/include/util/thash.h +++ b/include/util/thash.h @@ -124,6 +124,9 @@ int32_t taosHashGetSize(const SHashObj *pHashObj); */ int32_t taosHashPut(SHashObj *pHashObj, const void *key, size_t keyLen, void *data, size_t size); +int32_t taosHashPutExt(SHashObj *pHashObj, const void *key, size_t keyLen, void *data, size_t size, bool *newAdded); + + /** * return the payload data with the specified key * diff --git a/include/util/tlog.h b/include/util/tlog.h index a367243a46..26a5417320 100644 --- a/include/util/tlog.h +++ b/include/util/tlog.h @@ -44,7 +44,6 @@ extern int32_t tsdbDebugFlag; extern int32_t tqDebugFlag; extern int32_t cqDebugFlag; extern int32_t debugFlag; -extern int32_t ctgDebugFlag; #define DEBUG_FATAL 1U #define DEBUG_ERROR DEBUG_FATAL diff --git a/source/dnode/mnode/impl/src/mndDb.c b/source/dnode/mnode/impl/src/mndDb.c index 410368f130..251cbb9217 100644 --- a/source/dnode/mnode/impl/src/mndDb.c +++ b/source/dnode/mnode/impl/src/mndDb.c @@ -917,6 +917,7 @@ static int32_t mndProcessUseDbMsg(SMnodeMsg *pMsg) { } memcpy(pRsp->db, pDb->name, TSDB_DB_FNAME_LEN); + pRsp->uid = htobe64(pDb->uid); pRsp->vgVersion = htonl(pDb->vgVersion); pRsp->vgNum = htonl(vindex); pRsp->hashMethod = pDb->hashMethod; diff --git a/source/libs/catalog/inc/catalogInt.h b/source/libs/catalog/inc/catalogInt.h index c4f2b54cf8..00a50e90eb 100644 --- a/source/libs/catalog/inc/catalogInt.h +++ b/source/libs/catalog/inc/catalogInt.h @@ -22,12 +22,16 @@ extern "C" { #include "catalog.h" #include "common.h" -#include "tlog.h" +#include "query.h" #define CTG_DEFAULT_CACHE_CLUSTER_NUMBER 6 #define CTG_DEFAULT_CACHE_VGROUP_NUMBER 100 #define CTG_DEFAULT_CACHE_DB_NUMBER 20 #define CTG_DEFAULT_CACHE_TABLEMETA_NUMBER 100000 +#define CTG_DEFAULT_RENT_SECOND 10 +#define CTG_DEFAULT_RENT_SLOT_SIZE 10 + +#define CTG_RENT_SLOT_SECOND 2 #define CTG_DEFAULT_INVALID_VERSION (-1) @@ -36,6 +40,11 @@ enum { CTG_WRITE, }; +enum { + CTG_RENT_DB = 1, + CTG_RENT_STABLE, +}; + typedef struct SVgroupListCache { int32_t vgroupVersion; SHashObj *cache; // key:vgId, value:SVgroupInfo @@ -51,14 +60,29 @@ typedef struct STableMetaCache { SHashObj *stableCache; //key:suid, value:STableMeta* } STableMetaCache; +typedef struct SRentSlotInfo { + SRWLatch lock; + bool needSort; + SArray *meta; +} SRentSlotInfo; + +typedef struct SMetaRentMgmt { + int8_t type; + uint16_t slotNum; + uint16_t slotRIdx; + int64_t lastReadMsec; + SRentSlotInfo *slots; +} SMetaRentMgmt; + typedef struct SCatalog { SDBVgroupCache dbCache; STableMetaCache tableCache; + SMetaRentMgmt dbRent; + SMetaRentMgmt stableRent; } SCatalog; typedef struct SCatalogMgmt { - void *pMsgSender; // used to send messsage to mnode to fetch necessary metadata - SHashObj *pCluster; // items cached for each cluster, the hash key is the cluster-id got from mgmt node + SHashObj *pCluster; //key: clusterId, value: SCatalog* SCatalogCfg cfg; } SCatalogMgmt; @@ -72,17 +96,15 @@ typedef uint32_t (*tableNameHashFp)(const char *, uint32_t); #define CTG_TABLE_NOT_EXIST(code) (code == TSDB_CODE_TDB_INVALID_TABLE_ID) -#define ctgFatal(...) do { if (ctgDebugFlag & DEBUG_FATAL) { taosPrintLog("CTG FATAL ", ctgDebugFlag, __VA_ARGS__); }} while(0) -#define ctgError(...) do { if (ctgDebugFlag & DEBUG_ERROR) { taosPrintLog("CTG ERROR ", ctgDebugFlag, __VA_ARGS__); }} while(0) -#define ctgWarn(...) do { if (ctgDebugFlag & DEBUG_WARN) { taosPrintLog("CTG WARN ", ctgDebugFlag, __VA_ARGS__); }} while(0) -#define ctgInfo(...) do { if (ctgDebugFlag & DEBUG_INFO) { taosPrintLog("CTG ", ctgDebugFlag, __VA_ARGS__); }} while(0) -#define ctgDebug(...) do { if (ctgDebugFlag & DEBUG_DEBUG) { taosPrintLog("CTG ", ctgDebugFlag, __VA_ARGS__); }} while(0) -#define ctgTrace(...) do { if (ctgDebugFlag & DEBUG_TRACE) { taosPrintLog("CTG ", ctgDebugFlag, __VA_ARGS__); }} while(0) -#define ctgDebugL(...) do { if (ctgDebugFlag & DEBUG_DEBUG) { taosPrintLongString("CTG ", ctgDebugFlag, __VA_ARGS__); }} while(0) +#define ctgFatal(param, ...) qFatal("CTG:%p " param, pCatalog, __VA_ARGS__) +#define ctgError(param, ...) qError("CTG:%p " param, pCatalog, __VA_ARGS__) +#define ctgWarn(param, ...) qWarn("CTG:%p " param, pCatalog, __VA_ARGS__) +#define ctgInfo(param, ...) qInfo("CTG:%p " param, pCatalog, __VA_ARGS__) +#define ctgDebug(param, ...) qDebug("CTG:%p " param, pCatalog, __VA_ARGS__) +#define ctgTrace(param, ...) qTrace("CTG:%p " param, pCatalog, __VA_ARGS__) #define CTG_ERR_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; return _code; } } while (0) #define CTG_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; } return _code; } while (0) -#define CTG_ERR_LRET(c,...) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { ctgError(__VA_ARGS__); terrno = _code; return _code; } } while (0) #define CTG_ERR_JRET(c) do { code = c; if (code != TSDB_CODE_SUCCESS) { terrno = code; goto _return; } } while (0) #define TD_RWLATCH_WRITE_FLAG_COPY 0x40000000 @@ -90,15 +112,15 @@ typedef uint32_t (*tableNameHashFp)(const char *, uint32_t); #define CTG_LOCK(type, _lock) do { \ if (CTG_READ == (type)) { \ assert(atomic_load_32((_lock)) >= 0); \ - ctgDebug("CTG RLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \ + qDebug("CTG RLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \ taosRLockLatch(_lock); \ - ctgDebug("CTG RLOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \ + qDebug("CTG RLOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \ assert(atomic_load_32((_lock)) > 0); \ } else { \ assert(atomic_load_32((_lock)) >= 0); \ - ctgDebug("CTG WLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \ + qDebug("CTG WLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \ taosWLockLatch(_lock); \ - ctgDebug("CTG WLOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \ + qDebug("CTG WLOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \ assert(atomic_load_32((_lock)) == TD_RWLATCH_WRITE_FLAG_COPY); \ } \ } while (0) @@ -106,15 +128,15 @@ typedef uint32_t (*tableNameHashFp)(const char *, uint32_t); #define CTG_UNLOCK(type, _lock) do { \ if (CTG_READ == (type)) { \ assert(atomic_load_32((_lock)) > 0); \ - ctgDebug("CTG RULOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \ + qDebug("CTG RULOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \ taosRUnLockLatch(_lock); \ - ctgDebug("CTG RULOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \ + qDebug("CTG RULOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \ assert(atomic_load_32((_lock)) >= 0); \ } else { \ assert(atomic_load_32((_lock)) == TD_RWLATCH_WRITE_FLAG_COPY); \ - ctgDebug("CTG WULOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \ + qDebug("CTG WULOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \ taosWUnLockLatch(_lock); \ - ctgDebug("CTG WULOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \ + qDebug("CTG WULOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \ assert(atomic_load_32((_lock)) >= 0); \ } \ } while (0) diff --git a/source/libs/catalog/src/catalog.c b/source/libs/catalog/src/catalog.c index f7139f21b8..165a091689 100644 --- a/source/libs/catalog/src/catalog.c +++ b/source/libs/catalog/src/catalog.c @@ -23,7 +23,7 @@ SCatalogMgmt ctgMgmt = {0}; int32_t ctgGetDBVgroupFromCache(struct SCatalog* pCatalog, const char *dbName, SDBVgroupInfo **dbInfo, bool *inCache) { if (NULL == pCatalog->dbCache.cache) { *inCache = false; - ctgWarn("no db cache"); + ctgWarn("empty db cache, dbName:%s", dbName); return TSDB_CODE_SUCCESS; } @@ -34,7 +34,7 @@ int32_t ctgGetDBVgroupFromCache(struct SCatalog* pCatalog, const char *dbName, S if (NULL == info) { *inCache = false; - ctgWarn("no db cache, dbName:%s", dbName); + ctgWarn("not in db vgroup cache, dbName:%s", dbName); return TSDB_CODE_SUCCESS; } @@ -52,6 +52,8 @@ int32_t ctgGetDBVgroupFromCache(struct SCatalog* pCatalog, const char *dbName, S *dbInfo = info; *inCache = true; + + ctgDebug("Got db vgroup from cache, dbName:%s", dbName); return TSDB_CODE_SUCCESS; } @@ -63,7 +65,13 @@ int32_t ctgGetDBVgroupFromMnode(struct SCatalog* pCatalog, void *pRpc, const SEp SEpSet *pVnodeEpSet = NULL; int32_t msgLen = 0; - CTG_ERR_RET(queryBuildMsg[TMSG_INDEX(TDMT_MND_USE_DB)](input, &msg, 0, &msgLen)); + ctgDebug("try to get db vgroup from mnode, db:%s", input->db); + + int32_t code = queryBuildMsg[TMSG_INDEX(TDMT_MND_USE_DB)](input, &msg, 0, &msgLen); + if (code) { + ctgError("Build use db msg failed, code:%x, db:%s", code, input->db); + CTG_ERR_RET(code); + } SRpcMsg rpcMsg = { .msgType = TDMT_MND_USE_DB, @@ -75,11 +83,15 @@ int32_t ctgGetDBVgroupFromMnode(struct SCatalog* pCatalog, void *pRpc, const SEp rpcSendRecv(pRpc, (SEpSet*)pMgmtEps, &rpcMsg, &rpcRsp); if (TSDB_CODE_SUCCESS != rpcRsp.code) { - ctgError("error rsp for use db, code:%x", rpcRsp.code); + ctgError("error rsp for use db, code:%x, db:%s", rpcRsp.code, input->db); CTG_ERR_RET(rpcRsp.code); } - CTG_ERR_RET(queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_USE_DB)](out, rpcRsp.pCont, rpcRsp.contLen)); + code = queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_USE_DB)](out, rpcRsp.pCont, rpcRsp.contLen); + if (code) { + ctgError("Process use db rsp failed, code:%x, db:%s", code, input->db); + CTG_ERR_RET(code); + } return TSDB_CODE_SUCCESS; } @@ -88,6 +100,7 @@ int32_t ctgGetDBVgroupFromMnode(struct SCatalog* pCatalog, void *pRpc, const SEp int32_t ctgGetTableMetaFromCache(struct SCatalog* pCatalog, const SName* pTableName, STableMeta** pTableMeta, int32_t *exist) { if (NULL == pCatalog->tableCache.cache) { *exist = 0; + ctgWarn("empty tablemeta cache, tbName:%s", pTableName->tname); return TSDB_CODE_SUCCESS; } @@ -101,6 +114,7 @@ int32_t ctgGetTableMetaFromCache(struct SCatalog* pCatalog, const SName* pTableN if (NULL == *pTableMeta) { *exist = 0; + ctgDebug("tablemeta not in cache, tbName:%s", tbFullName); return TSDB_CODE_SUCCESS; } @@ -109,6 +123,8 @@ int32_t ctgGetTableMetaFromCache(struct SCatalog* pCatalog, const SName* pTableN tbMeta = *pTableMeta; if (tbMeta->tableType != TSDB_CHILD_TABLE) { + ctgDebug("Got tablemeta from cache, tbName:%s", tbFullName); + return TSDB_CODE_SUCCESS; } @@ -117,7 +133,7 @@ int32_t ctgGetTableMetaFromCache(struct SCatalog* pCatalog, const SName* pTableN STableMeta **stbMeta = taosHashGet(pCatalog->tableCache.stableCache, &tbMeta->suid, sizeof(tbMeta->suid)); if (NULL == stbMeta || NULL == *stbMeta) { CTG_UNLOCK(CTG_READ, &pCatalog->tableCache.stableLock); - qError("no stable:%"PRIx64 " meta in cache", tbMeta->suid); + ctgError("stable not in stableCache, suid:%"PRIx64, tbMeta->suid); tfree(*pTableMeta); *exist = 0; return TSDB_CODE_SUCCESS; @@ -126,7 +142,7 @@ int32_t ctgGetTableMetaFromCache(struct SCatalog* pCatalog, const SName* pTableN if ((*stbMeta)->suid != tbMeta->suid) { CTG_UNLOCK(CTG_READ, &pCatalog->tableCache.stableLock); tfree(*pTableMeta); - ctgError("stable cache error, expected suid:%"PRId64 ",actual suid:%"PRId64, tbMeta->suid, (*stbMeta)->suid); + ctgError("stable suid in stableCache mis-match, expected suid:%"PRIx64 ",actual suid:%"PRIx64, tbMeta->suid, (*stbMeta)->suid); CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR); } @@ -134,19 +150,22 @@ int32_t ctgGetTableMetaFromCache(struct SCatalog* pCatalog, const SName* pTableN *pTableMeta = realloc(*pTableMeta, metaSize); if (NULL == *pTableMeta) { CTG_UNLOCK(CTG_READ, &pCatalog->tableCache.stableLock); - ctgError("calloc size[%d] failed", metaSize); + ctgError("realloc size[%d] failed", metaSize); CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR); } memcpy(&(*pTableMeta)->sversion, &(*stbMeta)->sversion, metaSize - sizeof(SCTableMeta)); CTG_UNLOCK(CTG_READ, &pCatalog->tableCache.stableLock); + + ctgDebug("Got tablemeta from cache, tbName:%s", tbFullName); return TSDB_CODE_SUCCESS; } int32_t ctgGetTableTypeFromCache(struct SCatalog* pCatalog, const SName* pTableName, int32_t *tbType) { if (NULL == pCatalog->tableCache.cache) { + ctgWarn("empty tablemeta cache, tbName:%s", pTableName->tname); return TSDB_CODE_SUCCESS; } @@ -159,10 +178,14 @@ int32_t ctgGetTableTypeFromCache(struct SCatalog* pCatalog, const SName* pTableN taosHashGetCloneExt(pCatalog->tableCache.cache, tbFullName, strlen(tbFullName), NULL, (void **)&pTableMeta, &sz); if (NULL == pTableMeta) { + ctgWarn("tablemeta not in cache, tbName:%s", tbFullName); + return TSDB_CODE_SUCCESS; } *tbType = pTableMeta->tableType; + + ctgDebug("Got tabletype from cache, tbName:%s, type:%d", tbFullName, *tbType); return TSDB_CODE_SUCCESS; } @@ -184,7 +207,13 @@ int32_t ctgGetTableMetaFromMnodeImpl(struct SCatalog* pCatalog, void *pRpc, cons SEpSet *pVnodeEpSet = NULL; int32_t msgLen = 0; - CTG_ERR_RET(queryBuildMsg[TMSG_INDEX(TDMT_MND_STB_META)](&bInput, &msg, 0, &msgLen)); + ctgDebug("try to get table meta from mnode, tbName:%s", tbFullName); + + int32_t code = queryBuildMsg[TMSG_INDEX(TDMT_MND_STB_META)](&bInput, &msg, 0, &msgLen); + if (code) { + ctgError("Build mnode stablemeta msg failed, code:%x", code); + CTG_ERR_RET(code); + } SRpcMsg rpcMsg = { .msgType = TDMT_MND_STB_META, @@ -199,15 +228,21 @@ int32_t ctgGetTableMetaFromMnodeImpl(struct SCatalog* pCatalog, void *pRpc, cons if (TSDB_CODE_SUCCESS != rpcRsp.code) { if (CTG_TABLE_NOT_EXIST(rpcRsp.code)) { output->metaNum = 0; - ctgDebug("tbmeta:%s not exist in mnode", tbFullName); + ctgDebug("stablemeta not exist in mnode, tbName:%s", tbFullName); return TSDB_CODE_SUCCESS; } - ctgError("error rsp for table meta, code:%x", rpcRsp.code); + ctgError("error rsp for stablemeta from mnode, code:%x, tbName:%s", rpcRsp.code, tbFullName); CTG_ERR_RET(rpcRsp.code); } - CTG_ERR_RET(queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_STB_META)](output, rpcRsp.pCont, rpcRsp.contLen)); + code = queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_STB_META)](output, rpcRsp.pCont, rpcRsp.contLen); + if (code) { + ctgError("Process mnode stablemeta rsp failed, code:%x, tbName:%s", code, tbFullName); + CTG_ERR_RET(code); + } + + ctgDebug("Got table meta from mnode, tbName:%s", tbFullName); return TSDB_CODE_SUCCESS; } @@ -228,12 +263,18 @@ int32_t ctgGetTableMetaFromVnode(struct SCatalog* pCatalog, void *pRpc, const SE char dbFullName[TSDB_DB_FNAME_LEN]; tNameGetFullDbName(pTableName, dbFullName); + ctgDebug("try to get table meta from vnode, db:%s, tbName:%s", dbFullName, pTableName->tname); + SBuildTableMetaInput bInput = {.vgId = vgroupInfo->vgId, .dbName = dbFullName, .tableFullName = (char *)pTableName->tname}; char *msg = NULL; SEpSet *pVnodeEpSet = NULL; int32_t msgLen = 0; - CTG_ERR_RET(queryBuildMsg[TMSG_INDEX(TDMT_VND_TABLE_META)](&bInput, &msg, 0, &msgLen)); + int32_t code = queryBuildMsg[TMSG_INDEX(TDMT_VND_TABLE_META)](&bInput, &msg, 0, &msgLen); + if (code) { + ctgError("Build vnode tablemeta msg failed, code:%x, tbName:%s", code, pTableName->tname); + CTG_ERR_RET(code); + } SRpcMsg rpcMsg = { .msgType = TDMT_VND_TABLE_META, @@ -251,15 +292,21 @@ int32_t ctgGetTableMetaFromVnode(struct SCatalog* pCatalog, void *pRpc, const SE if (TSDB_CODE_SUCCESS != rpcRsp.code) { if (CTG_TABLE_NOT_EXIST(rpcRsp.code)) { output->metaNum = 0; - ctgDebug("tbmeta:%s not exist in vnode", pTableName->tname); + ctgDebug("tablemeta not exist in vnode, tbName:%s", pTableName->tname); return TSDB_CODE_SUCCESS; } - ctgError("error rsp for table meta, code:%x", rpcRsp.code); + ctgError("error rsp for table meta from vnode, code:%x, tbName:%s", rpcRsp.code, pTableName->tname); CTG_ERR_RET(rpcRsp.code); } - CTG_ERR_RET(queryProcessMsgRsp[TMSG_INDEX(TDMT_VND_TABLE_META)](output, rpcRsp.pCont, rpcRsp.contLen)); + code = queryProcessMsgRsp[TMSG_INDEX(TDMT_VND_TABLE_META)](output, rpcRsp.pCont, rpcRsp.contLen); + if (code) { + ctgError("Process vnode tablemeta rsp failed, code:%x, tbName:%s", code, pTableName->tname); + CTG_ERR_RET(code); + } + + ctgDebug("Got table meta from vnode, db:%s, tbName:%s", dbFullName, pTableName->tname); return TSDB_CODE_SUCCESS; } @@ -280,10 +327,11 @@ int32_t ctgGetVgInfoFromDB(struct SCatalog *pCatalog, void *pRpc, const SEpSet * SVgroupInfo *vgInfo = NULL; SArray *vgList = NULL; int32_t code = 0; + int32_t vgNum = taosHashGetSize(dbInfo->vgInfo); - vgList = taosArrayInit(taosHashGetSize(dbInfo->vgInfo), sizeof(SVgroupInfo)); + vgList = taosArrayInit(vgNum, sizeof(SVgroupInfo)); if (NULL == vgList) { - ctgError("taosArrayInit failed"); + ctgError("taosArrayInit failed, num:%d", vgNum); CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR); } @@ -292,7 +340,7 @@ int32_t ctgGetVgInfoFromDB(struct SCatalog *pCatalog, void *pRpc, const SEpSet * vgInfo = pIter; if (NULL == taosArrayPush(vgList, vgInfo)) { - ctgError("taosArrayPush failed"); + ctgError("taosArrayPush failed, vgId:%d", vgInfo->vgId); CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR); } @@ -303,6 +351,8 @@ int32_t ctgGetVgInfoFromDB(struct SCatalog *pCatalog, void *pRpc, const SEpSet * *vgroupList = vgList; vgList = NULL; + ctgDebug("Got vg list from DB, vgNum:%d", vgNum); + return TSDB_CODE_SUCCESS; _return: @@ -314,7 +364,7 @@ _return: CTG_RET(code); } -int32_t ctgGetVgInfoFromHashValue(SDBVgroupInfo *dbInfo, const SName *pTableName, SVgroupInfo *pVgroup) { +int32_t ctgGetVgInfoFromHashValue(struct SCatalog *pCatalog, SDBVgroupInfo *dbInfo, const SName *pTableName, SVgroupInfo *pVgroup) { int32_t code = 0; int32_t vgNum = taosHashGetSize(dbInfo->vgInfo); @@ -322,7 +372,7 @@ int32_t ctgGetVgInfoFromHashValue(SDBVgroupInfo *dbInfo, const SName *pTableName tNameGetFullDbName(pTableName, db); if (vgNum <= 0) { - ctgError("db[%s] vgroup cache invalid, vgroup number:%d", db, vgNum); + ctgError("db vgroup cache invalid, db:%s, vgroup number:%d", db, vgNum); CTG_ERR_RET(TSDB_CODE_TSC_DB_NOT_SELECTED); } @@ -348,7 +398,7 @@ int32_t ctgGetVgInfoFromHashValue(SDBVgroupInfo *dbInfo, const SName *pTableName } if (NULL == vgInfo) { - ctgError("no hash range found for hashvalue[%u]", hashValue); + ctgError("no hash range found for hashvalue[%u], db:%s", hashValue, db); CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR); } @@ -359,57 +409,205 @@ _return: CTG_RET(TSDB_CODE_SUCCESS); } -int32_t ctgGetTableMetaImpl(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const SName* pTableName, bool forceUpdate, STableMeta** pTableMeta, int32_t isSTable) { - if (NULL == pCatalog || NULL == pRpc || NULL == pMgmtEps || NULL == pTableName || NULL == pTableMeta) { - CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT); + +int32_t ctgSTableVersionCompare(const void* key1, const void* key2) { + if (((SSTableMetaVersion*)key1)->suid < ((SSTableMetaVersion*)key2)->suid) { + return -1; + } else if (((SSTableMetaVersion*)key1)->suid > ((SSTableMetaVersion*)key2)->suid) { + return 1; + } else { + return 0; + } +} + +int32_t ctgDbVgVersionCompare(const void* key1, const void* key2) { + if (((SDbVgVersion*)key1)->dbId < ((SDbVgVersion*)key2)->dbId) { + return -1; + } else if (((SDbVgVersion*)key1)->dbId > ((SDbVgVersion*)key2)->dbId) { + return 1; + } else { + return 0; } +} + + +int32_t ctgMetaRentInit(SMetaRentMgmt *mgmt, uint32_t rentSec, int8_t type) { + mgmt->slotRIdx = 0; + mgmt->slotNum = rentSec / CTG_RENT_SLOT_SECOND; + mgmt->type = type; + + size_t msgSize = sizeof(SRentSlotInfo) * mgmt->slotNum; - int32_t exist = 0; + mgmt->slots = calloc(1, msgSize); + if (NULL == mgmt->slots) { + qError("calloc %d failed", (int32_t)msgSize); + return TSDB_CODE_CTG_MEM_ERROR; + } - if (!forceUpdate) { - CTG_ERR_RET(ctgGetTableMetaFromCache(pCatalog, pTableName, pTableMeta, &exist)); + qDebug("meta rent initialized, type:%d, slotNum:%d", type, mgmt->slotNum); + + return TSDB_CODE_SUCCESS; +} - if (exist && CTG_TBTYPE_MATCH(isSTable, (*pTableMeta)->tableType)) { - return TSDB_CODE_SUCCESS; + +int32_t ctgMetaRentAdd(SMetaRentMgmt *mgmt, void *meta, int64_t id, int32_t size) { + int16_t widx = id % mgmt->slotNum; + + SRentSlotInfo *slot = &mgmt->slots[widx]; + int32_t code = 0; + + CTG_LOCK(CTG_WRITE, &slot->lock); + if (NULL == slot->meta) { + slot->meta = taosArrayInit(CTG_DEFAULT_RENT_SLOT_SIZE, size); + if (NULL == slot->meta) { + qError("taosArrayInit %d failed, id:%"PRIx64", slot idx:%d, type:%d", CTG_DEFAULT_RENT_SLOT_SIZE, id, widx, mgmt->type); + CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR); } - } else if (CTG_IS_UNKNOWN_STABLE(isSTable)) { - int32_t tbType = 0; - - CTG_ERR_RET(ctgGetTableTypeFromCache(pCatalog, pTableName, &tbType)); + } - CTG_SET_STABLE(isSTable, tbType); + if (NULL == taosArrayPush(slot->meta, meta)) { + qError("taosArrayPush meta to rent failed, id:%"PRIx64", slot idx:%d, type:%d", id, widx, mgmt->type); + CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR); } - CTG_ERR_RET(ctgRenewTableMetaImpl(pCatalog, pRpc, pMgmtEps, pTableName, isSTable)); + slot->needSort = true; - CTG_ERR_RET(ctgGetTableMetaFromCache(pCatalog, pTableName, pTableMeta, &exist)); + qDebug("add meta to rent, id:%"PRIx64", slot idx:%d, type:%d", id, widx, mgmt->type); - if (0 == exist) { - ctgError("get table meta from cache failed, but fetch succeed"); - CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR); +_return: + + CTG_UNLOCK(CTG_WRITE, &slot->lock); + CTG_RET(code); +} + +int32_t ctgMetaRentUpdate(SMetaRentMgmt *mgmt, void *meta, int64_t id, int32_t size, __compar_fn_t compare) { + int16_t widx = id % mgmt->slotNum; + + SRentSlotInfo *slot = &mgmt->slots[widx]; + int32_t code = 0; + + CTG_LOCK(CTG_WRITE, &slot->lock); + if (NULL == slot->meta) { + qError("meta in slot is empty, id:%"PRIx64", slot idx:%d, type:%d", id, widx, mgmt->type); + CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR); + } + + if (slot->needSort) { + taosArraySort(slot->meta, compare); + slot->needSort = false; + qDebug("slot meta sorted, slot idx:%d, type:%d", widx, mgmt->type); + } + + void *orig = taosArraySearch(slot->meta, &id, compare, TD_EQ); + if (NULL == orig) { + qError("meta not found in slot, id:%"PRIx64", slot idx:%d, type:%d", id, widx, mgmt->type); + CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR); + } + + memcpy(orig, meta, size); + + qDebug("meta in rent updated, id:%"PRIx64", slot idx:%d, type:%d", id, widx, mgmt->type); + +_return: + + CTG_UNLOCK(CTG_WRITE, &slot->lock); + + if (code) { + qWarn("meta in rent update failed, will try to add it, code:%x, id:%"PRIx64", slot idx:%d, type:%d", code, id, widx, mgmt->type); + CTG_RET(ctgMetaRentAdd(mgmt, meta, id, size)); + } + + CTG_RET(code); +} + +int32_t ctgMetaRentGetImpl(SMetaRentMgmt *mgmt, void **res, uint32_t *num, int32_t size) { + int16_t ridx = atomic_add_fetch_16(&mgmt->slotRIdx, 1); + if (ridx >= mgmt->slotNum) { + ridx %= mgmt->slotNum; + atomic_store_16(&mgmt->slotRIdx, ridx); } + + SRentSlotInfo *slot = &mgmt->slots[ridx]; + int32_t code = 0; + CTG_LOCK(CTG_READ, &slot->lock); + if (NULL == slot->meta) { + qDebug("empty meta in slot:%d, type:%d", ridx, mgmt->type); + *num = 0; + goto _return; + } + + size_t metaNum = taosArrayGetSize(slot->meta); + if (metaNum <= 0) { + qDebug("no meta in slot:%d, type:%d", ridx, mgmt->type); + *num = 0; + goto _return; + } + + size_t msize = metaNum * size; + *res = malloc(msize); + if (NULL == *res) { + qError("malloc %d failed", (int32_t)msize); + CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR); + } + + void *meta = taosArrayGet(slot->meta, 0); + + memcpy(*res, meta, msize); + + *num = (uint32_t)metaNum; + + qDebug("Got %d meta from rent, type:%d", (int32_t)metaNum, mgmt->type); + +_return: + + CTG_UNLOCK(CTG_READ, &slot->lock); + + CTG_RET(code); +} + +int32_t ctgMetaRentGet(SMetaRentMgmt *mgmt, void **res, uint32_t *num, int32_t size) { + while (true) { + int64_t msec = taosGetTimestampMs(); + int64_t lsec = atomic_load_64(&mgmt->lastReadMsec); + if ((msec - lsec) < CTG_RENT_SLOT_SECOND * 1000) { + *res = NULL; + *num = 0; + qDebug("too short time period to get expired meta, type:%d", mgmt->type); + return TSDB_CODE_SUCCESS; + } + + if (lsec != atomic_val_compare_exchange_64(&mgmt->lastReadMsec, lsec, msec)) { + continue; + } + + break; + } + + CTG_ERR_RET(ctgMetaRentGetImpl(mgmt, res, num, size)); + return TSDB_CODE_SUCCESS; } + int32_t ctgUpdateTableMetaCache(struct SCatalog *pCatalog, STableMetaOutput *output) { int32_t code = 0; if (output->metaNum != 1 && output->metaNum != 2) { - ctgError("invalid table meta number[%d] got from meta rsp", output->metaNum); + ctgError("invalid table meta number in meta rsp, num:%d", output->metaNum); CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR); } if (NULL == output->tbMeta) { - ctgError("no valid table meta got from meta rsp"); + ctgError("no valid table meta got from meta rsp, tbName:%s", output->tbFname); CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR); } if (NULL == pCatalog->tableCache.cache) { SHashObj *cache = taosHashInit(ctgMgmt.cfg.maxTblCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); if (NULL == cache) { - ctgError("init hash[%d] for tablemeta cache failed", ctgMgmt.cfg.maxTblCacheNum); + ctgError("taosHashInit failed, num:%d", ctgMgmt.cfg.maxTblCacheNum); CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR); } @@ -421,7 +619,7 @@ int32_t ctgUpdateTableMetaCache(struct SCatalog *pCatalog, STableMetaOutput *out if (NULL == pCatalog->tableCache.stableCache) { SHashObj *cache = taosHashInit(ctgMgmt.cfg.maxTblCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), true, HASH_ENTRY_LOCK); if (NULL == cache) { - ctgError("init hash[%d] for stablemeta cache failed", ctgMgmt.cfg.maxTblCacheNum); + ctgError("taosHashInit failed, num:%d", ctgMgmt.cfg.maxTblCacheNum); CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR); } @@ -432,12 +630,14 @@ int32_t ctgUpdateTableMetaCache(struct SCatalog *pCatalog, STableMetaOutput *out if (output->metaNum == 2) { if (taosHashPut(pCatalog->tableCache.cache, output->ctbFname, strlen(output->ctbFname), &output->ctbMeta, sizeof(output->ctbMeta)) != 0) { - ctgError("push ctable[%s] to table cache failed", output->ctbFname); + ctgError("taosHashPut ctablemeta to cache failed, ctbName:%s", output->ctbFname); CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR); } + ctgDebug("update tablemeta to cache, tbName:%s", output->ctbFname); + if (TSDB_SUPER_TABLE != output->tbMeta->tableType) { - ctgError("table type[%d] error, expected:%d", output->tbMeta->tableType, TSDB_SUPER_TABLE); + ctgError("table type error, expected:%d, actual:%d", TSDB_SUPER_TABLE, output->tbMeta->tableType); CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR); } } @@ -445,27 +645,40 @@ int32_t ctgUpdateTableMetaCache(struct SCatalog *pCatalog, STableMetaOutput *out int32_t tbSize = sizeof(*output->tbMeta) + sizeof(SSchema) * (output->tbMeta->tableInfo.numOfColumns + output->tbMeta->tableInfo.numOfTags); if (TSDB_SUPER_TABLE == output->tbMeta->tableType) { + bool newAdded = false; + SSTableMetaVersion metaRent = {.suid = output->tbMeta->suid, .sversion = output->tbMeta->sversion, .tversion = output->tbMeta->tversion}; + CTG_LOCK(CTG_WRITE, &pCatalog->tableCache.stableLock); if (taosHashPut(pCatalog->tableCache.cache, output->tbFname, strlen(output->tbFname), output->tbMeta, tbSize) != 0) { CTG_UNLOCK(CTG_WRITE, &pCatalog->tableCache.stableLock); - ctgError("push table[%s] to table cache failed", output->tbFname); + ctgError("taosHashPut tablemeta to cache failed, tbName:%s", output->tbFname); CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR); } STableMeta *tbMeta = taosHashGet(pCatalog->tableCache.cache, output->tbFname, strlen(output->tbFname)); - if (taosHashPut(pCatalog->tableCache.stableCache, &output->tbMeta->suid, sizeof(output->tbMeta->suid), &tbMeta, POINTER_BYTES) != 0) { + if (taosHashPutExt(pCatalog->tableCache.stableCache, &output->tbMeta->suid, sizeof(output->tbMeta->suid), &tbMeta, POINTER_BYTES, &newAdded) != 0) { CTG_UNLOCK(CTG_WRITE, &pCatalog->tableCache.stableLock); - ctgError("push suid[%"PRIu64"] to stable cache failed", output->tbMeta->suid); + ctgError("taosHashPutExt stable to stable cache failed, suid:%"PRIx64, output->tbMeta->suid); CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR); } CTG_UNLOCK(CTG_WRITE, &pCatalog->tableCache.stableLock); + + ctgDebug("update stable to cache, suid:%"PRIx64, output->tbMeta->suid); + + if (newAdded) { + CTG_ERR_RET(ctgMetaRentAdd(&pCatalog->stableRent, &metaRent, metaRent.suid, sizeof(SSTableMetaVersion))); + } else { + CTG_ERR_RET(ctgMetaRentUpdate(&pCatalog->stableRent, &metaRent, metaRent.suid, sizeof(SSTableMetaVersion), ctgSTableVersionCompare)); + } } else { if (taosHashPut(pCatalog->tableCache.cache, output->tbFname, strlen(output->tbFname), output->tbMeta, tbSize) != 0) { - ctgError("push table[%s] to table cache failed", output->tbFname); + ctgError("taosHashPut tablemeta to cache failed, tbName:%s", output->tbFname); CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR); } } + ctgDebug("update tablemeta to cache, tbName:%s", output->tbFname); + CTG_RET(code); } @@ -495,7 +708,7 @@ int32_t ctgGetDBVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgm CTG_ERR_RET(ctgGetDBVgroupFromCache(pCatalog, dbName, dbInfo, &inCache)); if (!inCache) { - ctgWarn("get db vgroup from cache failed, db:%s", dbName); + ctgWarn("can't get db vgroup from cache, will retry, db:%s", dbName); continue; } @@ -511,7 +724,7 @@ int32_t ctgValidateAndRemoveDb(struct SCatalog* pCatalog, const char* dbName, SD if (oldInfo) { CTG_LOCK(CTG_WRITE, &oldInfo->lock); if (dbInfo->vgVersion <= oldInfo->vgVersion) { - ctgInfo("dbName:%s vg will not update, vgVersion:%d , current:%d", dbName, dbInfo->vgVersion, oldInfo->vgVersion); + ctgInfo("db vgVersion is not new, db:%s, vgVersion:%d, current:%d", dbName, dbInfo->vgVersion, oldInfo->vgVersion); CTG_UNLOCK(CTG_WRITE, &oldInfo->lock); taosHashRelease(pCatalog->dbCache.cache, oldInfo); @@ -519,7 +732,7 @@ int32_t ctgValidateAndRemoveDb(struct SCatalog* pCatalog, const char* dbName, SD } if (oldInfo->vgInfo) { - ctgInfo("dbName:%s vg will be cleanup", dbName); + ctgInfo("cleanup db vgInfo, db:%s", dbName); taosHashCleanup(oldInfo->vgInfo); oldInfo->vgInfo = NULL; } @@ -547,6 +760,8 @@ int32_t ctgRenewTableMetaImpl(struct SCatalog* pCatalog, void *pTransporter, con STableMetaOutput *output = &voutput; if (CTG_IS_STABLE(isSTable)) { + ctgDebug("will renew table meta, supposed to be stable, tbName:%s", pTableName->tname); + CTG_ERR_JRET(ctgGetTableMetaFromMnode(pCatalog, pTransporter, pMgmtEps, pTableName, &moutput)); if (0 == moutput.metaNum) { @@ -555,9 +770,13 @@ int32_t ctgRenewTableMetaImpl(struct SCatalog* pCatalog, void *pTransporter, con output = &moutput; } } else { + ctgDebug("will renew table meta, not supposed to be stable, tbName:%s, isStable:%d", pTableName->tname, isSTable); + CTG_ERR_JRET(ctgGetTableMetaFromVnode(pCatalog, pTransporter, pMgmtEps, pTableName, &vgroupInfo, &voutput)); if (voutput.metaNum > 0 && TSDB_SUPER_TABLE == voutput.tbMeta->tableType) { + ctgDebug("will continue to renew table meta since got stable, tbName:%s, metaNum:%d", pTableName->tname, voutput.metaNum); + CTG_ERR_JRET(ctgGetTableMetaFromMnodeImpl(pCatalog, pTransporter, pMgmtEps, voutput.tbFname, &moutput)); tfree(voutput.tbMeta); @@ -577,9 +796,46 @@ _return: } +int32_t ctgGetTableMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const SName* pTableName, bool forceUpdate, STableMeta** pTableMeta, int32_t isSTable) { + if (NULL == pCatalog || NULL == pRpc || NULL == pMgmtEps || NULL == pTableName || NULL == pTableMeta) { + CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT); + } + + int32_t exist = 0; + + if (!forceUpdate) { + CTG_ERR_RET(ctgGetTableMetaFromCache(pCatalog, pTableName, pTableMeta, &exist)); + + if (exist && CTG_TBTYPE_MATCH(isSTable, (*pTableMeta)->tableType)) { + return TSDB_CODE_SUCCESS; + } + } else if (CTG_IS_UNKNOWN_STABLE(isSTable)) { + int32_t tbType = 0; + + CTG_ERR_RET(ctgGetTableTypeFromCache(pCatalog, pTableName, &tbType)); + + CTG_SET_STABLE(isSTable, tbType); + } + + CTG_ERR_RET(ctgRenewTableMetaImpl(pCatalog, pRpc, pMgmtEps, pTableName, isSTable)); + + CTG_ERR_RET(ctgGetTableMetaFromCache(pCatalog, pTableName, pTableMeta, &exist)); + + if (0 == exist) { + ctgError("renew tablemeta succeed but get from cache failed, may be deleted, tbName:%s", pTableName->tname); + CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR); + } + + return TSDB_CODE_SUCCESS; +} + +void ctgFreeHandle(struct SCatalog* pCatalog) { + //TODO +} + int32_t catalogInit(SCatalogCfg *cfg) { if (ctgMgmt.pCluster) { - ctgError("catalog already init"); + qError("catalog already init"); CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT); } @@ -593,16 +849,29 @@ int32_t catalogInit(SCatalogCfg *cfg) { if (ctgMgmt.cfg.maxTblCacheNum == 0) { ctgMgmt.cfg.maxTblCacheNum = CTG_DEFAULT_CACHE_TABLEMETA_NUMBER; } + + if (ctgMgmt.cfg.dbRentSec == 0) { + ctgMgmt.cfg.dbRentSec = CTG_DEFAULT_RENT_SECOND; + } + + if (ctgMgmt.cfg.stableRentSec == 0) { + ctgMgmt.cfg.stableRentSec = CTG_DEFAULT_RENT_SECOND; + } } else { ctgMgmt.cfg.maxDBCacheNum = CTG_DEFAULT_CACHE_DB_NUMBER; ctgMgmt.cfg.maxTblCacheNum = CTG_DEFAULT_CACHE_TABLEMETA_NUMBER; + ctgMgmt.cfg.dbRentSec = CTG_DEFAULT_RENT_SECOND; + ctgMgmt.cfg.stableRentSec = CTG_DEFAULT_RENT_SECOND; } - ctgMgmt.pCluster = taosHashInit(CTG_DEFAULT_CACHE_CLUSTER_NUMBER, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK); + ctgMgmt.pCluster = taosHashInit(CTG_DEFAULT_CACHE_CLUSTER_NUMBER, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK); if (NULL == ctgMgmt.pCluster) { - CTG_ERR_LRET(TSDB_CODE_CTG_INTERNAL_ERROR, "init %d cluster cache failed", CTG_DEFAULT_CACHE_CLUSTER_NUMBER); + qError("taosHashInit %d cluster cache failed", CTG_DEFAULT_CACHE_CLUSTER_NUMBER); + CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR); } + qDebug("catalog initialized, maxDb:%u, maxTbl:%u, dbRentSec:%u, stableRentSec:%u", ctgMgmt.cfg.maxDBCacheNum, ctgMgmt.cfg.maxTblCacheNum, ctgMgmt.cfg.dbRentSec, ctgMgmt.cfg.stableRentSec); + return TSDB_CODE_SUCCESS; } @@ -612,32 +881,64 @@ int32_t catalogGetHandle(uint64_t clusterId, struct SCatalog** catalogHandle) { } if (NULL == ctgMgmt.pCluster) { - ctgError("cluster cache are not ready"); + qError("cluster cache are not ready, clusterId:%"PRIx64, clusterId); CTG_ERR_RET(TSDB_CODE_CTG_NOT_READY); } - SCatalog **ctg = (SCatalog **)taosHashGet(ctgMgmt.pCluster, (char*)&clusterId, sizeof(clusterId)); + int32_t code = 0; + SCatalog *clusterCtg = NULL; - if (ctg && (*ctg)) { - *catalogHandle = *ctg; - return TSDB_CODE_SUCCESS; - } + while (true) { + SCatalog **ctg = (SCatalog **)taosHashGet(ctgMgmt.pCluster, (char*)&clusterId, sizeof(clusterId)); - SCatalog *clusterCtg = calloc(1, sizeof(SCatalog)); - if (NULL == clusterCtg) { - ctgError("calloc %d failed", (int32_t)sizeof(SCatalog)); - CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR); - } + if (ctg && (*ctg)) { + *catalogHandle = *ctg; + qDebug("got catalog handle from cache, clusterId:%"PRIx64", CTG:%p", clusterId, *ctg); + return TSDB_CODE_SUCCESS; + } - if (taosHashPut(ctgMgmt.pCluster, &clusterId, sizeof(clusterId), &clusterCtg, POINTER_BYTES)) { - ctgError("put cluster %"PRIx64" cache to hash failed", clusterId); - tfree(clusterCtg); - CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR); + clusterCtg = calloc(1, sizeof(SCatalog)); + if (NULL == clusterCtg) { + qError("calloc %d failed", (int32_t)sizeof(SCatalog)); + CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR); + } + + CTG_ERR_JRET(ctgMetaRentInit(&clusterCtg->dbRent, ctgMgmt.cfg.dbRentSec, CTG_RENT_DB)); + CTG_ERR_JRET(ctgMetaRentInit(&clusterCtg->stableRent, ctgMgmt.cfg.stableRentSec, CTG_RENT_STABLE)); + + code = taosHashPut(ctgMgmt.pCluster, &clusterId, sizeof(clusterId), &clusterCtg, POINTER_BYTES); + if (code) { + if (HASH_NODE_EXIST(code)) { + ctgFreeHandle(clusterCtg); + continue; + } + + qError("taosHashPut CTG to cache failed, clusterId:%"PRIx64, clusterId); + CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR); + } + + qDebug("add CTG to cache, clusterId:%"PRIx64", CTG:%p", clusterId, clusterCtg); + + break; } *catalogHandle = clusterCtg; return TSDB_CODE_SUCCESS; + +_return: + + ctgFreeHandle(clusterCtg); + + CTG_RET(code); +} + +void catalogFreeHandle(struct SCatalog* pCatalog) { + if (NULL == pCatalog) { + return; + } + + ctgFreeHandle(pCatalog); } int32_t catalogGetDBVgroupVersion(struct SCatalog* pCatalog, const char* dbName, int32_t* version) { @@ -647,18 +948,22 @@ int32_t catalogGetDBVgroupVersion(struct SCatalog* pCatalog, const char* dbName, if (NULL == pCatalog->dbCache.cache) { *version = CTG_DEFAULT_INVALID_VERSION; + ctgInfo("empty db cache, dbName:%s", dbName); return TSDB_CODE_SUCCESS; } SDBVgroupInfo * dbInfo = taosHashAcquire(pCatalog->dbCache.cache, dbName, strlen(dbName)); if (NULL == dbInfo) { *version = CTG_DEFAULT_INVALID_VERSION; + ctgInfo("db not in cache, dbName:%s", dbName); return TSDB_CODE_SUCCESS; } *version = dbInfo->vgVersion; taosHashRelease(pCatalog->dbCache.cache, dbInfo); + ctgDebug("Got db vgVersion from cache, dbName:%s, vgVersion:%d", dbName, *version); + return TSDB_CODE_SUCCESS; } @@ -676,7 +981,7 @@ int32_t catalogGetDBVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* vgList = taosArrayInit(taosHashGetSize(db->vgInfo), sizeof(SVgroupInfo)); if (NULL == vgList) { - ctgError("taosArrayInit failed"); + ctgError("taosArrayInit %d failed", taosHashGetSize(db->vgInfo)); CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR); } @@ -685,7 +990,7 @@ int32_t catalogGetDBVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* vgInfo = pIter; if (NULL == taosArrayPush(vgList, vgInfo)) { - ctgError("taosArrayPush failed"); + ctgError("taosArrayPush failed, vgId:%d", vgInfo->vgId); CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR); } @@ -720,12 +1025,12 @@ int32_t catalogUpdateDBVgroup(struct SCatalog* pCatalog, const char* dbName, SDB } if (NULL == dbInfo->vgInfo || dbInfo->vgVersion < 0 || taosHashGetSize(dbInfo->vgInfo) <= 0) { - ctgError("invalid db vg, dbName:%s", dbName); + ctgError("invalid db vgInfo, dbName:%s, vgInfo:%p, vgVersion:%d", dbName, dbInfo->vgInfo, dbInfo->vgVersion); CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR); } if (dbInfo->vgVersion < 0) { - ctgWarn("invalid db vgVersion:%d, dbName:%s", dbInfo->vgVersion, dbName); + ctgWarn("db vgVersion less than 0, dbName:%s, vgVersion:%d", dbName, dbInfo->vgVersion); if (pCatalog->dbCache.cache) { CTG_ERR_JRET(ctgValidateAndRemoveDb(pCatalog, dbName, dbInfo)); @@ -733,14 +1038,14 @@ int32_t catalogUpdateDBVgroup(struct SCatalog* pCatalog, const char* dbName, SDB CTG_ERR_JRET(taosHashRemove(pCatalog->dbCache.cache, dbName, strlen(dbName))); } - ctgWarn("remove db [%s] from cache", dbName); + ctgWarn("db removed from cache, db:%s", dbName); goto _return; } if (NULL == pCatalog->dbCache.cache) { SHashObj *cache = taosHashInit(ctgMgmt.cfg.maxDBCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); if (NULL == cache) { - ctgError("init hash[%d] for db cache failed", CTG_DEFAULT_CACHE_DB_NUMBER); + ctgError("taosHashInit %d failed", CTG_DEFAULT_CACHE_DB_NUMBER); CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR); } @@ -751,14 +1056,23 @@ int32_t catalogUpdateDBVgroup(struct SCatalog* pCatalog, const char* dbName, SDB CTG_ERR_JRET(ctgValidateAndRemoveDb(pCatalog, dbName, dbInfo)); } - if (taosHashPut(pCatalog->dbCache.cache, dbName, strlen(dbName), dbInfo, sizeof(*dbInfo)) != 0) { - ctgError("push to vgroup hash cache failed"); + bool newAdded = false; + if (taosHashPutExt(pCatalog->dbCache.cache, dbName, strlen(dbName), dbInfo, sizeof(*dbInfo), &newAdded) != 0) { + ctgError("taosHashPutExt db vgroup to cache failed, db:%s", dbName); CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR); } + dbInfo->vgInfo = NULL; + + SDbVgVersion vgVersion = {.dbId = dbInfo->dbId, .vgVersion = dbInfo->vgVersion}; + if (newAdded) { + CTG_ERR_JRET(ctgMetaRentAdd(&pCatalog->dbRent, &vgVersion, dbInfo->dbId, sizeof(SDbVgVersion))); + } else { + CTG_ERR_JRET(ctgMetaRentUpdate(&pCatalog->dbRent, &vgVersion, dbInfo->dbId, sizeof(SDbVgVersion), ctgDbVgVersionCompare)); + } + ctgDebug("dbName:%s vgroup updated, vgVersion:%d", dbName, dbInfo->vgVersion); - dbInfo->vgInfo = NULL; _return: @@ -771,11 +1085,11 @@ _return: } int32_t catalogGetTableMeta(struct SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, STableMeta** pTableMeta) { - return ctgGetTableMetaImpl(pCatalog, pTransporter, pMgmtEps, pTableName, false, pTableMeta, -1); + return ctgGetTableMeta(pCatalog, pTransporter, pMgmtEps, pTableName, false, pTableMeta, -1); } int32_t catalogGetSTableMeta(struct SCatalog* pCatalog, void * pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, STableMeta** pTableMeta) { - return ctgGetTableMetaImpl(pCatalog, pTransporter, pMgmtEps, pTableName, false, pTableMeta, 1); + return ctgGetTableMeta(pCatalog, pTransporter, pMgmtEps, pTableName, false, pTableMeta, 1); } int32_t catalogRenewTableMeta(struct SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, int32_t isSTable) { @@ -787,7 +1101,7 @@ int32_t catalogRenewTableMeta(struct SCatalog* pCatalog, void *pTransporter, con } int32_t catalogRenewAndGetTableMeta(struct SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, STableMeta** pTableMeta, int32_t isSTable) { - return ctgGetTableMetaImpl(pCatalog, pTransporter, pMgmtEps, pTableName, true, pTableMeta, isSTable); + return ctgGetTableMeta(pCatalog, pTransporter, pMgmtEps, pTableName, true, pTableMeta, isSTable); } int32_t catalogGetTableDistVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const SName* pTableName, SArray** pVgroupList) { @@ -803,7 +1117,7 @@ int32_t catalogGetTableDistVgroup(struct SCatalog* pCatalog, void *pRpc, const S *pVgroupList = NULL; - CTG_ERR_JRET(catalogGetTableMeta(pCatalog, pRpc, pMgmtEps, pTableName, &tbMeta)); + CTG_ERR_JRET(ctgGetTableMeta(pCatalog, pRpc, pMgmtEps, pTableName, false, &tbMeta, -1)); char db[TSDB_DB_FNAME_LEN] = {0}; tNameGetFullDbName(pTableName, db); @@ -814,18 +1128,18 @@ int32_t catalogGetTableDistVgroup(struct SCatalog* pCatalog, void *pRpc, const S } else { int32_t vgId = tbMeta->vgId; if (NULL == taosHashGetClone(dbVgroup->vgInfo, &vgId, sizeof(vgId), &vgroupInfo)) { - ctgError("vgId[%d] not found in vgroup list", vgId); + ctgError("table's vgId not found in vgroup list, vgId:%d, tbName:%s", vgId, pTableName->tname); CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR); } vgList = taosArrayInit(1, sizeof(SVgroupInfo)); if (NULL == vgList) { - ctgError("taosArrayInit failed"); + ctgError("taosArrayInit %d failed", (int32_t)sizeof(SVgroupInfo)); CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR); } if (NULL == taosArrayPush(vgList, &vgroupInfo)) { - ctgError("push vgroupInfo to array failed"); + ctgError("taosArrayPush vgroupInfo to array failed, vgId:%d, tbName:%s", vgId, pTableName->tname); CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR); } @@ -859,7 +1173,7 @@ int32_t catalogGetTableHashVgroup(struct SCatalog *pCatalog, void *pTransporter, CTG_ERR_RET(ctgGetDBVgroup(pCatalog, pTransporter, pMgmtEps, db, false, &dbInfo)); - CTG_ERR_JRET(ctgGetVgInfoFromHashValue(dbInfo, pTableName, pVgroup)); + CTG_ERR_JRET(ctgGetVgInfoFromHashValue(pCatalog, dbInfo, pTableName, pVgroup)); _return: @@ -882,13 +1196,13 @@ int32_t catalogGetAllMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* p if (pReq->pTableName) { int32_t tbNum = (int32_t)taosArrayGetSize(pReq->pTableName); if (tbNum <= 0) { - ctgError("empty table name list"); + ctgError("empty table name list, tbNum:%d", tbNum); CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT); } pRsp->pTableMeta = taosArrayInit(tbNum, POINTER_BYTES); if (NULL == pRsp->pTableMeta) { - ctgError("taosArrayInit num[%d] failed", tbNum); + ctgError("taosArrayInit %d failed", tbNum); CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR); } @@ -896,7 +1210,7 @@ int32_t catalogGetAllMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* p SName *name = taosArrayGet(pReq->pTableName, i); STableMeta *pTableMeta = NULL; - CTG_ERR_JRET(catalogGetTableMeta(pCatalog, pRpc, pMgmtEps, name, &pTableMeta)); + CTG_ERR_JRET(ctgGetTableMeta(pCatalog, pRpc, pMgmtEps, name, false, &pTableMeta, -1)); if (NULL == taosArrayPush(pRsp->pTableMeta, &pTableMeta)) { ctgError("taosArrayPush failed, idx:%d", i); @@ -929,16 +1243,35 @@ int32_t catalogGetQnodeList(struct SCatalog* pCatalog, void *pRpc, const SEpSet* CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT); } + //TODO return TSDB_CODE_SUCCESS; } +int32_t catalogGetExpiredSTables(struct SCatalog* pCatalog, SSTableMetaVersion **stables, uint32_t *num) { + if (NULL == pCatalog || NULL == stables || NULL == num) { + CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT); + } + + CTG_RET(ctgMetaRentGet(&pCatalog->stableRent, (void **)stables, num, sizeof(SSTableMetaVersion))); +} + +int32_t catalogGetExpiredDBs(struct SCatalog* pCatalog, SDbVgVersion **dbs, uint32_t *num) { + if (NULL == pCatalog || NULL == dbs || NULL == num) { + CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT); + } + + CTG_RET(ctgMetaRentGet(&pCatalog->dbRent, (void **)dbs, num, sizeof(SDbVgVersion))); +} + void catalogDestroy(void) { if (ctgMgmt.pCluster) { taosHashCleanup(ctgMgmt.pCluster); //TBD ctgMgmt.pCluster = NULL; } + + qInfo("catalog destroyed"); } diff --git a/source/libs/catalog/test/catalogTests.cpp b/source/libs/catalog/test/catalogTests.cpp index 4fc53e5f18..4adc4e95f0 100644 --- a/source/libs/catalog/test/catalogTests.cpp +++ b/source/libs/catalog/test/catalogTests.cpp @@ -46,6 +46,8 @@ void ctgTestSetPrepareSTableMeta(); bool ctgTestStop = false; bool ctgTestEnableSleep = false; bool ctgTestDeadLoop = false; +int32_t ctgTestPrintNum = 200000; + int32_t ctgTestCurrentVgVersion = 0; int32_t ctgTestVgVersion = 1; @@ -101,7 +103,6 @@ void ctgTestInitLogFile() { const char *defaultLogFileNamePrefix = "taoslog"; const int32_t maxLogFileNum = 10; - ctgDebugFlag = 159; tsAsyncLog = 0; char temp[128] = {0}; @@ -216,6 +217,7 @@ void ctgTestPrepareDbVgroups(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcM ctgTestCurrentVgVersion = ctgTestVgVersion; rspMsg->vgNum = htonl(ctgTestVgNum); rspMsg->hashMethod = 0; + rspMsg->uid = htobe64(3); SVgroupInfo *vg = NULL; uint32_t hashUnit = UINT32_MAX / ctgTestVgNum; @@ -507,7 +509,7 @@ void *ctgTestGetDbVgroupThread(void *param) { if (ctgTestEnableSleep) { usleep(rand()%5); } - if (++n % 50000 == 0) { + if (++n % ctgTestPrintNum == 0) { printf("Get:%d\n", n); } } @@ -531,7 +533,7 @@ void *ctgTestSetDbVgroupThread(void *param) { if (ctgTestEnableSleep) { usleep(rand()%5); } - if (++n % 50000 == 0) { + if (++n % ctgTestPrintNum == 0) { printf("Set:%d\n", n); } } @@ -563,7 +565,7 @@ void *ctgTestGetCtableMetaThread(void *param) { usleep(rand()%5); } - if (++n % 50000 == 0) { + if (++n % ctgTestPrintNum == 0) { printf("Get:%d\n", n); } } @@ -589,7 +591,7 @@ void *ctgTestSetCtableMetaThread(void *param) { if (ctgTestEnableSleep) { usleep(rand()%5); } - if (++n % 50000 == 0) { + if (++n % ctgTestPrintNum == 0) { printf("Set:%d\n", n); } } @@ -627,6 +629,7 @@ TEST(tableMeta, normalTable) { ASSERT_EQ(vgInfo.vgId, 8); ASSERT_EQ(vgInfo.numOfEps, 3); + ctgTestSetPrepareTableMeta(); STableMeta *tableMeta = NULL; @@ -653,6 +656,41 @@ TEST(tableMeta, normalTable) { ASSERT_EQ(tableMeta->tableInfo.precision, 1); ASSERT_EQ(tableMeta->tableInfo.rowSize, 12); + SDbVgVersion *dbs = NULL; + SSTableMetaVersion *stb = NULL; + uint32_t dbNum = 0, stbNum = 0, allDbNum = 0, allStbNum = 0; + int32_t i = 0; + while (i < 5) { + ++i; + code = catalogGetExpiredDBs(pCtg, &dbs, &dbNum); + ASSERT_EQ(code, 0); + code = catalogGetExpiredSTables(pCtg, &stb, &stbNum); + ASSERT_EQ(code, 0); + + if (dbNum) { + printf("got expired db,dbId:%"PRId64"\n", dbs->dbId); + free(dbs); + dbs = NULL; + } else { + printf("no expired db\n"); + } + + if (stbNum) { + printf("got expired stb,suid:%"PRId64"\n", stb->suid); + free(stb); + stb = NULL; + } else { + printf("no expired stb\n"); + } + + allDbNum += dbNum; + allStbNum += stbNum; + sleep(2); + } + + ASSERT_EQ(allDbNum, 1); + ASSERT_EQ(allStbNum, 0); + catalogDestroy(); } @@ -714,6 +752,42 @@ TEST(tableMeta, childTableCase) { ASSERT_EQ(tableMeta->tableInfo.precision, 1); ASSERT_EQ(tableMeta->tableInfo.rowSize, 12); + SDbVgVersion *dbs = NULL; + SSTableMetaVersion *stb = NULL; + uint32_t dbNum = 0, stbNum = 0, allDbNum = 0, allStbNum = 0; + int32_t i = 0; + while (i < 5) { + ++i; + code = catalogGetExpiredDBs(pCtg, &dbs, &dbNum); + ASSERT_EQ(code, 0); + code = catalogGetExpiredSTables(pCtg, &stb, &stbNum); + ASSERT_EQ(code, 0); + + if (dbNum) { + printf("got expired db,dbId:%"PRId64"\n", dbs->dbId); + free(dbs); + dbs = NULL; + } else { + printf("no expired db\n"); + } + + if (stbNum) { + printf("got expired stb,suid:%"PRId64"\n", stb->suid); + free(stb); + stb = NULL; + } else { + printf("no expired stb\n"); + } + + allDbNum += dbNum; + allStbNum += stbNum; + sleep(2); + } + + ASSERT_EQ(allDbNum, 1); + ASSERT_EQ(allStbNum, 1); + + catalogDestroy(); } @@ -778,6 +852,40 @@ TEST(tableMeta, superTableCase) { ASSERT_EQ(tableMeta->tableInfo.precision, 1); ASSERT_EQ(tableMeta->tableInfo.rowSize, 12); + SDbVgVersion *dbs = NULL; + SSTableMetaVersion *stb = NULL; + uint32_t dbNum = 0, stbNum = 0, allDbNum = 0, allStbNum = 0; + int32_t i = 0; + while (i < 5) { + ++i; + code = catalogGetExpiredDBs(pCtg, &dbs, &dbNum); + ASSERT_EQ(code, 0); + code = catalogGetExpiredSTables(pCtg, &stb, &stbNum); + ASSERT_EQ(code, 0); + + if (dbNum) { + printf("got expired db,dbId:%"PRId64"\n", dbs->dbId); + free(dbs); + dbs = NULL; + } else { + printf("no expired db\n"); + } + + if (stbNum) { + printf("got expired stb,suid:%"PRId64"\n", stb->suid); + free(stb); + stb = NULL; + } else { + printf("no expired stb\n"); + } + + allDbNum += dbNum; + allStbNum += stbNum; + sleep(2); + } + + ASSERT_EQ(allDbNum, 1); + ASSERT_EQ(allStbNum, 1); catalogDestroy(); @@ -947,7 +1055,6 @@ TEST(dbVgroup, getSetDbVgroupCase) { catalogDestroy(); } - TEST(multiThread, getSetDbVgroupCase) { struct SCatalog* pCtg = NULL; void *mockPointer = (void *)0x1; @@ -955,6 +1062,7 @@ TEST(multiThread, getSetDbVgroupCase) { SVgroupInfo *pvgInfo = NULL; SDBVgroupInfo dbVgroup = {0}; SArray *vgList = NULL; + ctgTestStop = false; ctgTestInitLogFile(); @@ -998,7 +1106,6 @@ TEST(multiThread, getSetDbVgroupCase) { catalogDestroy(); } - TEST(multiThread, ctableMeta) { struct SCatalog* pCtg = NULL; void *mockPointer = (void *)0x1; @@ -1006,6 +1113,7 @@ TEST(multiThread, ctableMeta) { SVgroupInfo *pvgInfo = NULL; SDBVgroupInfo dbVgroup = {0}; SArray *vgList = NULL; + ctgTestStop = false; ctgTestSetPrepareDbVgroupsAndChildMeta(); diff --git a/source/libs/qcom/src/querymsg.c b/source/libs/qcom/src/querymsg.c index 507650159f..914bf359f5 100644 --- a/source/libs/qcom/src/querymsg.c +++ b/source/libs/qcom/src/querymsg.c @@ -97,6 +97,7 @@ int32_t queryProcessUseDBRsp(void* output, char *msg, int32_t msgSize) { pRsp->vgVersion = ntohl(pRsp->vgVersion); pRsp->vgNum = ntohl(pRsp->vgNum); + pRsp->uid = be64toh(pRsp->uid); if (pRsp->vgNum < 0) { qError("invalid db[%s] vgroup number[%d]", pRsp->db, pRsp->vgNum); @@ -111,6 +112,7 @@ int32_t queryProcessUseDBRsp(void* output, char *msg, int32_t msgSize) { pOut->dbVgroup.vgVersion = pRsp->vgVersion; pOut->dbVgroup.hashMethod = pRsp->hashMethod; + pOut->dbVgroup.dbId = pRsp->uid; pOut->dbVgroup.vgInfo = taosHashInit(pRsp->vgNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK); if (NULL == pOut->dbVgroup.vgInfo) { qError("hash init[%d] failed", pRsp->vgNum); @@ -149,8 +151,8 @@ static int32_t queryConvertTableMetaMsg(STableMetaMsg* pMetaMsg) { pMetaMsg->numOfColumns = ntohl(pMetaMsg->numOfColumns); pMetaMsg->sversion = ntohl(pMetaMsg->sversion); pMetaMsg->tversion = ntohl(pMetaMsg->tversion); - pMetaMsg->tuid = htobe64(pMetaMsg->tuid); - pMetaMsg->suid = htobe64(pMetaMsg->suid); + pMetaMsg->tuid = be64toh(pMetaMsg->tuid); + pMetaMsg->suid = be64toh(pMetaMsg->suid); pMetaMsg->vgId = ntohl(pMetaMsg->vgId); if (pMetaMsg->numOfTags < 0 || pMetaMsg->numOfTags > TSDB_MAX_TAGS) { diff --git a/source/util/src/thash.c b/source/util/src/thash.c index f90b157558..7c7548635a 100644 --- a/source/util/src/thash.c +++ b/source/util/src/thash.c @@ -214,7 +214,7 @@ static FORCE_INLINE bool taosHashTableEmpty(const SHashObj *pHashObj) { return taosHashGetSize(pHashObj) == 0; } -int32_t taosHashPut(SHashObj *pHashObj, const void *key, size_t keyLen, void *data, size_t size) { +int32_t taosHashPutImpl(SHashObj *pHashObj, const void *key, size_t keyLen, void *data, size_t size, bool *newAdded) { uint32_t hashVal = (*pHashObj->hashFp)(key, (uint32_t)keyLen); SHashNode *pNewNode = doCreateHashNode(key, keyLen, data, size, hashVal); if (pNewNode == NULL) { @@ -273,6 +273,10 @@ int32_t taosHashPut(SHashObj *pHashObj, const void *key, size_t keyLen, void *da __rd_unlock((void*) &pHashObj->lock, pHashObj->type); atomic_add_fetch_32(&pHashObj->size, 1); + if (newAdded) { + *newAdded = true; + } + return 0; } else { // not support the update operation, return error @@ -289,10 +293,23 @@ int32_t taosHashPut(SHashObj *pHashObj, const void *key, size_t keyLen, void *da // enable resize __rd_unlock((void*) &pHashObj->lock, pHashObj->type); + if (newAdded) { + *newAdded = false; + } + return pHashObj->enableUpdate ? 0 : -2; } } +int32_t taosHashPut(SHashObj *pHashObj, const void *key, size_t keyLen, void *data, size_t size) { + return taosHashPutImpl(pHashObj, key, keyLen, data, size, NULL); +} + +int32_t taosHashPutExt(SHashObj *pHashObj, const void *key, size_t keyLen, void *data, size_t size, bool *newAdded) { + return taosHashPutImpl(pHashObj, key, keyLen, data, size, newAdded); +} + + void *taosHashGet(SHashObj *pHashObj, const void *key, size_t keyLen) { return taosHashGetClone(pHashObj, key, keyLen, NULL); } diff --git a/source/util/src/tlog.c b/source/util/src/tlog.c index 95f2fe76e6..c284efbcd4 100644 --- a/source/util/src/tlog.c +++ b/source/util/src/tlog.c @@ -95,7 +95,6 @@ int32_t tsdbDebugFlag = 131; int32_t tqDebugFlag = 131; int32_t cqDebugFlag = 131; int32_t fsDebugFlag = 135; -int32_t ctgDebugFlag = 131; int64_t dbgEmptyW = 0; int64_t dbgWN = 0; -- GitLab