未验证 提交 6f3f8a08 编写于 作者: D dapan1121 提交者: GitHub

Merge pull request #9662 from taosdata/feature/qnode

Feature/qnode
......@@ -871,6 +871,7 @@ typedef struct {
typedef struct {
char db[TSDB_DB_FNAME_LEN];
int64_t uid;
int32_t vgVersion;
int32_t vgNum;
int8_t hashMethod;
......
......@@ -48,8 +48,22 @@ typedef struct SMetaData {
typedef struct SCatalogCfg {
uint32_t maxTblCacheNum;
uint32_t maxDBCacheNum;
uint32_t dbRentSec;
uint32_t stableRentSec;
} SCatalogCfg;
typedef struct SSTableMetaVersion {
uint64_t suid;
int16_t sversion;
int16_t tversion;
} SSTableMetaVersion;
typedef struct SDbVgVersion {
int64_t dbId;
int32_t vgVersion;
} SDbVgVersion;
int32_t catalogInit(SCatalogCfg *cfg);
/**
......@@ -60,6 +74,14 @@ int32_t catalogInit(SCatalogCfg *cfg);
*/
int32_t catalogGetHandle(uint64_t clusterId, struct SCatalog** catalogHandle);
/**
* Free a cluster's all catalog info, usually it's not necessary, until the application is closing.
* no current or future usage should be guaranteed by application
* @param pCatalog (input, NO more usage)
* @return error code
*/
void catalogFreeHandle(struct SCatalog* pCatalog);
int32_t catalogGetDBVgroupVersion(struct SCatalog* pCatalog, const char* dbName, int32_t* version);
/**
......@@ -87,15 +109,28 @@ int32_t catalogUpdateDBVgroup(struct SCatalog* pCatalog, const char* dbName, SDB
*/
int32_t catalogGetTableMeta(struct SCatalog* pCatalog, void * pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, STableMeta** pTableMeta);
/**
* Get a super table's meta data.
* @param pCatalog (input, got with catalogGetHandle)
* @param pTransporter (input, rpc object)
* @param pMgmtEps (input, mnode EPs)
* @param pTableName (input, table name, NOT including db name)
* @param pTableMeta(output, table meta data, NEED to free it by calller)
* @return error code
*/
int32_t catalogGetSTableMeta(struct SCatalog* pCatalog, void * pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, STableMeta** pTableMeta);
/**
* Force renew a table's local cached meta data.
* @param pCatalog (input, got with catalogGetHandle)
* @param pTransporter (input, rpc object)
* @param pMgmtEps (input, mnode EPs)
* @param pTableName (input, table name, NOT including db name)
* @param isSTable (input, is super table or not, 1:supposed to be stable, 0: supposed not to be stable, -1:not sure)
* @return error code
*/
int32_t catalogRenewTableMeta(struct SCatalog* pCatalog, void * pTransporter, const SEpSet* pMgmtEps, const SName* pTableName);
int32_t catalogRenewTableMeta(struct SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, int32_t isSTable);
/**
* Force renew a table's local cached meta data and get the new one.
......@@ -104,9 +139,11 @@ int32_t catalogRenewTableMeta(struct SCatalog* pCatalog, void * pTransporter, co
* @param pMgmtEps (input, mnode EPs)
* @param pTableName (input, table name, NOT including db name)
* @param pTableMeta(output, table meta data, NEED to free it by calller)
* @param isSTable (input, is super table or not, 1:supposed to be stable, 0: supposed not to be stable, -1:not sure)
* @return error code
*/
int32_t catalogRenewAndGetTableMeta(struct SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, STableMeta** pTableMeta);
int32_t catalogRenewAndGetTableMeta(struct SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, STableMeta** pTableMeta, int32_t isSTable);
/**
......@@ -146,6 +183,9 @@ int32_t catalogGetAllMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* p
int32_t catalogGetQnodeList(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, SArray* pQnodeList);
int32_t catalogGetExpiredSTables(struct SCatalog* pCatalog, SSTableMetaVersion **stables, uint32_t *num);
int32_t catalogGetExpiredDBs(struct SCatalog* pCatalog, SDbVgVersion **dbs, uint32_t *num);
/**
......
......@@ -76,6 +76,7 @@ typedef struct STableMeta {
typedef struct SDBVgroupInfo {
SRWLatch lock;
int64_t dbId;
int32_t vgVersion;
int8_t hashMethod;
SHashObj *vgInfo; //key:vgId, value:SVgroupInfo
......@@ -86,8 +87,15 @@ typedef struct SUseDbOutput {
SDBVgroupInfo dbVgroup;
} SUseDbOutput;
typedef enum {
META_TYPE_NON_TABLE = 1,
META_TYPE_CTABLE,
META_TYPE_TABLE,
META_TYPE_BOTH_TABLE,
};
typedef struct STableMetaOutput {
int32_t metaNum;
int32_t metaType;
char ctbFname[TSDB_TABLE_FNAME_LEN];
char tbFname[TSDB_TABLE_FNAME_LEN];
SCTableMeta ctbMeta;
......@@ -149,6 +157,11 @@ void initQueryModuleMsgHandle();
extern int32_t (*queryBuildMsg[TDMT_MAX])(void* input, char **msg, int32_t msgSize, int32_t *msgLen);
extern int32_t (*queryProcessMsgRsp[TDMT_MAX])(void* output, char *msg, int32_t msgSize);
#define SET_META_TYPE_NONE(t) (t) = META_TYPE_NON_TABLE
#define SET_META_TYPE_CTABLE(t) (t) = META_TYPE_CTABLE
#define SET_META_TYPE_TABLE(t) (t) = META_TYPE_TABLE
#define SET_META_TYPE_BOTH_TABLE(t) (t) = META_TYPE_BOTH_TABLE
#define qFatal(...) do { if (qDebugFlag & DEBUG_FATAL) { taosPrintLog("QRY FATAL ", qDebugFlag, __VA_ARGS__); }} while(0)
#define qError(...) do { if (qDebugFlag & DEBUG_ERROR) { taosPrintLog("QRY ERROR ", qDebugFlag, __VA_ARGS__); }} while(0)
#define qWarn(...) do { if (qDebugFlag & DEBUG_WARN) { taosPrintLog("QRY WARN ", qDebugFlag, __VA_ARGS__); }} while(0)
......
......@@ -124,6 +124,9 @@ int32_t taosHashGetSize(const SHashObj *pHashObj);
*/
int32_t taosHashPut(SHashObj *pHashObj, const void *key, size_t keyLen, void *data, size_t size);
int32_t taosHashPutExt(SHashObj *pHashObj, const void *key, size_t keyLen, void *data, size_t size, bool *newAdded);
/**
* return the payload data with the specified key
*
......
......@@ -44,7 +44,6 @@ extern int32_t tsdbDebugFlag;
extern int32_t tqDebugFlag;
extern int32_t cqDebugFlag;
extern int32_t debugFlag;
extern int32_t ctgDebugFlag;
#define DEBUG_FATAL 1U
#define DEBUG_ERROR DEBUG_FATAL
......
......@@ -46,6 +46,7 @@ void taos_cleanup(void) {
taosCloseRef(id);
rpcCleanup();
catalogDestroy();
taosCloseLog();
tscInfo("all local resources released");
......
......@@ -927,16 +927,6 @@ static void doInitGlobalConfig(void) {
cfg.unitType = TAOS_CFG_UTYPE_NONE;
taosAddConfigOption(cfg);
cfg.option = "ctgDebugFlag";
cfg.ptr = &ctgDebugFlag;
cfg.valType = TAOS_CFG_VTYPE_INT32;
cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_LOG | TSDB_CFG_CTYPE_B_CLIENT;
cfg.minValue = 0;
cfg.maxValue = 255;
cfg.ptrLength = 0;
cfg.unitType = TAOS_CFG_UTYPE_NONE;
taosAddConfigOption(cfg);
cfg.option = "enableRecordSql";
cfg.ptr = &tsTscEnableRecordSql;
cfg.valType = TAOS_CFG_VTYPE_INT8;
......
......@@ -137,9 +137,9 @@ TEST_F(DndTestStb, 01_Create_Show_Meta_Drop_Restart_Stb) {
pRsp->numOfColumns = htonl(pRsp->numOfColumns);
pRsp->sversion = htonl(pRsp->sversion);
pRsp->tversion = htonl(pRsp->tversion);
pRsp->suid = htobe64(pRsp->suid);
pRsp->tuid = htobe64(pRsp->tuid);
pRsp->vgId = htobe64(pRsp->vgId);
pRsp->suid = be64toh(pRsp->suid);
pRsp->tuid = be64toh(pRsp->tuid);
pRsp->vgId = be64toh(pRsp->vgId);
for (int32_t i = 0; i < pRsp->numOfTags + pRsp->numOfColumns; ++i) {
SSchema* pSchema = &pRsp->pSchema[i];
pSchema->colId = htonl(pSchema->colId);
......@@ -156,7 +156,7 @@ TEST_F(DndTestStb, 01_Create_Show_Meta_Drop_Restart_Stb) {
EXPECT_EQ(pRsp->sversion, 1);
EXPECT_EQ(pRsp->tversion, 0);
EXPECT_GT(pRsp->suid, 0);
EXPECT_EQ(pRsp->tuid, 0);
EXPECT_GT(pRsp->tuid, 0);
EXPECT_EQ(pRsp->vgId, 0);
{
......
......@@ -917,6 +917,7 @@ static int32_t mndProcessUseDbMsg(SMnodeMsg *pMsg) {
}
memcpy(pRsp->db, pDb->name, TSDB_DB_FNAME_LEN);
pRsp->uid = htobe64(pDb->uid);
pRsp->vgVersion = htonl(pDb->vgVersion);
pRsp->vgNum = htonl(vindex);
pRsp->hashMethod = pDb->hashMethod;
......
......@@ -769,7 +769,8 @@ static int32_t mndProcessStbMetaMsg(SMnodeMsg *pMsg) {
pMeta->tableType = TSDB_SUPER_TABLE;
pMeta->update = pDb->cfg.update;
pMeta->sversion = htonl(pStb->version);
pMeta->suid = htonl(pStb->uid);
pMeta->suid = htobe64(pStb->uid);
pMeta->tuid = htobe64(pStb->uid);
for (int32_t i = 0; i < totalCols; ++i) {
SSchema *pSchema = &pMeta->pSchema[i];
......
......@@ -105,6 +105,9 @@ static int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
if (pTbCfg->type == META_CHILD_TABLE) {
strcpy(pTbMetaMsg->stbFname, pStbCfg->name);
pTbMetaMsg->suid = htobe64(pTbCfg->ctbCfg.suid);
} else if (pTbCfg->type == META_SUPER_TABLE) {
strcpy(pTbMetaMsg->stbFname, pTbCfg->name);
pTbMetaMsg->suid = htobe64(uid);
}
pTbMetaMsg->numOfTags = htonl(nTagCols);
pTbMetaMsg->numOfColumns = htonl(nCols);
......
......@@ -22,20 +22,31 @@ extern "C" {
#include "catalog.h"
#include "common.h"
#include "tlog.h"
#include "query.h"
#define CTG_DEFAULT_CACHE_CLUSTER_NUMBER 6
#define CTG_DEFAULT_CACHE_VGROUP_NUMBER 100
#define CTG_DEFAULT_CACHE_DB_NUMBER 20
#define CTG_DEFAULT_CACHE_TABLEMETA_NUMBER 100000
#define CTG_DEFAULT_RENT_SECOND 10
#define CTG_DEFAULT_RENT_SLOT_SIZE 10
#define CTG_RENT_SLOT_SECOND 2
#define CTG_DEFAULT_INVALID_VERSION (-1)
#define CTG_ERR_CODE_TABLE_NOT_EXIST TSDB_CODE_TDB_INVALID_TABLE_ID
enum {
CTG_READ = 1,
CTG_WRITE,
};
enum {
CTG_RENT_DB = 1,
CTG_RENT_STABLE,
};
typedef struct SVgroupListCache {
int32_t vgroupVersion;
SHashObj *cache; // key:vgId, value:SVgroupInfo
......@@ -51,30 +62,76 @@ typedef struct STableMetaCache {
SHashObj *stableCache; //key:suid, value:STableMeta*
} STableMetaCache;
typedef struct SRentSlotInfo {
SRWLatch lock;
bool needSort;
SArray *meta; // element is SDbVgVersion or SSTableMetaVersion
} SRentSlotInfo;
typedef struct SMetaRentMgmt {
int8_t type;
uint16_t slotNum;
uint16_t slotRIdx;
int64_t lastReadMsec;
SRentSlotInfo *slots;
} SMetaRentMgmt;
typedef struct SCatalog {
uint64_t clusterId;
SDBVgroupCache dbCache;
STableMetaCache tableCache;
SMetaRentMgmt dbRent;
SMetaRentMgmt stableRent;
} SCatalog;
typedef struct SCtgApiStat {
} SCtgApiStat;
typedef struct SCtgResourceStat {
} SCtgResourceStat;
typedef struct SCtgCacheStat {
} SCtgCacheStat;
typedef struct SCatalogStat {
SCtgApiStat api;
SCtgResourceStat resource;
SCtgCacheStat cache;
} SCatalogStat;
typedef struct SCatalogMgmt {
void *pMsgSender; // used to send messsage to mnode to fetch necessary metadata
SHashObj *pCluster; // items cached for each cluster, the hash key is the cluster-id got from mgmt node
SCatalogCfg cfg;
SHashObj *pCluster; //key: clusterId, value: SCatalog*
SCatalogStat stat;
SCatalogCfg cfg;
} SCatalogMgmt;
typedef uint32_t (*tableNameHashFp)(const char *, uint32_t);
#define ctgFatal(...) do { if (ctgDebugFlag & DEBUG_FATAL) { taosPrintLog("CTG FATAL ", ctgDebugFlag, __VA_ARGS__); }} while(0)
#define ctgError(...) do { if (ctgDebugFlag & DEBUG_ERROR) { taosPrintLog("CTG ERROR ", ctgDebugFlag, __VA_ARGS__); }} while(0)
#define ctgWarn(...) do { if (ctgDebugFlag & DEBUG_WARN) { taosPrintLog("CTG WARN ", ctgDebugFlag, __VA_ARGS__); }} while(0)
#define ctgInfo(...) do { if (ctgDebugFlag & DEBUG_INFO) { taosPrintLog("CTG ", ctgDebugFlag, __VA_ARGS__); }} while(0)
#define ctgDebug(...) do { if (ctgDebugFlag & DEBUG_DEBUG) { taosPrintLog("CTG ", ctgDebugFlag, __VA_ARGS__); }} while(0)
#define ctgTrace(...) do { if (ctgDebugFlag & DEBUG_TRACE) { taosPrintLog("CTG ", ctgDebugFlag, __VA_ARGS__); }} while(0)
#define ctgDebugL(...) do { if (ctgDebugFlag & DEBUG_DEBUG) { taosPrintLongString("CTG ", ctgDebugFlag, __VA_ARGS__); }} while(0)
#define CTG_IS_META_NONE(type) ((type) == META_TYPE_NON_TABLE)
#define CTG_IS_META_CTABLE(type) ((type) == META_TYPE_CTABLE)
#define CTG_IS_META_TABLE(type) ((type) == META_TYPE_TABLE)
#define CTG_IS_META_BOTH(type) ((type) == META_TYPE_BOTH_TABLE)
#define CTG_IS_STABLE(isSTable) (1 == (isSTable))
#define CTG_IS_NOT_STABLE(isSTable) (0 == (isSTable))
#define CTG_IS_UNKNOWN_STABLE(isSTable) ((isSTable) < 0)
#define CTG_SET_STABLE(isSTable, tbType) do { (isSTable) = ((tbType) == TSDB_SUPER_TABLE) ? 1 : ((tbType) > TSDB_SUPER_TABLE ? 0 : -1); } while (0)
#define CTG_TBTYPE_MATCH(isSTable, tbType) (CTG_IS_UNKNOWN_STABLE(isSTable) || (CTG_IS_STABLE(isSTable) && (tbType) == TSDB_SUPER_TABLE) || (CTG_IS_NOT_STABLE(isSTable) && (tbType) != TSDB_SUPER_TABLE))
#define CTG_TABLE_NOT_EXIST(code) (code == CTG_ERR_CODE_TABLE_NOT_EXIST)
#define ctgFatal(param, ...) qFatal("CTG:%p " param, pCatalog, __VA_ARGS__)
#define ctgError(param, ...) qError("CTG:%p " param, pCatalog, __VA_ARGS__)
#define ctgWarn(param, ...) qWarn("CTG:%p " param, pCatalog, __VA_ARGS__)
#define ctgInfo(param, ...) qInfo("CTG:%p " param, pCatalog, __VA_ARGS__)
#define ctgDebug(param, ...) qDebug("CTG:%p " param, pCatalog, __VA_ARGS__)
#define ctgTrace(param, ...) qTrace("CTG:%p " param, pCatalog, __VA_ARGS__)
#define CTG_ERR_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; return _code; } } while (0)
#define CTG_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; } return _code; } while (0)
#define CTG_ERR_LRET(c,...) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { ctgError(__VA_ARGS__); terrno = _code; return _code; } } while (0)
#define CTG_ERR_JRET(c) do { code = c; if (code != TSDB_CODE_SUCCESS) { terrno = code; goto _return; } } while (0)
#define TD_RWLATCH_WRITE_FLAG_COPY 0x40000000
......@@ -82,15 +139,15 @@ typedef uint32_t (*tableNameHashFp)(const char *, uint32_t);
#define CTG_LOCK(type, _lock) do { \
if (CTG_READ == (type)) { \
assert(atomic_load_32((_lock)) >= 0); \
ctgDebug("CTG RLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
qDebug("CTG RLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
taosRLockLatch(_lock); \
ctgDebug("CTG RLOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
qDebug("CTG RLOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
assert(atomic_load_32((_lock)) > 0); \
} else { \
assert(atomic_load_32((_lock)) >= 0); \
ctgDebug("CTG WLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
qDebug("CTG WLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
taosWLockLatch(_lock); \
ctgDebug("CTG WLOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
qDebug("CTG WLOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
assert(atomic_load_32((_lock)) == TD_RWLATCH_WRITE_FLAG_COPY); \
} \
} while (0)
......@@ -98,15 +155,15 @@ typedef uint32_t (*tableNameHashFp)(const char *, uint32_t);
#define CTG_UNLOCK(type, _lock) do { \
if (CTG_READ == (type)) { \
assert(atomic_load_32((_lock)) > 0); \
ctgDebug("CTG RULOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
qDebug("CTG RULOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
taosRUnLockLatch(_lock); \
ctgDebug("CTG RULOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
qDebug("CTG RULOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
assert(atomic_load_32((_lock)) >= 0); \
} else { \
assert(atomic_load_32((_lock)) == TD_RWLATCH_WRITE_FLAG_COPY); \
ctgDebug("CTG WULOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
qDebug("CTG WULOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
taosWUnLockLatch(_lock); \
ctgDebug("CTG WULOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
qDebug("CTG WULOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
assert(atomic_load_32((_lock)) >= 0); \
} \
} while (0)
......
......@@ -23,7 +23,7 @@ SCatalogMgmt ctgMgmt = {0};
int32_t ctgGetDBVgroupFromCache(struct SCatalog* pCatalog, const char *dbName, SDBVgroupInfo **dbInfo, bool *inCache) {
if (NULL == pCatalog->dbCache.cache) {
*inCache = false;
ctgWarn("no db cache");
ctgWarn("empty db cache, dbName:%s", dbName);
return TSDB_CODE_SUCCESS;
}
......@@ -34,7 +34,7 @@ int32_t ctgGetDBVgroupFromCache(struct SCatalog* pCatalog, const char *dbName, S
if (NULL == info) {
*inCache = false;
ctgWarn("no db cache, dbName:%s", dbName);
ctgWarn("not in db vgroup cache, dbName:%s", dbName);
return TSDB_CODE_SUCCESS;
}
......@@ -52,6 +52,8 @@ int32_t ctgGetDBVgroupFromCache(struct SCatalog* pCatalog, const char *dbName, S
*dbInfo = info;
*inCache = true;
ctgDebug("Got db vgroup from cache, dbName:%s", dbName);
return TSDB_CODE_SUCCESS;
}
......@@ -63,7 +65,13 @@ int32_t ctgGetDBVgroupFromMnode(struct SCatalog* pCatalog, void *pRpc, const SEp
SEpSet *pVnodeEpSet = NULL;
int32_t msgLen = 0;
CTG_ERR_RET(queryBuildMsg[TMSG_INDEX(TDMT_MND_USE_DB)](input, &msg, 0, &msgLen));
ctgDebug("try to get db vgroup from mnode, db:%s", input->db);
int32_t code = queryBuildMsg[TMSG_INDEX(TDMT_MND_USE_DB)](input, &msg, 0, &msgLen);
if (code) {
ctgError("Build use db msg failed, code:%x, db:%s", code, input->db);
CTG_ERR_RET(code);
}
SRpcMsg rpcMsg = {
.msgType = TDMT_MND_USE_DB,
......@@ -75,12 +83,39 @@ int32_t ctgGetDBVgroupFromMnode(struct SCatalog* pCatalog, void *pRpc, const SEp
rpcSendRecv(pRpc, (SEpSet*)pMgmtEps, &rpcMsg, &rpcRsp);
if (TSDB_CODE_SUCCESS != rpcRsp.code) {
ctgError("error rsp for use db, code:%x", rpcRsp.code);
ctgError("error rsp for use db, code:%x, db:%s", rpcRsp.code, input->db);
CTG_ERR_RET(rpcRsp.code);
}
CTG_ERR_RET(queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_USE_DB)](out, rpcRsp.pCont, rpcRsp.contLen));
code = queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_USE_DB)](out, rpcRsp.pCont, rpcRsp.contLen);
if (code) {
ctgError("Process use db rsp failed, code:%x, db:%s", code, input->db);
CTG_ERR_RET(code);
}
return TSDB_CODE_SUCCESS;
}
int32_t ctgIsTableMetaExistInCache(struct SCatalog* pCatalog, const char* tbFullName, int32_t *exist) {
if (NULL == pCatalog->tableCache.cache) {
*exist = 0;
ctgWarn("empty tablemeta cache, tbName:%s", tbFullName);
return TSDB_CODE_SUCCESS;
}
size_t sz = 0;
STableMeta *tbMeta = taosHashGet(pCatalog->tableCache.cache, tbFullName, strlen(tbFullName));
if (NULL == tbMeta) {
*exist = 0;
ctgDebug("tablemeta not in cache, tbName:%s", tbFullName);
return TSDB_CODE_SUCCESS;
}
*exist = 1;
ctgDebug("tablemeta is in cache, tbName:%s", tbFullName);
return TSDB_CODE_SUCCESS;
}
......@@ -88,6 +123,7 @@ int32_t ctgGetDBVgroupFromMnode(struct SCatalog* pCatalog, void *pRpc, const SEp
int32_t ctgGetTableMetaFromCache(struct SCatalog* pCatalog, const SName* pTableName, STableMeta** pTableMeta, int32_t *exist) {
if (NULL == pCatalog->tableCache.cache) {
*exist = 0;
ctgWarn("empty tablemeta cache, tbName:%s", pTableName->tname);
return TSDB_CODE_SUCCESS;
}
......@@ -101,12 +137,17 @@ int32_t ctgGetTableMetaFromCache(struct SCatalog* pCatalog, const SName* pTableN
if (NULL == *pTableMeta) {
*exist = 0;
ctgDebug("tablemeta not in cache, tbName:%s", tbFullName);
return TSDB_CODE_SUCCESS;
}
*exist = 1;
tbMeta = *pTableMeta;
if (tbMeta->tableType != TSDB_CHILD_TABLE) {
ctgDebug("Got tablemeta from cache, tbName:%s", tbFullName);
return TSDB_CODE_SUCCESS;
}
......@@ -115,7 +156,7 @@ int32_t ctgGetTableMetaFromCache(struct SCatalog* pCatalog, const SName* pTableN
STableMeta **stbMeta = taosHashGet(pCatalog->tableCache.stableCache, &tbMeta->suid, sizeof(tbMeta->suid));
if (NULL == stbMeta || NULL == *stbMeta) {
CTG_UNLOCK(CTG_READ, &pCatalog->tableCache.stableLock);
qError("no stable:%"PRIx64 " meta in cache", tbMeta->suid);
ctgError("stable not in stableCache, suid:%"PRIx64, tbMeta->suid);
tfree(*pTableMeta);
*exist = 0;
return TSDB_CODE_SUCCESS;
......@@ -124,7 +165,7 @@ int32_t ctgGetTableMetaFromCache(struct SCatalog* pCatalog, const SName* pTableN
if ((*stbMeta)->suid != tbMeta->suid) {
CTG_UNLOCK(CTG_READ, &pCatalog->tableCache.stableLock);
tfree(*pTableMeta);
ctgError("stable cache error, expected suid:%"PRId64 ",actual suid:%"PRId64, tbMeta->suid, (*stbMeta)->suid);
ctgError("stable suid in stableCache mis-match, expected suid:%"PRIx64 ",actual suid:%"PRIx64, tbMeta->suid, (*stbMeta)->suid);
CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
}
......@@ -132,17 +173,47 @@ int32_t ctgGetTableMetaFromCache(struct SCatalog* pCatalog, const SName* pTableN
*pTableMeta = realloc(*pTableMeta, metaSize);
if (NULL == *pTableMeta) {
CTG_UNLOCK(CTG_READ, &pCatalog->tableCache.stableLock);
ctgError("calloc size[%d] failed", metaSize);
ctgError("realloc size[%d] failed", metaSize);
CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
}
memcpy(&(*pTableMeta)->sversion, &(*stbMeta)->sversion, metaSize - sizeof(SCTableMeta));
CTG_UNLOCK(CTG_READ, &pCatalog->tableCache.stableLock);
ctgDebug("Got tablemeta from cache, tbName:%s", tbFullName);
return TSDB_CODE_SUCCESS;
}
int32_t ctgGetTableTypeFromCache(struct SCatalog* pCatalog, const SName* pTableName, int32_t *tbType) {
if (NULL == pCatalog->tableCache.cache) {
ctgWarn("empty tablemeta cache, tbName:%s", pTableName->tname);
return TSDB_CODE_SUCCESS;
}
char tbFullName[TSDB_TABLE_FNAME_LEN];
tNameExtractFullName(pTableName, tbFullName);
size_t sz = 0;
STableMeta *pTableMeta = NULL;
taosHashGetCloneExt(pCatalog->tableCache.cache, tbFullName, strlen(tbFullName), NULL, (void **)&pTableMeta, &sz);
if (NULL == pTableMeta) {
ctgWarn("tablemeta not in cache, tbName:%s", tbFullName);
return TSDB_CODE_SUCCESS;
}
*tbType = pTableMeta->tableType;
ctgDebug("Got tabletype from cache, tbName:%s, type:%d", tbFullName, *tbType);
return TSDB_CODE_SUCCESS;
}
void ctgGenEpSet(SEpSet *epSet, SVgroupInfo *vgroupInfo) {
epSet->inUse = 0;
epSet->numOfEps = vgroupInfo->numOfEps;
......@@ -153,20 +224,19 @@ void ctgGenEpSet(SEpSet *epSet, SVgroupInfo *vgroupInfo) {
}
}
int32_t ctgGetTableMetaFromMnode(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const SName* pTableName, STableMetaOutput* output) {
if (NULL == pCatalog || NULL == pRpc || NULL == pMgmtEps || NULL == pTableName || NULL == output) {
CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
}
char tbFullName[TSDB_TABLE_FNAME_LEN];
tNameExtractFullName(pTableName, tbFullName);
int32_t ctgGetTableMetaFromMnodeImpl(struct SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, char* tbFullName, STableMetaOutput* output) {
SBuildTableMetaInput bInput = {.vgId = 0, .dbName = NULL, .tableFullName = tbFullName};
char *msg = NULL;
SEpSet *pVnodeEpSet = NULL;
int32_t msgLen = 0;
CTG_ERR_RET(queryBuildMsg[TMSG_INDEX(TDMT_MND_STB_META)](&bInput, &msg, 0, &msgLen));
ctgDebug("try to get table meta from mnode, tbName:%s", tbFullName);
int32_t code = queryBuildMsg[TMSG_INDEX(TDMT_MND_STB_META)](&bInput, &msg, 0, &msgLen);
if (code) {
ctgError("Build mnode stablemeta msg failed, code:%x", code);
CTG_ERR_RET(code);
}
SRpcMsg rpcMsg = {
.msgType = TDMT_MND_STB_META,
......@@ -176,33 +246,57 @@ int32_t ctgGetTableMetaFromMnode(struct SCatalog* pCatalog, void *pRpc, const SE
SRpcMsg rpcRsp = {0};
rpcSendRecv(pRpc, (SEpSet*)pMgmtEps, &rpcMsg, &rpcRsp);
rpcSendRecv(pTransporter, (SEpSet*)pMgmtEps, &rpcMsg, &rpcRsp);
if (TSDB_CODE_SUCCESS != rpcRsp.code) {
ctgError("error rsp for table meta, code:%x", rpcRsp.code);
if (CTG_TABLE_NOT_EXIST(rpcRsp.code)) {
SET_META_TYPE_NONE(output->metaType);
ctgDebug("stablemeta not exist in mnode, tbName:%s", tbFullName);
return TSDB_CODE_SUCCESS;
}
ctgError("error rsp for stablemeta from mnode, code:%x, tbName:%s", rpcRsp.code, tbFullName);
CTG_ERR_RET(rpcRsp.code);
}
CTG_ERR_RET(queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_STB_META)](output, rpcRsp.pCont, rpcRsp.contLen));
code = queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_STB_META)](output, rpcRsp.pCont, rpcRsp.contLen);
if (code) {
ctgError("Process mnode stablemeta rsp failed, code:%x, tbName:%s", code, tbFullName);
CTG_ERR_RET(code);
}
ctgDebug("Got table meta from mnode, tbName:%s", tbFullName);
return TSDB_CODE_SUCCESS;
}
int32_t ctgGetTableMetaFromMnode(struct SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, STableMetaOutput* output) {
char tbFullName[TSDB_TABLE_FNAME_LEN];
tNameExtractFullName(pTableName, tbFullName);
return ctgGetTableMetaFromMnodeImpl(pCatalog, pTransporter, pMgmtEps, tbFullName, output);
}
int32_t ctgGetTableMetaFromVnode(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const SName* pTableName, SVgroupInfo *vgroupInfo, STableMetaOutput* output) {
if (NULL == pCatalog || NULL == pRpc || NULL == pMgmtEps || NULL == pTableName || NULL == vgroupInfo || NULL == output) {
int32_t ctgGetTableMetaFromVnode(struct SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, SVgroupInfo *vgroupInfo, STableMetaOutput* output) {
if (NULL == pCatalog || NULL == pTransporter || NULL == pMgmtEps || NULL == pTableName || NULL == vgroupInfo || NULL == output) {
CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
}
char dbFullName[TSDB_DB_FNAME_LEN];
tNameGetFullDbName(pTableName, dbFullName);
SBuildTableMetaInput bInput = {.vgId = vgroupInfo->vgId, .dbName = dbFullName, .tableFullName = pTableName->tname};
ctgDebug("try to get table meta from vnode, db:%s, tbName:%s", dbFullName, pTableName->tname);
SBuildTableMetaInput bInput = {.vgId = vgroupInfo->vgId, .dbName = dbFullName, .tableFullName = (char *)pTableName->tname};
char *msg = NULL;
SEpSet *pVnodeEpSet = NULL;
int32_t msgLen = 0;
CTG_ERR_RET(queryBuildMsg[TMSG_INDEX(TDMT_VND_TABLE_META)](&bInput, &msg, 0, &msgLen));
int32_t code = queryBuildMsg[TMSG_INDEX(TDMT_VND_TABLE_META)](&bInput, &msg, 0, &msgLen);
if (code) {
ctgError("Build vnode tablemeta msg failed, code:%x, tbName:%s", code, pTableName->tname);
CTG_ERR_RET(code);
}
SRpcMsg rpcMsg = {
.msgType = TDMT_VND_TABLE_META,
......@@ -214,14 +308,26 @@ int32_t ctgGetTableMetaFromVnode(struct SCatalog* pCatalog, void *pRpc, const SE
SEpSet epSet;
ctgGenEpSet(&epSet, vgroupInfo);
rpcSendRecv(pRpc, &epSet, &rpcMsg, &rpcRsp);
rpcSendRecv(pTransporter, &epSet, &rpcMsg, &rpcRsp);
if (TSDB_CODE_SUCCESS != rpcRsp.code) {
ctgError("error rsp for table meta, code:%x", rpcRsp.code);
if (CTG_TABLE_NOT_EXIST(rpcRsp.code)) {
SET_META_TYPE_NONE(output->metaType);
ctgDebug("tablemeta not exist in vnode, tbName:%s", pTableName->tname);
return TSDB_CODE_SUCCESS;
}
ctgError("error rsp for table meta from vnode, code:%x, tbName:%s", rpcRsp.code, pTableName->tname);
CTG_ERR_RET(rpcRsp.code);
}
CTG_ERR_RET(queryProcessMsgRsp[TMSG_INDEX(TDMT_VND_TABLE_META)](output, rpcRsp.pCont, rpcRsp.contLen));
code = queryProcessMsgRsp[TMSG_INDEX(TDMT_VND_TABLE_META)](output, rpcRsp.pCont, rpcRsp.contLen);
if (code) {
ctgError("Process vnode tablemeta rsp failed, code:%x, tbName:%s", code, pTableName->tname);
CTG_ERR_RET(code);
}
ctgDebug("Got table meta from vnode, db:%s, tbName:%s", dbFullName, pTableName->tname);
return TSDB_CODE_SUCCESS;
}
......@@ -242,10 +348,11 @@ int32_t ctgGetVgInfoFromDB(struct SCatalog *pCatalog, void *pRpc, const SEpSet *
SVgroupInfo *vgInfo = NULL;
SArray *vgList = NULL;
int32_t code = 0;
int32_t vgNum = taosHashGetSize(dbInfo->vgInfo);
vgList = taosArrayInit(taosHashGetSize(dbInfo->vgInfo), sizeof(SVgroupInfo));
vgList = taosArrayInit(vgNum, sizeof(SVgroupInfo));
if (NULL == vgList) {
ctgError("taosArrayInit failed");
ctgError("taosArrayInit failed, num:%d", vgNum);
CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
}
......@@ -254,7 +361,7 @@ int32_t ctgGetVgInfoFromDB(struct SCatalog *pCatalog, void *pRpc, const SEpSet *
vgInfo = pIter;
if (NULL == taosArrayPush(vgList, vgInfo)) {
ctgError("taosArrayPush failed");
ctgError("taosArrayPush failed, vgId:%d", vgInfo->vgId);
CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
}
......@@ -265,6 +372,8 @@ int32_t ctgGetVgInfoFromDB(struct SCatalog *pCatalog, void *pRpc, const SEpSet *
*vgroupList = vgList;
vgList = NULL;
ctgDebug("Got vg list from DB, vgNum:%d", vgNum);
return TSDB_CODE_SUCCESS;
_return:
......@@ -276,7 +385,7 @@ _return:
CTG_RET(code);
}
int32_t ctgGetVgInfoFromHashValue(SDBVgroupInfo *dbInfo, const SName *pTableName, SVgroupInfo *pVgroup) {
int32_t ctgGetVgInfoFromHashValue(struct SCatalog *pCatalog, SDBVgroupInfo *dbInfo, const SName *pTableName, SVgroupInfo *pVgroup) {
int32_t code = 0;
int32_t vgNum = taosHashGetSize(dbInfo->vgInfo);
......@@ -284,7 +393,7 @@ int32_t ctgGetVgInfoFromHashValue(SDBVgroupInfo *dbInfo, const SName *pTableName
tNameGetFullDbName(pTableName, db);
if (vgNum <= 0) {
ctgError("db[%s] vgroup cache invalid, vgroup number:%d", db, vgNum);
ctgError("db vgroup cache invalid, db:%s, vgroup number:%d", db, vgNum);
CTG_ERR_RET(TSDB_CODE_TSC_DB_NOT_SELECTED);
}
......@@ -311,7 +420,7 @@ int32_t ctgGetVgInfoFromHashValue(SDBVgroupInfo *dbInfo, const SName *pTableName
}
if (NULL == vgInfo) {
ctgError("no hash range found for hash value [%u], numOfVgId:%d", hashValue, taosHashGetSize(dbInfo->vgInfo));
ctgError("no hash range found for hash value [%u], db:%s, numOfVgId:%d", hashValue, db, taosHashGetSize(dbInfo->vgInfo));
CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
}
......@@ -321,99 +430,274 @@ _return:
CTG_RET(code);
}
int32_t ctgGetTableMetaImpl(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const SName* pTableName, bool forceUpdate, STableMeta** pTableMeta) {
if (NULL == pCatalog || NULL == pRpc || NULL == pMgmtEps || NULL == pTableName || NULL == pTableMeta) {
CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
int32_t ctgSTableVersionCompare(const void* key1, const void* key2) {
if (((SSTableMetaVersion*)key1)->suid < ((SSTableMetaVersion*)key2)->suid) {
return -1;
} else if (((SSTableMetaVersion*)key1)->suid > ((SSTableMetaVersion*)key2)->suid) {
return 1;
} else {
return 0;
}
}
int32_t ctgDbVgVersionCompare(const void* key1, const void* key2) {
if (((SDbVgVersion*)key1)->dbId < ((SDbVgVersion*)key2)->dbId) {
return -1;
} else if (((SDbVgVersion*)key1)->dbId > ((SDbVgVersion*)key2)->dbId) {
return 1;
} else {
return 0;
}
}
int32_t ctgMetaRentInit(SMetaRentMgmt *mgmt, uint32_t rentSec, int8_t type) {
mgmt->slotRIdx = 0;
mgmt->slotNum = rentSec / CTG_RENT_SLOT_SECOND;
mgmt->type = type;
size_t msgSize = sizeof(SRentSlotInfo) * mgmt->slotNum;
int32_t exist = 0;
mgmt->slots = calloc(1, msgSize);
if (NULL == mgmt->slots) {
qError("calloc %d failed", (int32_t)msgSize);
return TSDB_CODE_CTG_MEM_ERROR;
}
if (!forceUpdate) {
CTG_ERR_RET(ctgGetTableMetaFromCache(pCatalog, pTableName, pTableMeta, &exist));
qDebug("meta rent initialized, type:%d, slotNum:%d", type, mgmt->slotNum);
return TSDB_CODE_SUCCESS;
}
if (exist) {
return TSDB_CODE_SUCCESS;
int32_t ctgMetaRentAdd(SMetaRentMgmt *mgmt, void *meta, int64_t id, int32_t size) {
int16_t widx = abs(id % mgmt->slotNum);
SRentSlotInfo *slot = &mgmt->slots[widx];
int32_t code = 0;
CTG_LOCK(CTG_WRITE, &slot->lock);
if (NULL == slot->meta) {
slot->meta = taosArrayInit(CTG_DEFAULT_RENT_SLOT_SIZE, size);
if (NULL == slot->meta) {
qError("taosArrayInit %d failed, id:%"PRIx64", slot idx:%d, type:%d", CTG_DEFAULT_RENT_SLOT_SIZE, id, widx, mgmt->type);
CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
}
}
CTG_ERR_RET(catalogRenewTableMeta(pCatalog, pRpc, pMgmtEps, pTableName));
if (NULL == taosArrayPush(slot->meta, meta)) {
qError("taosArrayPush meta to rent failed, id:%"PRIx64", slot idx:%d, type:%d", id, widx, mgmt->type);
CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
}
slot->needSort = true;
CTG_ERR_RET(ctgGetTableMetaFromCache(pCatalog, pTableName, pTableMeta, &exist));
qDebug("add meta to rent, id:%"PRIx64", slot idx:%d, type:%d", id, widx, mgmt->type);
if (0 == exist) {
ctgError("get table meta from cache failed, but fetch succeed");
CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
_return:
CTG_UNLOCK(CTG_WRITE, &slot->lock);
CTG_RET(code);
}
int32_t ctgMetaRentUpdate(SMetaRentMgmt *mgmt, void *meta, int64_t id, int32_t size, __compar_fn_t compare) {
int16_t widx = abs(id % mgmt->slotNum);
SRentSlotInfo *slot = &mgmt->slots[widx];
int32_t code = 0;
CTG_LOCK(CTG_WRITE, &slot->lock);
if (NULL == slot->meta) {
qError("meta in slot is empty, id:%"PRIx64", slot idx:%d, type:%d", id, widx, mgmt->type);
CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
}
if (slot->needSort) {
taosArraySort(slot->meta, compare);
slot->needSort = false;
qDebug("slot meta sorted, slot idx:%d, type:%d", widx, mgmt->type);
}
void *orig = taosArraySearch(slot->meta, &id, compare, TD_EQ);
if (NULL == orig) {
qError("meta not found in slot, id:%"PRIx64", slot idx:%d, type:%d", id, widx, mgmt->type);
CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
}
memcpy(orig, meta, size);
qDebug("meta in rent updated, id:%"PRIx64", slot idx:%d, type:%d", id, widx, mgmt->type);
_return:
CTG_UNLOCK(CTG_WRITE, &slot->lock);
if (code) {
qWarn("meta in rent update failed, will try to add it, code:%x, id:%"PRIx64", slot idx:%d, type:%d", code, id, widx, mgmt->type);
CTG_RET(ctgMetaRentAdd(mgmt, meta, id, size));
}
CTG_RET(code);
}
int32_t ctgMetaRentGetImpl(SMetaRentMgmt *mgmt, void **res, uint32_t *num, int32_t size) {
int16_t ridx = atomic_add_fetch_16(&mgmt->slotRIdx, 1);
if (ridx >= mgmt->slotNum) {
ridx %= mgmt->slotNum;
atomic_store_16(&mgmt->slotRIdx, ridx);
}
SRentSlotInfo *slot = &mgmt->slots[ridx];
int32_t code = 0;
CTG_LOCK(CTG_READ, &slot->lock);
if (NULL == slot->meta) {
qDebug("empty meta in slot:%d, type:%d", ridx, mgmt->type);
*num = 0;
goto _return;
}
size_t metaNum = taosArrayGetSize(slot->meta);
if (metaNum <= 0) {
qDebug("no meta in slot:%d, type:%d", ridx, mgmt->type);
*num = 0;
goto _return;
}
size_t msize = metaNum * size;
*res = malloc(msize);
if (NULL == *res) {
qError("malloc %d failed", (int32_t)msize);
CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
}
void *meta = taosArrayGet(slot->meta, 0);
memcpy(*res, meta, msize);
*num = (uint32_t)metaNum;
qDebug("Got %d meta from rent, type:%d", (int32_t)metaNum, mgmt->type);
_return:
CTG_UNLOCK(CTG_READ, &slot->lock);
CTG_RET(code);
}
int32_t ctgMetaRentGet(SMetaRentMgmt *mgmt, void **res, uint32_t *num, int32_t size) {
while (true) {
int64_t msec = taosGetTimestampMs();
int64_t lsec = atomic_load_64(&mgmt->lastReadMsec);
if ((msec - lsec) < CTG_RENT_SLOT_SECOND * 1000) {
*res = NULL;
*num = 0;
qDebug("too short time period to get expired meta, type:%d", mgmt->type);
return TSDB_CODE_SUCCESS;
}
if (lsec != atomic_val_compare_exchange_64(&mgmt->lastReadMsec, lsec, msec)) {
continue;
}
break;
}
CTG_ERR_RET(ctgMetaRentGetImpl(mgmt, res, num, size));
return TSDB_CODE_SUCCESS;
}
int32_t ctgUpdateTableMetaCache(struct SCatalog *pCatalog, STableMetaOutput *output) {
int32_t code = 0;
if (output->metaNum != 1 && output->metaNum != 2) {
ctgError("invalid table meta number[%d] got from meta rsp", output->metaNum);
CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
}
if (NULL == output->tbMeta) {
ctgError("no valid table meta got from meta rsp");
ctgError("no valid table meta got from meta rsp, tbName:%s", output->tbFname);
CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
}
if (NULL == pCatalog->tableCache.cache) {
pCatalog->tableCache.cache = taosHashInit(ctgMgmt.cfg.maxTblCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
if (NULL == pCatalog->tableCache.cache) {
ctgError("init hash[%d] for tablemeta cache failed", ctgMgmt.cfg.maxTblCacheNum);
SHashObj *cache = taosHashInit(ctgMgmt.cfg.maxTblCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
if (NULL == cache) {
ctgError("taosHashInit failed, num:%d", ctgMgmt.cfg.maxTblCacheNum);
CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
}
if (NULL != atomic_val_compare_exchange_ptr(&pCatalog->tableCache.cache, NULL, cache)) {
taosHashCleanup(cache);
}
}
if (NULL == pCatalog->tableCache.stableCache) {
pCatalog->tableCache.stableCache = taosHashInit(ctgMgmt.cfg.maxTblCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), true, HASH_ENTRY_LOCK);
if (NULL == pCatalog->tableCache.stableCache) {
ctgError("init hash[%d] for stablemeta cache failed", ctgMgmt.cfg.maxTblCacheNum);
SHashObj *cache = taosHashInit(ctgMgmt.cfg.maxTblCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), true, HASH_ENTRY_LOCK);
if (NULL == cache) {
ctgError("taosHashInit failed, num:%d", ctgMgmt.cfg.maxTblCacheNum);
CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
}
if (NULL != atomic_val_compare_exchange_ptr(&pCatalog->tableCache.stableCache, NULL, cache)) {
taosHashCleanup(cache);
}
}
if (output->metaNum == 2) {
if (CTG_IS_META_CTABLE(output->metaType) || CTG_IS_META_BOTH(output->metaType)) {
if (taosHashPut(pCatalog->tableCache.cache, output->ctbFname, strlen(output->ctbFname), &output->ctbMeta, sizeof(output->ctbMeta)) != 0) {
ctgError("push ctable[%s] to table cache failed", output->ctbFname);
ctgError("taosHashPut ctablemeta to cache failed, ctbName:%s", output->ctbFname);
CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
}
if (TSDB_SUPER_TABLE != output->tbMeta->tableType) {
ctgError("table type[%d] error, expected:%d", output->tbMeta->tableType, TSDB_SUPER_TABLE);
CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
}
ctgDebug("update child tablemeta to cache, tbName:%s", output->ctbFname);
}
if (CTG_IS_META_CTABLE(output->metaType)) {
return TSDB_CODE_SUCCESS;
}
if (CTG_IS_META_BOTH(output->metaType) && TSDB_SUPER_TABLE != output->tbMeta->tableType) {
ctgError("table type error, expected:%d, actual:%d", TSDB_SUPER_TABLE, output->tbMeta->tableType);
CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
}
int32_t tbSize = sizeof(*output->tbMeta) + sizeof(SSchema) * (output->tbMeta->tableInfo.numOfColumns + output->tbMeta->tableInfo.numOfTags);
if (TSDB_SUPER_TABLE == output->tbMeta->tableType) {
bool newAdded = false;
SSTableMetaVersion metaRent = {.suid = output->tbMeta->suid, .sversion = output->tbMeta->sversion, .tversion = output->tbMeta->tversion};
CTG_LOCK(CTG_WRITE, &pCatalog->tableCache.stableLock);
if (taosHashPut(pCatalog->tableCache.cache, output->tbFname, strlen(output->tbFname), output->tbMeta, tbSize) != 0) {
CTG_UNLOCK(CTG_WRITE, &pCatalog->tableCache.stableLock);
ctgError("push table[%s] to table cache failed", output->tbFname);
ctgError("taosHashPut tablemeta to cache failed, tbName:%s", output->tbFname);
CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
}
STableMeta *tbMeta = taosHashGet(pCatalog->tableCache.cache, output->tbFname, strlen(output->tbFname));
if (taosHashPut(pCatalog->tableCache.stableCache, &output->tbMeta->suid, sizeof(output->tbMeta->suid), &tbMeta, POINTER_BYTES) != 0) {
if (taosHashPutExt(pCatalog->tableCache.stableCache, &output->tbMeta->suid, sizeof(output->tbMeta->suid), &tbMeta, POINTER_BYTES, &newAdded) != 0) {
CTG_UNLOCK(CTG_WRITE, &pCatalog->tableCache.stableLock);
ctgError("push suid[%"PRIu64"] to stable cache failed", output->tbMeta->suid);
ctgError("taosHashPutExt stable to stable cache failed, suid:%"PRIx64, output->tbMeta->suid);
CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
}
CTG_UNLOCK(CTG_WRITE, &pCatalog->tableCache.stableLock);
ctgDebug("update stable to cache, suid:%"PRIx64, output->tbMeta->suid);
if (newAdded) {
CTG_ERR_RET(ctgMetaRentAdd(&pCatalog->stableRent, &metaRent, metaRent.suid, sizeof(SSTableMetaVersion)));
} else {
CTG_ERR_RET(ctgMetaRentUpdate(&pCatalog->stableRent, &metaRent, metaRent.suid, sizeof(SSTableMetaVersion), ctgSTableVersionCompare));
}
} else {
if (taosHashPut(pCatalog->tableCache.cache, output->tbFname, strlen(output->tbFname), output->tbMeta, tbSize) != 0) {
ctgError("push table[%s] to table cache failed", output->tbFname);
ctgError("taosHashPut tablemeta to cache failed, tbName:%s", output->tbFname);
CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
}
}
ctgDebug("update tablemeta to cache, tbName:%s", output->tbFname);
CTG_RET(code);
}
......@@ -440,7 +724,7 @@ int32_t ctgGetDBVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgm
CTG_ERR_RET(ctgGetDBVgroupFromCache(pCatalog, dbName, dbInfo, &inCache));
if (!inCache) {
ctgWarn("get DB vgroupInfo from cache failed, dbName:%s", dbName);
ctgWarn("can't get db vgroup from cache, will retry, db:%s", dbName);
continue;
}
......@@ -456,7 +740,7 @@ int32_t ctgValidateAndRemoveDb(struct SCatalog* pCatalog, const char* dbName, SD
if (oldInfo) {
CTG_LOCK(CTG_WRITE, &oldInfo->lock);
if (dbInfo->vgVersion <= oldInfo->vgVersion) {
ctgInfo("dbName:%s vg will not update, vgVersion:%d , current:%d", dbName, dbInfo->vgVersion, oldInfo->vgVersion);
ctgInfo("db vgVersion is not new, db:%s, vgVersion:%d, current:%d", dbName, dbInfo->vgVersion, oldInfo->vgVersion);
CTG_UNLOCK(CTG_WRITE, &oldInfo->lock);
taosHashRelease(pCatalog->dbCache.cache, oldInfo);
......@@ -464,7 +748,7 @@ int32_t ctgValidateAndRemoveDb(struct SCatalog* pCatalog, const char* dbName, SD
}
if (oldInfo->vgInfo) {
ctgInfo("dbName:%s vg will be cleanup", dbName);
ctgInfo("cleanup db vgInfo, db:%s", dbName);
taosHashCleanup(oldInfo->vgInfo);
oldInfo->vgInfo = NULL;
}
......@@ -477,10 +761,174 @@ int32_t ctgValidateAndRemoveDb(struct SCatalog* pCatalog, const char* dbName, SD
return TSDB_CODE_SUCCESS;
}
int32_t ctgRenewTableMetaImpl(struct SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, int32_t isSTable) {
if (NULL == pCatalog || NULL == pTransporter || NULL == pMgmtEps || NULL == pTableName) {
CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
}
SVgroupInfo vgroupInfo = {0};
int32_t code = 0;
CTG_ERR_RET(catalogGetTableHashVgroup(pCatalog, pTransporter, pMgmtEps, pTableName, &vgroupInfo));
STableMetaOutput voutput = {0};
STableMetaOutput moutput = {0};
STableMetaOutput *output = &voutput;
if (CTG_IS_STABLE(isSTable)) {
ctgDebug("will renew table meta, supposed to be stable, tbName:%s", pTableName->tname);
// if get from mnode failed, will not try vnode
CTG_ERR_JRET(ctgGetTableMetaFromMnode(pCatalog, pTransporter, pMgmtEps, pTableName, &moutput));
if (CTG_IS_META_NONE(moutput.metaType)) {
CTG_ERR_JRET(ctgGetTableMetaFromVnode(pCatalog, pTransporter, pMgmtEps, pTableName, &vgroupInfo, &voutput));
} else {
output = &moutput;
}
} else {
ctgDebug("will renew table meta, not supposed to be stable, tbName:%s, isStable:%d", pTableName->tname, isSTable);
// if get from vnode failed or no table meta, will not try mnode
CTG_ERR_JRET(ctgGetTableMetaFromVnode(pCatalog, pTransporter, pMgmtEps, pTableName, &vgroupInfo, &voutput));
if (CTG_IS_META_TABLE(voutput.metaType) && TSDB_SUPER_TABLE == voutput.tbMeta->tableType) {
ctgDebug("will continue to renew table meta since got stable, tbName:%s, metaType:%d", pTableName->tname, voutput.metaType);
CTG_ERR_JRET(ctgGetTableMetaFromMnodeImpl(pCatalog, pTransporter, pMgmtEps, voutput.tbFname, &moutput));
tfree(voutput.tbMeta);
voutput.tbMeta = moutput.tbMeta;
moutput.tbMeta = NULL;
} else if (CTG_IS_META_BOTH(voutput.metaType)) {
int32_t exist = 0;
CTG_ERR_JRET(ctgIsTableMetaExistInCache(pCatalog, voutput.tbFname, &exist));
if (0 == exist) {
CTG_ERR_JRET(ctgGetTableMetaFromMnodeImpl(pCatalog, pTransporter, pMgmtEps, voutput.tbFname, &moutput));
if (CTG_IS_META_NONE(moutput.metaType)) {
SET_META_TYPE_NONE(voutput.metaType);
}
tfree(voutput.tbMeta);
voutput.tbMeta = moutput.tbMeta;
moutput.tbMeta = NULL;
} else {
SET_META_TYPE_CTABLE(voutput.metaType);
}
}
}
if (CTG_IS_META_NONE(output->metaType)) {
ctgError("no tablemeta got, tbNmae:%s", pTableName->tname);
CTG_ERR_JRET(CTG_ERR_CODE_TABLE_NOT_EXIST);
}
CTG_ERR_JRET(ctgUpdateTableMetaCache(pCatalog, output));
_return:
tfree(voutput.tbMeta);
tfree(moutput.tbMeta);
CTG_RET(code);
}
int32_t ctgGetTableMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const SName* pTableName, bool forceUpdate, STableMeta** pTableMeta, int32_t isSTable) {
if (NULL == pCatalog || NULL == pRpc || NULL == pMgmtEps || NULL == pTableName || NULL == pTableMeta) {
CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
}
int32_t exist = 0;
if (!forceUpdate) {
CTG_ERR_RET(ctgGetTableMetaFromCache(pCatalog, pTableName, pTableMeta, &exist));
if (exist && CTG_TBTYPE_MATCH(isSTable, (*pTableMeta)->tableType)) {
return TSDB_CODE_SUCCESS;
}
} else if (CTG_IS_UNKNOWN_STABLE(isSTable)) {
int32_t tbType = 0;
CTG_ERR_RET(ctgGetTableTypeFromCache(pCatalog, pTableName, &tbType));
CTG_SET_STABLE(isSTable, tbType);
}
CTG_ERR_RET(ctgRenewTableMetaImpl(pCatalog, pRpc, pMgmtEps, pTableName, isSTable));
CTG_ERR_RET(ctgGetTableMetaFromCache(pCatalog, pTableName, pTableMeta, &exist));
if (0 == exist) {
ctgError("renew tablemeta succeed but get from cache failed, may be deleted, tbName:%s", pTableName->tname);
CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
}
return TSDB_CODE_SUCCESS;
}
void ctgFreeMetaRent(SMetaRentMgmt *mgmt) {
if (NULL == mgmt->slots) {
return;
}
for (int32_t i = 0; i < mgmt->slotNum; ++i) {
SRentSlotInfo *slot = &mgmt->slots[i];
if (slot->meta) {
taosArrayDestroy(slot->meta);
slot->meta = NULL;
}
}
tfree(mgmt->slots);
}
void ctgFreeDbCache(SDBVgroupCache *db) {
if (NULL == db->cache) {
return;
}
SDBVgroupInfo *dbInfo = NULL;
void *pIter = taosHashIterate(db->cache, NULL);
while (pIter) {
dbInfo = pIter;
if (dbInfo->vgInfo) {
taosHashCleanup(dbInfo->vgInfo);
dbInfo->vgInfo = NULL;
}
pIter = taosHashIterate(db->cache, pIter);
}
taosHashCleanup(db->cache);
db->cache = NULL;
}
void ctgFreeTableMetaCache(STableMetaCache *table) {
if (table->stableCache) {
taosHashCleanup(table->stableCache);
table->stableCache = NULL;
}
if (table->cache) {
taosHashCleanup(table->cache);
table->cache = NULL;
}
}
void ctgFreeHandle(struct SCatalog* pCatalog) {
ctgFreeMetaRent(&pCatalog->dbRent);
ctgFreeMetaRent(&pCatalog->stableRent);
ctgFreeDbCache(&pCatalog->dbCache);
ctgFreeTableMetaCache(&pCatalog->tableCache);
free(pCatalog);
}
int32_t catalogInit(SCatalogCfg *cfg) {
if (ctgMgmt.pCluster) {
ctgError("catalog already init");
qError("catalog already init");
CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
}
......@@ -494,16 +942,29 @@ int32_t catalogInit(SCatalogCfg *cfg) {
if (ctgMgmt.cfg.maxTblCacheNum == 0) {
ctgMgmt.cfg.maxTblCacheNum = CTG_DEFAULT_CACHE_TABLEMETA_NUMBER;
}
if (ctgMgmt.cfg.dbRentSec == 0) {
ctgMgmt.cfg.dbRentSec = CTG_DEFAULT_RENT_SECOND;
}
if (ctgMgmt.cfg.stableRentSec == 0) {
ctgMgmt.cfg.stableRentSec = CTG_DEFAULT_RENT_SECOND;
}
} else {
ctgMgmt.cfg.maxDBCacheNum = CTG_DEFAULT_CACHE_DB_NUMBER;
ctgMgmt.cfg.maxTblCacheNum = CTG_DEFAULT_CACHE_TABLEMETA_NUMBER;
ctgMgmt.cfg.dbRentSec = CTG_DEFAULT_RENT_SECOND;
ctgMgmt.cfg.stableRentSec = CTG_DEFAULT_RENT_SECOND;
}
ctgMgmt.pCluster = taosHashInit(CTG_DEFAULT_CACHE_CLUSTER_NUMBER, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK);
ctgMgmt.pCluster = taosHashInit(CTG_DEFAULT_CACHE_CLUSTER_NUMBER, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
if (NULL == ctgMgmt.pCluster) {
CTG_ERR_LRET(TSDB_CODE_CTG_INTERNAL_ERROR, "init %d cluster cache failed", CTG_DEFAULT_CACHE_CLUSTER_NUMBER);
qError("taosHashInit %d cluster cache failed", CTG_DEFAULT_CACHE_CLUSTER_NUMBER);
CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
}
qDebug("catalog initialized, maxDb:%u, maxTbl:%u, dbRentSec:%u, stableRentSec:%u", ctgMgmt.cfg.maxDBCacheNum, ctgMgmt.cfg.maxTblCacheNum, ctgMgmt.cfg.dbRentSec, ctgMgmt.cfg.stableRentSec);
return TSDB_CODE_SUCCESS;
}
......@@ -513,32 +974,75 @@ int32_t catalogGetHandle(uint64_t clusterId, struct SCatalog** catalogHandle) {
}
if (NULL == ctgMgmt.pCluster) {
ctgError("cluster cache are not ready");
qError("cluster cache are not ready, clusterId:%"PRIx64, clusterId);
CTG_ERR_RET(TSDB_CODE_CTG_NOT_READY);
}
SCatalog **ctg = (SCatalog **)taosHashGet(ctgMgmt.pCluster, (char*)&clusterId, sizeof(clusterId));
int32_t code = 0;
SCatalog *clusterCtg = NULL;
if (ctg && (*ctg)) {
*catalogHandle = *ctg;
return TSDB_CODE_SUCCESS;
}
while (true) {
SCatalog **ctg = (SCatalog **)taosHashGet(ctgMgmt.pCluster, (char*)&clusterId, sizeof(clusterId));
SCatalog *clusterCtg = calloc(1, sizeof(SCatalog));
if (NULL == clusterCtg) {
ctgError("calloc %d failed", (int32_t)sizeof(SCatalog));
CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
}
if (ctg && (*ctg)) {
*catalogHandle = *ctg;
qDebug("got catalog handle from cache, clusterId:%"PRIx64", CTG:%p", clusterId, *ctg);
return TSDB_CODE_SUCCESS;
}
if (taosHashPut(ctgMgmt.pCluster, &clusterId, sizeof(clusterId), &clusterCtg, POINTER_BYTES)) {
ctgError("put cluster %"PRIx64" cache to hash failed", clusterId);
tfree(clusterCtg);
CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
clusterCtg = calloc(1, sizeof(SCatalog));
if (NULL == clusterCtg) {
qError("calloc %d failed", (int32_t)sizeof(SCatalog));
CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
}
clusterCtg->clusterId = clusterId;
CTG_ERR_JRET(ctgMetaRentInit(&clusterCtg->dbRent, ctgMgmt.cfg.dbRentSec, CTG_RENT_DB));
CTG_ERR_JRET(ctgMetaRentInit(&clusterCtg->stableRent, ctgMgmt.cfg.stableRentSec, CTG_RENT_STABLE));
code = taosHashPut(ctgMgmt.pCluster, &clusterId, sizeof(clusterId), &clusterCtg, POINTER_BYTES);
if (code) {
if (HASH_NODE_EXIST(code)) {
ctgFreeHandle(clusterCtg);
continue;
}
qError("taosHashPut CTG to cache failed, clusterId:%"PRIx64, clusterId);
CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
}
qDebug("add CTG to cache, clusterId:%"PRIx64", CTG:%p", clusterId, clusterCtg);
break;
}
*catalogHandle = clusterCtg;
return TSDB_CODE_SUCCESS;
_return:
ctgFreeHandle(clusterCtg);
CTG_RET(code);
}
void catalogFreeHandle(struct SCatalog* pCatalog) {
if (NULL == pCatalog) {
return;
}
if (taosHashRemove(ctgMgmt.pCluster, &pCatalog->clusterId, sizeof(pCatalog->clusterId))) {
ctgWarn("taosHashRemove from cluster failed, may already be freed, clusterId:%"PRIx64, pCatalog->clusterId);
return;
}
uint64_t clusterId = pCatalog->clusterId;
ctgFreeHandle(pCatalog);
ctgInfo("handle freed, culsterId:%"PRIx64, clusterId);
}
int32_t catalogGetDBVgroupVersion(struct SCatalog* pCatalog, const char* dbName, int32_t* version) {
......@@ -548,18 +1052,22 @@ int32_t catalogGetDBVgroupVersion(struct SCatalog* pCatalog, const char* dbName,
if (NULL == pCatalog->dbCache.cache) {
*version = CTG_DEFAULT_INVALID_VERSION;
ctgInfo("empty db cache, dbName:%s", dbName);
return TSDB_CODE_SUCCESS;
}
SDBVgroupInfo * dbInfo = taosHashAcquire(pCatalog->dbCache.cache, dbName, strlen(dbName));
if (NULL == dbInfo) {
*version = CTG_DEFAULT_INVALID_VERSION;
ctgInfo("db not in cache, dbName:%s", dbName);
return TSDB_CODE_SUCCESS;
}
*version = dbInfo->vgVersion;
taosHashRelease(pCatalog->dbCache.cache, dbInfo);
ctgDebug("Got db vgVersion from cache, dbName:%s, vgVersion:%d", dbName, *version);
return TSDB_CODE_SUCCESS;
}
......@@ -577,8 +1085,8 @@ int32_t catalogGetDBVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet*
vgList = taosArrayInit(taosHashGetSize(db->vgInfo), sizeof(SVgroupInfo));
if (NULL == vgList) {
ctgError("taosArrayInit failed");
CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
ctgError("taosArrayInit %d failed", taosHashGetSize(db->vgInfo));
CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
}
void *pIter = taosHashIterate(db->vgInfo, NULL);
......@@ -586,7 +1094,7 @@ int32_t catalogGetDBVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet*
vgInfo = pIter;
if (NULL == taosArrayPush(vgList, vgInfo)) {
ctgError("taosArrayPush failed");
ctgError("taosArrayPush failed, vgId:%d", vgInfo->vgId);
CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
}
......@@ -620,12 +1128,12 @@ int32_t catalogUpdateDBVgroup(struct SCatalog* pCatalog, const char* dbName, SDB
}
if (NULL == dbInfo->vgInfo || dbInfo->vgVersion < 0 || taosHashGetSize(dbInfo->vgInfo) <= 0) {
ctgError("invalid db vg, dbName:%s", dbName);
ctgError("invalid db vgInfo, dbName:%s, vgInfo:%p, vgVersion:%d", dbName, dbInfo->vgInfo, dbInfo->vgVersion);
CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
}
if (dbInfo->vgVersion < 0) {
ctgWarn("invalid db vgVersion:%d, dbName:%s", dbInfo->vgVersion, dbName);
ctgWarn("db vgVersion less than 0, dbName:%s, vgVersion:%d", dbName, dbInfo->vgVersion);
if (pCatalog->dbCache.cache) {
CTG_ERR_JRET(ctgValidateAndRemoveDb(pCatalog, dbName, dbInfo));
......@@ -633,28 +1141,41 @@ int32_t catalogUpdateDBVgroup(struct SCatalog* pCatalog, const char* dbName, SDB
CTG_ERR_JRET(taosHashRemove(pCatalog->dbCache.cache, dbName, strlen(dbName)));
}
ctgWarn("remove db [%s] from cache", dbName);
ctgWarn("db removed from cache, db:%s", dbName);
goto _return;
}
if (NULL == pCatalog->dbCache.cache) {
pCatalog->dbCache.cache = taosHashInit(ctgMgmt.cfg.maxDBCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
if (NULL == pCatalog->dbCache.cache) {
ctgError("init hash[%d] for db cache failed", CTG_DEFAULT_CACHE_DB_NUMBER);
SHashObj *cache = taosHashInit(ctgMgmt.cfg.maxDBCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
if (NULL == cache) {
ctgError("taosHashInit %d failed", CTG_DEFAULT_CACHE_DB_NUMBER);
CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
}
if (NULL != atomic_val_compare_exchange_ptr(&pCatalog->dbCache.cache, NULL, cache)) {
taosHashCleanup(cache);
}
} else {
CTG_ERR_JRET(ctgValidateAndRemoveDb(pCatalog, dbName, dbInfo));
}
if (taosHashPut(pCatalog->dbCache.cache, dbName, strlen(dbName), dbInfo, sizeof(*dbInfo)) != 0) {
ctgError("push to vgroup hash cache failed");
bool newAdded = false;
if (taosHashPutExt(pCatalog->dbCache.cache, dbName, strlen(dbName), dbInfo, sizeof(*dbInfo), &newAdded) != 0) {
ctgError("taosHashPutExt db vgroup to cache failed, db:%s", dbName);
CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
}
dbInfo->vgInfo = NULL;
SDbVgVersion vgVersion = {.dbId = dbInfo->dbId, .vgVersion = dbInfo->vgVersion};
if (newAdded) {
CTG_ERR_JRET(ctgMetaRentAdd(&pCatalog->dbRent, &vgVersion, dbInfo->dbId, sizeof(SDbVgVersion)));
} else {
CTG_ERR_JRET(ctgMetaRentUpdate(&pCatalog->dbRent, &vgVersion, dbInfo->dbId, sizeof(SDbVgVersion), ctgDbVgVersionCompare));
}
ctgDebug("dbName:%s vgroup updated, vgVersion:%d", dbName, dbInfo->vgVersion);
dbInfo->vgInfo = NULL;
_return:
......@@ -667,34 +1188,23 @@ _return:
}
int32_t catalogGetTableMeta(struct SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, STableMeta** pTableMeta) {
return ctgGetTableMetaImpl(pCatalog, pTransporter, pMgmtEps, pTableName, false, pTableMeta);
return ctgGetTableMeta(pCatalog, pTransporter, pMgmtEps, pTableName, false, pTableMeta, -1);
}
int32_t catalogRenewTableMeta(struct SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const SName* pTableName) {
int32_t catalogGetSTableMeta(struct SCatalog* pCatalog, void * pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, STableMeta** pTableMeta) {
return ctgGetTableMeta(pCatalog, pTransporter, pMgmtEps, pTableName, false, pTableMeta, 1);
}
int32_t catalogRenewTableMeta(struct SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, int32_t isSTable) {
if (NULL == pCatalog || NULL == pTransporter || NULL == pMgmtEps || NULL == pTableName) {
CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
}
SVgroupInfo vgroupInfo = {0};
int32_t code = 0;
CTG_ERR_RET(catalogGetTableHashVgroup(pCatalog, pTransporter, pMgmtEps, pTableName, &vgroupInfo));
STableMetaOutput output = {0};
CTG_ERR_RET(ctgGetTableMetaFromVnode(pCatalog, pTransporter, pMgmtEps, pTableName, &vgroupInfo, &output));
//CTG_ERR_RET(ctgGetTableMetaFromMnode(pCatalog, pRpc, pMgmtEps, pTableName, &output));
CTG_ERR_JRET(ctgUpdateTableMetaCache(pCatalog, &output));
_return:
tfree(output.tbMeta);
CTG_RET(code);
return ctgRenewTableMetaImpl(pCatalog, pTransporter, pMgmtEps, pTableName, isSTable);
}
int32_t catalogRenewAndGetTableMeta(struct SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, STableMeta** pTableMeta) {
return ctgGetTableMetaImpl(pCatalog, pTransporter, pMgmtEps, pTableName, true, pTableMeta);
int32_t catalogRenewAndGetTableMeta(struct SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, STableMeta** pTableMeta, int32_t isSTable) {
return ctgGetTableMeta(pCatalog, pTransporter, pMgmtEps, pTableName, true, pTableMeta, isSTable);
}
int32_t catalogGetTableDistVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const SName* pTableName, SArray** pVgroupList) {
......@@ -710,7 +1220,7 @@ int32_t catalogGetTableDistVgroup(struct SCatalog* pCatalog, void *pRpc, const S
*pVgroupList = NULL;
CTG_ERR_JRET(catalogGetTableMeta(pCatalog, pRpc, pMgmtEps, pTableName, &tbMeta));
CTG_ERR_JRET(ctgGetTableMeta(pCatalog, pRpc, pMgmtEps, pTableName, false, &tbMeta, -1));
char db[TSDB_DB_FNAME_LEN] = {0};
tNameGetFullDbName(pTableName, db);
......@@ -721,18 +1231,18 @@ int32_t catalogGetTableDistVgroup(struct SCatalog* pCatalog, void *pRpc, const S
} else {
int32_t vgId = tbMeta->vgId;
if (NULL == taosHashGetClone(dbVgroup->vgInfo, &vgId, sizeof(vgId), &vgroupInfo)) {
ctgError("vgId[%d] not found in vgroup list", vgId);
ctgError("table's vgId not found in vgroup list, vgId:%d, tbName:%s", vgId, pTableName->tname);
CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
}
vgList = taosArrayInit(1, sizeof(SVgroupInfo));
if (NULL == vgList) {
ctgError("taosArrayInit failed");
ctgError("taosArrayInit %d failed", (int32_t)sizeof(SVgroupInfo));
CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
}
if (NULL == taosArrayPush(vgList, &vgroupInfo)) {
ctgError("push vgroupInfo to array failed");
ctgError("taosArrayPush vgroupInfo to array failed, vgId:%d, tbName:%s", vgId, pTableName->tname);
CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
}
......@@ -765,7 +1275,8 @@ int32_t catalogGetTableHashVgroup(struct SCatalog *pCatalog, void *pTransporter,
tNameGetFullDbName(pTableName, db);
CTG_ERR_RET(ctgGetDBVgroup(pCatalog, pTransporter, pMgmtEps, db, false, &dbInfo));
CTG_ERR_JRET(ctgGetVgInfoFromHashValue(dbInfo, pTableName, pVgroup));
CTG_ERR_JRET(ctgGetVgInfoFromHashValue(pCatalog, dbInfo, pTableName, pVgroup));
_return:
if (dbInfo) {
......@@ -787,13 +1298,13 @@ int32_t catalogGetAllMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* p
if (pReq->pTableName) {
int32_t tbNum = (int32_t)taosArrayGetSize(pReq->pTableName);
if (tbNum <= 0) {
ctgError("empty table name list");
ctgError("empty table name list, tbNum:%d", tbNum);
CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
}
pRsp->pTableMeta = taosArrayInit(tbNum, POINTER_BYTES);
if (NULL == pRsp->pTableMeta) {
ctgError("taosArrayInit num[%d] failed", tbNum);
ctgError("taosArrayInit %d failed", tbNum);
CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
}
......@@ -801,7 +1312,7 @@ int32_t catalogGetAllMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* p
SName *name = taosArrayGet(pReq->pTableName, i);
STableMeta *pTableMeta = NULL;
CTG_ERR_JRET(catalogGetTableMeta(pCatalog, pRpc, pMgmtEps, name, &pTableMeta));
CTG_ERR_JRET(ctgGetTableMeta(pCatalog, pRpc, pMgmtEps, name, false, &pTableMeta, -1));
if (NULL == taosArrayPush(pRsp->pTableMeta, &pTableMeta)) {
ctgError("taosArrayPush failed, idx:%d", i);
......@@ -834,16 +1345,49 @@ int32_t catalogGetQnodeList(struct SCatalog* pCatalog, void *pRpc, const SEpSet*
CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
}
//TODO
return TSDB_CODE_SUCCESS;
}
int32_t catalogGetExpiredSTables(struct SCatalog* pCatalog, SSTableMetaVersion **stables, uint32_t *num) {
if (NULL == pCatalog || NULL == stables || NULL == num) {
CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
}
CTG_RET(ctgMetaRentGet(&pCatalog->stableRent, (void **)stables, num, sizeof(SSTableMetaVersion)));
}
int32_t catalogGetExpiredDBs(struct SCatalog* pCatalog, SDbVgVersion **dbs, uint32_t *num) {
if (NULL == pCatalog || NULL == dbs || NULL == num) {
CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
}
CTG_RET(ctgMetaRentGet(&pCatalog->dbRent, (void **)dbs, num, sizeof(SDbVgVersion)));
}
void catalogDestroy(void) {
if (ctgMgmt.pCluster) {
taosHashCleanup(ctgMgmt.pCluster); //TBD
ctgMgmt.pCluster = NULL;
if (NULL == ctgMgmt.pCluster) {
return;
}
SCatalog *pCatalog = NULL;
void *pIter = taosHashIterate(ctgMgmt.pCluster, NULL);
while (pIter) {
pCatalog = *(SCatalog **)pIter;
if (pCatalog) {
catalogFreeHandle(pCatalog);
}
pIter = taosHashIterate(ctgMgmt.pCluster, pIter);
}
taosHashCleanup(ctgMgmt.pCluster);
ctgMgmt.pCluster = NULL;
qInfo("catalog destroyed");
}
......
......@@ -16,3 +16,8 @@ TARGET_INCLUDE_DIRECTORIES(
PUBLIC "${CMAKE_SOURCE_DIR}/include/libs/catalog/"
PRIVATE "${CMAKE_SOURCE_DIR}/source/libs/catalog/inc"
)
add_test(
NAME catalogTest
COMMAND catalogTest
)
......@@ -42,10 +42,13 @@ extern "C" int32_t ctgUpdateTableMetaCache(struct SCatalog *pCatalog, STableMeta
void ctgTestSetPrepareTableMeta();
void ctgTestSetPrepareCTableMeta();
void ctgTestSetPrepareSTableMeta();
void ctgTestSetPrepareMultiSTableMeta();
bool ctgTestStop = false;
bool ctgTestEnableSleep = false;
bool ctgTestDeadLoop = true;
bool ctgTestDeadLoop = false;
int32_t ctgTestPrintNum = 200000;
int32_t ctgTestMTRunSec = 30;
int32_t ctgTestCurrentVgVersion = 0;
int32_t ctgTestVgVersion = 1;
......@@ -54,6 +57,8 @@ int32_t ctgTestColNum = 2;
int32_t ctgTestTagNum = 1;
int32_t ctgTestSVersion = 1;
int32_t ctgTestTVersion = 1;
int32_t ctgTestSuid = 2;
int64_t ctgTestDbId = 33;
uint64_t ctgTestClusterId = 0x1;
char *ctgTestDbname = "1.db1";
......@@ -101,7 +106,6 @@ void ctgTestInitLogFile() {
const char *defaultLogFileNamePrefix = "taoslog";
const int32_t maxLogFileNum = 10;
ctgDebugFlag = 159;
tsAsyncLog = 0;
char temp[128] = {0};
......@@ -128,7 +132,7 @@ void ctgTestBuildCTableMetaOutput(STableMetaOutput *output) {
char tbFullName[TSDB_TABLE_FNAME_LEN];
tNameExtractFullName(&cn, tbFullName);
output->metaNum = 2;
SET_META_TYPE_BOTH_TABLE(output->metaType);
strcpy(output->ctbFname, tbFullName);
......@@ -183,6 +187,7 @@ void ctgTestBuildDBVgroup(SDBVgroupInfo *dbVgroup) {
ctgTestCurrentVgVersion = dbVgroup->vgVersion;
dbVgroup->hashMethod = 0;
dbVgroup->dbId = ctgTestDbId;
dbVgroup->vgInfo = taosHashInit(ctgTestVgNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
vgNum = ctgTestGetVgNumFromVgVersion(dbVgroup->vgVersion);
......@@ -216,6 +221,7 @@ void ctgTestPrepareDbVgroups(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcM
ctgTestCurrentVgVersion = ctgTestVgVersion;
rspMsg->vgNum = htonl(ctgTestVgNum);
rspMsg->hashMethod = 0;
rspMsg->uid = htobe64(ctgTestDbId);
SVgroupInfo *vg = NULL;
uint32_t hashUnit = UINT32_MAX / ctgTestVgNum;
......@@ -338,8 +344,52 @@ void ctgTestPrepareSTableMeta(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpc
rspMsg->update = 1;
rspMsg->sversion = htonl(ctgTestSVersion);
rspMsg->tversion = htonl(ctgTestTVersion);
rspMsg->suid = htobe64(0x0000000000000002);
rspMsg->tuid = htobe64(0x0000000000000003);
rspMsg->suid = htobe64(ctgTestSuid);
rspMsg->tuid = htobe64(ctgTestSuid);
rspMsg->vgId = 0;
SSchema *s = NULL;
s = &rspMsg->pSchema[0];
s->type = TSDB_DATA_TYPE_TIMESTAMP;
s->colId = htonl(1);
s->bytes = htonl(8);
strcpy(s->name, "ts");
s = &rspMsg->pSchema[1];
s->type = TSDB_DATA_TYPE_INT;
s->colId = htonl(2);
s->bytes = htonl(4);
strcpy(s->name, "col1s");
s = &rspMsg->pSchema[2];
s->type = TSDB_DATA_TYPE_BINARY;
s->colId = htonl(3);
s->bytes = htonl(12);
strcpy(s->name, "tag1s");
return;
}
void ctgTestPrepareMultiSTableMeta(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pRsp) {
STableMetaMsg *rspMsg = NULL; //todo
static int32_t idx = 1;
pRsp->code =0;
pRsp->contLen = sizeof(STableMetaMsg) + (ctgTestColNum + ctgTestTagNum) * sizeof(SSchema);
pRsp->pCont = calloc(1, pRsp->contLen);
rspMsg = (STableMetaMsg *)pRsp->pCont;
sprintf(rspMsg->tbFname, "%s.%s_%d", ctgTestDbname, ctgTestSTablename, idx);
sprintf(rspMsg->stbFname, "%s.%s_%d", ctgTestDbname, ctgTestSTablename, idx);
rspMsg->numOfTags = htonl(ctgTestTagNum);
rspMsg->numOfColumns = htonl(ctgTestColNum);
rspMsg->precision = 1;
rspMsg->tableType = TSDB_SUPER_TABLE;
rspMsg->update = 1;
rspMsg->sversion = htonl(ctgTestSVersion);
rspMsg->tversion = htonl(ctgTestTVersion);
rspMsg->suid = htobe64(ctgTestSuid + idx);
rspMsg->tuid = htobe64(ctgTestSuid + idx);
rspMsg->vgId = 0;
SSchema *s = NULL;
......@@ -361,10 +411,13 @@ void ctgTestPrepareSTableMeta(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpc
s->bytes = htonl(12);
strcpy(s->name, "tag1s");
++idx;
return;
}
void ctgTestPrepareDbVgroupsAndNormalMeta(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pRsp) {
ctgTestPrepareDbVgroups(shandle, pEpSet, pMsg, pRsp);
......@@ -390,6 +443,14 @@ void ctgTestPrepareDbVgroupsAndSuperMeta(void *shandle, SEpSet *pEpSet, SRpcMsg
return;
}
void ctgTestPrepareDbVgroupsAndMultiSuperMeta(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pRsp) {
ctgTestPrepareDbVgroups(shandle, pEpSet, pMsg, pRsp);
ctgTestSetPrepareMultiSTableMeta();
return;
}
void ctgTestSetPrepareDbVgroups() {
......@@ -444,6 +505,20 @@ void ctgTestSetPrepareSTableMeta() {
}
}
void ctgTestSetPrepareMultiSTableMeta() {
static Stub stub;
stub.set(rpcSendRecv, ctgTestPrepareMultiSTableMeta);
{
AddrAny any("libtransport.so");
std::map<std::string,void*> result;
any.get_global_func_addr_dynsym("^rpcSendRecv$", result);
for (const auto& f : result) {
stub.set(f.second, ctgTestPrepareMultiSTableMeta);
}
}
}
void ctgTestSetPrepareDbVgroupsAndNormalMeta() {
static Stub stub;
stub.set(rpcSendRecv, ctgTestPrepareDbVgroupsAndNormalMeta);
......@@ -484,6 +559,19 @@ void ctgTestSetPrepareDbVgroupsAndSuperMeta() {
}
}
void ctgTestSetPrepareDbVgroupsAndMultiSuperMeta() {
static Stub stub;
stub.set(rpcSendRecv, ctgTestPrepareDbVgroupsAndMultiSuperMeta);
{
AddrAny any("libtransport.so");
std::map<std::string,void*> result;
any.get_global_func_addr_dynsym("^rpcSendRecv$", result);
for (const auto& f : result) {
stub.set(f.second, ctgTestPrepareDbVgroupsAndMultiSuperMeta);
}
}
}
}
......@@ -507,7 +595,7 @@ void *ctgTestGetDbVgroupThread(void *param) {
if (ctgTestEnableSleep) {
usleep(rand()%5);
}
if (++n % 50000 == 0) {
if (++n % ctgTestPrintNum == 0) {
printf("Get:%d\n", n);
}
}
......@@ -531,7 +619,7 @@ void *ctgTestSetDbVgroupThread(void *param) {
if (ctgTestEnableSleep) {
usleep(rand()%5);
}
if (++n % 50000 == 0) {
if (++n % ctgTestPrintNum == 0) {
printf("Set:%d\n", n);
}
}
......@@ -563,7 +651,7 @@ void *ctgTestGetCtableMetaThread(void *param) {
usleep(rand()%5);
}
if (++n % 50000 == 0) {
if (++n % ctgTestPrintNum == 0) {
printf("Get:%d\n", n);
}
}
......@@ -589,7 +677,7 @@ void *ctgTestSetCtableMetaThread(void *param) {
if (ctgTestEnableSleep) {
usleep(rand()%5);
}
if (++n % 50000 == 0) {
if (++n % ctgTestPrintNum == 0) {
printf("Set:%d\n", n);
}
}
......@@ -600,7 +688,6 @@ void *ctgTestSetCtableMetaThread(void *param) {
}
#if 0
TEST(tableMeta, normalTable) {
struct SCatalog* pCtg = NULL;
......@@ -628,6 +715,7 @@ TEST(tableMeta, normalTable) {
ASSERT_EQ(vgInfo.vgId, 8);
ASSERT_EQ(vgInfo.numOfEps, 3);
ctgTestSetPrepareTableMeta();
STableMeta *tableMeta = NULL;
......@@ -654,6 +742,41 @@ TEST(tableMeta, normalTable) {
ASSERT_EQ(tableMeta->tableInfo.precision, 1);
ASSERT_EQ(tableMeta->tableInfo.rowSize, 12);
SDbVgVersion *dbs = NULL;
SSTableMetaVersion *stb = NULL;
uint32_t dbNum = 0, stbNum = 0, allDbNum = 0, allStbNum = 0;
int32_t i = 0;
while (i < 5) {
++i;
code = catalogGetExpiredDBs(pCtg, &dbs, &dbNum);
ASSERT_EQ(code, 0);
code = catalogGetExpiredSTables(pCtg, &stb, &stbNum);
ASSERT_EQ(code, 0);
if (dbNum) {
printf("got expired db,dbId:%"PRId64"\n", dbs->dbId);
free(dbs);
dbs = NULL;
} else {
printf("no expired db\n");
}
if (stbNum) {
printf("got expired stb,suid:%"PRId64"\n", stb->suid);
free(stb);
stb = NULL;
} else {
printf("no expired stb\n");
}
allDbNum += dbNum;
allStbNum += stbNum;
sleep(2);
}
ASSERT_EQ(allDbNum, 1);
ASSERT_EQ(allStbNum, 0);
catalogDestroy();
}
......@@ -715,6 +838,42 @@ TEST(tableMeta, childTableCase) {
ASSERT_EQ(tableMeta->tableInfo.precision, 1);
ASSERT_EQ(tableMeta->tableInfo.rowSize, 12);
SDbVgVersion *dbs = NULL;
SSTableMetaVersion *stb = NULL;
uint32_t dbNum = 0, stbNum = 0, allDbNum = 0, allStbNum = 0;
int32_t i = 0;
while (i < 5) {
++i;
code = catalogGetExpiredDBs(pCtg, &dbs, &dbNum);
ASSERT_EQ(code, 0);
code = catalogGetExpiredSTables(pCtg, &stb, &stbNum);
ASSERT_EQ(code, 0);
if (dbNum) {
printf("got expired db,dbId:%"PRId64"\n", dbs->dbId);
free(dbs);
dbs = NULL;
} else {
printf("no expired db\n");
}
if (stbNum) {
printf("got expired stb,suid:%"PRId64"\n", stb->suid);
free(stb);
stb = NULL;
} else {
printf("no expired stb\n");
}
allDbNum += dbNum;
allStbNum += stbNum;
sleep(2);
}
ASSERT_EQ(allDbNum, 1);
ASSERT_EQ(allStbNum, 1);
catalogDestroy();
}
......@@ -745,6 +904,8 @@ TEST(tableMeta, superTableCase) {
ASSERT_EQ(tableMeta->tableType, TSDB_SUPER_TABLE);
ASSERT_EQ(tableMeta->sversion, ctgTestSVersion);
ASSERT_EQ(tableMeta->tversion, ctgTestTVersion);
ASSERT_EQ(tableMeta->uid, ctgTestSuid);
ASSERT_EQ(tableMeta->suid, ctgTestSuid);
ASSERT_EQ(tableMeta->tableInfo.numOfColumns, ctgTestColNum);
ASSERT_EQ(tableMeta->tableInfo.numOfTags, ctgTestTagNum);
ASSERT_EQ(tableMeta->tableInfo.precision, 1);
......@@ -768,7 +929,7 @@ TEST(tableMeta, superTableCase) {
ASSERT_EQ(tableMeta->tableInfo.rowSize, 12);
tableMeta = NULL;
code = catalogRenewAndGetTableMeta(pCtg, mockPointer, (const SEpSet *)mockPointer, &n, &tableMeta);
code = catalogRenewAndGetTableMeta(pCtg, mockPointer, (const SEpSet *)mockPointer, &n, &tableMeta, 0);
ASSERT_EQ(code, 0);
ASSERT_EQ(tableMeta->vgId, 9);
ASSERT_EQ(tableMeta->tableType, TSDB_CHILD_TABLE);
......@@ -779,6 +940,40 @@ TEST(tableMeta, superTableCase) {
ASSERT_EQ(tableMeta->tableInfo.precision, 1);
ASSERT_EQ(tableMeta->tableInfo.rowSize, 12);
SDbVgVersion *dbs = NULL;
SSTableMetaVersion *stb = NULL;
uint32_t dbNum = 0, stbNum = 0, allDbNum = 0, allStbNum = 0;
int32_t i = 0;
while (i < 5) {
++i;
code = catalogGetExpiredDBs(pCtg, &dbs, &dbNum);
ASSERT_EQ(code, 0);
code = catalogGetExpiredSTables(pCtg, &stb, &stbNum);
ASSERT_EQ(code, 0);
if (dbNum) {
printf("got expired db,dbId:%"PRId64"\n", dbs->dbId);
free(dbs);
dbs = NULL;
} else {
printf("no expired db\n");
}
if (stbNum) {
printf("got expired stb,suid:%"PRId64"\n", stb->suid);
free(stb);
stb = NULL;
} else {
printf("no expired stb\n");
}
allDbNum += dbNum;
allStbNum += stbNum;
sleep(2);
}
ASSERT_EQ(allDbNum, 1);
ASSERT_EQ(allStbNum, 1);
catalogDestroy();
......@@ -948,7 +1143,6 @@ TEST(dbVgroup, getSetDbVgroupCase) {
catalogDestroy();
}
TEST(multiThread, getSetDbVgroupCase) {
struct SCatalog* pCtg = NULL;
void *mockPointer = (void *)0x1;
......@@ -956,6 +1150,7 @@ TEST(multiThread, getSetDbVgroupCase) {
SVgroupInfo *pvgInfo = NULL;
SDBVgroupInfo dbVgroup = {0};
SArray *vgList = NULL;
ctgTestStop = false;
ctgTestInitLogFile();
......@@ -988,7 +1183,7 @@ TEST(multiThread, getSetDbVgroupCase) {
if (ctgTestDeadLoop) {
sleep(1);
} else {
sleep(600);
sleep(ctgTestMTRunSec);
break;
}
}
......@@ -999,9 +1194,6 @@ TEST(multiThread, getSetDbVgroupCase) {
catalogDestroy();
}
#endif
TEST(multiThread, ctableMeta) {
struct SCatalog* pCtg = NULL;
void *mockPointer = (void *)0x1;
......@@ -1009,6 +1201,7 @@ TEST(multiThread, ctableMeta) {
SVgroupInfo *pvgInfo = NULL;
SDBVgroupInfo dbVgroup = {0};
SArray *vgList = NULL;
ctgTestStop = false;
ctgTestSetPrepareDbVgroupsAndChildMeta();
......@@ -1038,7 +1231,7 @@ TEST(multiThread, ctableMeta) {
if (ctgTestDeadLoop) {
sleep(1);
} else {
sleep(600);
sleep(ctgTestMTRunSec);
break;
}
}
......@@ -1050,6 +1243,78 @@ TEST(multiThread, ctableMeta) {
}
TEST(rentTest, allRent) {
struct SCatalog* pCtg = NULL;
void *mockPointer = (void *)0x1;
SVgroupInfo vgInfo = {0};
SVgroupInfo *pvgInfo = NULL;
SDBVgroupInfo dbVgroup = {0};
SArray *vgList = NULL;
ctgTestStop = false;
SDbVgVersion *dbs = NULL;
SSTableMetaVersion *stable = NULL;
uint32_t num = 0;
ctgTestSetPrepareDbVgroupsAndMultiSuperMeta();
initQueryModuleMsgHandle();
int32_t code = catalogInit(NULL);
ASSERT_EQ(code, 0);
code = catalogGetHandle(ctgTestClusterId, &pCtg);
ASSERT_EQ(code, 0);
SName n = {.type = TSDB_TABLE_NAME_T, .acctId = 1};
strcpy(n.dbname, "db1");
for (int32_t i = 1; i <= 10; ++i) {
sprintf(n.tname, "%s_%d", ctgTestSTablename, i);
STableMeta *tableMeta = NULL;
code = catalogGetSTableMeta(pCtg, mockPointer, (const SEpSet *)mockPointer, &n, &tableMeta);
ASSERT_EQ(code, 0);
ASSERT_EQ(tableMeta->vgId, 0);
ASSERT_EQ(tableMeta->tableType, TSDB_SUPER_TABLE);
ASSERT_EQ(tableMeta->sversion, ctgTestSVersion);
ASSERT_EQ(tableMeta->tversion, ctgTestTVersion);
ASSERT_EQ(tableMeta->uid, ctgTestSuid + i);
ASSERT_EQ(tableMeta->suid, ctgTestSuid + i);
ASSERT_EQ(tableMeta->tableInfo.numOfColumns, ctgTestColNum);
ASSERT_EQ(tableMeta->tableInfo.numOfTags, ctgTestTagNum);
ASSERT_EQ(tableMeta->tableInfo.precision, 1);
ASSERT_EQ(tableMeta->tableInfo.rowSize, 12);
code = catalogGetExpiredDBs(pCtg, &dbs, &num);
ASSERT_EQ(code, 0);
printf("%d - expired dbNum:%d\n", i, num);
if (dbs) {
printf("%d - expired dbId:%"PRId64", vgVersion:%d\n", i, dbs->dbId, dbs->vgVersion);
free(dbs);
dbs = NULL;
}
code = catalogGetExpiredSTables(pCtg, &stable, &num);
ASSERT_EQ(code, 0);
printf("%d - expired stableNum:%d\n", i, num);
if (stable) {
for (int32_t n = 0; n < num; ++n) {
printf("suid:%"PRId64", sversion:%d, tversion:%d\n", stable[n].suid, stable[n].sversion, stable[n].tversion);
}
free(stable);
stable = NULL;
}
printf("*************************************************\n");
sleep(2);
}
catalogDestroy();
}
int main(int argc, char** argv) {
testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
......
......@@ -97,6 +97,7 @@ int32_t queryProcessUseDBRsp(void* output, char *msg, int32_t msgSize) {
pRsp->vgVersion = ntohl(pRsp->vgVersion);
pRsp->vgNum = ntohl(pRsp->vgNum);
pRsp->uid = be64toh(pRsp->uid);
if (pRsp->vgNum < 0) {
qError("invalid db[%s] vgroup number[%d]", pRsp->db, pRsp->vgNum);
......@@ -111,6 +112,7 @@ int32_t queryProcessUseDBRsp(void* output, char *msg, int32_t msgSize) {
pOut->dbVgroup.vgVersion = pRsp->vgVersion;
pOut->dbVgroup.hashMethod = pRsp->hashMethod;
pOut->dbVgroup.dbId = pRsp->uid;
pOut->dbVgroup.vgInfo = taosHashInit(pRsp->vgNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
if (NULL == pOut->dbVgroup.vgInfo) {
qError("hash init[%d] failed", pRsp->vgNum);
......@@ -149,8 +151,8 @@ static int32_t queryConvertTableMetaMsg(STableMetaMsg* pMetaMsg) {
pMetaMsg->numOfColumns = ntohl(pMetaMsg->numOfColumns);
pMetaMsg->sversion = ntohl(pMetaMsg->sversion);
pMetaMsg->tversion = ntohl(pMetaMsg->tversion);
pMetaMsg->tuid = htobe64(pMetaMsg->tuid);
pMetaMsg->suid = htobe64(pMetaMsg->suid);
pMetaMsg->tuid = be64toh(pMetaMsg->tuid);
pMetaMsg->suid = be64toh(pMetaMsg->suid);
pMetaMsg->vgId = ntohl(pMetaMsg->vgId);
if (pMetaMsg->numOfTags < 0 || pMetaMsg->numOfTags > TSDB_MAX_TAGS) {
......@@ -208,7 +210,7 @@ int32_t queryCreateTableMetaFromMsg(STableMetaMsg* msg, bool isSuperTable, STabl
pTableMeta->vgId = isSuperTable ? 0 : msg->vgId;
pTableMeta->tableType = isSuperTable ? TSDB_SUPER_TABLE : msg->tableType;
pTableMeta->uid = msg->tuid;
pTableMeta->uid = isSuperTable ? msg->suid : msg->tuid;
pTableMeta->suid = msg->suid;
pTableMeta->sversion = msg->sversion;
pTableMeta->tversion = msg->tversion;
......@@ -244,7 +246,7 @@ int32_t queryProcessTableMetaRsp(void* output, char *msg, int32_t msgSize) {
}
if (pMetaMsg->tableType == TSDB_CHILD_TABLE) {
pOut->metaNum = 2;
SET_META_TYPE_BOTH_TABLE(pOut->metaType);
if (pMetaMsg->dbFname[0]) {
snprintf(pOut->ctbFname, sizeof(pOut->ctbFname), "%s.%s", pMetaMsg->dbFname, pMetaMsg->tbFname);
......@@ -261,7 +263,7 @@ int32_t queryProcessTableMetaRsp(void* output, char *msg, int32_t msgSize) {
code = queryCreateTableMetaFromMsg(pMetaMsg, true, &pOut->tbMeta);
} else {
pOut->metaNum = 1;
SET_META_TYPE_TABLE(pOut->metaType);
if (pMetaMsg->dbFname[0]) {
snprintf(pOut->tbFname, sizeof(pOut->tbFname), "%s.%s", pMetaMsg->dbFname, pMetaMsg->tbFname);
......
......@@ -505,7 +505,7 @@ int32_t schProcessRspMsg(SSchJob *job, SSchTask *task, int32_t msgType, char *ms
break;
}
case TDMT_VND_SUBMIT_RSP: {
if (rspCode != TSDB_CODE_SUCCESS) {
if (rspCode != TSDB_CODE_SUCCESS || NULL == msg) {
SCH_ERR_JRET(schProcessOnTaskFailure(job, task, rspCode));
} else {
SShellSubmitRspMsg *rsp = (SShellSubmitRspMsg *)msg;
......@@ -521,7 +521,7 @@ int32_t schProcessRspMsg(SSchJob *job, SSchTask *task, int32_t msgType, char *ms
case TDMT_VND_QUERY_RSP: {
SQueryTableRsp *rsp = (SQueryTableRsp *)msg;
if (rsp->code != TSDB_CODE_SUCCESS) {
if (rsp->code != TSDB_CODE_SUCCESS || NULL == msg) {
SCH_ERR_JRET(schProcessOnTaskFailure(job, task, rsp->code));
} else {
code = schBuildAndSendMsg(job, task, TDMT_VND_RES_READY);
......@@ -534,7 +534,7 @@ int32_t schProcessRspMsg(SSchJob *job, SSchTask *task, int32_t msgType, char *ms
case TDMT_VND_RES_READY_RSP: {
SResReadyRsp *rsp = (SResReadyRsp *)msg;
if (rsp->code != TSDB_CODE_SUCCESS) {
if (rsp->code != TSDB_CODE_SUCCESS || NULL == msg) {
SCH_ERR_JRET(schProcessOnTaskFailure(job, task, rsp->code));
} else {
code = schProcessOnTaskSuccess(job, task);
......@@ -549,7 +549,9 @@ int32_t schProcessRspMsg(SSchJob *job, SSchTask *task, int32_t msgType, char *ms
SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)msg;
job->res = rsp;
job->resNumOfRows = rsp->numOfRows;
if (rsp) {
job->resNumOfRows = rsp->numOfRows;
}
SCH_ERR_JRET(schProcessOnDataFetched(job));
break;
......@@ -1100,6 +1102,7 @@ void scheduleFreeJob(void *pJob) {
taosHashCleanup(job->failTasks);
taosHashCleanup(job->succTasks);
taosArrayDestroy(job->levels);
tfree(job);
}
......
......@@ -215,7 +215,7 @@ static FORCE_INLINE bool taosHashTableEmpty(const SHashObj *pHashObj) {
return taosHashGetSize(pHashObj) == 0;
}
int32_t taosHashPut(SHashObj *pHashObj, const void *key, size_t keyLen, void *data, size_t size) {
int32_t taosHashPutImpl(SHashObj *pHashObj, const void *key, size_t keyLen, void *data, size_t size, bool *newAdded) {
uint32_t hashVal = (*pHashObj->hashFp)(key, (uint32_t)keyLen);
SHashNode *pNewNode = doCreateHashNode(key, keyLen, data, size, hashVal);
if (pNewNode == NULL) {
......@@ -274,6 +274,10 @@ int32_t taosHashPut(SHashObj *pHashObj, const void *key, size_t keyLen, void *da
__rd_unlock((void*) &pHashObj->lock, pHashObj->type);
atomic_add_fetch_32(&pHashObj->size, 1);
if (newAdded) {
*newAdded = true;
}
return 0;
} else {
// not support the update operation, return error
......@@ -290,10 +294,23 @@ int32_t taosHashPut(SHashObj *pHashObj, const void *key, size_t keyLen, void *da
// enable resize
__rd_unlock((void*) &pHashObj->lock, pHashObj->type);
if (newAdded) {
*newAdded = false;
}
return pHashObj->enableUpdate ? 0 : -2;
}
}
int32_t taosHashPut(SHashObj *pHashObj, const void *key, size_t keyLen, void *data, size_t size) {
return taosHashPutImpl(pHashObj, key, keyLen, data, size, NULL);
}
int32_t taosHashPutExt(SHashObj *pHashObj, const void *key, size_t keyLen, void *data, size_t size, bool *newAdded) {
return taosHashPutImpl(pHashObj, key, keyLen, data, size, newAdded);
}
void *taosHashGet(SHashObj *pHashObj, const void *key, size_t keyLen) {
return taosHashGetClone(pHashObj, key, keyLen, NULL);
}
......
......@@ -95,7 +95,6 @@ int32_t tsdbDebugFlag = 131;
int32_t tqDebugFlag = 131;
int32_t cqDebugFlag = 131;
int32_t fsDebugFlag = 135;
int32_t ctgDebugFlag = 131;
int64_t dbgEmptyW = 0;
int64_t dbgWN = 0;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册