提交 0b452701 编写于 作者: D dapan1121

feature/qnode

上级 33b3a978
......@@ -725,6 +725,7 @@ typedef struct {
char tbName[TSDB_TABLE_NAME_LEN];
char stbName[TSDB_TABLE_NAME_LEN];
char dbFName[TSDB_DB_FNAME_LEN];
uint64_t dbId;
int32_t numOfTags;
int32_t numOfColumns;
int8_t precision;
......
......@@ -760,6 +760,7 @@ static int32_t mndProcessStbMetaReq(SMnodeMsg *pReq) {
strcpy(pMeta->dbFName, pStb->db);
strcpy(pMeta->tbName, pInfo->tbName);
strcpy(pMeta->stbName, pInfo->tbName);
pMeta->dbId = htobe64(pDb->uid);
pMeta->numOfTags = htonl(pStb->numOfTags);
pMeta->numOfColumns = htonl(pStb->numOfColumns);
pMeta->precision = pDb->cfg.precision;
......
......@@ -85,6 +85,7 @@ typedef struct SCtgRentMgmt {
typedef struct SCatalog {
uint64_t clusterId;
SRWLatch dbLock;
SHashObj *dbCache; //key:dbname, value:SCtgDBCache
SCtgRentMgmt dbRent;
SCtgRentMgmt stbRent;
......@@ -109,6 +110,8 @@ typedef struct SCatalogStat {
} SCatalogStat;
typedef struct SCatalogMgmt {
bool exit;
SRWLatch lock;
SHashObj *pCluster; //key: clusterId, value: SCatalog*
SCatalogStat stat;
SCatalogCfg cfg;
......@@ -136,10 +139,6 @@ typedef uint32_t (*tableNameHashFp)(const char *, uint32_t);
#define ctgDebug(param, ...) qDebug("CTG:%p " param, pCatalog, __VA_ARGS__)
#define ctgTrace(param, ...) qTrace("CTG:%p " param, pCatalog, __VA_ARGS__)
#define CTG_ERR_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; return _code; } } while (0)
#define CTG_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; } return _code; } while (0)
#define CTG_ERR_JRET(c) do { code = c; if (code != TSDB_CODE_SUCCESS) { terrno = code; goto _return; } } while (0)
#define CTG_LOCK_DEBUG(...) do { if (gCTGDebug.lockDebug) { qDebug(__VA_ARGS__); } } while (0)
#define CTG_CACHE_DEBUG(...) do { if (gCTGDebug.cacheDebug) { qDebug(__VA_ARGS__); } } while (0)
......@@ -177,6 +176,13 @@ typedef uint32_t (*tableNameHashFp)(const char *, uint32_t);
} \
} while (0)
#define CTG_ERR_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; return _code; } } while (0)
#define CTG_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; } return _code; } while (0)
#define CTG_ERR_JRET(c) do { code = c; if (code != TSDB_CODE_SUCCESS) { terrno = code; goto _return; } } while (0)
#define CTG_API_ENTER() do { CTG_LOCK(CTG_READ, &ctgMgmt.lock); if (atomic_load_8(&ctgMgmt.exit)) { CTG_RET(TSDB_CODE_CTG_OUT_OF_SERVICE); } } while (0)
#define CTG_API_LEAVE(c) do { int32_t __code = c; CTG_UNLOCK(CTG_READ, &ctgMgmt.lock); CTG_RET(__code); } while (0)
......
......@@ -122,7 +122,7 @@ void ctgDbgShowClusterCache(struct SCatalog* pCatalog) {
int32_t ctgInitDBCache(struct SCatalog* pCatalog) {
if (NULL == pCatalog->dbCache) {
SHashObj *cache = taosHashInit(ctgMgmt.cfg.maxDBCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
SHashObj *cache = taosHashInit(ctgMgmt.cfg.maxDBCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
if (NULL == cache) {
ctgError("taosHashInit %d failed", CTG_DEFAULT_CACHE_DB_NUMBER);
CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
......@@ -693,6 +693,7 @@ int32_t ctgGetVgInfoFromHashValue(struct SCatalog *pCatalog, SDBVgroupInfo *dbIn
CTG_RET(code);
}
#if 1
int32_t ctgSTableVersionCompare(const void* key1, const void* key2) {
if (*(uint64_t *)key1 < ((SSTableMetaVersion*)key2)->suid) {
return -1;
......@@ -712,7 +713,29 @@ int32_t ctgDbVgVersionCompare(const void* key1, const void* key2) {
return 0;
}
}
#else
int32_t ctgSTableVersionCompare(const void* key1, const void* key2) {
if (((SSTableMetaVersion*)key1)->suid < ((SSTableMetaVersion*)key2)->suid) {
return -1;
} else if (((SSTableMetaVersion*)key1)->suid > ((SSTableMetaVersion*)key2)->suid) {
return 1;
} else {
return 0;
}
}
int32_t ctgDbVgVersionCompare(const void* key1, const void* key2) {
if (((SDbVgVersion*)key1)->dbId < ((SDbVgVersion*)key2)->dbId) {
return -1;
} else if (((SDbVgVersion*)key1)->dbId > ((SDbVgVersion*)key2)->dbId) {
return 1;
} else {
return 0;
}
}
#endif
int32_t ctgMetaRentInit(SCtgRentMgmt *mgmt, uint32_t rentSec, int8_t type) {
mgmt->slotRIdx = 0;
......@@ -776,14 +799,15 @@ int32_t ctgMetaRentUpdate(SCtgRentMgmt *mgmt, void *meta, int64_t id, int32_t si
}
if (slot->needSort) {
qDebug("meta slot before sorte, slot idx:%d, type:%d, size:%d", widx, mgmt->type, (int32_t)taosArrayGetSize(slot->meta));
taosArraySort(slot->meta, compare);
slot->needSort = false;
qDebug("meta slot sorted, slot idx:%d, type:%d", widx, mgmt->type);
qDebug("meta slot sorted, slot idx:%d, type:%d, size:%d", widx, mgmt->type, (int32_t)taosArrayGetSize(slot->meta));
}
void *orig = taosArraySearch(slot->meta, &id, compare, TD_EQ);
if (NULL == orig) {
qError("meta not found in slot, id:%"PRIx64", slot idx:%d, type:%d", id, widx, mgmt->type);
qError("meta not found in slot, id:%"PRIx64", slot idx:%d, type:%d, size:%d", id, widx, mgmt->type, (int32_t)taosArrayGetSize(slot->meta));
CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
}
......@@ -910,8 +934,15 @@ int32_t ctgMetaRentGet(SCtgRentMgmt *mgmt, void **res, uint32_t *num, int32_t si
int32_t ctgAddDBCache(struct SCatalog *pCatalog, const char *dbFName, SCtgDBCache *dbCache) {
int32_t code = 0;
if (taosHashPut(pCatalog->dbCache, dbFName, strlen(dbFName), dbCache, sizeof(SCtgDBCache))) {
ctgError("taosHashPut db to cache failed, db:%s", dbFName);
code = taosHashPut(pCatalog->dbCache, dbFName, strlen(dbFName), dbCache, sizeof(SCtgDBCache));
if (code) {
if (HASH_NODE_EXIST(code)) {
ctgDebug("db already in cache, dbFName:%s", dbFName);
return TSDB_CODE_SUCCESS;
}
ctgError("taosHashPut db to cache failed, dbFName:%s", dbFName);
CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
}
......@@ -919,7 +950,7 @@ int32_t ctgAddDBCache(struct SCatalog *pCatalog, const char *dbFName, SCtgDBCach
strncpy(vgVersion.dbFName, dbFName, sizeof(vgVersion.dbFName));
ctgDebug("dbCache added, dbFName:%s, vgVersion:%d, dbId:%"PRIx64, dbFName, vgVersion.vgVersion, dbCache->dbId);
CTG_ERR_JRET(ctgMetaRentAdd(&pCatalog->dbRent, &vgVersion, vgVersion.dbId, sizeof(SDbVgVersion)));
return TSDB_CODE_SUCCESS;
......@@ -955,8 +986,8 @@ void ctgRemoveAndFreeTableMeta(struct SCatalog* pCatalog, SCtgTbMetaCache *cache
int32_t ctgValidateAndRemoveDb(struct SCatalog* pCatalog, SCtgDBCache *dbCache, const char* dbFName) {
if (taosHashRemove(pCatalog->dbCache, dbFName, strlen(dbFName))) {
ctgError("taosHashRemove from dbCache failed, dbFName:%s", dbFName);
CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
ctgInfo("taosHashRemove from dbCache failed, may be removed, dbFName:%s", dbFName);
CTG_ERR_RET(TSDB_CODE_CTG_DB_DROPPED);
}
atomic_store_8(&dbCache->deleted, 1);
......@@ -965,7 +996,7 @@ int32_t ctgValidateAndRemoveDb(struct SCatalog* pCatalog, SCtgDBCache *dbCache,
if (dbCache->vgInfo) {
ctgInfo("cleanup db vgInfo, dbFName:%s, dbId:%"PRIx64, dbFName, dbCache->dbId);
if (dbCache->vgInfo->vgHash) {
if (dbCache->vgInfo->vgHash) {
taosHashCleanup(dbCache->vgInfo->vgHash);
}
......@@ -988,6 +1019,8 @@ int32_t ctgValidateAndRemoveDb(struct SCatalog* pCatalog, SCtgDBCache *dbCache,
int32_t ctgAcquireDBCache(struct SCatalog* pCatalog, const char *dbFName, uint64_t dbId, SCtgDBCache **pCache) {
int32_t code = 0;
SCtgDBCache *dbCache = NULL;
CTG_LOCK(CTG_WRITE, &pCatalog->dbLock);
while (true) {
dbCache = (SCtgDBCache *)taosHashAcquire(pCatalog->dbCache, dbFName, strlen(dbFName));
......@@ -1015,9 +1048,16 @@ int32_t ctgAcquireDBCache(struct SCatalog* pCatalog, const char *dbFName, uint64
return TSDB_CODE_SUCCESS;
}
#endif
CTG_ERR_JRET(ctgValidateAndRemoveDb(pCatalog, dbCache, dbFName));
code = ctgValidateAndRemoveDb(pCatalog, dbCache, dbFName);
taosHashRelease(pCatalog->dbCache, dbCache);
dbCache = NULL;
if (code) {
if (TSDB_CODE_CTG_DB_DROPPED == code) {
continue;
}
CTG_ERR_JRET(code);
}
}
SCtgDBCache newDBCache = {0};
......@@ -1031,6 +1071,8 @@ _return:
if (dbCache) {
taosHashRelease(pCatalog->dbCache, dbCache);
}
CTG_UNLOCK(CTG_WRITE, &pCatalog->dbLock);
CTG_RET(code);
}
......@@ -1147,7 +1189,8 @@ int32_t ctgUpdateTableMetaCache(struct SCatalog *pCatalog, STableMetaOutput *out
_return:
if (dbCache) {
taosHashRelease(pCatalog->dbCache, dbCache);
taosHashRelease(pCatalog->dbCache, dbCache);
CTG_UNLOCK(CTG_WRITE, &pCatalog->dbLock);
}
CTG_RET(code);
......@@ -1459,17 +1502,19 @@ int32_t catalogGetDBVgroupVersion(struct SCatalog* pCatalog, const char* dbName,
CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
}
CTG_API_ENTER();
if (NULL == pCatalog->dbCache) {
*version = CTG_DEFAULT_INVALID_VERSION;
ctgInfo("empty db cache, dbName:%s", dbName);
return TSDB_CODE_SUCCESS;
CTG_API_LEAVE(TSDB_CODE_SUCCESS);
}
SCtgDBCache *db = taosHashAcquire(pCatalog->dbCache, dbName, strlen(dbName));
if (NULL == db) {
*version = CTG_DEFAULT_INVALID_VERSION;
ctgInfo("db not in cache, dbName:%s", dbName);
return TSDB_CODE_SUCCESS;
CTG_API_LEAVE(TSDB_CODE_SUCCESS);
}
CTG_LOCK(CTG_READ, &db->vgLock);
......@@ -1479,7 +1524,7 @@ int32_t catalogGetDBVgroupVersion(struct SCatalog* pCatalog, const char* dbName,
*version = CTG_DEFAULT_INVALID_VERSION;
ctgInfo("db not in cache, dbName:%s", dbName);
return TSDB_CODE_SUCCESS;
CTG_API_LEAVE(TSDB_CODE_SUCCESS);
}
*version = db->vgInfo->vgVersion;
......@@ -1489,7 +1534,7 @@ int32_t catalogGetDBVgroupVersion(struct SCatalog* pCatalog, const char* dbName,
ctgDebug("Got db vgVersion from cache, dbName:%s, vgVersion:%d", dbName, *version);
return TSDB_CODE_SUCCESS;
CTG_API_LEAVE(TSDB_CODE_SUCCESS);
}
int32_t catalogGetDBVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* dbFName, bool forceUpdate, SArray** vgroupList) {
......@@ -1497,6 +1542,8 @@ int32_t catalogGetDBVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet*
CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
}
CTG_API_ENTER();
SCtgDBCache* dbCache = NULL;
SVgroupInfo *vgInfo = NULL;
......@@ -1540,12 +1587,14 @@ _return:
vgList = NULL;
}
CTG_RET(code);
CTG_API_LEAVE(code);
}
int32_t catalogUpdateDBVgroup(struct SCatalog* pCatalog, const char* dbFName, uint64_t dbId, SDBVgroupInfo* dbInfo) {
int32_t code = 0;
CTG_API_ENTER();
if (NULL == pCatalog || NULL == dbFName || NULL == dbInfo) {
CTG_ERR_JRET(TSDB_CODE_CTG_INVALID_INPUT);
......@@ -1584,7 +1633,7 @@ int32_t catalogUpdateDBVgroup(struct SCatalog* pCatalog, const char* dbFName, ui
}
if (dbCache->vgInfo->vgHash) {
ctgInfo("cleanup db vgHash, dbFName:%s", dbFName);
ctgDebug("cleanup db vgHash, dbFName:%s", dbFName);
taosHashCleanup(dbCache->vgInfo->vgHash);
dbCache->vgInfo->vgHash = NULL;
}
......@@ -1605,13 +1654,17 @@ int32_t catalogUpdateDBVgroup(struct SCatalog* pCatalog, const char* dbFName, ui
_return:
if (dbCache) {
CTG_UNLOCK(CTG_WRITE, &pCatalog->dbLock);
}
if (dbInfo) {
taosHashCleanup(dbInfo->vgHash);
dbInfo->vgHash = NULL;
tfree(dbInfo);
}
CTG_RET(code);
CTG_API_LEAVE(code);
}
......@@ -1622,19 +1675,22 @@ int32_t catalogRemoveDB(struct SCatalog* pCatalog, const char* dbFName, uint64_t
CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
}
CTG_API_ENTER();
if (NULL == pCatalog->dbCache) {
return TSDB_CODE_SUCCESS;
CTG_API_LEAVE(TSDB_CODE_SUCCESS);
}
SCtgDBCache *dbCache = (SCtgDBCache *)taosHashAcquire(pCatalog->dbCache, dbFName, strlen(dbFName));
if (NULL == dbCache) {
ctgInfo("db not exist in dbCache, may be removed, dbFName:%s", dbFName);
return TSDB_CODE_SUCCESS;
CTG_API_LEAVE(TSDB_CODE_SUCCESS);
}
if (dbCache->dbId != dbId) {
ctgInfo("db id already updated, dbFName:%s, dbId:%"PRIx64 ", targetId:%"PRIx64, dbFName, dbCache->dbId, dbId);
return TSDB_CODE_SUCCESS;
taosHashRelease(pCatalog->dbCache, dbCache);
CTG_API_LEAVE(TSDB_CODE_SUCCESS);
}
CTG_ERR_JRET(ctgValidateAndRemoveDb(pCatalog, dbCache, dbFName));
......@@ -1643,7 +1699,7 @@ _return:
taosHashRelease(pCatalog->dbCache, dbCache);
CTG_RET(code);
CTG_API_LEAVE(code);
}
int32_t catalogRemoveSTableMeta(struct SCatalog* pCatalog, const char* dbName, const char* stbName, uint64_t suid) {
......@@ -1654,43 +1710,53 @@ int32_t catalogRemoveSTableMeta(struct SCatalog* pCatalog, const char* dbName, c
CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
}
CTG_API_ENTER();
if (NULL == pCatalog->dbCache) {
return TSDB_CODE_SUCCESS;
CTG_API_LEAVE(TSDB_CODE_SUCCESS);
}
CTG_ERR_RET(ctgValidateAndRemoveStbMeta(pCatalog, dbName, stbName, suid, &removed));
if (!removed) {
return TSDB_CODE_SUCCESS;
CTG_API_LEAVE(TSDB_CODE_SUCCESS);
}
ctgInfo("stb removed from cache, db:%s, stbName:%s, suid:%"PRIx64, dbName, stbName, suid);
CTG_ERR_RET(ctgMetaRentRemove(&pCatalog->stbRent, suid, ctgSTableVersionCompare));
CTG_ERR_JRET(ctgMetaRentRemove(&pCatalog->stbRent, suid, ctgSTableVersionCompare));
ctgDebug("stb removed from rent, db:%s, stbName:%s, suid:%"PRIx64, dbName, stbName, suid);
_return:
CTG_RET(code);
CTG_API_LEAVE(code);
}
int32_t catalogGetTableMeta(struct SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, STableMeta** pTableMeta) {
return ctgGetTableMeta(pCatalog, pTransporter, pMgmtEps, pTableName, false, pTableMeta, -1);
CTG_API_ENTER();
CTG_API_LEAVE(ctgGetTableMeta(pCatalog, pTransporter, pMgmtEps, pTableName, false, pTableMeta, -1));
}
int32_t catalogGetSTableMeta(struct SCatalog* pCatalog, void * pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, STableMeta** pTableMeta) {
return ctgGetTableMeta(pCatalog, pTransporter, pMgmtEps, pTableName, false, pTableMeta, 1);
CTG_API_ENTER();
CTG_API_LEAVE(ctgGetTableMeta(pCatalog, pTransporter, pMgmtEps, pTableName, false, pTableMeta, 1));
}
int32_t catalogUpdateSTableMeta(struct SCatalog* pCatalog, STableMetaRsp *rspMsg) {
STableMetaOutput output = {0};
int32_t code = 0;
CTG_API_ENTER();
strcpy(output.dbFName, rspMsg->dbFName);
strcpy(output.tbName, rspMsg->tbName);
SET_META_TYPE_TABLE(output.metaType);
CTG_ERR_RET(queryCreateTableMetaFromMsg(rspMsg, true, &output.tbMeta));
CTG_ERR_JRET(queryCreateTableMetaFromMsg(rspMsg, true, &output.tbMeta));
CTG_ERR_JRET(ctgUpdateTableMetaCache(pCatalog, &output));
......@@ -1698,7 +1764,7 @@ _return:
tfree(output.tbMeta);
CTG_RET(code);
CTG_API_LEAVE(code);
}
......@@ -1707,17 +1773,23 @@ int32_t catalogRenewTableMeta(struct SCatalog* pCatalog, void *pTransporter, con
CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
}
return ctgRenewTableMetaImpl(pCatalog, pTransporter, pMgmtEps, pTableName, isSTable);
CTG_API_ENTER();
CTG_API_LEAVE(ctgRenewTableMetaImpl(pCatalog, pTransporter, pMgmtEps, pTableName, isSTable));
}
int32_t catalogRenewAndGetTableMeta(struct SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, STableMeta** pTableMeta, int32_t isSTable) {
return ctgGetTableMeta(pCatalog, pTransporter, pMgmtEps, pTableName, true, pTableMeta, isSTable);
CTG_API_ENTER();
CTG_API_LEAVE(ctgGetTableMeta(pCatalog, pTransporter, pMgmtEps, pTableName, true, pTableMeta, isSTable));
}
int32_t catalogGetTableDistVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const SName* pTableName, SArray** pVgroupList) {
if (NULL == pCatalog || NULL == pRpc || NULL == pMgmtEps || NULL == pTableName || NULL == pVgroupList) {
CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
}
CTG_API_ENTER();
STableMeta *tbMeta = NULL;
int32_t code = 0;
......@@ -1733,7 +1805,7 @@ int32_t catalogGetTableDistVgroup(struct SCatalog* pCatalog, void *pRpc, const S
tNameGetFullDbName(pTableName, db);
CTG_ERR_JRET(ctgGetDBVgroup(pCatalog, pRpc, pMgmtEps, db, false, &dbCache));
// REMOEV THIS ....
// TODO REMOEV THIS ....
if (0 == tbMeta->vgId) {
SVgroupInfo vgroup = {0};
......@@ -1741,7 +1813,7 @@ int32_t catalogGetTableDistVgroup(struct SCatalog* pCatalog, void *pRpc, const S
tbMeta->vgId = vgroup.vgId;
}
// REMOVE THIS ....
// TODO REMOVE THIS ....
if (tbMeta->tableType == TSDB_SUPER_TABLE) {
CTG_ERR_JRET(ctgGetVgInfoFromDB(pCatalog, pRpc, pMgmtEps, dbCache->vgInfo, pVgroupList));
......@@ -1780,7 +1852,7 @@ _return:
vgList = NULL;
}
CTG_RET(code);
CTG_API_LEAVE(code);
}
......@@ -1788,10 +1860,12 @@ int32_t catalogGetTableHashVgroup(struct SCatalog *pCatalog, void *pTransporter,
SCtgDBCache* dbCache = NULL;
int32_t code = 0;
CTG_API_ENTER();
char db[TSDB_DB_FNAME_LEN] = {0};
tNameGetFullDbName(pTableName, db);
CTG_ERR_RET(ctgGetDBVgroup(pCatalog, pTransporter, pMgmtEps, db, false, &dbCache));
CTG_ERR_JRET(ctgGetDBVgroup(pCatalog, pTransporter, pMgmtEps, db, false, &dbCache));
CTG_ERR_JRET(ctgGetVgInfoFromHashValue(pCatalog, dbCache->vgInfo, pTableName, pVgroup));
......@@ -1802,7 +1876,7 @@ _return:
taosHashRelease(pCatalog->dbCache, dbCache);
}
CTG_RET(code);
CTG_API_LEAVE(code);
}
......@@ -1811,19 +1885,22 @@ int32_t catalogGetAllMeta(struct SCatalog* pCatalog, void *pTransporter, const S
CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
}
CTG_API_ENTER();
int32_t code = 0;
pRsp->pTableMeta = NULL;
if (pReq->pTableName) {
int32_t tbNum = (int32_t)taosArrayGetSize(pReq->pTableName);
if (tbNum <= 0) {
ctgError("empty table name list, tbNum:%d", tbNum);
CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
CTG_ERR_JRET(TSDB_CODE_CTG_INVALID_INPUT);
}
pRsp->pTableMeta = taosArrayInit(tbNum, POINTER_BYTES);
if (NULL == pRsp->pTableMeta) {
ctgError("taosArrayInit %d failed", tbNum);
CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
}
for (int32_t i = 0; i < tbNum; ++i) {
......@@ -1840,7 +1917,7 @@ int32_t catalogGetAllMeta(struct SCatalog* pCatalog, void *pTransporter, const S
}
}
return TSDB_CODE_SUCCESS;
CTG_API_LEAVE(TSDB_CODE_SUCCESS);
_return:
......@@ -1855,7 +1932,7 @@ _return:
pRsp->pTableMeta = NULL;
}
CTG_RET(code);
CTG_API_LEAVE(code);
}
int32_t catalogGetQnodeList(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, SArray* pQnodeList) {
......@@ -1863,9 +1940,11 @@ int32_t catalogGetQnodeList(struct SCatalog* pCatalog, void *pRpc, const SEpSet*
CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
}
CTG_API_ENTER();
//TODO
return TSDB_CODE_SUCCESS;
CTG_API_LEAVE(TSDB_CODE_SUCCESS);
}
int32_t catalogGetExpiredSTables(struct SCatalog* pCatalog, SSTableMetaVersion **stables, uint32_t *num) {
......@@ -1873,7 +1952,9 @@ int32_t catalogGetExpiredSTables(struct SCatalog* pCatalog, SSTableMetaVersion *
CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
}
CTG_RET(ctgMetaRentGet(&pCatalog->stbRent, (void **)stables, num, sizeof(SSTableMetaVersion)));
CTG_API_ENTER();
CTG_API_LEAVE(ctgMetaRentGet(&pCatalog->stbRent, (void **)stables, num, sizeof(SSTableMetaVersion)));
}
int32_t catalogGetExpiredDBs(struct SCatalog* pCatalog, SDbVgVersion **dbs, uint32_t *num) {
......@@ -1881,15 +1962,21 @@ int32_t catalogGetExpiredDBs(struct SCatalog* pCatalog, SDbVgVersion **dbs, uint
CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
}
CTG_RET(ctgMetaRentGet(&pCatalog->dbRent, (void **)dbs, num, sizeof(SDbVgVersion)));
CTG_API_ENTER();
CTG_API_LEAVE(ctgMetaRentGet(&pCatalog->dbRent, (void **)dbs, num, sizeof(SDbVgVersion)));
}
void catalogDestroy(void) {
if (NULL == ctgMgmt.pCluster) {
if (NULL == ctgMgmt.pCluster || atomic_load_8(&ctgMgmt.exit)) {
return;
}
atomic_store_8(&ctgMgmt.exit, true);
CTG_LOCK(CTG_WRITE, &ctgMgmt.lock);
SCatalog *pCatalog = NULL;
void *pIter = taosHashIterate(ctgMgmt.pCluster, NULL);
while (pIter) {
......@@ -1905,6 +1992,8 @@ void catalogDestroy(void) {
taosHashCleanup(ctgMgmt.pCluster);
ctgMgmt.pCluster = NULL;
CTG_UNLOCK(CTG_WRITE, &ctgMgmt.lock);
qInfo("catalog destroyed");
}
......
......@@ -108,6 +108,7 @@ void ctgTestInitLogFile() {
const int32_t maxLogFileNum = 10;
tsAsyncLog = 0;
qDebugFlag = 159;
char temp[128] = {0};
sprintf(temp, "%s/%s", tsLogDir, defaultLogFileNamePrefix);
......@@ -631,7 +632,7 @@ void *ctgTestGetDbVgroupThread(void *param) {
return NULL;
}
void *ctgTestSetDbVgroupThread(void *param) {
void *ctgTestSetSameDbVgroupThread(void *param) {
struct SCatalog *pCtg = (struct SCatalog *)param;
int32_t code = 0;
SDBVgroupInfo *dbVgroup = NULL;
......@@ -655,6 +656,32 @@ void *ctgTestSetDbVgroupThread(void *param) {
return NULL;
}
void *ctgTestSetDiffDbVgroupThread(void *param) {
struct SCatalog *pCtg = (struct SCatalog *)param;
int32_t code = 0;
SDBVgroupInfo *dbVgroup = NULL;
int32_t n = 0;
while (!ctgTestStop) {
ctgTestBuildDBVgroup(&dbVgroup);
code = catalogUpdateDBVgroup(pCtg, ctgTestDbname, ctgTestDbId++, dbVgroup);
if (code) {
assert(0);
}
if (ctgTestEnableSleep) {
usleep(rand() % 5);
}
if (++n % ctgTestPrintNum == 0) {
printf("Set:%d\n", n);
}
}
return NULL;
}
void *ctgTestGetCtableMetaThread(void *param) {
struct SCatalog *pCtg = (struct SCatalog *)param;
int32_t code = 0;
......@@ -720,6 +747,8 @@ TEST(tableMeta, normalTable) {
void *mockPointer = (void *)0x1;
SVgroupInfo vgInfo = {0};
ctgTestInitLogFile();
ctgTestSetPrepareDbVgroups();
initQueryModuleMsgHandle();
......@@ -1285,7 +1314,7 @@ TEST(dbVgroup, getSetDbVgroupCase) {
catalogDestroy();
}
TEST(multiThread, getSetDbVgroupCase) {
TEST(multiThread, getSetRmSameDbVgroup) {
struct SCatalog *pCtg = NULL;
void *mockPointer = (void *)0x1;
SVgroupInfo vgInfo = {0};
......@@ -1316,10 +1345,10 @@ TEST(multiThread, getSetDbVgroupCase) {
pthread_attr_init(&thattr);
pthread_t thread1, thread2;
pthread_create(&(thread1), &thattr, ctgTestSetDbVgroupThread, pCtg);
pthread_create(&(thread1), &thattr, ctgTestSetSameDbVgroupThread, pCtg);
sleep(1);
pthread_create(&(thread1), &thattr, ctgTestGetDbVgroupThread, pCtg);
pthread_create(&(thread2), &thattr, ctgTestGetDbVgroupThread, pCtg);
while (true) {
if (ctgTestDeadLoop) {
......@@ -1336,6 +1365,58 @@ TEST(multiThread, getSetDbVgroupCase) {
catalogDestroy();
}
TEST(multiThread, getSetRmDiffDbVgroup) {
struct SCatalog *pCtg = NULL;
void *mockPointer = (void *)0x1;
SVgroupInfo vgInfo = {0};
SVgroupInfo *pvgInfo = NULL;
SDBVgroupInfo dbVgroup = {0};
SArray *vgList = NULL;
ctgTestStop = false;
ctgTestInitLogFile();
ctgTestSetPrepareDbVgroups();
initQueryModuleMsgHandle();
// sendCreateDbMsg(pConn->pTransporter, &pConn->pAppInfo->mgmtEp.epSet);
int32_t code = catalogInit(NULL);
ASSERT_EQ(code, 0);
code = catalogGetHandle(ctgTestClusterId, &pCtg);
ASSERT_EQ(code, 0);
SName n = {.type = TSDB_TABLE_NAME_T, .acctId = 1};
strcpy(n.dbname, "db1");
strcpy(n.tname, ctgTestTablename);
pthread_attr_t thattr;
pthread_attr_init(&thattr);
pthread_t thread1, thread2;
pthread_create(&(thread1), &thattr, ctgTestSetDiffDbVgroupThread, pCtg);
sleep(1);
pthread_create(&(thread2), &thattr, ctgTestGetDbVgroupThread, pCtg);
while (true) {
if (ctgTestDeadLoop) {
sleep(1);
} else {
sleep(ctgTestMTRunSec);
break;
}
}
ctgTestStop = true;
sleep(1);
catalogDestroy();
}
TEST(multiThread, ctableMeta) {
struct SCatalog *pCtg = NULL;
......
......@@ -159,6 +159,7 @@ _return:
}
static int32_t queryConvertTableMetaMsg(STableMetaRsp* pMetaMsg) {
pMetaMsg->dbId = be64toh(pMetaMsg->dbId);
pMetaMsg->numOfTags = ntohl(pMetaMsg->numOfTags);
pMetaMsg->numOfColumns = ntohl(pMetaMsg->numOfColumns);
pMetaMsg->sversion = ntohl(pMetaMsg->sversion);
......@@ -258,6 +259,8 @@ int32_t queryProcessTableMetaRsp(void* output, char *msg, int32_t msgSize) {
}
strcpy(pOut->dbFName, pMetaMsg->dbFName);
pOut->dbId = pMetaMsg->dbId;
if (pMetaMsg->tableType == TSDB_CHILD_TABLE) {
SET_META_TYPE_BOTH_TABLE(pOut->metaType);
......
......@@ -419,6 +419,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_CTG_NOT_READY, "catalog is not ready"
TAOS_DEFINE_ERROR(TSDB_CODE_CTG_MEM_ERROR, "catalog memory error")
TAOS_DEFINE_ERROR(TSDB_CODE_CTG_SYS_ERROR, "catalog system error")
TAOS_DEFINE_ERROR(TSDB_CODE_CTG_DB_DROPPED, "Database is dropped")
TAOS_DEFINE_ERROR(TSDB_CODE_CTG_OUT_OF_SERVICE, "catalog is out of service")
//scheduler
TAOS_DEFINE_ERROR(TSDB_CODE_SCH_STATUS_ERROR, "scheduler status error")
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册