From d61aa9b6d1f2bac20218f2a17384938b57207409 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Wed, 29 Dec 2021 17:45:14 +0800 Subject: [PATCH] feature/qnode --- include/libs/catalog/catalog.h | 12 ++ include/libs/qcom/query.h | 1 + include/util/thash.h | 30 +++ source/libs/catalog/inc/catalogInt.h | 31 +++ source/libs/catalog/src/catalog.c | 311 ++++++++++++++++++--------- source/libs/qworker/inc/qworkerInt.h | 8 +- source/util/CMakeLists.txt | 4 +- source/util/src/thash.c | 21 +- source/util/test/CMakeLists.txt | 11 +- source/util/test/hashTest.cpp | 54 ++++- 10 files changed, 373 insertions(+), 110 deletions(-) diff --git a/include/libs/catalog/catalog.h b/include/libs/catalog/catalog.h index a9abf45c8d..d8f14f2a95 100644 --- a/include/libs/catalog/catalog.h +++ b/include/libs/catalog/catalog.h @@ -61,6 +61,18 @@ int32_t catalogGetHandle(const char *clusterId, struct SCatalog** catalogHandle) int32_t catalogGetDBVgroupVersion(struct SCatalog* pCatalog, const char* dbName, int32_t* version); +/** + * Get a DB's all vgroup info. + * @param pCatalog (input, got with catalogGetHandle) + * @param pRpc (input, rpc object) + * @param pMgmtEps (input, mnode EPs) + * @param pDBName (input, full db name) + * @param forceUpdate (input, force update db vgroup info from mnode) + * @param pVgroupList (output, vgroup info list, element is SVgroupInfo, NEED to simply free the array by caller) + * @return error code + */ +int32_t catalogGetDBVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pDBName, int32_t forceUpdate, SArray** pVgroupList); + int32_t catalogUpdateDBVgroup(struct SCatalog* pCatalog, const char* dbName, SDBVgroupInfo* dbInfo); /** diff --git a/include/libs/qcom/query.h b/include/libs/qcom/query.h index ea131bbbf2..c8e81a2e94 100644 --- a/include/libs/qcom/query.h +++ b/include/libs/qcom/query.h @@ -75,6 +75,7 @@ typedef struct STableMeta { typedef struct SDBVgroupInfo { + int32_t lock; int32_t vgVersion; int8_t hashMethod; SHashObj *vgInfo; //key:vgId, value:SVgroupInfo diff --git a/include/util/thash.h b/include/util/thash.h index d0247a0729..f38ab50893 100644 --- a/include/util/thash.h +++ b/include/util/thash.h @@ -144,6 +144,16 @@ void *taosHashGet(SHashObj *pHashObj, const void *key, size_t keyLen); */ void *taosHashGetClone(SHashObj *pHashObj, const void *key, size_t keyLen, void* destBuf); +/** + * Clone the result to interval allocated buffer + * @param pHashObj + * @param key + * @param keyLen + * @param destBuf + * @return + */ +void* taosHashGetCloneExt(SHashObj *pHashObj, const void *key, size_t keyLen, void (*fp)(void *), void** d, size_t *sz); + /** * remove item with the specified key * @param pHashObj @@ -200,6 +210,26 @@ void taosHashCancelIterate(SHashObj *pHashObj, void *p); */ int32_t taosHashGetKey(void *data, void** key, size_t* keyLen); +/** + * return the payload data with the specified key(reference number added) + * + * @param pHashObj + * @param key + * @param keyLen + * @return + */ +void* taosHashAcquire(SHashObj *pHashObj, const void *key, size_t keyLen); + +/** + * release the prevous acquired obj + * + * @param pHashObj + * @param data + * @return + */ +void taosHashRelease(SHashObj *pHashObj, void *p); + + #ifdef __cplusplus } #endif diff --git a/source/libs/catalog/inc/catalogInt.h b/source/libs/catalog/inc/catalogInt.h index 40943849f1..31b5939463 100644 --- a/source/libs/catalog/inc/catalogInt.h +++ b/source/libs/catalog/inc/catalogInt.h @@ -31,6 +31,11 @@ extern "C" { #define CTG_DEFAULT_INVALID_VERSION (-1) +enum { + CTG_READ = 1, + CTG_WRITE, +}; + typedef struct SVgroupListCache { int32_t vgroupVersion; SHashObj *cache; // key:vgId, value:SVgroupInfo @@ -41,6 +46,7 @@ typedef struct SDBVgroupCache { } SDBVgroupCache; typedef struct STableMetaCache { + SRWLatch stableLock; SHashObj *cache; //key:fulltablename, value:STableMeta SHashObj *stableCache; //key:suid, value:STableMeta* } STableMetaCache; @@ -71,6 +77,31 @@ typedef uint32_t (*tableNameHashFp)(const char *, uint32_t); #define CTG_ERR_LRET(c,...) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { ctgError(__VA_ARGS__); terrno = _code; return _code; } } while (0) #define CTG_ERR_JRET(c) do { code = c; if (code != TSDB_CODE_SUCCESS) { terrno = code; goto _return; } } while (0) +#define CTG_LOCK(type, _lock) do { \ + if (CTG_READ == (type)) { \ + if ((*(_lock)) < 0) assert(0); \ + taosRLockLatch(_lock); \ + ctgDebug("CTG RLOCK%p, %s:%d", (_lock), __FILE__, __LINE__); \ + } else { \ + if ((*(_lock)) < 0) assert(0); \ + taosWLockLatch(_lock); \ + ctgDebug("CTG WLOCK%p, %s:%d", (_lock), __FILE__, __LINE__); \ + } \ +} while (0) + +#define CTG_UNLOCK(type, _lock) do { \ + if (CTG_READ == (type)) { \ + if ((*(_lock)) <= 0) assert(0); \ + taosRUnLockLatch(_lock); \ + ctgDebug("CTG RULOCK%p, %s:%d", (_lock), __FILE__, __LINE__); \ + } else { \ + if ((*(_lock)) <= 0) assert(0); \ + taosWUnLockLatch(_lock); \ + ctgDebug("CTG WULOCK%p, %s:%d", (_lock), __FILE__, __LINE__); \ + } \ +} while (0) + + #ifdef __cplusplus } #endif diff --git a/source/libs/catalog/src/catalog.c b/source/libs/catalog/src/catalog.c index e067a7597a..57a8882f4c 100644 --- a/source/libs/catalog/src/catalog.c +++ b/source/libs/catalog/src/catalog.c @@ -20,24 +20,28 @@ SCatalogMgmt ctgMgmt = {0}; -int32_t ctgGetDBVgroupFromCache(struct SCatalog* pCatalog, const char *dbName, SDBVgroupInfo *dbInfo, int32_t *exist) { +int32_t ctgGetDBVgroupFromCache(struct SCatalog* pCatalog, const char *dbName, SDBVgroupInfo **dbInfo, bool *inCache) { if (NULL == pCatalog->dbCache.cache) { - *exist = 0; + *inCache = false; return TSDB_CODE_SUCCESS; } - SDBVgroupInfo *info = taosHashGet(pCatalog->dbCache.cache, dbName, strlen(dbName)); + SDBVgroupInfo *info = taosHashAcquire(pCatalog->dbCache.cache, dbName, strlen(dbName)); if (NULL == info) { - *exist = 0; + *inCache = false; return TSDB_CODE_SUCCESS; } - if (dbInfo) { - *dbInfo = *info; + CTG_LOCK(CTG_READ, &info->lock); + if (NULL == info->vgInfo) { + CTG_UNLOCK(CTG_READ, &info->lock); + *inCache = false; + return TSDB_CODE_SUCCESS; } - *exist = 1; + *dbInfo = info; + *inCache = true; return TSDB_CODE_SUCCESS; } @@ -81,46 +85,51 @@ int32_t ctgGetTableMetaFromCache(struct SCatalog* pCatalog, const char *dbName, snprintf(tbFullName, sizeof(tbFullName), "%s.%s", dbName, pTableName); - STableMeta *tbMeta = taosHashGet(pCatalog->tableCache.cache, tbFullName, strlen(tbFullName)); + *pTableMeta = NULL; + + size_t sz = 0; + STableMeta *tbMeta = taosHashGetCloneExt(pCatalog->tableCache.cache, tbFullName, strlen(tbFullName), NULL, (void **)pTableMeta, &sz); - if (NULL == tbMeta) { + if (NULL == *pTableMeta) { *exist = 0; return TSDB_CODE_SUCCESS; } - if (tbMeta->tableType == TSDB_CHILD_TABLE) { - STableMeta **stbMeta = taosHashGet(pCatalog->tableCache.stableCache, &tbMeta->suid, sizeof(tbMeta->suid)); - if (NULL == stbMeta || NULL == *stbMeta) { - *exist = 0; - return TSDB_CODE_SUCCESS; - } - - if ((*stbMeta)->suid != tbMeta->suid) { - ctgError("stable cache error, expected suid:%"PRId64 ",actual suid:%"PRId64, tbMeta->suid, (*stbMeta)->suid); - CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR); - } + *exist = 1; - int32_t metaSize = sizeof(STableMeta) + ((*stbMeta)->tableInfo.numOfTags + (*stbMeta)->tableInfo.numOfColumns) * sizeof(SSchema); - *pTableMeta = calloc(1, metaSize); - if (NULL == *pTableMeta) { - ctgError("calloc size[%d] failed", metaSize); - CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR); - } + if (tbMeta->tableType != TSDB_CHILD_TABLE) { + return TSDB_CODE_SUCCESS; + } + + CTG_LOCK(CTG_READ, &pCatalog->tableCache.stableLock); + + STableMeta **stbMeta = taosHashGet(pCatalog->tableCache.stableCache, &tbMeta->suid, sizeof(tbMeta->suid)); + if (NULL == stbMeta || NULL == *stbMeta) { + CTG_UNLOCK(CTG_READ, &pCatalog->tableCache.stableLock); + qError("no stable:%"PRIx64 " meta in cache", tbMeta->suid); + tfree(*pTableMeta); + *exist = 0; + return TSDB_CODE_SUCCESS; + } - memcpy(*pTableMeta, tbMeta, sizeof(SCTableMeta)); - memcpy(&(*pTableMeta)->sversion, &(*stbMeta)->sversion, metaSize - sizeof(SCTableMeta)); - } else { - int32_t metaSize = sizeof(STableMeta) + (tbMeta->tableInfo.numOfTags + tbMeta->tableInfo.numOfColumns) * sizeof(SSchema); - *pTableMeta = calloc(1, metaSize); - if (NULL == *pTableMeta) { - ctgError("calloc size[%d] failed", metaSize); - CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR); - } + if ((*stbMeta)->suid != tbMeta->suid) { + CTG_UNLOCK(CTG_READ, &pCatalog->tableCache.stableLock); + tfree(*pTableMeta); + ctgError("stable cache error, expected suid:%"PRId64 ",actual suid:%"PRId64, tbMeta->suid, (*stbMeta)->suid); + CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR); + } - memcpy(*pTableMeta, tbMeta, metaSize); + int32_t metaSize = sizeof(STableMeta) + ((*stbMeta)->tableInfo.numOfTags + (*stbMeta)->tableInfo.numOfColumns) * sizeof(SSchema); + *pTableMeta = realloc(*pTableMeta, metaSize); + if (NULL == *pTableMeta) { + CTG_UNLOCK(CTG_READ, &pCatalog->tableCache.stableLock); + ctgError("calloc size[%d] failed", metaSize); + CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR); } - *exist = 1; + memcpy(&(*pTableMeta)->sversion, &(*stbMeta)->sversion, metaSize - sizeof(SCTableMeta)); + + CTG_UNLOCK(CTG_READ, &pCatalog->tableCache.stableLock); return TSDB_CODE_SUCCESS; } @@ -225,9 +234,11 @@ int32_t ctgGetHashFunction(int8_t hashMethod, tableNameHashFp *fp) { int32_t ctgGetVgInfoFromDB(struct SCatalog *pCatalog, void *pRpc, const SEpSet *pMgmtEps, SDBVgroupInfo *dbInfo, SArray** vgroupList) { SHashObj *vgroupHash = NULL; SVgroupInfo *vgInfo = NULL; + SArray *vgList = NULL; + int32_t code = 0; - *vgroupList = taosArrayInit(taosHashGetSize(dbInfo->vgInfo), sizeof(SVgroupInfo)); - if (NULL == *vgroupList) { + vgList = taosArrayInit(taosHashGetSize(dbInfo->vgInfo), sizeof(SVgroupInfo)); + if (NULL == vgList) { ctgError("taosArrayInit failed"); CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR); } @@ -236,29 +247,44 @@ int32_t ctgGetVgInfoFromDB(struct SCatalog *pCatalog, void *pRpc, const SEpSet * while (pIter) { vgInfo = pIter; - if (NULL == taosArrayPush(*vgroupList, vgInfo)) { + if (NULL == taosArrayPush(vgList, vgInfo)) { ctgError("taosArrayPush failed"); - CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR); + CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR); } pIter = taosHashIterate(dbInfo->vgInfo, pIter); vgInfo = NULL; } + *vgroupList = vgList; + vgList = NULL; + return TSDB_CODE_SUCCESS; + +_return: + + if (vgList) { + taosArrayDestroy(vgList); + } + + CTG_RET(code); } int32_t ctgGetVgInfoFromHashValue(SDBVgroupInfo *dbInfo, const char *pDBName, const char *pTableName, SVgroupInfo *pVgroup) { + int32_t code = 0; + + CTG_LOCK(CTG_READ, &dbInfo->lock); + int32_t vgNum = taosHashGetSize(dbInfo->vgInfo); if (vgNum <= 0) { ctgError("db[%s] vgroup cache invalid, vgroup number:%d", pDBName, vgNum); - CTG_ERR_RET(TSDB_CODE_TSC_DB_NOT_SELECTED); + CTG_ERR_JRET(TSDB_CODE_TSC_DB_NOT_SELECTED); } tableNameHashFp fp = NULL; SVgroupInfo *vgInfo = NULL; - CTG_ERR_RET(ctgGetHashFunction(dbInfo->hashMethod, &fp)); + CTG_ERR_JRET(ctgGetHashFunction(dbInfo->hashMethod, &fp)); char tbFullName[TSDB_TABLE_FNAME_LEN]; @@ -279,19 +305,23 @@ int32_t ctgGetVgInfoFromHashValue(SDBVgroupInfo *dbInfo, const char *pDBName, co if (NULL == vgInfo) { ctgError("no hash range found for hashvalue[%u]", hashValue); - CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR); + CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR); } *pVgroup = *vgInfo; - return TSDB_CODE_SUCCESS; +_return: + + CTG_UNLOCK(CTG_READ, &dbInfo->lock); + + CTG_RET(TSDB_CODE_SUCCESS); } int32_t ctgGetTableMetaImpl(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pDBName, const char* pTableName, bool forceUpdate, STableMeta** pTableMeta) { if (NULL == pCatalog || NULL == pDBName || NULL == pRpc || NULL == pMgmtEps || NULL == pTableName || NULL == pTableMeta) { CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT); } - + int32_t exist = 0; if (!forceUpdate) { @@ -316,21 +346,23 @@ int32_t ctgGetTableMetaImpl(struct SCatalog* pCatalog, void *pRpc, const SEpSet* int32_t ctgUpdateTableMetaCache(struct SCatalog *pCatalog, STableMetaOutput *output) { + int32_t code = 0; + if (output->metaNum != 1 && output->metaNum != 2) { ctgError("invalid table meta number[%d] got from meta rsp", output->metaNum); - CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR); + CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR); } if (NULL == output->tbMeta) { ctgError("no valid table meta got from meta rsp"); - CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR); + CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR); } if (NULL == pCatalog->tableCache.cache) { pCatalog->tableCache.cache = taosHashInit(ctgMgmt.cfg.maxTblCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); if (NULL == pCatalog->tableCache.cache) { ctgError("init hash[%d] for tablemeta cache failed", ctgMgmt.cfg.maxTblCacheNum); - CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR); + CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR); } } @@ -338,50 +370,59 @@ int32_t ctgUpdateTableMetaCache(struct SCatalog *pCatalog, STableMetaOutput *out pCatalog->tableCache.stableCache = taosHashInit(ctgMgmt.cfg.maxTblCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), true, HASH_ENTRY_LOCK); if (NULL == pCatalog->tableCache.stableCache) { ctgError("init hash[%d] for stablemeta cache failed", ctgMgmt.cfg.maxTblCacheNum); - CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR); + CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR); } } if (output->metaNum == 2) { if (taosHashPut(pCatalog->tableCache.cache, output->ctbFname, strlen(output->ctbFname), &output->ctbMeta, sizeof(output->ctbMeta)) != 0) { ctgError("push ctable[%s] to table cache failed", output->ctbFname); - CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR); + CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR); } if (TSDB_SUPER_TABLE != output->tbMeta->tableType) { ctgError("table type[%d] error, expected:%d", output->tbMeta->tableType, TSDB_SUPER_TABLE); - CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR); + CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR); } } int32_t tbSize = sizeof(*output->tbMeta) + sizeof(SSchema) * (output->tbMeta->tableInfo.numOfColumns + output->tbMeta->tableInfo.numOfTags); - if (taosHashPut(pCatalog->tableCache.cache, output->tbFname, strlen(output->tbFname), output->tbMeta, tbSize) != 0) { - ctgError("push table[%s] to table cache failed", output->tbFname); - CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR); - } if (TSDB_SUPER_TABLE == output->tbMeta->tableType) { - if (taosHashPut(pCatalog->tableCache.stableCache, &output->tbMeta->suid, sizeof(output->tbMeta->suid), &output->tbMeta, POINTER_BYTES) != 0) { + CTG_LOCK(CTG_WRITE, &pCatalog->tableCache.stableLock); + if (taosHashPut(pCatalog->tableCache.cache, output->tbFname, strlen(output->tbFname), output->tbMeta, tbSize) != 0) { + CTG_UNLOCK(CTG_WRITE, &pCatalog->tableCache.stableLock); + ctgError("push table[%s] to table cache failed", output->tbFname); + CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR); + } + + STableMeta *tbMeta = taosHashGet(pCatalog->tableCache.cache, output->tbFname, strlen(output->tbFname)); + if (taosHashPut(pCatalog->tableCache.stableCache, &output->tbMeta->suid, sizeof(output->tbMeta->suid), &tbMeta, POINTER_BYTES) != 0) { + CTG_UNLOCK(CTG_WRITE, &pCatalog->tableCache.stableLock); ctgError("push suid[%"PRIu64"] to stable cache failed", output->tbMeta->suid); - CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR); + CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR); + } + CTG_UNLOCK(CTG_WRITE, &pCatalog->tableCache.stableLock); + } else { + if (taosHashPut(pCatalog->tableCache.cache, output->tbFname, strlen(output->tbFname), output->tbMeta, tbSize) != 0) { + ctgError("push table[%s] to table cache failed", output->tbFname); + CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR); } } + +_return: + tfree(output->tbMeta); - return TSDB_CODE_SUCCESS; + CTG_RET(code); } -int32_t ctgGetDBVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* dbName, int32_t forceUpdate, SDBVgroupInfo* dbInfo) { - if (NULL == pCatalog || NULL == dbName || NULL == pRpc || NULL == pMgmtEps) { - CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT); - } - - int32_t exist = 0; - +int32_t ctgGetDBVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* dbName, int32_t forceUpdate, SDBVgroupInfo** dbInfo) { + bool inCache = false; if (0 == forceUpdate) { - CTG_ERR_RET(ctgGetDBVgroupFromCache(pCatalog, dbName, dbInfo, &exist)); + CTG_ERR_RET(ctgGetDBVgroupFromCache(pCatalog, dbName, dbInfo, &inCache)); - if (exist) { + if (inCache) { return TSDB_CODE_SUCCESS; } } @@ -397,9 +438,7 @@ int32_t ctgGetDBVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgm CTG_ERR_RET(catalogUpdateDBVgroup(pCatalog, dbName, &DbOut.dbVgroup)); - if (dbInfo) { - *dbInfo = DbOut.dbVgroup; - } + CTG_ERR_RET(ctgGetDBVgroupFromCache(pCatalog, dbName, dbInfo, &inCache)); return TSDB_CODE_SUCCESS; } @@ -479,17 +518,68 @@ int32_t catalogGetDBVgroupVersion(struct SCatalog* pCatalog, const char* dbName, return TSDB_CODE_SUCCESS; } - SDBVgroupInfo * dbInfo = taosHashGet(pCatalog->dbCache.cache, dbName, strlen(dbName)); + SDBVgroupInfo * dbInfo = taosHashAcquire(pCatalog->dbCache.cache, dbName, strlen(dbName)); if (NULL == dbInfo) { *version = CTG_DEFAULT_INVALID_VERSION; return TSDB_CODE_SUCCESS; } *version = dbInfo->vgVersion; + taosHashRelease(pCatalog->dbCache.cache, dbInfo); return TSDB_CODE_SUCCESS; } +int32_t catalogGetDBVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* dbName, int32_t forceUpdate, SArray** vgroupList) { + if (NULL == pCatalog || NULL == dbName || NULL == pRpc || NULL == pMgmtEps || NULL == vgroupList) { + CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT); + } + + SDBVgroupInfo* db = NULL; + int32_t code = 0; + SVgroupInfo *vgInfo = NULL; + SArray *vgList = NULL; + + CTG_ERR_JRET(ctgGetDBVgroup(pCatalog, pRpc, pMgmtEps, dbName, forceUpdate, &db)); + + vgList = taosArrayInit(taosHashGetSize(db->vgInfo), sizeof(SVgroupInfo)); + if (NULL == vgList) { + ctgError("taosArrayInit failed"); + CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR); + } + + void *pIter = taosHashIterate(db->vgInfo, NULL); + while (pIter) { + vgInfo = pIter; + + if (NULL == taosArrayPush(vgList, vgInfo)) { + ctgError("taosArrayPush failed"); + CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR); + } + + pIter = taosHashIterate(db->vgInfo, pIter); + vgInfo = NULL; + } + + *vgroupList = vgList; + vgList = NULL; + +_return: + + if (db) { + CTG_UNLOCK(CTG_READ, &db->lock); + taosHashRelease(pCatalog->dbCache.cache, db); + } + + if (vgList) { + taosArrayDestroy(vgList); + vgList = NULL; + } + + CTG_RET(code); +} + + int32_t catalogUpdateDBVgroup(struct SCatalog* pCatalog, const char* dbName, SDBVgroupInfo* dbInfo) { if (NULL == pCatalog || NULL == dbName || NULL == dbInfo) { CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT); @@ -497,13 +587,17 @@ int32_t catalogUpdateDBVgroup(struct SCatalog* pCatalog, const char* dbName, SDB if (dbInfo->vgVersion < 0) { if (pCatalog->dbCache.cache) { - SDBVgroupInfo *oldInfo = taosHashGet(pCatalog->dbCache.cache, dbName, strlen(dbName)); - if (oldInfo && oldInfo->vgInfo) { - taosHashCleanup(oldInfo->vgInfo); - oldInfo->vgInfo = NULL; + SDBVgroupInfo *oldInfo = taosHashAcquire(pCatalog->dbCache.cache, dbName, strlen(dbName)); + if (oldInfo) { + CTG_LOCK(CTG_WRITE, &oldInfo->lock); + if (oldInfo->vgInfo) { + taosHashCleanup(oldInfo->vgInfo); + oldInfo->vgInfo = NULL; + } + CTG_UNLOCK(CTG_WRITE, &oldInfo->lock); + + taosHashRelease(pCatalog->dbCache.cache, oldInfo); } - - taosHashRemove(pCatalog->dbCache.cache, dbName, strlen(dbName)); } ctgWarn("remove db [%s] from cache", dbName); @@ -517,10 +611,16 @@ int32_t catalogUpdateDBVgroup(struct SCatalog* pCatalog, const char* dbName, SDB CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR); } } else { - SDBVgroupInfo *oldInfo = taosHashGet(pCatalog->dbCache.cache, dbName, strlen(dbName)); - if (oldInfo && oldInfo->vgInfo) { - taosHashCleanup(oldInfo->vgInfo); - oldInfo->vgInfo = NULL; + SDBVgroupInfo *oldInfo = taosHashAcquire(pCatalog->dbCache.cache, dbName, strlen(dbName)); + if (oldInfo) { + CTG_LOCK(CTG_WRITE, &oldInfo->lock); + if (oldInfo->vgInfo) { + taosHashCleanup(oldInfo->vgInfo); + oldInfo->vgInfo = NULL; + } + CTG_UNLOCK(CTG_WRITE, &oldInfo->lock); + + taosHashRelease(pCatalog->dbCache.cache, oldInfo); } } @@ -573,60 +673,71 @@ int32_t catalogGetTableDistVgroup(struct SCatalog* pCatalog, void *pRpc, const S STableMeta *tbMeta = NULL; int32_t code = 0; SVgroupInfo vgroupInfo = {0}; - SDBVgroupInfo dbVgroup = {0}; + SDBVgroupInfo* dbVgroup = NULL; + SArray *vgList = NULL; + + *pVgroupList = NULL; CTG_ERR_JRET(catalogGetTableMeta(pCatalog, pRpc, pMgmtEps, pDBName, pTableName, &tbMeta)); CTG_ERR_JRET(ctgGetDBVgroup(pCatalog, pRpc, pMgmtEps, pDBName, false, &dbVgroup)); if (tbMeta->tableType == TSDB_SUPER_TABLE) { - CTG_ERR_JRET(ctgGetVgInfoFromDB(pCatalog, pRpc, pMgmtEps, &dbVgroup, pVgroupList)); + CTG_ERR_JRET(ctgGetVgInfoFromDB(pCatalog, pRpc, pMgmtEps, dbVgroup, pVgroupList)); } else { int32_t vgId = tbMeta->vgId; - if (NULL == taosHashGetClone(dbVgroup.vgInfo, &vgId, sizeof(vgId), &vgroupInfo)) { + if (NULL == taosHashGetClone(dbVgroup->vgInfo, &vgId, sizeof(vgId), &vgroupInfo)) { ctgError("vgId[%d] not found in vgroup list", vgId); CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR); } - *pVgroupList = taosArrayInit(1, sizeof(SVgroupInfo)); - if (NULL == *pVgroupList) { + vgList = taosArrayInit(1, sizeof(SVgroupInfo)); + if (NULL == vgList) { ctgError("taosArrayInit failed"); CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR); } - if (NULL == taosArrayPush(*pVgroupList, &vgroupInfo)) { + if (NULL == taosArrayPush(vgList, &vgroupInfo)) { ctgError("push vgroupInfo to array failed"); CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR); } - } - - tfree(tbMeta); - return TSDB_CODE_SUCCESS; + *pVgroupList = vgList; + vgList = NULL; + } _return: tfree(tbMeta); - taosArrayDestroy(*pVgroupList); - *pVgroupList = NULL; + if (dbVgroup) { + CTG_UNLOCK(CTG_READ, &dbVgroup->lock); + taosHashRelease(pCatalog->dbCache.cache, dbVgroup); + } + + if (vgList) { + taosArrayDestroy(vgList); + vgList = NULL; + } CTG_RET(code); } int32_t catalogGetTableHashVgroup(struct SCatalog *pCatalog, void *pTransporter, const SEpSet *pMgmtEps, const char *pDBName, const char *pTableName, SVgroupInfo *pVgroup) { - SDBVgroupInfo dbInfo = {0}; + SDBVgroupInfo* dbInfo = NULL; int32_t code = 0; int32_t vgId = 0; CTG_ERR_RET(ctgGetDBVgroup(pCatalog, pTransporter, pMgmtEps, pDBName, false, &dbInfo)); - if (dbInfo.vgVersion < 0 || NULL == dbInfo.vgInfo) { - ctgError("db[%s] vgroup cache invalid, vgroup version:%d, vgInfo:%p", pDBName, dbInfo.vgVersion, dbInfo.vgInfo); - CTG_ERR_RET(TSDB_CODE_TSC_DB_NOT_SELECTED); - } + CTG_ERR_JRET(ctgGetVgInfoFromHashValue(dbInfo, pDBName, pTableName, pVgroup)); - CTG_ERR_RET(ctgGetVgInfoFromHashValue(&dbInfo, pDBName, pTableName, pVgroup)); +_return: + + if (dbInfo) { + CTG_UNLOCK(CTG_READ, &dbInfo->lock); + taosHashRelease(pCatalog->dbCache.cache, dbInfo); + } CTG_RET(code); } diff --git a/source/libs/qworker/inc/qworkerInt.h b/source/libs/qworker/inc/qworkerInt.h index 07ca91729d..91927c370a 100644 --- a/source/libs/qworker/inc/qworkerInt.h +++ b/source/libs/qworker/inc/qworkerInt.h @@ -106,11 +106,11 @@ typedef struct SQWorkerMgmt { if (QW_READ == (type)) { \ if ((*(_lock)) < 0) assert(0); \ taosRLockLatch(_lock); \ - qDebug("RLOCK%p, %s:%d", (_lock), __FILE__, __LINE__); \ + qDebug("QW RLOCK%p, %s:%d", (_lock), __FILE__, __LINE__); \ } else { \ if ((*(_lock)) < 0) assert(0); \ taosWLockLatch(_lock); \ - qDebug("WLOCK%p, %s:%d", (_lock), __FILE__, __LINE__); \ + qDebug("QW WLOCK%p, %s:%d", (_lock), __FILE__, __LINE__); \ } \ } while (0) @@ -118,11 +118,11 @@ typedef struct SQWorkerMgmt { if (QW_READ == (type)) { \ if ((*(_lock)) <= 0) assert(0); \ taosRUnLockLatch(_lock); \ - qDebug("RULOCK%p, %s:%d", (_lock), __FILE__, __LINE__); \ + qDebug("QW RULOCK%p, %s:%d", (_lock), __FILE__, __LINE__); \ } else { \ if ((*(_lock)) <= 0) assert(0); \ taosWUnLockLatch(_lock); \ - qDebug("WULOCK%p, %s:%d", (_lock), __FILE__, __LINE__); \ + qDebug("QW WULOCK%p, %s:%d", (_lock), __FILE__, __LINE__); \ } \ } while (0) diff --git a/source/util/CMakeLists.txt b/source/util/CMakeLists.txt index d343945a80..bf1774b45b 100644 --- a/source/util/CMakeLists.txt +++ b/source/util/CMakeLists.txt @@ -12,4 +12,6 @@ target_link_libraries( PUBLIC zlib PUBLIC lz4_static PUBLIC api -) \ No newline at end of file +) + +ADD_SUBDIRECTORY(test) diff --git a/source/util/src/thash.c b/source/util/src/thash.c index cfe14f00e1..840a1ef390 100644 --- a/source/util/src/thash.c +++ b/source/util/src/thash.c @@ -362,7 +362,7 @@ void* taosHashGetCloneExt(SHashObj *pHashObj, const void *key, size_t keyLen, vo return data; } -void* taosHashGetClone(SHashObj *pHashObj, const void *key, size_t keyLen, void* d) { +void* taosHashGetCloneImpl(SHashObj *pHashObj, const void *key, size_t keyLen, void* d, bool acquire) { if (taosHashTableEmpty(pHashObj) || keyLen == 0 || key == NULL) { return NULL; } @@ -404,6 +404,10 @@ void* taosHashGetClone(SHashObj *pHashObj, const void *key, size_t keyLen, void* memcpy(d, GET_HASH_NODE_DATA(pNode), pNode->dataLen); } + if (acquire) { + pNode->count++; + } + data = GET_HASH_NODE_DATA(pNode); } @@ -415,6 +419,15 @@ void* taosHashGetClone(SHashObj *pHashObj, const void *key, size_t keyLen, void* return data; } +void* taosHashGetClone(SHashObj *pHashObj, const void *key, size_t keyLen, void* d) { + return taosHashGetCloneImpl(pHashObj, key, keyLen, d, false); +} + +void* taosHashAcquire(SHashObj *pHashObj, const void *key, size_t keyLen) { + return taosHashGetCloneImpl(pHashObj, key, keyLen, NULL, true); +} + + int32_t taosHashRemove(SHashObj *pHashObj, const void *key, size_t keyLen/*, void *data, size_t dsize*/) { if (pHashObj == NULL || taosHashTableEmpty(pHashObj)) { return -1; @@ -919,3 +932,9 @@ void taosHashCancelIterate(SHashObj *pHashObj, void *p) { __rd_unlock(&pHashObj->lock, pHashObj->type); } + +void taosHashRelease(SHashObj *pHashObj, void *p) { + taosHashCancelIterate(pHashObj, p); +} + + diff --git a/source/util/test/CMakeLists.txt b/source/util/test/CMakeLists.txt index a60c6cff28..79aaa1beb0 100644 --- a/source/util/test/CMakeLists.txt +++ b/source/util/test/CMakeLists.txt @@ -13,17 +13,22 @@ IF (HEADER_GTEST_INCLUDE_DIR AND (LIB_GTEST_STATIC_DIR OR LIB_GTEST_SHARED_DIR)) LIST(REMOVE_ITEM SOURCE_LIST ${CMAKE_CURRENT_SOURCE_DIR}/trefTest.c) ADD_EXECUTABLE(utilTest ${SOURCE_LIST}) - TARGET_LINK_LIBRARIES(utilTest tutil common os gtest pthread gcov) + TARGET_LINK_LIBRARIES(utilTest util common os gtest pthread gcov) + LIST(REMOVE_ITEM SOURCE_LIST ${CMAKE_CURRENT_SOURCE_DIR}/cacheTest.cpp) + LIST(APPEND SOURCE_LIST ${CMAKE_CURRENT_SOURCE_DIR}/hashTest.cpp) + ADD_EXECUTABLE(hashTest ${SOURCE_LIST}) + TARGET_LINK_LIBRARIES(hashTest util common os gtest pthread gcov) + LIST(APPEND BIN_SRC ${CMAKE_CURRENT_SOURCE_DIR}/trefTest.c) ADD_EXECUTABLE(trefTest ${BIN_SRC}) - TARGET_LINK_LIBRARIES(trefTest common tutil) + TARGET_LINK_LIBRARIES(trefTest common util) ENDIF() #IF (TD_LINUX) # ADD_EXECUTABLE(trefTest ./trefTest.c) -# TARGET_LINK_LIBRARIES(trefTest tutil common) +# TARGET_LINK_LIBRARIES(trefTest util common) #ENDIF () INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/util/inc) diff --git a/source/util/test/hashTest.cpp b/source/util/test/hashTest.cpp index bc3fed74c4..d31fcfb7ef 100644 --- a/source/util/test/hashTest.cpp +++ b/source/util/test/hashTest.cpp @@ -4,10 +4,15 @@ #include #include -#include "hash.h" +#include "thash.h" #include "taos.h" namespace { + +typedef struct TESTSTRUCT { + char *p; +}TESTSTRUCT; + // the simple test code for basic operations void simpleTest() { SHashObj* hashTable = (SHashObj*) taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_ENTRY_LOCK); @@ -141,6 +146,52 @@ void invalidOperationTest() { } +void acquireRleaseTest() { + SHashObj* hashTable = (SHashObj*) taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK); + ASSERT_EQ(taosHashGetSize(hashTable), 0); + + int32_t key = 2; + int32_t code = 0; + int32_t num = 0; + TESTSTRUCT data = {0}; + char *str1 = "abcdefg"; + char *str2 = "aaaaaaa"; + char *str3 = "123456789"; + + data.p = (char *)malloc(10); + strcpy(data.p, str1); + + code = taosHashPut(hashTable, &key, sizeof(key), &data, sizeof(data)); + ASSERT_EQ(code, 0); + + TESTSTRUCT* pdata = (TESTSTRUCT*)taosHashAcquire(hashTable, &key, sizeof(key)); + ASSERT_TRUE(pdata != nullptr); + ASSERT_TRUE(strcmp(pdata->p, str1) == 0); + + code = taosHashRemove(hashTable, &key, sizeof(key)); + ASSERT_EQ(code, 0); + ASSERT_TRUE(strcmp(pdata->p, str1) == 0); + + num = taosHashGetSize(hashTable); + ASSERT_EQ(num, 1); + + strcpy(pdata->p, str3); + + data.p = (char *)malloc(10); + strcpy(data.p, str2); + code = taosHashPut(hashTable, &key, sizeof(key), &data, sizeof(data)); + ASSERT_EQ(code, 0); + num = taosHashGetSize(hashTable); + ASSERT_EQ(num, 2); + + printf("%s,expect:%s", pdata->p, str3); + ASSERT_TRUE(strcmp(pdata->p, str3) == 0); + + taosHashRelease(hashTable, pdata); + num = taosHashGetSize(hashTable); + ASSERT_EQ(num, 1); +} + } int main(int argc, char** argv) { @@ -153,4 +204,5 @@ TEST(testCase, hashTest) { stringKeyTest(); noLockPerformanceTest(); multithreadsTest(); + acquireRleaseTest(); } -- GitLab