diff --git a/include/common/tmsg.h b/include/common/tmsg.h index bf7a6ea11b426fa3fb437992070bbc87fe52094f..a45ce86b918e4ad5356514ac74292e10f12db23f 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -775,8 +775,8 @@ typedef struct { } SAuthVnodeMsg; typedef struct { - int32_t vgId; - char tableFname[TSDB_TABLE_FNAME_LEN]; + SMsgHead header; + char tableFname[TSDB_TABLE_FNAME_LEN]; } STableInfoMsg; typedef struct { @@ -1060,6 +1060,7 @@ typedef struct { } SUpdateTagValRsp; typedef struct SSubQueryMsg { + SMsgHead header; uint64_t sId; uint64_t queryId; uint64_t taskId; @@ -1068,6 +1069,7 @@ typedef struct SSubQueryMsg { } SSubQueryMsg; typedef struct SResReadyMsg { + SMsgHead header; uint64_t sId; uint64_t queryId; uint64_t taskId; @@ -1078,6 +1080,7 @@ typedef struct SResReadyRsp { } SResReadyRsp; typedef struct SResFetchMsg { + SMsgHead header; uint64_t sId; uint64_t queryId; uint64_t taskId; diff --git a/include/libs/qcom/query.h b/include/libs/qcom/query.h index 4ea35f1d2c76697cd8177c5722c68f32cd14603e..4d5b1a8bd3db1bbc9a2d0654ce8031710afb1909 100644 --- a/include/libs/qcom/query.h +++ b/include/libs/qcom/query.h @@ -75,7 +75,7 @@ typedef struct STableMeta { } STableMeta; typedef struct SDBVgroupInfo { - int32_t lock; + SRWLatch lock; int32_t vgVersion; int8_t hashMethod; SHashObj *vgInfo; //key:vgId, value:SVgroupInfo diff --git a/include/libs/scheduler/scheduler.h b/include/libs/scheduler/scheduler.h index ddcfcab4dbc026888f04440aedfb2ba3c7af09b1..d6cac976d41c1da8001e5bc18a95ee9a85d1b2bd 100644 --- a/include/libs/scheduler/scheduler.h +++ b/include/libs/scheduler/scheduler.h @@ -50,23 +50,37 @@ typedef struct SQueryProfileSummary { uint64_t resultSize; // generated result size in Kb. } SQueryProfileSummary; +typedef struct SQueryNodeAddr{ + int32_t nodeId; //vgId or qnodeId + int8_t inUse; + int8_t numOfEps; + SEpAddrMsg epAddr[TSDB_MAX_REPLICA]; +} SQueryNodeAddr; + +typedef struct SQueryResult { + int32_t code; + uint64_t numOfRows; + int32_t msgSize; + char *msg; +} SQueryResult; + int32_t schedulerInit(SSchedulerCfg *cfg); /** * Process the query job, generated according to the query physical plan. * This is a synchronized API, and is also thread-safety. - * @param qnodeList Qnode address list, element is SEpAddr + * @param nodeList Qnode/Vnode address list, element is SQueryNodeAddr * @return */ -int32_t scheduleExecJob(void *transport, SArray *qnodeList, SQueryDag* pDag, void** pJob, uint64_t *numOfRows); +int32_t scheduleExecJob(void *transport, SArray *nodeList, SQueryDag* pDag, void** pJob, SQueryResult *pRes); /** * Process the query job, generated according to the query physical plan. * This is a asynchronized API, and is also thread-safety. - * @param qnodeList Qnode address list, element is SEpAddr + * @param nodeList Qnode/Vnode address list, element is SQueryNodeAddr * @return */ -int32_t scheduleAsyncExecJob(void *transport, SArray *qnodeList, SQueryDag* pDag, void** pJob); +int32_t scheduleAsyncExecJob(void *transport, SArray *nodeList, SQueryDag* pDag, void** pJob); int32_t scheduleFetchRows(void *pJob, void **data); diff --git a/source/dnode/mgmt/impl/src/dndTransport.c b/source/dnode/mgmt/impl/src/dndTransport.c index e5be16937bb4bc5eb909eba8cccf4f94ac2578c6..dc5248c8e0ecd5cfddab311d2db5b8cb1ab12897 100644 --- a/source/dnode/mgmt/impl/src/dndTransport.c +++ b/source/dnode/mgmt/impl/src/dndTransport.c @@ -121,8 +121,8 @@ static void dndInitMsgFp(STransMgmt *pMgmt) { pMgmt->msgFp[TMSG_INDEX(TDMT_VND_FETCH)] = dndProcessVnodeFetchMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_VND_ALTER_TABLE)] = dndProcessVnodeWriteMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_VND_UPDATE_TAG_VAL)] = dndProcessVnodeWriteMsg; - pMgmt->msgFp[TMSG_INDEX(TDMT_VND_TABLE_META)] = dndProcessVnodeQueryMsg; - pMgmt->msgFp[TMSG_INDEX(TDMT_VND_TABLES_META)] = dndProcessVnodeQueryMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_VND_TABLE_META)] = dndProcessVnodeFetchMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_VND_TABLES_META)] = dndProcessVnodeFetchMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_VND_MQ_CONSUME)] = dndProcessVnodeQueryMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_VND_MQ_QUERY)] = dndProcessVnodeQueryMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_VND_MQ_CONNECT)] = dndProcessVnodeWriteMsg; diff --git a/source/dnode/vnode/impl/src/vnodeQuery.c b/source/dnode/vnode/impl/src/vnodeQuery.c index 01df929080265477536b07a47aa42009051e18ab..daa1e964dea80b11f12bb3a861a69c6437563f37 100644 --- a/source/dnode/vnode/impl/src/vnodeQuery.c +++ b/source/dnode/vnode/impl/src/vnodeQuery.c @@ -17,6 +17,7 @@ #include "vnodeDef.h" static int32_t vnodeGetTableList(SVnode *pVnode, SRpcMsg *pMsg); +static int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp); int vnodeQueryOpen(SVnode *pVnode) { return qWorkerInit(NULL, &pVnode->pQuery); } @@ -43,6 +44,8 @@ int vnodeProcessFetchReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { case TDMT_VND_SHOW_TABLES_FETCH: return vnodeGetTableList(pVnode, pMsg); // return qWorkerProcessShowFetchMsg(pVnode->pMeta, pVnode->pQuery, pMsg); + case TDMT_VND_TABLE_META: + return vnodeGetTableMeta(pVnode, pMsg, pRsp); default: vError("unknown msg type:%d in fetch queue", pMsg->msgType); return TSDB_CODE_VND_APP_ERROR; @@ -88,7 +91,8 @@ static int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { pTagSchema = NULL; } - pTbMetaMsg = (STableMetaMsg *)calloc(1, sizeof(STableMetaMsg) + sizeof(SSchema) * (nCols + nTagCols)); + int msgLen = sizeof(STableMetaMsg) + sizeof(SSchema) * (nCols + nTagCols); + pTbMetaMsg = (STableMetaMsg *)rpcMallocCont(msgLen); if (pTbMetaMsg == NULL) { return -1; } @@ -115,6 +119,16 @@ static int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { pSch->bytes = htonl(pSch->bytes); } + SRpcMsg rpcMsg = { + .handle = pMsg->handle, + .ahandle = pMsg->ahandle, + .pCont = pTbMetaMsg, + .contLen = msgLen, + .code = 0, + }; + + rpcSendResponse(&rpcMsg); + return 0; } @@ -166,4 +180,4 @@ static int32_t vnodeGetTableList(SVnode *pVnode, SRpcMsg *pMsg) { rpcSendResponse(&rpcMsg); return 0; -} \ No newline at end of file +} diff --git a/source/libs/catalog/inc/catalogInt.h b/source/libs/catalog/inc/catalogInt.h index 31b59394639761c941d98660daeac589232b16b6..f426139c147e862633dafd26dc2f744ad77556a0 100644 --- a/source/libs/catalog/inc/catalogInt.h +++ b/source/libs/catalog/inc/catalogInt.h @@ -77,27 +77,37 @@ typedef uint32_t (*tableNameHashFp)(const char *, uint32_t); #define CTG_ERR_LRET(c,...) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { ctgError(__VA_ARGS__); terrno = _code; return _code; } } while (0) #define CTG_ERR_JRET(c) do { code = c; if (code != TSDB_CODE_SUCCESS) { terrno = code; goto _return; } } while (0) +#define TD_RWLATCH_WRITE_FLAG_COPY 0x40000000 + #define CTG_LOCK(type, _lock) do { \ if (CTG_READ == (type)) { \ - if ((*(_lock)) < 0) assert(0); \ + assert(atomic_load_32((_lock)) >= 0); \ + ctgDebug("CTG RLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \ taosRLockLatch(_lock); \ - ctgDebug("CTG RLOCK%p, %s:%d", (_lock), __FILE__, __LINE__); \ + ctgDebug("CTG RLOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \ + assert(atomic_load_32((_lock)) > 0); \ } else { \ - if ((*(_lock)) < 0) assert(0); \ + assert(atomic_load_32((_lock)) >= 0); \ + ctgDebug("CTG WLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \ taosWLockLatch(_lock); \ - ctgDebug("CTG WLOCK%p, %s:%d", (_lock), __FILE__, __LINE__); \ + ctgDebug("CTG WLOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \ + assert(atomic_load_32((_lock)) == TD_RWLATCH_WRITE_FLAG_COPY); \ } \ } while (0) #define CTG_UNLOCK(type, _lock) do { \ if (CTG_READ == (type)) { \ - if ((*(_lock)) <= 0) assert(0); \ + assert(atomic_load_32((_lock)) > 0); \ + ctgDebug("CTG RULOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \ taosRUnLockLatch(_lock); \ - ctgDebug("CTG RULOCK%p, %s:%d", (_lock), __FILE__, __LINE__); \ + ctgDebug("CTG RULOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \ + assert(atomic_load_32((_lock)) >= 0); \ } else { \ - if ((*(_lock)) <= 0) assert(0); \ + assert(atomic_load_32((_lock)) == TD_RWLATCH_WRITE_FLAG_COPY); \ + ctgDebug("CTG WULOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \ taosWUnLockLatch(_lock); \ - ctgDebug("CTG WULOCK%p, %s:%d", (_lock), __FILE__, __LINE__); \ + ctgDebug("CTG WULOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \ + assert(atomic_load_32((_lock)) >= 0); \ } \ } while (0) diff --git a/source/libs/catalog/src/catalog.c b/source/libs/catalog/src/catalog.c index 43f613f654081b4ec2ad067362c48653e1e7ca9e..68ff1b8557f3276f800f0bb33ea21bcbcab12444 100644 --- a/source/libs/catalog/src/catalog.c +++ b/source/libs/catalog/src/catalog.c @@ -23,21 +23,31 @@ SCatalogMgmt ctgMgmt = {0}; int32_t ctgGetDBVgroupFromCache(struct SCatalog* pCatalog, const char *dbName, SDBVgroupInfo **dbInfo, bool *inCache) { if (NULL == pCatalog->dbCache.cache) { *inCache = false; + ctgWarn("no db cache"); return TSDB_CODE_SUCCESS; } - SDBVgroupInfo *info = taosHashAcquire(pCatalog->dbCache.cache, dbName, strlen(dbName)); + SDBVgroupInfo *info = NULL; - if (NULL == info) { - *inCache = false; - return TSDB_CODE_SUCCESS; - } + while (true) { + info = taosHashAcquire(pCatalog->dbCache.cache, dbName, strlen(dbName)); - CTG_LOCK(CTG_READ, &info->lock); - if (NULL == info->vgInfo) { - CTG_UNLOCK(CTG_READ, &info->lock); - *inCache = false; - return TSDB_CODE_SUCCESS; + if (NULL == info) { + *inCache = false; + ctgWarn("no db cache, dbName:%s", dbName); + return TSDB_CODE_SUCCESS; + } + + CTG_LOCK(CTG_READ, &info->lock); + if (NULL == info->vgInfo) { + CTG_UNLOCK(CTG_READ, &info->lock); + taosHashRelease(pCatalog->dbCache.cache, info); + ctgWarn("db cache vgInfo is NULL, dbName:%s", dbName); + + continue; + } + + break; } *dbInfo = info; @@ -179,14 +189,13 @@ int32_t ctgGetTableMetaFromMnode(struct SCatalog* pCatalog, void *pRpc, const SE } -int32_t ctgGetTableMetaFromVnode(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char *pDBName, const char* pTableName, SVgroupInfo *vgroupInfo, STableMetaOutput* output) { - if (NULL == pCatalog || NULL == pRpc || NULL == pMgmtEps || NULL == pDBName || NULL == pTableName || NULL == vgroupInfo || NULL == output) { +int32_t ctgGetTableMetaFromVnode(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const SName* pTableName, SVgroupInfo *vgroupInfo, STableMetaOutput* output) { + if (NULL == pCatalog || NULL == pRpc || NULL == pMgmtEps || NULL == pTableName || NULL == vgroupInfo || NULL == output) { CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT); } char tbFullName[TSDB_TABLE_FNAME_LEN]; - - snprintf(tbFullName, sizeof(tbFullName), "%s.%s", pDBName, pTableName); + tNameExtractFullName(pTableName, tbFullName); SBuildTableMetaInput bInput = {.vgId = vgroupInfo->vgId, .tableFullName = tbFullName}; char *msg = NULL; @@ -271,8 +280,6 @@ _return: int32_t ctgGetVgInfoFromHashValue(SDBVgroupInfo *dbInfo, const SName *pTableName, SVgroupInfo *pVgroup) { int32_t code = 0; - CTG_LOCK(CTG_READ, &dbInfo->lock); - int32_t vgNum = taosHashGetSize(dbInfo->vgInfo); char db[TSDB_DB_FNAME_LEN] = {0}; tNameGetFullDbName(pTableName, db); @@ -311,8 +318,6 @@ int32_t ctgGetVgInfoFromHashValue(SDBVgroupInfo *dbInfo, const SName *pTableName *pVgroup = *vgInfo; _return: - - CTG_UNLOCK(CTG_READ, &dbInfo->lock); CTG_RET(TSDB_CODE_SUCCESS); } @@ -434,11 +439,47 @@ int32_t ctgGetDBVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgm input.db[sizeof(input.db) - 1] = 0; input.vgVersion = CTG_DEFAULT_INVALID_VERSION; - CTG_ERR_RET(ctgGetDBVgroupFromMnode(pCatalog, pRpc, pMgmtEps, &input, &DbOut)); + while (true) { + CTG_ERR_RET(ctgGetDBVgroupFromMnode(pCatalog, pRpc, pMgmtEps, &input, &DbOut)); - CTG_ERR_RET(catalogUpdateDBVgroup(pCatalog, dbName, &DbOut.dbVgroup)); + CTG_ERR_RET(catalogUpdateDBVgroup(pCatalog, dbName, &DbOut.dbVgroup)); - CTG_ERR_RET(ctgGetDBVgroupFromCache(pCatalog, dbName, dbInfo, &inCache)); + CTG_ERR_RET(ctgGetDBVgroupFromCache(pCatalog, dbName, dbInfo, &inCache)); + + if (!inCache) { + ctgWarn("get db vgroup from cache failed, db:%s", dbName); + continue; + } + + break; + } + + return TSDB_CODE_SUCCESS; +} + + +int32_t ctgValidateAndRemoveDb(struct SCatalog* pCatalog, const char* dbName, SDBVgroupInfo* dbInfo) { + SDBVgroupInfo *oldInfo = (SDBVgroupInfo *)taosHashAcquire(pCatalog->dbCache.cache, dbName, strlen(dbName)); + if (oldInfo) { + CTG_LOCK(CTG_WRITE, &oldInfo->lock); + if (dbInfo->vgVersion <= oldInfo->vgVersion) { + ctgInfo("dbName:%s vg will not update, vgVersion:%d , current:%d", dbName, dbInfo->vgVersion, oldInfo->vgVersion); + CTG_UNLOCK(CTG_WRITE, &oldInfo->lock); + taosHashRelease(pCatalog->dbCache.cache, oldInfo); + + return TSDB_CODE_SUCCESS; + } + + if (oldInfo->vgInfo) { + ctgInfo("dbName:%s vg will be cleanup", dbName); + taosHashCleanup(oldInfo->vgInfo); + oldInfo->vgInfo = NULL; + } + + CTG_UNLOCK(CTG_WRITE, &oldInfo->lock); + + taosHashRelease(pCatalog->dbCache.cache, oldInfo); + } return TSDB_CODE_SUCCESS; } @@ -580,55 +621,57 @@ _return: int32_t catalogUpdateDBVgroup(struct SCatalog* pCatalog, const char* dbName, SDBVgroupInfo* dbInfo) { + int32_t code = 0; + if (NULL == pCatalog || NULL == dbName || NULL == dbInfo) { - CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT); + CTG_ERR_JRET(TSDB_CODE_CTG_INVALID_INPUT); + } + + if (NULL == dbInfo->vgInfo || dbInfo->vgVersion < 0 || taosHashGetSize(dbInfo->vgInfo) <= 0) { + ctgError("invalid db vg, dbName:%s", dbName); + CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR); } if (dbInfo->vgVersion < 0) { + ctgWarn("invalid db vgVersion:%d, dbName:%s", dbInfo->vgVersion, dbName); + if (pCatalog->dbCache.cache) { - SDBVgroupInfo *oldInfo = taosHashAcquire(pCatalog->dbCache.cache, dbName, strlen(dbName)); - if (oldInfo) { - CTG_LOCK(CTG_WRITE, &oldInfo->lock); - if (oldInfo->vgInfo) { - taosHashCleanup(oldInfo->vgInfo); - oldInfo->vgInfo = NULL; - } - CTG_UNLOCK(CTG_WRITE, &oldInfo->lock); - - taosHashRelease(pCatalog->dbCache.cache, oldInfo); - } + CTG_ERR_JRET(ctgValidateAndRemoveDb(pCatalog, dbName, dbInfo)); + + CTG_ERR_JRET(taosHashRemove(pCatalog->dbCache.cache, dbName, strlen(dbName))); } ctgWarn("remove db [%s] from cache", dbName); - return TSDB_CODE_SUCCESS; + goto _return; } if (NULL == pCatalog->dbCache.cache) { pCatalog->dbCache.cache = taosHashInit(ctgMgmt.cfg.maxDBCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); if (NULL == pCatalog->dbCache.cache) { ctgError("init hash[%d] for db cache failed", CTG_DEFAULT_CACHE_DB_NUMBER); - CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR); + CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR); } } else { - SDBVgroupInfo *oldInfo = taosHashAcquire(pCatalog->dbCache.cache, dbName, strlen(dbName)); - if (oldInfo) { - CTG_LOCK(CTG_WRITE, &oldInfo->lock); - if (oldInfo->vgInfo) { - taosHashCleanup(oldInfo->vgInfo); - oldInfo->vgInfo = NULL; - } - CTG_UNLOCK(CTG_WRITE, &oldInfo->lock); - - taosHashRelease(pCatalog->dbCache.cache, oldInfo); - } + CTG_ERR_JRET(ctgValidateAndRemoveDb(pCatalog, dbName, dbInfo)); } if (taosHashPut(pCatalog->dbCache.cache, dbName, strlen(dbName), dbInfo, sizeof(*dbInfo)) != 0) { ctgError("push to vgroup hash cache failed"); - CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR); + CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR); } - return TSDB_CODE_SUCCESS; + ctgDebug("dbName:%s vgroup updated, vgVersion:%d", dbName, dbInfo->vgVersion); + + dbInfo->vgInfo = NULL; + +_return: + + if (dbInfo && dbInfo->vgInfo) { + taosHashCleanup(dbInfo->vgInfo); + dbInfo->vgInfo = NULL; + } + + CTG_RET(code); } int32_t catalogGetTableMeta(struct SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, STableMeta** pTableMeta) { @@ -647,9 +690,9 @@ int32_t catalogRenewTableMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSe STableMetaOutput output = {0}; - //CTG_ERR_RET(ctgGetTableMetaFromVnode(pCatalog, pRpc, pMgmtEps, pDBName, pTableName, &vgroupInfo, &output)); + CTG_ERR_RET(ctgGetTableMetaFromVnode(pCatalog, pRpc, pMgmtEps, pTableName, &vgroupInfo, &output)); - CTG_ERR_RET(ctgGetTableMetaFromMnode(pCatalog, pRpc, pMgmtEps, pTableName, &output)); + //CTG_ERR_RET(ctgGetTableMetaFromMnode(pCatalog, pRpc, pMgmtEps, pTableName, &output)); CTG_ERR_JRET(ctgUpdateTableMetaCache(pCatalog, &output)); diff --git a/source/libs/catalog/test/catalogTests.cpp b/source/libs/catalog/test/catalogTests.cpp index aa35e56552f619967713bec483e1a8de9377021f..5979d3a147c26cbd154f0a79c92be9004d40314e 100644 --- a/source/libs/catalog/test/catalogTests.cpp +++ b/source/libs/catalog/test/catalogTests.cpp @@ -36,11 +36,19 @@ namespace { +extern "C" int32_t ctgGetTableMetaFromCache(struct SCatalog* pCatalog, const SName* pTableName, STableMeta** pTableMeta, int32_t *exist); +extern "C" int32_t ctgUpdateTableMetaCache(struct SCatalog *pCatalog, STableMetaOutput *output); + void ctgTestSetPrepareTableMeta(); void ctgTestSetPrepareCTableMeta(); void ctgTestSetPrepareSTableMeta(); +bool ctgTestStop = false; +bool ctgTestEnableSleep = false; +bool ctgTestDeadLoop = true; +int32_t ctgTestCurrentVgVersion = 0; +int32_t ctgTestVgVersion = 1; int32_t ctgTestVgNum = 10; int32_t ctgTestColNum = 2; int32_t ctgTestTagNum = 1; @@ -89,6 +97,113 @@ void sendCreateDbMsg(void *shandle, SEpSet *pEpSet) { ASSERT_EQ(rpcRsp.code, 0); } +void ctgTestInitLogFile() { + const char *defaultLogFileNamePrefix = "taoslog"; + const int32_t maxLogFileNum = 10; + + ctgDebugFlag = 159; + tsAsyncLog = 0; + + char temp[128] = {0}; + sprintf(temp, "%s/%s", tsLogDir, defaultLogFileNamePrefix); + if (taosInitLog(temp, tsNumOfLogLines, maxLogFileNum) < 0) { + printf("failed to open log file in directory:%s\n", tsLogDir); + } + +} + +int32_t ctgTestGetVgNumFromVgVersion(int32_t vgVersion) { + return ((vgVersion % 2) == 0) ? ctgTestVgNum - 2 : ctgTestVgNum; +} + +void ctgTestBuildCTableMetaOutput(STableMetaOutput *output) { + SName cn = {.type = TSDB_TABLE_NAME_T, .acctId = 1}; + strcpy(cn.dbname, "db1"); + strcpy(cn.tname, ctgTestCTablename); + + SName sn = {.type = TSDB_TABLE_NAME_T, .acctId = 1}; + strcpy(sn.dbname, "db1"); + strcpy(sn.tname, ctgTestSTablename); + + char tbFullName[TSDB_TABLE_FNAME_LEN]; + tNameExtractFullName(&cn, tbFullName); + + output->metaNum = 2; + + strcpy(output->ctbFname, tbFullName); + + tNameExtractFullName(&cn, tbFullName); + strcpy(output->tbFname, tbFullName); + + output->ctbMeta.vgId = 9; + output->ctbMeta.tableType = TSDB_CHILD_TABLE; + output->ctbMeta.uid = 3; + output->ctbMeta.suid = 2; + + output->tbMeta = (STableMeta *)calloc(1, sizeof(STableMeta) + sizeof(SSchema) * (ctgTestColNum + ctgTestColNum)); + output->tbMeta->vgId = 9; + output->tbMeta->tableType = TSDB_SUPER_TABLE; + output->tbMeta->uid = 2; + output->tbMeta->suid = 2; + + output->tbMeta->tableInfo.numOfColumns = ctgTestColNum; + output->tbMeta->tableInfo.numOfTags = ctgTestTagNum; + + output->tbMeta->sversion = ctgTestSVersion; + output->tbMeta->tversion = ctgTestTVersion; + + SSchema *s = NULL; + s = &output->tbMeta->schema[0]; + s->type = TSDB_DATA_TYPE_TIMESTAMP; + s->colId = 1; + s->bytes = 8; + strcpy(s->name, "ts"); + + s = &output->tbMeta->schema[1]; + s->type = TSDB_DATA_TYPE_INT; + s->colId = 2; + s->bytes = 4; + strcpy(s->name, "col1s"); + + s = &output->tbMeta->schema[2]; + s->type = TSDB_DATA_TYPE_BINARY; + s->colId = 3; + s->bytes = 12; + strcpy(s->name, "tag1s"); + +} + +void ctgTestBuildDBVgroup(SDBVgroupInfo *dbVgroup) { + static int32_t vgVersion = ctgTestVgVersion + 1; + int32_t vgNum = 0; + SVgroupInfo vgInfo = {0}; + + dbVgroup->vgVersion = vgVersion++; + + ctgTestCurrentVgVersion = dbVgroup->vgVersion; + + dbVgroup->hashMethod = 0; + dbVgroup->vgInfo = taosHashInit(ctgTestVgNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK); + + vgNum = ctgTestGetVgNumFromVgVersion(dbVgroup->vgVersion); + uint32_t hashUnit = UINT32_MAX / vgNum; + + for (int32_t i = 0; i < vgNum; ++i) { + vgInfo.vgId = i + 1; + vgInfo.hashBegin = i * hashUnit; + vgInfo.hashEnd = hashUnit * (i + 1) - 1; + vgInfo.numOfEps = i % TSDB_MAX_REPLICA + 1; + vgInfo.inUse = i % vgInfo.numOfEps; + for (int32_t n = 0; n < vgInfo.numOfEps; ++n) { + SEpAddrMsg *addr = &vgInfo.epAddr[n]; + strcpy(addr->fqdn, "a0"); + addr->port = htons(n + 22); + } + + taosHashPut(dbVgroup->vgInfo, &vgInfo.vgId, sizeof(vgInfo.vgId), &vgInfo, sizeof(vgInfo)); + } +} + void ctgTestPrepareDbVgroups(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pRsp) { SUseDbRsp *rspMsg = NULL; //todo @@ -97,7 +212,8 @@ void ctgTestPrepareDbVgroups(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcM pRsp->pCont = calloc(1, pRsp->contLen); rspMsg = (SUseDbRsp *)pRsp->pCont; strcpy(rspMsg->db, ctgTestDbname); - rspMsg->vgVersion = htonl(1); + rspMsg->vgVersion = htonl(ctgTestVgVersion); + ctgTestCurrentVgVersion = ctgTestVgVersion; rspMsg->vgNum = htonl(ctgTestVgNum); rspMsg->hashMethod = 0; @@ -148,13 +264,13 @@ void ctgTestPrepareTableMeta(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcM SSchema *s = NULL; s = &rspMsg->pSchema[0]; s->type = TSDB_DATA_TYPE_TIMESTAMP; - s->colId = htonl(0); + s->colId = htonl(1); s->bytes = htonl(8); strcpy(s->name, "ts"); s = &rspMsg->pSchema[1]; s->type = TSDB_DATA_TYPE_INT; - s->colId = htonl(1); + s->colId = htonl(2); s->bytes = htonl(4); strcpy(s->name, "col1"); @@ -185,19 +301,19 @@ void ctgTestPrepareCTableMeta(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpc SSchema *s = NULL; s = &rspMsg->pSchema[0]; s->type = TSDB_DATA_TYPE_TIMESTAMP; - s->colId = htonl(0); + s->colId = htonl(1); s->bytes = htonl(8); strcpy(s->name, "ts"); s = &rspMsg->pSchema[1]; s->type = TSDB_DATA_TYPE_INT; - s->colId = htonl(1); + s->colId = htonl(2); s->bytes = htonl(4); strcpy(s->name, "col1s"); s = &rspMsg->pSchema[2]; s->type = TSDB_DATA_TYPE_BINARY; - s->colId = htonl(2); + s->colId = htonl(3); s->bytes = htonl(12); strcpy(s->name, "tag1s"); @@ -229,19 +345,19 @@ void ctgTestPrepareSTableMeta(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpc SSchema *s = NULL; s = &rspMsg->pSchema[0]; s->type = TSDB_DATA_TYPE_TIMESTAMP; - s->colId = htonl(0); + s->colId = htonl(1); s->bytes = htonl(8); strcpy(s->name, "ts"); s = &rspMsg->pSchema[1]; s->type = TSDB_DATA_TYPE_INT; - s->colId = htonl(1); + s->colId = htonl(2); s->bytes = htonl(4); strcpy(s->name, "col1s"); s = &rspMsg->pSchema[2]; s->type = TSDB_DATA_TYPE_BINARY; - s->colId = htonl(2); + s->colId = htonl(3); s->bytes = htonl(12); strcpy(s->name, "tag1s"); @@ -371,6 +487,117 @@ void ctgTestSetPrepareDbVgroupsAndSuperMeta() { } +void *ctgTestGetDbVgroupThread(void *param) { + struct SCatalog* pCtg = (struct SCatalog*)param; + int32_t code = 0; + void *mockPointer = (void *)0x1; + SArray *vgList = NULL; + int32_t n = 0; + + while (!ctgTestStop) { + code = catalogGetDBVgroup(pCtg, mockPointer, (const SEpSet *)mockPointer, ctgTestDbname, false, &vgList); + if (code) { + assert(0); + } + + if (vgList) { + taosArrayDestroy(vgList); + } + + if (ctgTestEnableSleep) { + usleep(rand()%5); + } + if (++n % 50000 == 0) { + printf("Get:%d\n", n); + } + } + + return NULL; +} + +void *ctgTestSetDbVgroupThread(void *param) { + struct SCatalog* pCtg = (struct SCatalog*)param; + int32_t code = 0; + SDBVgroupInfo dbVgroup = {0}; + int32_t n = 0; + + while (!ctgTestStop) { + ctgTestBuildDBVgroup(&dbVgroup); + code = catalogUpdateDBVgroup(pCtg, ctgTestDbname, &dbVgroup); + if (code) { + assert(0); + } + + if (ctgTestEnableSleep) { + usleep(rand()%5); + } + if (++n % 50000 == 0) { + printf("Set:%d\n", n); + } + } + + return NULL; + +} + +void *ctgTestGetCtableMetaThread(void *param) { + struct SCatalog* pCtg = (struct SCatalog*)param; + int32_t code = 0; + int32_t n = 0; + STableMeta* tbMeta = NULL; + int32_t exist = 0; + + SName cn = {.type = TSDB_TABLE_NAME_T, .acctId = 1}; + strcpy(cn.dbname, "db1"); + strcpy(cn.tname, ctgTestCTablename); + + while (!ctgTestStop) { + code = ctgGetTableMetaFromCache(pCtg, &cn, &tbMeta, &exist); + if (code || 0 == exist) { + assert(0); + } + + if (ctgTestEnableSleep) { + usleep(rand()%5); + } + + if (++n % 50000 == 0) { + printf("Get:%d\n", n); + } + } + + return NULL; +} + +void *ctgTestSetCtableMetaThread(void *param) { + struct SCatalog* pCtg = (struct SCatalog*)param; + int32_t code = 0; + SDBVgroupInfo dbVgroup = {0}; + int32_t n = 0; + STableMetaOutput output = {0}; + + ctgTestBuildCTableMetaOutput(&output); + + while (!ctgTestStop) { + code = ctgUpdateTableMetaCache(pCtg, &output); + if (code) { + assert(0); + } + + if (ctgTestEnableSleep) { + usleep(rand()%5); + } + if (++n % 50000 == 0) { + printf("Set:%d\n", n); + } + } + + return NULL; + +} + +#if 0 + TEST(tableMeta, normalTable) { struct SCatalog* pCtg = NULL; void *mockPointer = (void *)0x1; @@ -388,7 +615,7 @@ TEST(tableMeta, normalTable) { code = catalogGetHandle(ctgTestClusterId, &pCtg); ASSERT_EQ(code, 0); - SName n = {.type = T_NAME_TABLE, .acctId = 1}; + SName n = {.type = TSDB_TABLE_NAME_T, .acctId = 1}; strcpy(n.dbname, "db1"); strcpy(n.tname, ctgTestTablename); @@ -436,11 +663,13 @@ TEST(tableMeta, childTableCase) { initQueryModuleMsgHandle(); //sendCreateDbMsg(pConn->pTransporter, &pConn->pAppInfo->mgmtEp.epSet); + int32_t code = catalogInit(NULL); + ASSERT_EQ(code, 0); - int32_t code = catalogGetHandle(ctgTestClusterId, &pCtg); + code = catalogGetHandle(ctgTestClusterId, &pCtg); ASSERT_EQ(code, 0); - SName n = {.type = T_NAME_TABLE, .acctId = 1}; + SName n = {.type = TSDB_TABLE_NAME_T, .acctId = 1}; strcpy(n.dbname, "db1"); strcpy(n.tname, ctgTestCTablename); @@ -494,11 +723,14 @@ TEST(tableMeta, superTableCase) { initQueryModuleMsgHandle(); + int32_t code = catalogInit(NULL); + ASSERT_EQ(code, 0); + //sendCreateDbMsg(pConn->pTransporter, &pConn->pAppInfo->mgmtEp.epSet); - int32_t code = catalogGetHandle(ctgTestClusterId, &pCtg); + code = catalogGetHandle(ctgTestClusterId, &pCtg); ASSERT_EQ(code, 0); - SName n = {.type = T_NAME_TABLE, .acctId = 1}; + SName n = {.type = TSDB_TABLE_NAME_T, .acctId = 1}; strcpy(n.dbname, "db1"); strcpy(n.tname, ctgTestSTablename); @@ -558,12 +790,15 @@ TEST(tableDistVgroup, normalTable) { initQueryModuleMsgHandle(); + int32_t code = catalogInit(NULL); + ASSERT_EQ(code, 0); + //sendCreateDbMsg(pConn->pTransporter, &pConn->pAppInfo->mgmtEp.epSet); - int32_t code = catalogGetHandle(ctgTestClusterId, &pCtg); + code = catalogGetHandle(ctgTestClusterId, &pCtg); ASSERT_EQ(code, 0); - SName n = {.type = T_NAME_TABLE, .acctId = 1}; + SName n = {.type = TSDB_TABLE_NAME_T, .acctId = 1}; strcpy(n.dbname, "db1"); strcpy(n.tname, ctgTestTablename); @@ -595,7 +830,7 @@ TEST(tableDistVgroup, childTableCase) { code = catalogGetHandle(ctgTestClusterId, &pCtg); ASSERT_EQ(code, 0); - SName n = {.type = T_NAME_TABLE, .acctId = 1}; + SName n = {.type = TSDB_TABLE_NAME_T, .acctId = 1}; strcpy(n.dbname, "db1"); strcpy(n.tname, ctgTestCTablename); @@ -620,11 +855,14 @@ TEST(tableDistVgroup, superTableCase) { initQueryModuleMsgHandle(); + int32_t code = catalogInit(NULL); + ASSERT_EQ(code, 0); + //sendCreateDbMsg(pConn->pTransporter, &pConn->pAppInfo->mgmtEp.epSet); - int32_t code = catalogGetHandle(ctgTestClusterId, &pCtg); + code = catalogGetHandle(ctgTestClusterId, &pCtg); ASSERT_EQ(code, 0); - SName n = {.type = T_NAME_TABLE, .acctId = 1}; + SName n = {.type = TSDB_TABLE_NAME_T, .acctId = 1}; strcpy(n.dbname, "db1"); strcpy(n.tname, ctgTestSTablename); @@ -645,6 +883,164 @@ TEST(tableDistVgroup, superTableCase) { catalogDestroy(); } +TEST(dbVgroup, getSetDbVgroupCase) { + struct SCatalog* pCtg = NULL; + void *mockPointer = (void *)0x1; + SVgroupInfo vgInfo = {0}; + SVgroupInfo *pvgInfo = NULL; + SDBVgroupInfo dbVgroup = {0}; + SArray *vgList = NULL; + + ctgTestSetPrepareDbVgroupsAndNormalMeta(); + + initQueryModuleMsgHandle(); + + //sendCreateDbMsg(pConn->pTransporter, &pConn->pAppInfo->mgmtEp.epSet); + + int32_t code = catalogInit(NULL); + ASSERT_EQ(code, 0); + + code = catalogGetHandle(ctgTestClusterId, &pCtg); + ASSERT_EQ(code, 0); + + SName n = {.type = TSDB_TABLE_NAME_T, .acctId = 1}; + strcpy(n.dbname, "db1"); + strcpy(n.tname, ctgTestTablename); + + code = catalogGetDBVgroup(pCtg, mockPointer, (const SEpSet *)mockPointer, ctgTestDbname, false, &vgList); + ASSERT_EQ(code, 0); + ASSERT_EQ(taosArrayGetSize((const SArray *)vgList), ctgTestVgNum); + + code = catalogGetTableHashVgroup(pCtg, mockPointer, (const SEpSet *)mockPointer, &n, &vgInfo); + ASSERT_EQ(code, 0); + ASSERT_EQ(vgInfo.vgId, 8); + ASSERT_EQ(vgInfo.numOfEps, 3); + + code = catalogGetTableDistVgroup(pCtg, mockPointer, (const SEpSet *)mockPointer, &n, &vgList); + ASSERT_EQ(code, 0); + ASSERT_EQ(taosArrayGetSize((const SArray *)vgList), 1); + pvgInfo = (SVgroupInfo *)taosArrayGet(vgList, 0); + ASSERT_EQ(pvgInfo->vgId, 8); + ASSERT_EQ(pvgInfo->numOfEps, 3); + taosArrayDestroy(vgList); + + ctgTestBuildDBVgroup(&dbVgroup); + code = catalogUpdateDBVgroup(pCtg, ctgTestDbname, &dbVgroup); + ASSERT_EQ(code, 0); + + code = catalogGetTableHashVgroup(pCtg, mockPointer, (const SEpSet *)mockPointer, &n, &vgInfo); + ASSERT_EQ(code, 0); + ASSERT_EQ(vgInfo.vgId, 7); + ASSERT_EQ(vgInfo.numOfEps, 2); + + code = catalogGetTableDistVgroup(pCtg, mockPointer, (const SEpSet *)mockPointer, &n, &vgList); + ASSERT_EQ(code, 0); + ASSERT_EQ(taosArrayGetSize((const SArray *)vgList), 1); + pvgInfo = (SVgroupInfo *)taosArrayGet(vgList, 0); + ASSERT_EQ(pvgInfo->vgId, 8); + ASSERT_EQ(pvgInfo->numOfEps, 3); + taosArrayDestroy(vgList); + + catalogDestroy(); +} + +#endif + +TEST(multiThread, getSetDbVgroupCase) { + struct SCatalog* pCtg = NULL; + void *mockPointer = (void *)0x1; + SVgroupInfo vgInfo = {0}; + SVgroupInfo *pvgInfo = NULL; + SDBVgroupInfo dbVgroup = {0}; + SArray *vgList = NULL; + + ctgTestInitLogFile(); + + ctgTestSetPrepareDbVgroups(); + + initQueryModuleMsgHandle(); + + //sendCreateDbMsg(pConn->pTransporter, &pConn->pAppInfo->mgmtEp.epSet); + + int32_t code = catalogInit(NULL); + ASSERT_EQ(code, 0); + + code = catalogGetHandle(ctgTestClusterId, &pCtg); + ASSERT_EQ(code, 0); + + SName n = {.type = TSDB_TABLE_NAME_T, .acctId = 1}; + strcpy(n.dbname, "db1"); + strcpy(n.tname, ctgTestTablename); + + pthread_attr_t thattr; + pthread_attr_init(&thattr); + + pthread_t thread1, thread2; + pthread_create(&(thread1), &thattr, ctgTestSetDbVgroupThread, pCtg); + + sleep(1); + pthread_create(&(thread1), &thattr, ctgTestGetDbVgroupThread, pCtg); + + while (true) { + if (ctgTestDeadLoop) { + sleep(1); + } else { + sleep(600); + break; + } + } + + ctgTestStop = true; + sleep(1); + + catalogDestroy(); +} + +TEST(multiThread, ctableMeta) { + struct SCatalog* pCtg = NULL; + void *mockPointer = (void *)0x1; + SVgroupInfo vgInfo = {0}; + SVgroupInfo *pvgInfo = NULL; + SDBVgroupInfo dbVgroup = {0}; + SArray *vgList = NULL; + + ctgTestSetPrepareDbVgroupsAndChildMeta(); + + initQueryModuleMsgHandle(); + + //sendCreateDbMsg(pConn->pTransporter, &pConn->pAppInfo->mgmtEp.epSet); + + int32_t code = catalogInit(NULL); + ASSERT_EQ(code, 0); + + code = catalogGetHandle(ctgTestClusterId, &pCtg); + ASSERT_EQ(code, 0); + + SName n = {.type = TSDB_TABLE_NAME_T, .acctId = 1}; + strcpy(n.dbname, "db1"); + strcpy(n.tname, ctgTestTablename); + + pthread_attr_t thattr; + pthread_attr_init(&thattr); + + pthread_t thread1, thread2; + pthread_create(&(thread1), &thattr, ctgTestGetCtableMetaThread, pCtg); + pthread_create(&(thread1), &thattr, ctgTestSetCtableMetaThread, pCtg); + + while (true) { + if (ctgTestDeadLoop) { + sleep(1); + } else { + sleep(600); + break; + } + } + + ctgTestStop = true; + sleep(1); + + catalogDestroy(); +} int main(int argc, char** argv) { diff --git a/source/libs/qcom/src/querymsg.c b/source/libs/qcom/src/querymsg.c index 27ca406fc47745d2857719865d079f371c22064b..117297b9ffa90b93d56be5a872950a4f0ac9d1ff 100644 --- a/source/libs/qcom/src/querymsg.c +++ b/source/libs/qcom/src/querymsg.c @@ -40,7 +40,7 @@ int32_t queryBuildTableMetaReqMsg(void* input, char **msg, int32_t msgSize, int3 STableInfoMsg *bMsg = (STableInfoMsg *)*msg; - bMsg->vgId = bInput->vgId; + bMsg->header.vgId = htonl(bInput->vgId); strncpy(bMsg->tableFname, bInput->tableFullName, sizeof(bMsg->tableFname)); bMsg->tableFname[sizeof(bMsg->tableFname) - 1] = 0; diff --git a/source/libs/scheduler/inc/schedulerInt.h b/source/libs/scheduler/inc/schedulerInt.h index c327e4cfead62c33da5d63af0651d868f2c56b34..2770f7e21a4dce62a7461eede23cc3e3f60332cf 100644 --- a/source/libs/scheduler/inc/schedulerInt.h +++ b/source/libs/scheduler/inc/schedulerInt.h @@ -89,12 +89,12 @@ typedef struct SSchJob { SEpSet dataSrcEps; SEpAddr resEp; void *transport; - SArray *qnodeList; + SArray *nodeList; // qnode/vnode list, element is SQueryNodeAddr tsem_t rspSem; int32_t userFetch; int32_t remoteFetch; - SSchTask *fetchTask; + SSchTask *fetchTask; int32_t errCode; void *res; int32_t resNumOfRows; diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index 12c07d69eeafb017297b83f423b753fde527cb55..3f1799507df67520d966ac527d0da03b0b75ad8f 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -220,10 +220,10 @@ int32_t schSetTaskExecEpSet(SSchJob *job, SEpSet *epSet) { return TSDB_CODE_SUCCESS; } - int32_t qnodeNum = taosArrayGetSize(job->qnodeList); + int32_t nodeNum = taosArrayGetSize(job->nodeList); - for (int32_t i = 0; i < qnodeNum && epSet->numOfEps < tListLen(epSet->port); ++i) { - SEpAddr *addr = taosArrayGet(job->qnodeList, i); + for (int32_t i = 0; i < nodeNum && epSet->numOfEps < tListLen(epSet->port); ++i) { + SEpAddr *addr = taosArrayGet(job->nodeList, i); strncpy(epSet->fqdn[epSet->numOfEps], addr->fqdn, sizeof(addr->fqdn)); epSet->port[epSet->numOfEps] = addr->port; @@ -829,8 +829,8 @@ int32_t schedulerInit(SSchedulerCfg *cfg) { } -int32_t scheduleExecJobImpl(void *transport, SArray *qnodeList, SQueryDag* pDag, void** pJob, bool syncSchedule) { - if (qnodeList && taosArrayGetSize(qnodeList) <= 0) { +int32_t scheduleExecJobImpl(void *transport, SArray *nodeList, SQueryDag* pDag, void** pJob, bool syncSchedule) { + if (nodeList && taosArrayGetSize(nodeList) <= 0) { qInfo("qnodeList is empty"); } @@ -842,7 +842,7 @@ int32_t scheduleExecJobImpl(void *transport, SArray *qnodeList, SQueryDag* pDag, job->attr.syncSchedule = syncSchedule; job->transport = transport; - job->qnodeList = qnodeList; + job->nodeList = nodeList; SCH_ERR_JRET(schValidateAndBuildJob(pDag, job)); @@ -897,28 +897,27 @@ _return: SCH_RET(code); } -int32_t scheduleExecJob(void *transport, SArray *qnodeList, SQueryDag* pDag, void** pJob, uint64_t *numOfRows) { - if (NULL == transport || /* NULL == qnodeList || */ NULL == pDag || NULL == pDag->pSubplans || NULL == pJob || NULL == numOfRows) { +int32_t scheduleExecJob(void *transport, SArray *nodeList, SQueryDag* pDag, void** pJob, SQueryResult *pRes) { + if (NULL == transport || /* NULL == nodeList || */ NULL == pDag || NULL == pDag->pSubplans || NULL == pJob || NULL == pRes) { SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); } - *numOfRows = 0; - - SCH_ERR_RET(scheduleExecJobImpl(transport, qnodeList, pDag, pJob, true)); + SCH_ERR_RET(scheduleExecJobImpl(transport, nodeList, pDag, pJob, true)); SSchJob *job = *(SSchJob **)pJob; - *numOfRows = job->resNumOfRows; - + pRes->code = job->errCode; + pRes->numOfRows = job->resNumOfRows; + return TSDB_CODE_SUCCESS; } -int32_t scheduleAsyncExecJob(void *transport, SArray *qnodeList, SQueryDag* pDag, void** pJob) { - if (NULL == transport || NULL == qnodeList ||NULL == pDag || NULL == pDag->pSubplans || NULL == pJob) { +int32_t scheduleAsyncExecJob(void *transport, SArray *nodeList, SQueryDag* pDag, void** pJob) { + if (NULL == transport || NULL == nodeList ||NULL == pDag || NULL == pDag->pSubplans || NULL == pJob) { SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); } - return scheduleExecJobImpl(transport, qnodeList, pDag, pJob, false); + return scheduleExecJobImpl(transport, nodeList, pDag, pJob, false); } diff --git a/source/libs/scheduler/test/schedulerTests.cpp b/source/libs/scheduler/test/schedulerTests.cpp index 6163bc0c1a4d22bc1b612a4218f8a485ff9cd802..b418ade1725cf97765aea2da961dcb061081e0c5 100644 --- a/source/libs/scheduler/test/schedulerTests.cpp +++ b/source/libs/scheduler/test/schedulerTests.cpp @@ -321,10 +321,11 @@ TEST(insertTest, normalCase) { pthread_t thread1; pthread_create(&(thread1), &thattr, schtSendRsp, &pInsertJob); - - code = scheduleExecJob(mockPointer, qnodeList, &dag, &pInsertJob, &numOfRows); + + SQueryResult res = {0}; + code = scheduleExecJob(mockPointer, qnodeList, &dag, &pInsertJob, &res); ASSERT_EQ(code, 0); - ASSERT_EQ(numOfRows, 20); + ASSERT_EQ(res.numOfRows, 20); scheduleFreeJob(pInsertJob); } diff --git a/source/util/src/thash.c b/source/util/src/thash.c index 840a1ef390a529c5aebdce673f3e6d1800e8e8ff..2841f27da4b74f616ab70360df1b068b0d52cfdb 100644 --- a/source/util/src/thash.c +++ b/source/util/src/thash.c @@ -132,7 +132,7 @@ static FORCE_INLINE SHashNode *doUpdateHashNode(SHashObj *pHashObj, SHashEntry* } else { pNewNode->next = pNode; pe->num++; - atomic_add_fetch_64(&pHashObj->size, 1); + atomic_add_fetch_32(&pHashObj->size, 1); } return pNewNode; @@ -209,7 +209,7 @@ int32_t taosHashGetSize(const SHashObj *pHashObj) { if (!pHashObj) { return 0; } - return (int32_t)atomic_load_64(&pHashObj->size); + return (int32_t)atomic_load_32(&pHashObj->size); } static FORCE_INLINE bool taosHashTableEmpty(const SHashObj *pHashObj) { @@ -273,7 +273,7 @@ int32_t taosHashPut(SHashObj *pHashObj, const void *key, size_t keyLen, void *da // enable resize __rd_unlock(&pHashObj->lock, pHashObj->type); - atomic_add_fetch_64(&pHashObj->size, 1); + atomic_add_fetch_32(&pHashObj->size, 1); return 0; } else { @@ -405,7 +405,7 @@ void* taosHashGetCloneImpl(SHashObj *pHashObj, const void *key, size_t keyLen, v } if (acquire) { - pNode->count++; + atomic_add_fetch_16(&pNode->count, 1); } data = GET_HASH_NODE_DATA(pNode); @@ -482,7 +482,7 @@ int32_t taosHashRemove(SHashObj *pHashObj, const void *key, size_t keyLen/*, voi // if (data) memcpy(data, GET_HASH_NODE_DATA(pNode), dsize); pe->num--; - atomic_sub_fetch_64(&pHashObj->size, 1); + atomic_sub_fetch_32(&pHashObj->size, 1); FREE_HASH_NODE(pHashObj, pNode); } } @@ -520,7 +520,7 @@ int32_t taosHashCondTraverse(SHashObj *pHashObj, bool (*fp)(void *, void *), voi while((pNode = pEntry->next) != NULL) { if (fp && (!fp(param, GET_HASH_NODE_DATA(pNode)))) { pEntry->num -= 1; - atomic_sub_fetch_64(&pHashObj->size, 1); + atomic_sub_fetch_32(&pHashObj->size, 1); pEntry->next = pNode->next; @@ -546,7 +546,7 @@ int32_t taosHashCondTraverse(SHashObj *pHashObj, bool (*fp)(void *, void *), voi if (fp && (!fp(param, GET_HASH_NODE_DATA(pNext)))) { pNode->next = pNext->next; pEntry->num -= 1; - atomic_sub_fetch_64(&pHashObj->size, 1); + atomic_sub_fetch_32(&pHashObj->size, 1); if (pEntry->num == 0) { assert(pEntry->next == NULL); @@ -600,7 +600,7 @@ void taosHashClear(SHashObj *pHashObj) { pEntry->next = NULL; } - pHashObj->size = 0; + atomic_store_32(&pHashObj->size, 0); __wr_unlock(&pHashObj->lock, pHashObj->type); } @@ -847,7 +847,7 @@ static void *taosHashReleaseNode(SHashObj *pHashObj, void *p, int *slot) { } pe->num--; - atomic_sub_fetch_64(&pHashObj->size, 1); + atomic_sub_fetch_32(&pHashObj->size, 1); FREE_HASH_NODE(pHashObj, pOld); } } else {