提交 d847bff7 编写于 作者: H Hongze Cheng

Merge branch '3.0' of github.com:taosdata/TDengine into feature/vnode

......@@ -775,8 +775,8 @@ typedef struct {
} SAuthVnodeMsg;
typedef struct {
int32_t vgId;
char tableFname[TSDB_TABLE_FNAME_LEN];
SMsgHead header;
char tableFname[TSDB_TABLE_FNAME_LEN];
} STableInfoMsg;
typedef struct {
......@@ -1060,6 +1060,7 @@ typedef struct {
} SUpdateTagValRsp;
typedef struct SSubQueryMsg {
SMsgHead header;
uint64_t sId;
uint64_t queryId;
uint64_t taskId;
......@@ -1068,6 +1069,7 @@ typedef struct SSubQueryMsg {
} SSubQueryMsg;
typedef struct SResReadyMsg {
SMsgHead header;
uint64_t sId;
uint64_t queryId;
uint64_t taskId;
......@@ -1078,6 +1080,7 @@ typedef struct SResReadyRsp {
} SResReadyRsp;
typedef struct SResFetchMsg {
SMsgHead header;
uint64_t sId;
uint64_t queryId;
uint64_t taskId;
......
......@@ -75,7 +75,7 @@ typedef struct STableMeta {
} STableMeta;
typedef struct SDBVgroupInfo {
int32_t lock;
SRWLatch lock;
int32_t vgVersion;
int8_t hashMethod;
SHashObj *vgInfo; //key:vgId, value:SVgroupInfo
......
......@@ -50,23 +50,37 @@ typedef struct SQueryProfileSummary {
uint64_t resultSize; // generated result size in Kb.
} SQueryProfileSummary;
typedef struct SQueryNodeAddr{
int32_t nodeId; //vgId or qnodeId
int8_t inUse;
int8_t numOfEps;
SEpAddrMsg epAddr[TSDB_MAX_REPLICA];
} SQueryNodeAddr;
typedef struct SQueryResult {
int32_t code;
uint64_t numOfRows;
int32_t msgSize;
char *msg;
} SQueryResult;
int32_t schedulerInit(SSchedulerCfg *cfg);
/**
* Process the query job, generated according to the query physical plan.
* This is a synchronized API, and is also thread-safety.
* @param qnodeList Qnode address list, element is SEpAddr
* @param nodeList Qnode/Vnode address list, element is SQueryNodeAddr
* @return
*/
int32_t scheduleExecJob(void *transport, SArray *qnodeList, SQueryDag* pDag, void** pJob, uint64_t *numOfRows);
int32_t scheduleExecJob(void *transport, SArray *nodeList, SQueryDag* pDag, void** pJob, SQueryResult *pRes);
/**
* Process the query job, generated according to the query physical plan.
* This is a asynchronized API, and is also thread-safety.
* @param qnodeList Qnode address list, element is SEpAddr
* @param nodeList Qnode/Vnode address list, element is SQueryNodeAddr
* @return
*/
int32_t scheduleAsyncExecJob(void *transport, SArray *qnodeList, SQueryDag* pDag, void** pJob);
int32_t scheduleAsyncExecJob(void *transport, SArray *nodeList, SQueryDag* pDag, void** pJob);
int32_t scheduleFetchRows(void *pJob, void **data);
......
......@@ -121,8 +121,8 @@ static void dndInitMsgFp(STransMgmt *pMgmt) {
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_FETCH)] = dndProcessVnodeFetchMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_ALTER_TABLE)] = dndProcessVnodeWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_UPDATE_TAG_VAL)] = dndProcessVnodeWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_TABLE_META)] = dndProcessVnodeQueryMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_TABLES_META)] = dndProcessVnodeQueryMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_TABLE_META)] = dndProcessVnodeFetchMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_TABLES_META)] = dndProcessVnodeFetchMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_MQ_CONSUME)] = dndProcessVnodeQueryMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_MQ_QUERY)] = dndProcessVnodeQueryMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_MQ_CONNECT)] = dndProcessVnodeWriteMsg;
......
......@@ -17,6 +17,7 @@
#include "vnodeDef.h"
static int32_t vnodeGetTableList(SVnode *pVnode, SRpcMsg *pMsg);
static int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp);
int vnodeQueryOpen(SVnode *pVnode) { return qWorkerInit(NULL, &pVnode->pQuery); }
......@@ -43,6 +44,8 @@ int vnodeProcessFetchReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
case TDMT_VND_SHOW_TABLES_FETCH:
return vnodeGetTableList(pVnode, pMsg);
// return qWorkerProcessShowFetchMsg(pVnode->pMeta, pVnode->pQuery, pMsg);
case TDMT_VND_TABLE_META:
return vnodeGetTableMeta(pVnode, pMsg, pRsp);
default:
vError("unknown msg type:%d in fetch queue", pMsg->msgType);
return TSDB_CODE_VND_APP_ERROR;
......@@ -88,7 +91,8 @@ static int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
pTagSchema = NULL;
}
pTbMetaMsg = (STableMetaMsg *)calloc(1, sizeof(STableMetaMsg) + sizeof(SSchema) * (nCols + nTagCols));
int msgLen = sizeof(STableMetaMsg) + sizeof(SSchema) * (nCols + nTagCols);
pTbMetaMsg = (STableMetaMsg *)rpcMallocCont(msgLen);
if (pTbMetaMsg == NULL) {
return -1;
}
......@@ -115,6 +119,16 @@ static int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
pSch->bytes = htonl(pSch->bytes);
}
SRpcMsg rpcMsg = {
.handle = pMsg->handle,
.ahandle = pMsg->ahandle,
.pCont = pTbMetaMsg,
.contLen = msgLen,
.code = 0,
};
rpcSendResponse(&rpcMsg);
return 0;
}
......@@ -166,4 +180,4 @@ static int32_t vnodeGetTableList(SVnode *pVnode, SRpcMsg *pMsg) {
rpcSendResponse(&rpcMsg);
return 0;
}
\ No newline at end of file
}
......@@ -77,27 +77,37 @@ typedef uint32_t (*tableNameHashFp)(const char *, uint32_t);
#define CTG_ERR_LRET(c,...) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { ctgError(__VA_ARGS__); terrno = _code; return _code; } } while (0)
#define CTG_ERR_JRET(c) do { code = c; if (code != TSDB_CODE_SUCCESS) { terrno = code; goto _return; } } while (0)
#define TD_RWLATCH_WRITE_FLAG_COPY 0x40000000
#define CTG_LOCK(type, _lock) do { \
if (CTG_READ == (type)) { \
if ((*(_lock)) < 0) assert(0); \
assert(atomic_load_32((_lock)) >= 0); \
ctgDebug("CTG RLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
taosRLockLatch(_lock); \
ctgDebug("CTG RLOCK%p, %s:%d", (_lock), __FILE__, __LINE__); \
ctgDebug("CTG RLOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
assert(atomic_load_32((_lock)) > 0); \
} else { \
if ((*(_lock)) < 0) assert(0); \
assert(atomic_load_32((_lock)) >= 0); \
ctgDebug("CTG WLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
taosWLockLatch(_lock); \
ctgDebug("CTG WLOCK%p, %s:%d", (_lock), __FILE__, __LINE__); \
ctgDebug("CTG WLOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
assert(atomic_load_32((_lock)) == TD_RWLATCH_WRITE_FLAG_COPY); \
} \
} while (0)
#define CTG_UNLOCK(type, _lock) do { \
if (CTG_READ == (type)) { \
if ((*(_lock)) <= 0) assert(0); \
assert(atomic_load_32((_lock)) > 0); \
ctgDebug("CTG RULOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
taosRUnLockLatch(_lock); \
ctgDebug("CTG RULOCK%p, %s:%d", (_lock), __FILE__, __LINE__); \
ctgDebug("CTG RULOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
assert(atomic_load_32((_lock)) >= 0); \
} else { \
if ((*(_lock)) <= 0) assert(0); \
assert(atomic_load_32((_lock)) == TD_RWLATCH_WRITE_FLAG_COPY); \
ctgDebug("CTG WULOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
taosWUnLockLatch(_lock); \
ctgDebug("CTG WULOCK%p, %s:%d", (_lock), __FILE__, __LINE__); \
ctgDebug("CTG WULOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
assert(atomic_load_32((_lock)) >= 0); \
} \
} while (0)
......
......@@ -23,21 +23,31 @@ SCatalogMgmt ctgMgmt = {0};
int32_t ctgGetDBVgroupFromCache(struct SCatalog* pCatalog, const char *dbName, SDBVgroupInfo **dbInfo, bool *inCache) {
if (NULL == pCatalog->dbCache.cache) {
*inCache = false;
ctgWarn("no db cache");
return TSDB_CODE_SUCCESS;
}
SDBVgroupInfo *info = taosHashAcquire(pCatalog->dbCache.cache, dbName, strlen(dbName));
SDBVgroupInfo *info = NULL;
if (NULL == info) {
*inCache = false;
return TSDB_CODE_SUCCESS;
}
while (true) {
info = taosHashAcquire(pCatalog->dbCache.cache, dbName, strlen(dbName));
CTG_LOCK(CTG_READ, &info->lock);
if (NULL == info->vgInfo) {
CTG_UNLOCK(CTG_READ, &info->lock);
*inCache = false;
return TSDB_CODE_SUCCESS;
if (NULL == info) {
*inCache = false;
ctgWarn("no db cache, dbName:%s", dbName);
return TSDB_CODE_SUCCESS;
}
CTG_LOCK(CTG_READ, &info->lock);
if (NULL == info->vgInfo) {
CTG_UNLOCK(CTG_READ, &info->lock);
taosHashRelease(pCatalog->dbCache.cache, info);
ctgWarn("db cache vgInfo is NULL, dbName:%s", dbName);
continue;
}
break;
}
*dbInfo = info;
......@@ -179,14 +189,13 @@ int32_t ctgGetTableMetaFromMnode(struct SCatalog* pCatalog, void *pRpc, const SE
}
int32_t ctgGetTableMetaFromVnode(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char *pDBName, const char* pTableName, SVgroupInfo *vgroupInfo, STableMetaOutput* output) {
if (NULL == pCatalog || NULL == pRpc || NULL == pMgmtEps || NULL == pDBName || NULL == pTableName || NULL == vgroupInfo || NULL == output) {
int32_t ctgGetTableMetaFromVnode(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const SName* pTableName, SVgroupInfo *vgroupInfo, STableMetaOutput* output) {
if (NULL == pCatalog || NULL == pRpc || NULL == pMgmtEps || NULL == pTableName || NULL == vgroupInfo || NULL == output) {
CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
}
char tbFullName[TSDB_TABLE_FNAME_LEN];
snprintf(tbFullName, sizeof(tbFullName), "%s.%s", pDBName, pTableName);
tNameExtractFullName(pTableName, tbFullName);
SBuildTableMetaInput bInput = {.vgId = vgroupInfo->vgId, .tableFullName = tbFullName};
char *msg = NULL;
......@@ -271,8 +280,6 @@ _return:
int32_t ctgGetVgInfoFromHashValue(SDBVgroupInfo *dbInfo, const SName *pTableName, SVgroupInfo *pVgroup) {
int32_t code = 0;
CTG_LOCK(CTG_READ, &dbInfo->lock);
int32_t vgNum = taosHashGetSize(dbInfo->vgInfo);
char db[TSDB_DB_FNAME_LEN] = {0};
tNameGetFullDbName(pTableName, db);
......@@ -311,8 +318,6 @@ int32_t ctgGetVgInfoFromHashValue(SDBVgroupInfo *dbInfo, const SName *pTableName
*pVgroup = *vgInfo;
_return:
CTG_UNLOCK(CTG_READ, &dbInfo->lock);
CTG_RET(TSDB_CODE_SUCCESS);
}
......@@ -434,11 +439,47 @@ int32_t ctgGetDBVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgm
input.db[sizeof(input.db) - 1] = 0;
input.vgVersion = CTG_DEFAULT_INVALID_VERSION;
CTG_ERR_RET(ctgGetDBVgroupFromMnode(pCatalog, pRpc, pMgmtEps, &input, &DbOut));
while (true) {
CTG_ERR_RET(ctgGetDBVgroupFromMnode(pCatalog, pRpc, pMgmtEps, &input, &DbOut));
CTG_ERR_RET(catalogUpdateDBVgroup(pCatalog, dbName, &DbOut.dbVgroup));
CTG_ERR_RET(catalogUpdateDBVgroup(pCatalog, dbName, &DbOut.dbVgroup));
CTG_ERR_RET(ctgGetDBVgroupFromCache(pCatalog, dbName, dbInfo, &inCache));
CTG_ERR_RET(ctgGetDBVgroupFromCache(pCatalog, dbName, dbInfo, &inCache));
if (!inCache) {
ctgWarn("get db vgroup from cache failed, db:%s", dbName);
continue;
}
break;
}
return TSDB_CODE_SUCCESS;
}
int32_t ctgValidateAndRemoveDb(struct SCatalog* pCatalog, const char* dbName, SDBVgroupInfo* dbInfo) {
SDBVgroupInfo *oldInfo = (SDBVgroupInfo *)taosHashAcquire(pCatalog->dbCache.cache, dbName, strlen(dbName));
if (oldInfo) {
CTG_LOCK(CTG_WRITE, &oldInfo->lock);
if (dbInfo->vgVersion <= oldInfo->vgVersion) {
ctgInfo("dbName:%s vg will not update, vgVersion:%d , current:%d", dbName, dbInfo->vgVersion, oldInfo->vgVersion);
CTG_UNLOCK(CTG_WRITE, &oldInfo->lock);
taosHashRelease(pCatalog->dbCache.cache, oldInfo);
return TSDB_CODE_SUCCESS;
}
if (oldInfo->vgInfo) {
ctgInfo("dbName:%s vg will be cleanup", dbName);
taosHashCleanup(oldInfo->vgInfo);
oldInfo->vgInfo = NULL;
}
CTG_UNLOCK(CTG_WRITE, &oldInfo->lock);
taosHashRelease(pCatalog->dbCache.cache, oldInfo);
}
return TSDB_CODE_SUCCESS;
}
......@@ -580,55 +621,57 @@ _return:
int32_t catalogUpdateDBVgroup(struct SCatalog* pCatalog, const char* dbName, SDBVgroupInfo* dbInfo) {
int32_t code = 0;
if (NULL == pCatalog || NULL == dbName || NULL == dbInfo) {
CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
CTG_ERR_JRET(TSDB_CODE_CTG_INVALID_INPUT);
}
if (NULL == dbInfo->vgInfo || dbInfo->vgVersion < 0 || taosHashGetSize(dbInfo->vgInfo) <= 0) {
ctgError("invalid db vg, dbName:%s", dbName);
CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
}
if (dbInfo->vgVersion < 0) {
ctgWarn("invalid db vgVersion:%d, dbName:%s", dbInfo->vgVersion, dbName);
if (pCatalog->dbCache.cache) {
SDBVgroupInfo *oldInfo = taosHashAcquire(pCatalog->dbCache.cache, dbName, strlen(dbName));
if (oldInfo) {
CTG_LOCK(CTG_WRITE, &oldInfo->lock);
if (oldInfo->vgInfo) {
taosHashCleanup(oldInfo->vgInfo);
oldInfo->vgInfo = NULL;
}
CTG_UNLOCK(CTG_WRITE, &oldInfo->lock);
taosHashRelease(pCatalog->dbCache.cache, oldInfo);
}
CTG_ERR_JRET(ctgValidateAndRemoveDb(pCatalog, dbName, dbInfo));
CTG_ERR_JRET(taosHashRemove(pCatalog->dbCache.cache, dbName, strlen(dbName)));
}
ctgWarn("remove db [%s] from cache", dbName);
return TSDB_CODE_SUCCESS;
goto _return;
}
if (NULL == pCatalog->dbCache.cache) {
pCatalog->dbCache.cache = taosHashInit(ctgMgmt.cfg.maxDBCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
if (NULL == pCatalog->dbCache.cache) {
ctgError("init hash[%d] for db cache failed", CTG_DEFAULT_CACHE_DB_NUMBER);
CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
}
} else {
SDBVgroupInfo *oldInfo = taosHashAcquire(pCatalog->dbCache.cache, dbName, strlen(dbName));
if (oldInfo) {
CTG_LOCK(CTG_WRITE, &oldInfo->lock);
if (oldInfo->vgInfo) {
taosHashCleanup(oldInfo->vgInfo);
oldInfo->vgInfo = NULL;
}
CTG_UNLOCK(CTG_WRITE, &oldInfo->lock);
taosHashRelease(pCatalog->dbCache.cache, oldInfo);
}
CTG_ERR_JRET(ctgValidateAndRemoveDb(pCatalog, dbName, dbInfo));
}
if (taosHashPut(pCatalog->dbCache.cache, dbName, strlen(dbName), dbInfo, sizeof(*dbInfo)) != 0) {
ctgError("push to vgroup hash cache failed");
CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
}
return TSDB_CODE_SUCCESS;
ctgDebug("dbName:%s vgroup updated, vgVersion:%d", dbName, dbInfo->vgVersion);
dbInfo->vgInfo = NULL;
_return:
if (dbInfo && dbInfo->vgInfo) {
taosHashCleanup(dbInfo->vgInfo);
dbInfo->vgInfo = NULL;
}
CTG_RET(code);
}
int32_t catalogGetTableMeta(struct SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, STableMeta** pTableMeta) {
......@@ -647,9 +690,9 @@ int32_t catalogRenewTableMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSe
STableMetaOutput output = {0};
//CTG_ERR_RET(ctgGetTableMetaFromVnode(pCatalog, pRpc, pMgmtEps, pDBName, pTableName, &vgroupInfo, &output));
CTG_ERR_RET(ctgGetTableMetaFromVnode(pCatalog, pRpc, pMgmtEps, pTableName, &vgroupInfo, &output));
CTG_ERR_RET(ctgGetTableMetaFromMnode(pCatalog, pRpc, pMgmtEps, pTableName, &output));
//CTG_ERR_RET(ctgGetTableMetaFromMnode(pCatalog, pRpc, pMgmtEps, pTableName, &output));
CTG_ERR_JRET(ctgUpdateTableMetaCache(pCatalog, &output));
......
......@@ -36,11 +36,19 @@
namespace {
extern "C" int32_t ctgGetTableMetaFromCache(struct SCatalog* pCatalog, const SName* pTableName, STableMeta** pTableMeta, int32_t *exist);
extern "C" int32_t ctgUpdateTableMetaCache(struct SCatalog *pCatalog, STableMetaOutput *output);
void ctgTestSetPrepareTableMeta();
void ctgTestSetPrepareCTableMeta();
void ctgTestSetPrepareSTableMeta();
bool ctgTestStop = false;
bool ctgTestEnableSleep = false;
bool ctgTestDeadLoop = true;
int32_t ctgTestCurrentVgVersion = 0;
int32_t ctgTestVgVersion = 1;
int32_t ctgTestVgNum = 10;
int32_t ctgTestColNum = 2;
int32_t ctgTestTagNum = 1;
......@@ -89,6 +97,113 @@ void sendCreateDbMsg(void *shandle, SEpSet *pEpSet) {
ASSERT_EQ(rpcRsp.code, 0);
}
void ctgTestInitLogFile() {
const char *defaultLogFileNamePrefix = "taoslog";
const int32_t maxLogFileNum = 10;
ctgDebugFlag = 159;
tsAsyncLog = 0;
char temp[128] = {0};
sprintf(temp, "%s/%s", tsLogDir, defaultLogFileNamePrefix);
if (taosInitLog(temp, tsNumOfLogLines, maxLogFileNum) < 0) {
printf("failed to open log file in directory:%s\n", tsLogDir);
}
}
int32_t ctgTestGetVgNumFromVgVersion(int32_t vgVersion) {
return ((vgVersion % 2) == 0) ? ctgTestVgNum - 2 : ctgTestVgNum;
}
void ctgTestBuildCTableMetaOutput(STableMetaOutput *output) {
SName cn = {.type = TSDB_TABLE_NAME_T, .acctId = 1};
strcpy(cn.dbname, "db1");
strcpy(cn.tname, ctgTestCTablename);
SName sn = {.type = TSDB_TABLE_NAME_T, .acctId = 1};
strcpy(sn.dbname, "db1");
strcpy(sn.tname, ctgTestSTablename);
char tbFullName[TSDB_TABLE_FNAME_LEN];
tNameExtractFullName(&cn, tbFullName);
output->metaNum = 2;
strcpy(output->ctbFname, tbFullName);
tNameExtractFullName(&cn, tbFullName);
strcpy(output->tbFname, tbFullName);
output->ctbMeta.vgId = 9;
output->ctbMeta.tableType = TSDB_CHILD_TABLE;
output->ctbMeta.uid = 3;
output->ctbMeta.suid = 2;
output->tbMeta = (STableMeta *)calloc(1, sizeof(STableMeta) + sizeof(SSchema) * (ctgTestColNum + ctgTestColNum));
output->tbMeta->vgId = 9;
output->tbMeta->tableType = TSDB_SUPER_TABLE;
output->tbMeta->uid = 2;
output->tbMeta->suid = 2;
output->tbMeta->tableInfo.numOfColumns = ctgTestColNum;
output->tbMeta->tableInfo.numOfTags = ctgTestTagNum;
output->tbMeta->sversion = ctgTestSVersion;
output->tbMeta->tversion = ctgTestTVersion;
SSchema *s = NULL;
s = &output->tbMeta->schema[0];
s->type = TSDB_DATA_TYPE_TIMESTAMP;
s->colId = 1;
s->bytes = 8;
strcpy(s->name, "ts");
s = &output->tbMeta->schema[1];
s->type = TSDB_DATA_TYPE_INT;
s->colId = 2;
s->bytes = 4;
strcpy(s->name, "col1s");
s = &output->tbMeta->schema[2];
s->type = TSDB_DATA_TYPE_BINARY;
s->colId = 3;
s->bytes = 12;
strcpy(s->name, "tag1s");
}
void ctgTestBuildDBVgroup(SDBVgroupInfo *dbVgroup) {
static int32_t vgVersion = ctgTestVgVersion + 1;
int32_t vgNum = 0;
SVgroupInfo vgInfo = {0};
dbVgroup->vgVersion = vgVersion++;
ctgTestCurrentVgVersion = dbVgroup->vgVersion;
dbVgroup->hashMethod = 0;
dbVgroup->vgInfo = taosHashInit(ctgTestVgNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
vgNum = ctgTestGetVgNumFromVgVersion(dbVgroup->vgVersion);
uint32_t hashUnit = UINT32_MAX / vgNum;
for (int32_t i = 0; i < vgNum; ++i) {
vgInfo.vgId = i + 1;
vgInfo.hashBegin = i * hashUnit;
vgInfo.hashEnd = hashUnit * (i + 1) - 1;
vgInfo.numOfEps = i % TSDB_MAX_REPLICA + 1;
vgInfo.inUse = i % vgInfo.numOfEps;
for (int32_t n = 0; n < vgInfo.numOfEps; ++n) {
SEpAddrMsg *addr = &vgInfo.epAddr[n];
strcpy(addr->fqdn, "a0");
addr->port = htons(n + 22);
}
taosHashPut(dbVgroup->vgInfo, &vgInfo.vgId, sizeof(vgInfo.vgId), &vgInfo, sizeof(vgInfo));
}
}
void ctgTestPrepareDbVgroups(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pRsp) {
SUseDbRsp *rspMsg = NULL; //todo
......@@ -97,7 +212,8 @@ void ctgTestPrepareDbVgroups(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcM
pRsp->pCont = calloc(1, pRsp->contLen);
rspMsg = (SUseDbRsp *)pRsp->pCont;
strcpy(rspMsg->db, ctgTestDbname);
rspMsg->vgVersion = htonl(1);
rspMsg->vgVersion = htonl(ctgTestVgVersion);
ctgTestCurrentVgVersion = ctgTestVgVersion;
rspMsg->vgNum = htonl(ctgTestVgNum);
rspMsg->hashMethod = 0;
......@@ -148,13 +264,13 @@ void ctgTestPrepareTableMeta(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcM
SSchema *s = NULL;
s = &rspMsg->pSchema[0];
s->type = TSDB_DATA_TYPE_TIMESTAMP;
s->colId = htonl(0);
s->colId = htonl(1);
s->bytes = htonl(8);
strcpy(s->name, "ts");
s = &rspMsg->pSchema[1];
s->type = TSDB_DATA_TYPE_INT;
s->colId = htonl(1);
s->colId = htonl(2);
s->bytes = htonl(4);
strcpy(s->name, "col1");
......@@ -185,19 +301,19 @@ void ctgTestPrepareCTableMeta(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpc
SSchema *s = NULL;
s = &rspMsg->pSchema[0];
s->type = TSDB_DATA_TYPE_TIMESTAMP;
s->colId = htonl(0);
s->colId = htonl(1);
s->bytes = htonl(8);
strcpy(s->name, "ts");
s = &rspMsg->pSchema[1];
s->type = TSDB_DATA_TYPE_INT;
s->colId = htonl(1);
s->colId = htonl(2);
s->bytes = htonl(4);
strcpy(s->name, "col1s");
s = &rspMsg->pSchema[2];
s->type = TSDB_DATA_TYPE_BINARY;
s->colId = htonl(2);
s->colId = htonl(3);
s->bytes = htonl(12);
strcpy(s->name, "tag1s");
......@@ -229,19 +345,19 @@ void ctgTestPrepareSTableMeta(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpc
SSchema *s = NULL;
s = &rspMsg->pSchema[0];
s->type = TSDB_DATA_TYPE_TIMESTAMP;
s->colId = htonl(0);
s->colId = htonl(1);
s->bytes = htonl(8);
strcpy(s->name, "ts");
s = &rspMsg->pSchema[1];
s->type = TSDB_DATA_TYPE_INT;
s->colId = htonl(1);
s->colId = htonl(2);
s->bytes = htonl(4);
strcpy(s->name, "col1s");
s = &rspMsg->pSchema[2];
s->type = TSDB_DATA_TYPE_BINARY;
s->colId = htonl(2);
s->colId = htonl(3);
s->bytes = htonl(12);
strcpy(s->name, "tag1s");
......@@ -371,6 +487,117 @@ void ctgTestSetPrepareDbVgroupsAndSuperMeta() {
}
void *ctgTestGetDbVgroupThread(void *param) {
struct SCatalog* pCtg = (struct SCatalog*)param;
int32_t code = 0;
void *mockPointer = (void *)0x1;
SArray *vgList = NULL;
int32_t n = 0;
while (!ctgTestStop) {
code = catalogGetDBVgroup(pCtg, mockPointer, (const SEpSet *)mockPointer, ctgTestDbname, false, &vgList);
if (code) {
assert(0);
}
if (vgList) {
taosArrayDestroy(vgList);
}
if (ctgTestEnableSleep) {
usleep(rand()%5);
}
if (++n % 50000 == 0) {
printf("Get:%d\n", n);
}
}
return NULL;
}
void *ctgTestSetDbVgroupThread(void *param) {
struct SCatalog* pCtg = (struct SCatalog*)param;
int32_t code = 0;
SDBVgroupInfo dbVgroup = {0};
int32_t n = 0;
while (!ctgTestStop) {
ctgTestBuildDBVgroup(&dbVgroup);
code = catalogUpdateDBVgroup(pCtg, ctgTestDbname, &dbVgroup);
if (code) {
assert(0);
}
if (ctgTestEnableSleep) {
usleep(rand()%5);
}
if (++n % 50000 == 0) {
printf("Set:%d\n", n);
}
}
return NULL;
}
void *ctgTestGetCtableMetaThread(void *param) {
struct SCatalog* pCtg = (struct SCatalog*)param;
int32_t code = 0;
int32_t n = 0;
STableMeta* tbMeta = NULL;
int32_t exist = 0;
SName cn = {.type = TSDB_TABLE_NAME_T, .acctId = 1};
strcpy(cn.dbname, "db1");
strcpy(cn.tname, ctgTestCTablename);
while (!ctgTestStop) {
code = ctgGetTableMetaFromCache(pCtg, &cn, &tbMeta, &exist);
if (code || 0 == exist) {
assert(0);
}
if (ctgTestEnableSleep) {
usleep(rand()%5);
}
if (++n % 50000 == 0) {
printf("Get:%d\n", n);
}
}
return NULL;
}
void *ctgTestSetCtableMetaThread(void *param) {
struct SCatalog* pCtg = (struct SCatalog*)param;
int32_t code = 0;
SDBVgroupInfo dbVgroup = {0};
int32_t n = 0;
STableMetaOutput output = {0};
ctgTestBuildCTableMetaOutput(&output);
while (!ctgTestStop) {
code = ctgUpdateTableMetaCache(pCtg, &output);
if (code) {
assert(0);
}
if (ctgTestEnableSleep) {
usleep(rand()%5);
}
if (++n % 50000 == 0) {
printf("Set:%d\n", n);
}
}
return NULL;
}
#if 0
TEST(tableMeta, normalTable) {
struct SCatalog* pCtg = NULL;
void *mockPointer = (void *)0x1;
......@@ -388,7 +615,7 @@ TEST(tableMeta, normalTable) {
code = catalogGetHandle(ctgTestClusterId, &pCtg);
ASSERT_EQ(code, 0);
SName n = {.type = T_NAME_TABLE, .acctId = 1};
SName n = {.type = TSDB_TABLE_NAME_T, .acctId = 1};
strcpy(n.dbname, "db1");
strcpy(n.tname, ctgTestTablename);
......@@ -436,11 +663,13 @@ TEST(tableMeta, childTableCase) {
initQueryModuleMsgHandle();
//sendCreateDbMsg(pConn->pTransporter, &pConn->pAppInfo->mgmtEp.epSet);
int32_t code = catalogInit(NULL);
ASSERT_EQ(code, 0);
int32_t code = catalogGetHandle(ctgTestClusterId, &pCtg);
code = catalogGetHandle(ctgTestClusterId, &pCtg);
ASSERT_EQ(code, 0);
SName n = {.type = T_NAME_TABLE, .acctId = 1};
SName n = {.type = TSDB_TABLE_NAME_T, .acctId = 1};
strcpy(n.dbname, "db1");
strcpy(n.tname, ctgTestCTablename);
......@@ -494,11 +723,14 @@ TEST(tableMeta, superTableCase) {
initQueryModuleMsgHandle();
int32_t code = catalogInit(NULL);
ASSERT_EQ(code, 0);
//sendCreateDbMsg(pConn->pTransporter, &pConn->pAppInfo->mgmtEp.epSet);
int32_t code = catalogGetHandle(ctgTestClusterId, &pCtg);
code = catalogGetHandle(ctgTestClusterId, &pCtg);
ASSERT_EQ(code, 0);
SName n = {.type = T_NAME_TABLE, .acctId = 1};
SName n = {.type = TSDB_TABLE_NAME_T, .acctId = 1};
strcpy(n.dbname, "db1");
strcpy(n.tname, ctgTestSTablename);
......@@ -558,12 +790,15 @@ TEST(tableDistVgroup, normalTable) {
initQueryModuleMsgHandle();
int32_t code = catalogInit(NULL);
ASSERT_EQ(code, 0);
//sendCreateDbMsg(pConn->pTransporter, &pConn->pAppInfo->mgmtEp.epSet);
int32_t code = catalogGetHandle(ctgTestClusterId, &pCtg);
code = catalogGetHandle(ctgTestClusterId, &pCtg);
ASSERT_EQ(code, 0);
SName n = {.type = T_NAME_TABLE, .acctId = 1};
SName n = {.type = TSDB_TABLE_NAME_T, .acctId = 1};
strcpy(n.dbname, "db1");
strcpy(n.tname, ctgTestTablename);
......@@ -595,7 +830,7 @@ TEST(tableDistVgroup, childTableCase) {
code = catalogGetHandle(ctgTestClusterId, &pCtg);
ASSERT_EQ(code, 0);
SName n = {.type = T_NAME_TABLE, .acctId = 1};
SName n = {.type = TSDB_TABLE_NAME_T, .acctId = 1};
strcpy(n.dbname, "db1");
strcpy(n.tname, ctgTestCTablename);
......@@ -620,11 +855,14 @@ TEST(tableDistVgroup, superTableCase) {
initQueryModuleMsgHandle();
int32_t code = catalogInit(NULL);
ASSERT_EQ(code, 0);
//sendCreateDbMsg(pConn->pTransporter, &pConn->pAppInfo->mgmtEp.epSet);
int32_t code = catalogGetHandle(ctgTestClusterId, &pCtg);
code = catalogGetHandle(ctgTestClusterId, &pCtg);
ASSERT_EQ(code, 0);
SName n = {.type = T_NAME_TABLE, .acctId = 1};
SName n = {.type = TSDB_TABLE_NAME_T, .acctId = 1};
strcpy(n.dbname, "db1");
strcpy(n.tname, ctgTestSTablename);
......@@ -645,6 +883,164 @@ TEST(tableDistVgroup, superTableCase) {
catalogDestroy();
}
TEST(dbVgroup, getSetDbVgroupCase) {
struct SCatalog* pCtg = NULL;
void *mockPointer = (void *)0x1;
SVgroupInfo vgInfo = {0};
SVgroupInfo *pvgInfo = NULL;
SDBVgroupInfo dbVgroup = {0};
SArray *vgList = NULL;
ctgTestSetPrepareDbVgroupsAndNormalMeta();
initQueryModuleMsgHandle();
//sendCreateDbMsg(pConn->pTransporter, &pConn->pAppInfo->mgmtEp.epSet);
int32_t code = catalogInit(NULL);
ASSERT_EQ(code, 0);
code = catalogGetHandle(ctgTestClusterId, &pCtg);
ASSERT_EQ(code, 0);
SName n = {.type = TSDB_TABLE_NAME_T, .acctId = 1};
strcpy(n.dbname, "db1");
strcpy(n.tname, ctgTestTablename);
code = catalogGetDBVgroup(pCtg, mockPointer, (const SEpSet *)mockPointer, ctgTestDbname, false, &vgList);
ASSERT_EQ(code, 0);
ASSERT_EQ(taosArrayGetSize((const SArray *)vgList), ctgTestVgNum);
code = catalogGetTableHashVgroup(pCtg, mockPointer, (const SEpSet *)mockPointer, &n, &vgInfo);
ASSERT_EQ(code, 0);
ASSERT_EQ(vgInfo.vgId, 8);
ASSERT_EQ(vgInfo.numOfEps, 3);
code = catalogGetTableDistVgroup(pCtg, mockPointer, (const SEpSet *)mockPointer, &n, &vgList);
ASSERT_EQ(code, 0);
ASSERT_EQ(taosArrayGetSize((const SArray *)vgList), 1);
pvgInfo = (SVgroupInfo *)taosArrayGet(vgList, 0);
ASSERT_EQ(pvgInfo->vgId, 8);
ASSERT_EQ(pvgInfo->numOfEps, 3);
taosArrayDestroy(vgList);
ctgTestBuildDBVgroup(&dbVgroup);
code = catalogUpdateDBVgroup(pCtg, ctgTestDbname, &dbVgroup);
ASSERT_EQ(code, 0);
code = catalogGetTableHashVgroup(pCtg, mockPointer, (const SEpSet *)mockPointer, &n, &vgInfo);
ASSERT_EQ(code, 0);
ASSERT_EQ(vgInfo.vgId, 7);
ASSERT_EQ(vgInfo.numOfEps, 2);
code = catalogGetTableDistVgroup(pCtg, mockPointer, (const SEpSet *)mockPointer, &n, &vgList);
ASSERT_EQ(code, 0);
ASSERT_EQ(taosArrayGetSize((const SArray *)vgList), 1);
pvgInfo = (SVgroupInfo *)taosArrayGet(vgList, 0);
ASSERT_EQ(pvgInfo->vgId, 8);
ASSERT_EQ(pvgInfo->numOfEps, 3);
taosArrayDestroy(vgList);
catalogDestroy();
}
#endif
TEST(multiThread, getSetDbVgroupCase) {
struct SCatalog* pCtg = NULL;
void *mockPointer = (void *)0x1;
SVgroupInfo vgInfo = {0};
SVgroupInfo *pvgInfo = NULL;
SDBVgroupInfo dbVgroup = {0};
SArray *vgList = NULL;
ctgTestInitLogFile();
ctgTestSetPrepareDbVgroups();
initQueryModuleMsgHandle();
//sendCreateDbMsg(pConn->pTransporter, &pConn->pAppInfo->mgmtEp.epSet);
int32_t code = catalogInit(NULL);
ASSERT_EQ(code, 0);
code = catalogGetHandle(ctgTestClusterId, &pCtg);
ASSERT_EQ(code, 0);
SName n = {.type = TSDB_TABLE_NAME_T, .acctId = 1};
strcpy(n.dbname, "db1");
strcpy(n.tname, ctgTestTablename);
pthread_attr_t thattr;
pthread_attr_init(&thattr);
pthread_t thread1, thread2;
pthread_create(&(thread1), &thattr, ctgTestSetDbVgroupThread, pCtg);
sleep(1);
pthread_create(&(thread1), &thattr, ctgTestGetDbVgroupThread, pCtg);
while (true) {
if (ctgTestDeadLoop) {
sleep(1);
} else {
sleep(600);
break;
}
}
ctgTestStop = true;
sleep(1);
catalogDestroy();
}
TEST(multiThread, ctableMeta) {
struct SCatalog* pCtg = NULL;
void *mockPointer = (void *)0x1;
SVgroupInfo vgInfo = {0};
SVgroupInfo *pvgInfo = NULL;
SDBVgroupInfo dbVgroup = {0};
SArray *vgList = NULL;
ctgTestSetPrepareDbVgroupsAndChildMeta();
initQueryModuleMsgHandle();
//sendCreateDbMsg(pConn->pTransporter, &pConn->pAppInfo->mgmtEp.epSet);
int32_t code = catalogInit(NULL);
ASSERT_EQ(code, 0);
code = catalogGetHandle(ctgTestClusterId, &pCtg);
ASSERT_EQ(code, 0);
SName n = {.type = TSDB_TABLE_NAME_T, .acctId = 1};
strcpy(n.dbname, "db1");
strcpy(n.tname, ctgTestTablename);
pthread_attr_t thattr;
pthread_attr_init(&thattr);
pthread_t thread1, thread2;
pthread_create(&(thread1), &thattr, ctgTestGetCtableMetaThread, pCtg);
pthread_create(&(thread1), &thattr, ctgTestSetCtableMetaThread, pCtg);
while (true) {
if (ctgTestDeadLoop) {
sleep(1);
} else {
sleep(600);
break;
}
}
ctgTestStop = true;
sleep(1);
catalogDestroy();
}
int main(int argc, char** argv) {
......
......@@ -40,7 +40,7 @@ int32_t queryBuildTableMetaReqMsg(void* input, char **msg, int32_t msgSize, int3
STableInfoMsg *bMsg = (STableInfoMsg *)*msg;
bMsg->vgId = bInput->vgId;
bMsg->header.vgId = htonl(bInput->vgId);
strncpy(bMsg->tableFname, bInput->tableFullName, sizeof(bMsg->tableFname));
bMsg->tableFname[sizeof(bMsg->tableFname) - 1] = 0;
......
......@@ -89,12 +89,12 @@ typedef struct SSchJob {
SEpSet dataSrcEps;
SEpAddr resEp;
void *transport;
SArray *qnodeList;
SArray *nodeList; // qnode/vnode list, element is SQueryNodeAddr
tsem_t rspSem;
int32_t userFetch;
int32_t remoteFetch;
SSchTask *fetchTask;
SSchTask *fetchTask;
int32_t errCode;
void *res;
int32_t resNumOfRows;
......
......@@ -220,10 +220,10 @@ int32_t schSetTaskExecEpSet(SSchJob *job, SEpSet *epSet) {
return TSDB_CODE_SUCCESS;
}
int32_t qnodeNum = taosArrayGetSize(job->qnodeList);
int32_t nodeNum = taosArrayGetSize(job->nodeList);
for (int32_t i = 0; i < qnodeNum && epSet->numOfEps < tListLen(epSet->port); ++i) {
SEpAddr *addr = taosArrayGet(job->qnodeList, i);
for (int32_t i = 0; i < nodeNum && epSet->numOfEps < tListLen(epSet->port); ++i) {
SEpAddr *addr = taosArrayGet(job->nodeList, i);
strncpy(epSet->fqdn[epSet->numOfEps], addr->fqdn, sizeof(addr->fqdn));
epSet->port[epSet->numOfEps] = addr->port;
......@@ -829,8 +829,8 @@ int32_t schedulerInit(SSchedulerCfg *cfg) {
}
int32_t scheduleExecJobImpl(void *transport, SArray *qnodeList, SQueryDag* pDag, void** pJob, bool syncSchedule) {
if (qnodeList && taosArrayGetSize(qnodeList) <= 0) {
int32_t scheduleExecJobImpl(void *transport, SArray *nodeList, SQueryDag* pDag, void** pJob, bool syncSchedule) {
if (nodeList && taosArrayGetSize(nodeList) <= 0) {
qInfo("qnodeList is empty");
}
......@@ -842,7 +842,7 @@ int32_t scheduleExecJobImpl(void *transport, SArray *qnodeList, SQueryDag* pDag,
job->attr.syncSchedule = syncSchedule;
job->transport = transport;
job->qnodeList = qnodeList;
job->nodeList = nodeList;
SCH_ERR_JRET(schValidateAndBuildJob(pDag, job));
......@@ -897,28 +897,27 @@ _return:
SCH_RET(code);
}
int32_t scheduleExecJob(void *transport, SArray *qnodeList, SQueryDag* pDag, void** pJob, uint64_t *numOfRows) {
if (NULL == transport || /* NULL == qnodeList || */ NULL == pDag || NULL == pDag->pSubplans || NULL == pJob || NULL == numOfRows) {
int32_t scheduleExecJob(void *transport, SArray *nodeList, SQueryDag* pDag, void** pJob, SQueryResult *pRes) {
if (NULL == transport || /* NULL == nodeList || */ NULL == pDag || NULL == pDag->pSubplans || NULL == pJob || NULL == pRes) {
SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
}
*numOfRows = 0;
SCH_ERR_RET(scheduleExecJobImpl(transport, qnodeList, pDag, pJob, true));
SCH_ERR_RET(scheduleExecJobImpl(transport, nodeList, pDag, pJob, true));
SSchJob *job = *(SSchJob **)pJob;
*numOfRows = job->resNumOfRows;
pRes->code = job->errCode;
pRes->numOfRows = job->resNumOfRows;
return TSDB_CODE_SUCCESS;
}
int32_t scheduleAsyncExecJob(void *transport, SArray *qnodeList, SQueryDag* pDag, void** pJob) {
if (NULL == transport || NULL == qnodeList ||NULL == pDag || NULL == pDag->pSubplans || NULL == pJob) {
int32_t scheduleAsyncExecJob(void *transport, SArray *nodeList, SQueryDag* pDag, void** pJob) {
if (NULL == transport || NULL == nodeList ||NULL == pDag || NULL == pDag->pSubplans || NULL == pJob) {
SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
}
return scheduleExecJobImpl(transport, qnodeList, pDag, pJob, false);
return scheduleExecJobImpl(transport, nodeList, pDag, pJob, false);
}
......
......@@ -321,10 +321,11 @@ TEST(insertTest, normalCase) {
pthread_t thread1;
pthread_create(&(thread1), &thattr, schtSendRsp, &pInsertJob);
code = scheduleExecJob(mockPointer, qnodeList, &dag, &pInsertJob, &numOfRows);
SQueryResult res = {0};
code = scheduleExecJob(mockPointer, qnodeList, &dag, &pInsertJob, &res);
ASSERT_EQ(code, 0);
ASSERT_EQ(numOfRows, 20);
ASSERT_EQ(res.numOfRows, 20);
scheduleFreeJob(pInsertJob);
}
......
......@@ -132,7 +132,7 @@ static FORCE_INLINE SHashNode *doUpdateHashNode(SHashObj *pHashObj, SHashEntry*
} else {
pNewNode->next = pNode;
pe->num++;
atomic_add_fetch_64(&pHashObj->size, 1);
atomic_add_fetch_32(&pHashObj->size, 1);
}
return pNewNode;
......@@ -209,7 +209,7 @@ int32_t taosHashGetSize(const SHashObj *pHashObj) {
if (!pHashObj) {
return 0;
}
return (int32_t)atomic_load_64(&pHashObj->size);
return (int32_t)atomic_load_32(&pHashObj->size);
}
static FORCE_INLINE bool taosHashTableEmpty(const SHashObj *pHashObj) {
......@@ -273,7 +273,7 @@ int32_t taosHashPut(SHashObj *pHashObj, const void *key, size_t keyLen, void *da
// enable resize
__rd_unlock(&pHashObj->lock, pHashObj->type);
atomic_add_fetch_64(&pHashObj->size, 1);
atomic_add_fetch_32(&pHashObj->size, 1);
return 0;
} else {
......@@ -405,7 +405,7 @@ void* taosHashGetCloneImpl(SHashObj *pHashObj, const void *key, size_t keyLen, v
}
if (acquire) {
pNode->count++;
atomic_add_fetch_16(&pNode->count, 1);
}
data = GET_HASH_NODE_DATA(pNode);
......@@ -482,7 +482,7 @@ int32_t taosHashRemove(SHashObj *pHashObj, const void *key, size_t keyLen/*, voi
// if (data) memcpy(data, GET_HASH_NODE_DATA(pNode), dsize);
pe->num--;
atomic_sub_fetch_64(&pHashObj->size, 1);
atomic_sub_fetch_32(&pHashObj->size, 1);
FREE_HASH_NODE(pHashObj, pNode);
}
}
......@@ -520,7 +520,7 @@ int32_t taosHashCondTraverse(SHashObj *pHashObj, bool (*fp)(void *, void *), voi
while((pNode = pEntry->next) != NULL) {
if (fp && (!fp(param, GET_HASH_NODE_DATA(pNode)))) {
pEntry->num -= 1;
atomic_sub_fetch_64(&pHashObj->size, 1);
atomic_sub_fetch_32(&pHashObj->size, 1);
pEntry->next = pNode->next;
......@@ -546,7 +546,7 @@ int32_t taosHashCondTraverse(SHashObj *pHashObj, bool (*fp)(void *, void *), voi
if (fp && (!fp(param, GET_HASH_NODE_DATA(pNext)))) {
pNode->next = pNext->next;
pEntry->num -= 1;
atomic_sub_fetch_64(&pHashObj->size, 1);
atomic_sub_fetch_32(&pHashObj->size, 1);
if (pEntry->num == 0) {
assert(pEntry->next == NULL);
......@@ -600,7 +600,7 @@ void taosHashClear(SHashObj *pHashObj) {
pEntry->next = NULL;
}
pHashObj->size = 0;
atomic_store_32(&pHashObj->size, 0);
__wr_unlock(&pHashObj->lock, pHashObj->type);
}
......@@ -847,7 +847,7 @@ static void *taosHashReleaseNode(SHashObj *pHashObj, void *p, int *slot) {
}
pe->num--;
atomic_sub_fetch_64(&pHashObj->size, 1);
atomic_sub_fetch_32(&pHashObj->size, 1);
FREE_HASH_NODE(pHashObj, pOld);
}
} else {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册