提交 d61aa9b6 编写于 作者: D dapan1121

feature/qnode

上级 19660534
......@@ -61,6 +61,18 @@ int32_t catalogGetHandle(const char *clusterId, struct SCatalog** catalogHandle)
int32_t catalogGetDBVgroupVersion(struct SCatalog* pCatalog, const char* dbName, int32_t* version);
/**
* Get a DB's all vgroup info.
* @param pCatalog (input, got with catalogGetHandle)
* @param pRpc (input, rpc object)
* @param pMgmtEps (input, mnode EPs)
* @param pDBName (input, full db name)
* @param forceUpdate (input, force update db vgroup info from mnode)
* @param pVgroupList (output, vgroup info list, element is SVgroupInfo, NEED to simply free the array by caller)
* @return error code
*/
int32_t catalogGetDBVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pDBName, int32_t forceUpdate, SArray** pVgroupList);
int32_t catalogUpdateDBVgroup(struct SCatalog* pCatalog, const char* dbName, SDBVgroupInfo* dbInfo);
/**
......
......@@ -75,6 +75,7 @@ typedef struct STableMeta {
typedef struct SDBVgroupInfo {
int32_t lock;
int32_t vgVersion;
int8_t hashMethod;
SHashObj *vgInfo; //key:vgId, value:SVgroupInfo
......
......@@ -144,6 +144,16 @@ void *taosHashGet(SHashObj *pHashObj, const void *key, size_t keyLen);
*/
void *taosHashGetClone(SHashObj *pHashObj, const void *key, size_t keyLen, void* destBuf);
/**
* Clone the result to interval allocated buffer
* @param pHashObj
* @param key
* @param keyLen
* @param destBuf
* @return
*/
void* taosHashGetCloneExt(SHashObj *pHashObj, const void *key, size_t keyLen, void (*fp)(void *), void** d, size_t *sz);
/**
* remove item with the specified key
* @param pHashObj
......@@ -200,6 +210,26 @@ void taosHashCancelIterate(SHashObj *pHashObj, void *p);
*/
int32_t taosHashGetKey(void *data, void** key, size_t* keyLen);
/**
* return the payload data with the specified key(reference number added)
*
* @param pHashObj
* @param key
* @param keyLen
* @return
*/
void* taosHashAcquire(SHashObj *pHashObj, const void *key, size_t keyLen);
/**
* release the prevous acquired obj
*
* @param pHashObj
* @param data
* @return
*/
void taosHashRelease(SHashObj *pHashObj, void *p);
#ifdef __cplusplus
}
#endif
......
......@@ -31,6 +31,11 @@ extern "C" {
#define CTG_DEFAULT_INVALID_VERSION (-1)
enum {
CTG_READ = 1,
CTG_WRITE,
};
typedef struct SVgroupListCache {
int32_t vgroupVersion;
SHashObj *cache; // key:vgId, value:SVgroupInfo
......@@ -41,6 +46,7 @@ typedef struct SDBVgroupCache {
} SDBVgroupCache;
typedef struct STableMetaCache {
SRWLatch stableLock;
SHashObj *cache; //key:fulltablename, value:STableMeta
SHashObj *stableCache; //key:suid, value:STableMeta*
} STableMetaCache;
......@@ -71,6 +77,31 @@ typedef uint32_t (*tableNameHashFp)(const char *, uint32_t);
#define CTG_ERR_LRET(c,...) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { ctgError(__VA_ARGS__); terrno = _code; return _code; } } while (0)
#define CTG_ERR_JRET(c) do { code = c; if (code != TSDB_CODE_SUCCESS) { terrno = code; goto _return; } } while (0)
#define CTG_LOCK(type, _lock) do { \
if (CTG_READ == (type)) { \
if ((*(_lock)) < 0) assert(0); \
taosRLockLatch(_lock); \
ctgDebug("CTG RLOCK%p, %s:%d", (_lock), __FILE__, __LINE__); \
} else { \
if ((*(_lock)) < 0) assert(0); \
taosWLockLatch(_lock); \
ctgDebug("CTG WLOCK%p, %s:%d", (_lock), __FILE__, __LINE__); \
} \
} while (0)
#define CTG_UNLOCK(type, _lock) do { \
if (CTG_READ == (type)) { \
if ((*(_lock)) <= 0) assert(0); \
taosRUnLockLatch(_lock); \
ctgDebug("CTG RULOCK%p, %s:%d", (_lock), __FILE__, __LINE__); \
} else { \
if ((*(_lock)) <= 0) assert(0); \
taosWUnLockLatch(_lock); \
ctgDebug("CTG WULOCK%p, %s:%d", (_lock), __FILE__, __LINE__); \
} \
} while (0)
#ifdef __cplusplus
}
#endif
......
......@@ -20,24 +20,28 @@
SCatalogMgmt ctgMgmt = {0};
int32_t ctgGetDBVgroupFromCache(struct SCatalog* pCatalog, const char *dbName, SDBVgroupInfo *dbInfo, int32_t *exist) {
int32_t ctgGetDBVgroupFromCache(struct SCatalog* pCatalog, const char *dbName, SDBVgroupInfo **dbInfo, bool *inCache) {
if (NULL == pCatalog->dbCache.cache) {
*exist = 0;
*inCache = false;
return TSDB_CODE_SUCCESS;
}
SDBVgroupInfo *info = taosHashGet(pCatalog->dbCache.cache, dbName, strlen(dbName));
SDBVgroupInfo *info = taosHashAcquire(pCatalog->dbCache.cache, dbName, strlen(dbName));
if (NULL == info) {
*exist = 0;
*inCache = false;
return TSDB_CODE_SUCCESS;
}
if (dbInfo) {
*dbInfo = *info;
CTG_LOCK(CTG_READ, &info->lock);
if (NULL == info->vgInfo) {
CTG_UNLOCK(CTG_READ, &info->lock);
*inCache = false;
return TSDB_CODE_SUCCESS;
}
*exist = 1;
*dbInfo = info;
*inCache = true;
return TSDB_CODE_SUCCESS;
}
......@@ -81,46 +85,51 @@ int32_t ctgGetTableMetaFromCache(struct SCatalog* pCatalog, const char *dbName,
snprintf(tbFullName, sizeof(tbFullName), "%s.%s", dbName, pTableName);
STableMeta *tbMeta = taosHashGet(pCatalog->tableCache.cache, tbFullName, strlen(tbFullName));
*pTableMeta = NULL;
size_t sz = 0;
STableMeta *tbMeta = taosHashGetCloneExt(pCatalog->tableCache.cache, tbFullName, strlen(tbFullName), NULL, (void **)pTableMeta, &sz);
if (NULL == tbMeta) {
if (NULL == *pTableMeta) {
*exist = 0;
return TSDB_CODE_SUCCESS;
}
if (tbMeta->tableType == TSDB_CHILD_TABLE) {
STableMeta **stbMeta = taosHashGet(pCatalog->tableCache.stableCache, &tbMeta->suid, sizeof(tbMeta->suid));
if (NULL == stbMeta || NULL == *stbMeta) {
*exist = 0;
return TSDB_CODE_SUCCESS;
}
if ((*stbMeta)->suid != tbMeta->suid) {
ctgError("stable cache error, expected suid:%"PRId64 ",actual suid:%"PRId64, tbMeta->suid, (*stbMeta)->suid);
CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
}
*exist = 1;
int32_t metaSize = sizeof(STableMeta) + ((*stbMeta)->tableInfo.numOfTags + (*stbMeta)->tableInfo.numOfColumns) * sizeof(SSchema);
*pTableMeta = calloc(1, metaSize);
if (NULL == *pTableMeta) {
ctgError("calloc size[%d] failed", metaSize);
CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
}
if (tbMeta->tableType != TSDB_CHILD_TABLE) {
return TSDB_CODE_SUCCESS;
}
CTG_LOCK(CTG_READ, &pCatalog->tableCache.stableLock);
STableMeta **stbMeta = taosHashGet(pCatalog->tableCache.stableCache, &tbMeta->suid, sizeof(tbMeta->suid));
if (NULL == stbMeta || NULL == *stbMeta) {
CTG_UNLOCK(CTG_READ, &pCatalog->tableCache.stableLock);
qError("no stable:%"PRIx64 " meta in cache", tbMeta->suid);
tfree(*pTableMeta);
*exist = 0;
return TSDB_CODE_SUCCESS;
}
memcpy(*pTableMeta, tbMeta, sizeof(SCTableMeta));
memcpy(&(*pTableMeta)->sversion, &(*stbMeta)->sversion, metaSize - sizeof(SCTableMeta));
} else {
int32_t metaSize = sizeof(STableMeta) + (tbMeta->tableInfo.numOfTags + tbMeta->tableInfo.numOfColumns) * sizeof(SSchema);
*pTableMeta = calloc(1, metaSize);
if (NULL == *pTableMeta) {
ctgError("calloc size[%d] failed", metaSize);
CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
}
if ((*stbMeta)->suid != tbMeta->suid) {
CTG_UNLOCK(CTG_READ, &pCatalog->tableCache.stableLock);
tfree(*pTableMeta);
ctgError("stable cache error, expected suid:%"PRId64 ",actual suid:%"PRId64, tbMeta->suid, (*stbMeta)->suid);
CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
}
memcpy(*pTableMeta, tbMeta, metaSize);
int32_t metaSize = sizeof(STableMeta) + ((*stbMeta)->tableInfo.numOfTags + (*stbMeta)->tableInfo.numOfColumns) * sizeof(SSchema);
*pTableMeta = realloc(*pTableMeta, metaSize);
if (NULL == *pTableMeta) {
CTG_UNLOCK(CTG_READ, &pCatalog->tableCache.stableLock);
ctgError("calloc size[%d] failed", metaSize);
CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
}
*exist = 1;
memcpy(&(*pTableMeta)->sversion, &(*stbMeta)->sversion, metaSize - sizeof(SCTableMeta));
CTG_UNLOCK(CTG_READ, &pCatalog->tableCache.stableLock);
return TSDB_CODE_SUCCESS;
}
......@@ -225,9 +234,11 @@ int32_t ctgGetHashFunction(int8_t hashMethod, tableNameHashFp *fp) {
int32_t ctgGetVgInfoFromDB(struct SCatalog *pCatalog, void *pRpc, const SEpSet *pMgmtEps, SDBVgroupInfo *dbInfo, SArray** vgroupList) {
SHashObj *vgroupHash = NULL;
SVgroupInfo *vgInfo = NULL;
SArray *vgList = NULL;
int32_t code = 0;
*vgroupList = taosArrayInit(taosHashGetSize(dbInfo->vgInfo), sizeof(SVgroupInfo));
if (NULL == *vgroupList) {
vgList = taosArrayInit(taosHashGetSize(dbInfo->vgInfo), sizeof(SVgroupInfo));
if (NULL == vgList) {
ctgError("taosArrayInit failed");
CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
}
......@@ -236,29 +247,44 @@ int32_t ctgGetVgInfoFromDB(struct SCatalog *pCatalog, void *pRpc, const SEpSet *
while (pIter) {
vgInfo = pIter;
if (NULL == taosArrayPush(*vgroupList, vgInfo)) {
if (NULL == taosArrayPush(vgList, vgInfo)) {
ctgError("taosArrayPush failed");
CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
}
pIter = taosHashIterate(dbInfo->vgInfo, pIter);
vgInfo = NULL;
}
*vgroupList = vgList;
vgList = NULL;
return TSDB_CODE_SUCCESS;
_return:
if (vgList) {
taosArrayDestroy(vgList);
}
CTG_RET(code);
}
int32_t ctgGetVgInfoFromHashValue(SDBVgroupInfo *dbInfo, const char *pDBName, const char *pTableName, SVgroupInfo *pVgroup) {
int32_t code = 0;
CTG_LOCK(CTG_READ, &dbInfo->lock);
int32_t vgNum = taosHashGetSize(dbInfo->vgInfo);
if (vgNum <= 0) {
ctgError("db[%s] vgroup cache invalid, vgroup number:%d", pDBName, vgNum);
CTG_ERR_RET(TSDB_CODE_TSC_DB_NOT_SELECTED);
CTG_ERR_JRET(TSDB_CODE_TSC_DB_NOT_SELECTED);
}
tableNameHashFp fp = NULL;
SVgroupInfo *vgInfo = NULL;
CTG_ERR_RET(ctgGetHashFunction(dbInfo->hashMethod, &fp));
CTG_ERR_JRET(ctgGetHashFunction(dbInfo->hashMethod, &fp));
char tbFullName[TSDB_TABLE_FNAME_LEN];
......@@ -279,19 +305,23 @@ int32_t ctgGetVgInfoFromHashValue(SDBVgroupInfo *dbInfo, const char *pDBName, co
if (NULL == vgInfo) {
ctgError("no hash range found for hashvalue[%u]", hashValue);
CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
}
*pVgroup = *vgInfo;
return TSDB_CODE_SUCCESS;
_return:
CTG_UNLOCK(CTG_READ, &dbInfo->lock);
CTG_RET(TSDB_CODE_SUCCESS);
}
int32_t ctgGetTableMetaImpl(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pDBName, const char* pTableName, bool forceUpdate, STableMeta** pTableMeta) {
if (NULL == pCatalog || NULL == pDBName || NULL == pRpc || NULL == pMgmtEps || NULL == pTableName || NULL == pTableMeta) {
CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
}
int32_t exist = 0;
if (!forceUpdate) {
......@@ -316,21 +346,23 @@ int32_t ctgGetTableMetaImpl(struct SCatalog* pCatalog, void *pRpc, const SEpSet*
int32_t ctgUpdateTableMetaCache(struct SCatalog *pCatalog, STableMetaOutput *output) {
int32_t code = 0;
if (output->metaNum != 1 && output->metaNum != 2) {
ctgError("invalid table meta number[%d] got from meta rsp", output->metaNum);
CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
}
if (NULL == output->tbMeta) {
ctgError("no valid table meta got from meta rsp");
CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
}
if (NULL == pCatalog->tableCache.cache) {
pCatalog->tableCache.cache = taosHashInit(ctgMgmt.cfg.maxTblCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
if (NULL == pCatalog->tableCache.cache) {
ctgError("init hash[%d] for tablemeta cache failed", ctgMgmt.cfg.maxTblCacheNum);
CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
}
}
......@@ -338,50 +370,59 @@ int32_t ctgUpdateTableMetaCache(struct SCatalog *pCatalog, STableMetaOutput *out
pCatalog->tableCache.stableCache = taosHashInit(ctgMgmt.cfg.maxTblCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), true, HASH_ENTRY_LOCK);
if (NULL == pCatalog->tableCache.stableCache) {
ctgError("init hash[%d] for stablemeta cache failed", ctgMgmt.cfg.maxTblCacheNum);
CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
}
}
if (output->metaNum == 2) {
if (taosHashPut(pCatalog->tableCache.cache, output->ctbFname, strlen(output->ctbFname), &output->ctbMeta, sizeof(output->ctbMeta)) != 0) {
ctgError("push ctable[%s] to table cache failed", output->ctbFname);
CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
}
if (TSDB_SUPER_TABLE != output->tbMeta->tableType) {
ctgError("table type[%d] error, expected:%d", output->tbMeta->tableType, TSDB_SUPER_TABLE);
CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
}
}
int32_t tbSize = sizeof(*output->tbMeta) + sizeof(SSchema) * (output->tbMeta->tableInfo.numOfColumns + output->tbMeta->tableInfo.numOfTags);
if (taosHashPut(pCatalog->tableCache.cache, output->tbFname, strlen(output->tbFname), output->tbMeta, tbSize) != 0) {
ctgError("push table[%s] to table cache failed", output->tbFname);
CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
}
if (TSDB_SUPER_TABLE == output->tbMeta->tableType) {
if (taosHashPut(pCatalog->tableCache.stableCache, &output->tbMeta->suid, sizeof(output->tbMeta->suid), &output->tbMeta, POINTER_BYTES) != 0) {
CTG_LOCK(CTG_WRITE, &pCatalog->tableCache.stableLock);
if (taosHashPut(pCatalog->tableCache.cache, output->tbFname, strlen(output->tbFname), output->tbMeta, tbSize) != 0) {
CTG_UNLOCK(CTG_WRITE, &pCatalog->tableCache.stableLock);
ctgError("push table[%s] to table cache failed", output->tbFname);
CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
}
STableMeta *tbMeta = taosHashGet(pCatalog->tableCache.cache, output->tbFname, strlen(output->tbFname));
if (taosHashPut(pCatalog->tableCache.stableCache, &output->tbMeta->suid, sizeof(output->tbMeta->suid), &tbMeta, POINTER_BYTES) != 0) {
CTG_UNLOCK(CTG_WRITE, &pCatalog->tableCache.stableLock);
ctgError("push suid[%"PRIu64"] to stable cache failed", output->tbMeta->suid);
CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
}
CTG_UNLOCK(CTG_WRITE, &pCatalog->tableCache.stableLock);
} else {
if (taosHashPut(pCatalog->tableCache.cache, output->tbFname, strlen(output->tbFname), output->tbMeta, tbSize) != 0) {
ctgError("push table[%s] to table cache failed", output->tbFname);
CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
}
}
_return:
tfree(output->tbMeta);
return TSDB_CODE_SUCCESS;
CTG_RET(code);
}
int32_t ctgGetDBVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* dbName, int32_t forceUpdate, SDBVgroupInfo* dbInfo) {
if (NULL == pCatalog || NULL == dbName || NULL == pRpc || NULL == pMgmtEps) {
CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
}
int32_t exist = 0;
int32_t ctgGetDBVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* dbName, int32_t forceUpdate, SDBVgroupInfo** dbInfo) {
bool inCache = false;
if (0 == forceUpdate) {
CTG_ERR_RET(ctgGetDBVgroupFromCache(pCatalog, dbName, dbInfo, &exist));
CTG_ERR_RET(ctgGetDBVgroupFromCache(pCatalog, dbName, dbInfo, &inCache));
if (exist) {
if (inCache) {
return TSDB_CODE_SUCCESS;
}
}
......@@ -397,9 +438,7 @@ int32_t ctgGetDBVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgm
CTG_ERR_RET(catalogUpdateDBVgroup(pCatalog, dbName, &DbOut.dbVgroup));
if (dbInfo) {
*dbInfo = DbOut.dbVgroup;
}
CTG_ERR_RET(ctgGetDBVgroupFromCache(pCatalog, dbName, dbInfo, &inCache));
return TSDB_CODE_SUCCESS;
}
......@@ -479,17 +518,68 @@ int32_t catalogGetDBVgroupVersion(struct SCatalog* pCatalog, const char* dbName,
return TSDB_CODE_SUCCESS;
}
SDBVgroupInfo * dbInfo = taosHashGet(pCatalog->dbCache.cache, dbName, strlen(dbName));
SDBVgroupInfo * dbInfo = taosHashAcquire(pCatalog->dbCache.cache, dbName, strlen(dbName));
if (NULL == dbInfo) {
*version = CTG_DEFAULT_INVALID_VERSION;
return TSDB_CODE_SUCCESS;
}
*version = dbInfo->vgVersion;
taosHashRelease(pCatalog->dbCache.cache, dbInfo);
return TSDB_CODE_SUCCESS;
}
int32_t catalogGetDBVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* dbName, int32_t forceUpdate, SArray** vgroupList) {
if (NULL == pCatalog || NULL == dbName || NULL == pRpc || NULL == pMgmtEps || NULL == vgroupList) {
CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
}
SDBVgroupInfo* db = NULL;
int32_t code = 0;
SVgroupInfo *vgInfo = NULL;
SArray *vgList = NULL;
CTG_ERR_JRET(ctgGetDBVgroup(pCatalog, pRpc, pMgmtEps, dbName, forceUpdate, &db));
vgList = taosArrayInit(taosHashGetSize(db->vgInfo), sizeof(SVgroupInfo));
if (NULL == vgList) {
ctgError("taosArrayInit failed");
CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
}
void *pIter = taosHashIterate(db->vgInfo, NULL);
while (pIter) {
vgInfo = pIter;
if (NULL == taosArrayPush(vgList, vgInfo)) {
ctgError("taosArrayPush failed");
CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
}
pIter = taosHashIterate(db->vgInfo, pIter);
vgInfo = NULL;
}
*vgroupList = vgList;
vgList = NULL;
_return:
if (db) {
CTG_UNLOCK(CTG_READ, &db->lock);
taosHashRelease(pCatalog->dbCache.cache, db);
}
if (vgList) {
taosArrayDestroy(vgList);
vgList = NULL;
}
CTG_RET(code);
}
int32_t catalogUpdateDBVgroup(struct SCatalog* pCatalog, const char* dbName, SDBVgroupInfo* dbInfo) {
if (NULL == pCatalog || NULL == dbName || NULL == dbInfo) {
CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
......@@ -497,13 +587,17 @@ int32_t catalogUpdateDBVgroup(struct SCatalog* pCatalog, const char* dbName, SDB
if (dbInfo->vgVersion < 0) {
if (pCatalog->dbCache.cache) {
SDBVgroupInfo *oldInfo = taosHashGet(pCatalog->dbCache.cache, dbName, strlen(dbName));
if (oldInfo && oldInfo->vgInfo) {
taosHashCleanup(oldInfo->vgInfo);
oldInfo->vgInfo = NULL;
SDBVgroupInfo *oldInfo = taosHashAcquire(pCatalog->dbCache.cache, dbName, strlen(dbName));
if (oldInfo) {
CTG_LOCK(CTG_WRITE, &oldInfo->lock);
if (oldInfo->vgInfo) {
taosHashCleanup(oldInfo->vgInfo);
oldInfo->vgInfo = NULL;
}
CTG_UNLOCK(CTG_WRITE, &oldInfo->lock);
taosHashRelease(pCatalog->dbCache.cache, oldInfo);
}
taosHashRemove(pCatalog->dbCache.cache, dbName, strlen(dbName));
}
ctgWarn("remove db [%s] from cache", dbName);
......@@ -517,10 +611,16 @@ int32_t catalogUpdateDBVgroup(struct SCatalog* pCatalog, const char* dbName, SDB
CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
}
} else {
SDBVgroupInfo *oldInfo = taosHashGet(pCatalog->dbCache.cache, dbName, strlen(dbName));
if (oldInfo && oldInfo->vgInfo) {
taosHashCleanup(oldInfo->vgInfo);
oldInfo->vgInfo = NULL;
SDBVgroupInfo *oldInfo = taosHashAcquire(pCatalog->dbCache.cache, dbName, strlen(dbName));
if (oldInfo) {
CTG_LOCK(CTG_WRITE, &oldInfo->lock);
if (oldInfo->vgInfo) {
taosHashCleanup(oldInfo->vgInfo);
oldInfo->vgInfo = NULL;
}
CTG_UNLOCK(CTG_WRITE, &oldInfo->lock);
taosHashRelease(pCatalog->dbCache.cache, oldInfo);
}
}
......@@ -573,60 +673,71 @@ int32_t catalogGetTableDistVgroup(struct SCatalog* pCatalog, void *pRpc, const S
STableMeta *tbMeta = NULL;
int32_t code = 0;
SVgroupInfo vgroupInfo = {0};
SDBVgroupInfo dbVgroup = {0};
SDBVgroupInfo* dbVgroup = NULL;
SArray *vgList = NULL;
*pVgroupList = NULL;
CTG_ERR_JRET(catalogGetTableMeta(pCatalog, pRpc, pMgmtEps, pDBName, pTableName, &tbMeta));
CTG_ERR_JRET(ctgGetDBVgroup(pCatalog, pRpc, pMgmtEps, pDBName, false, &dbVgroup));
if (tbMeta->tableType == TSDB_SUPER_TABLE) {
CTG_ERR_JRET(ctgGetVgInfoFromDB(pCatalog, pRpc, pMgmtEps, &dbVgroup, pVgroupList));
CTG_ERR_JRET(ctgGetVgInfoFromDB(pCatalog, pRpc, pMgmtEps, dbVgroup, pVgroupList));
} else {
int32_t vgId = tbMeta->vgId;
if (NULL == taosHashGetClone(dbVgroup.vgInfo, &vgId, sizeof(vgId), &vgroupInfo)) {
if (NULL == taosHashGetClone(dbVgroup->vgInfo, &vgId, sizeof(vgId), &vgroupInfo)) {
ctgError("vgId[%d] not found in vgroup list", vgId);
CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
}
*pVgroupList = taosArrayInit(1, sizeof(SVgroupInfo));
if (NULL == *pVgroupList) {
vgList = taosArrayInit(1, sizeof(SVgroupInfo));
if (NULL == vgList) {
ctgError("taosArrayInit failed");
CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
}
if (NULL == taosArrayPush(*pVgroupList, &vgroupInfo)) {
if (NULL == taosArrayPush(vgList, &vgroupInfo)) {
ctgError("push vgroupInfo to array failed");
CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
}
}
tfree(tbMeta);
return TSDB_CODE_SUCCESS;
*pVgroupList = vgList;
vgList = NULL;
}
_return:
tfree(tbMeta);
taosArrayDestroy(*pVgroupList);
*pVgroupList = NULL;
if (dbVgroup) {
CTG_UNLOCK(CTG_READ, &dbVgroup->lock);
taosHashRelease(pCatalog->dbCache.cache, dbVgroup);
}
if (vgList) {
taosArrayDestroy(vgList);
vgList = NULL;
}
CTG_RET(code);
}
int32_t catalogGetTableHashVgroup(struct SCatalog *pCatalog, void *pTransporter, const SEpSet *pMgmtEps, const char *pDBName, const char *pTableName, SVgroupInfo *pVgroup) {
SDBVgroupInfo dbInfo = {0};
SDBVgroupInfo* dbInfo = NULL;
int32_t code = 0;
int32_t vgId = 0;
CTG_ERR_RET(ctgGetDBVgroup(pCatalog, pTransporter, pMgmtEps, pDBName, false, &dbInfo));
if (dbInfo.vgVersion < 0 || NULL == dbInfo.vgInfo) {
ctgError("db[%s] vgroup cache invalid, vgroup version:%d, vgInfo:%p", pDBName, dbInfo.vgVersion, dbInfo.vgInfo);
CTG_ERR_RET(TSDB_CODE_TSC_DB_NOT_SELECTED);
}
CTG_ERR_JRET(ctgGetVgInfoFromHashValue(dbInfo, pDBName, pTableName, pVgroup));
CTG_ERR_RET(ctgGetVgInfoFromHashValue(&dbInfo, pDBName, pTableName, pVgroup));
_return:
if (dbInfo) {
CTG_UNLOCK(CTG_READ, &dbInfo->lock);
taosHashRelease(pCatalog->dbCache.cache, dbInfo);
}
CTG_RET(code);
}
......
......@@ -106,11 +106,11 @@ typedef struct SQWorkerMgmt {
if (QW_READ == (type)) { \
if ((*(_lock)) < 0) assert(0); \
taosRLockLatch(_lock); \
qDebug("RLOCK%p, %s:%d", (_lock), __FILE__, __LINE__); \
qDebug("QW RLOCK%p, %s:%d", (_lock), __FILE__, __LINE__); \
} else { \
if ((*(_lock)) < 0) assert(0); \
taosWLockLatch(_lock); \
qDebug("WLOCK%p, %s:%d", (_lock), __FILE__, __LINE__); \
qDebug("QW WLOCK%p, %s:%d", (_lock), __FILE__, __LINE__); \
} \
} while (0)
......@@ -118,11 +118,11 @@ typedef struct SQWorkerMgmt {
if (QW_READ == (type)) { \
if ((*(_lock)) <= 0) assert(0); \
taosRUnLockLatch(_lock); \
qDebug("RULOCK%p, %s:%d", (_lock), __FILE__, __LINE__); \
qDebug("QW RULOCK%p, %s:%d", (_lock), __FILE__, __LINE__); \
} else { \
if ((*(_lock)) <= 0) assert(0); \
taosWUnLockLatch(_lock); \
qDebug("WULOCK%p, %s:%d", (_lock), __FILE__, __LINE__); \
qDebug("QW WULOCK%p, %s:%d", (_lock), __FILE__, __LINE__); \
} \
} while (0)
......
......@@ -12,4 +12,6 @@ target_link_libraries(
PUBLIC zlib
PUBLIC lz4_static
PUBLIC api
)
\ No newline at end of file
)
ADD_SUBDIRECTORY(test)
......@@ -362,7 +362,7 @@ void* taosHashGetCloneExt(SHashObj *pHashObj, const void *key, size_t keyLen, vo
return data;
}
void* taosHashGetClone(SHashObj *pHashObj, const void *key, size_t keyLen, void* d) {
void* taosHashGetCloneImpl(SHashObj *pHashObj, const void *key, size_t keyLen, void* d, bool acquire) {
if (taosHashTableEmpty(pHashObj) || keyLen == 0 || key == NULL) {
return NULL;
}
......@@ -404,6 +404,10 @@ void* taosHashGetClone(SHashObj *pHashObj, const void *key, size_t keyLen, void*
memcpy(d, GET_HASH_NODE_DATA(pNode), pNode->dataLen);
}
if (acquire) {
pNode->count++;
}
data = GET_HASH_NODE_DATA(pNode);
}
......@@ -415,6 +419,15 @@ void* taosHashGetClone(SHashObj *pHashObj, const void *key, size_t keyLen, void*
return data;
}
void* taosHashGetClone(SHashObj *pHashObj, const void *key, size_t keyLen, void* d) {
return taosHashGetCloneImpl(pHashObj, key, keyLen, d, false);
}
void* taosHashAcquire(SHashObj *pHashObj, const void *key, size_t keyLen) {
return taosHashGetCloneImpl(pHashObj, key, keyLen, NULL, true);
}
int32_t taosHashRemove(SHashObj *pHashObj, const void *key, size_t keyLen/*, void *data, size_t dsize*/) {
if (pHashObj == NULL || taosHashTableEmpty(pHashObj)) {
return -1;
......@@ -919,3 +932,9 @@ void taosHashCancelIterate(SHashObj *pHashObj, void *p) {
__rd_unlock(&pHashObj->lock, pHashObj->type);
}
void taosHashRelease(SHashObj *pHashObj, void *p) {
taosHashCancelIterate(pHashObj, p);
}
......@@ -13,17 +13,22 @@ IF (HEADER_GTEST_INCLUDE_DIR AND (LIB_GTEST_STATIC_DIR OR LIB_GTEST_SHARED_DIR))
LIST(REMOVE_ITEM SOURCE_LIST ${CMAKE_CURRENT_SOURCE_DIR}/trefTest.c)
ADD_EXECUTABLE(utilTest ${SOURCE_LIST})
TARGET_LINK_LIBRARIES(utilTest tutil common os gtest pthread gcov)
TARGET_LINK_LIBRARIES(utilTest util common os gtest pthread gcov)
LIST(REMOVE_ITEM SOURCE_LIST ${CMAKE_CURRENT_SOURCE_DIR}/cacheTest.cpp)
LIST(APPEND SOURCE_LIST ${CMAKE_CURRENT_SOURCE_DIR}/hashTest.cpp)
ADD_EXECUTABLE(hashTest ${SOURCE_LIST})
TARGET_LINK_LIBRARIES(hashTest util common os gtest pthread gcov)
LIST(APPEND BIN_SRC ${CMAKE_CURRENT_SOURCE_DIR}/trefTest.c)
ADD_EXECUTABLE(trefTest ${BIN_SRC})
TARGET_LINK_LIBRARIES(trefTest common tutil)
TARGET_LINK_LIBRARIES(trefTest common util)
ENDIF()
#IF (TD_LINUX)
# ADD_EXECUTABLE(trefTest ./trefTest.c)
# TARGET_LINK_LIBRARIES(trefTest tutil common)
# TARGET_LINK_LIBRARIES(trefTest util common)
#ENDIF ()
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/util/inc)
......
......@@ -4,10 +4,15 @@
#include <taosdef.h>
#include <iostream>
#include "hash.h"
#include "thash.h"
#include "taos.h"
namespace {
typedef struct TESTSTRUCT {
char *p;
}TESTSTRUCT;
// the simple test code for basic operations
void simpleTest() {
SHashObj* hashTable = (SHashObj*) taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_ENTRY_LOCK);
......@@ -141,6 +146,52 @@ void invalidOperationTest() {
}
void acquireRleaseTest() {
SHashObj* hashTable = (SHashObj*) taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
ASSERT_EQ(taosHashGetSize(hashTable), 0);
int32_t key = 2;
int32_t code = 0;
int32_t num = 0;
TESTSTRUCT data = {0};
char *str1 = "abcdefg";
char *str2 = "aaaaaaa";
char *str3 = "123456789";
data.p = (char *)malloc(10);
strcpy(data.p, str1);
code = taosHashPut(hashTable, &key, sizeof(key), &data, sizeof(data));
ASSERT_EQ(code, 0);
TESTSTRUCT* pdata = (TESTSTRUCT*)taosHashAcquire(hashTable, &key, sizeof(key));
ASSERT_TRUE(pdata != nullptr);
ASSERT_TRUE(strcmp(pdata->p, str1) == 0);
code = taosHashRemove(hashTable, &key, sizeof(key));
ASSERT_EQ(code, 0);
ASSERT_TRUE(strcmp(pdata->p, str1) == 0);
num = taosHashGetSize(hashTable);
ASSERT_EQ(num, 1);
strcpy(pdata->p, str3);
data.p = (char *)malloc(10);
strcpy(data.p, str2);
code = taosHashPut(hashTable, &key, sizeof(key), &data, sizeof(data));
ASSERT_EQ(code, 0);
num = taosHashGetSize(hashTable);
ASSERT_EQ(num, 2);
printf("%s,expect:%s", pdata->p, str3);
ASSERT_TRUE(strcmp(pdata->p, str3) == 0);
taosHashRelease(hashTable, pdata);
num = taosHashGetSize(hashTable);
ASSERT_EQ(num, 1);
}
}
int main(int argc, char** argv) {
......@@ -153,4 +204,5 @@ TEST(testCase, hashTest) {
stringKeyTest();
noLockPerformanceTest();
multithreadsTest();
acquireRleaseTest();
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册