diff --git a/include/client/taos.h b/include/client/taos.h index 669512e9b12d7c6a4608dcc1791526667e12dc80..651faf5dcc6ece553eff3f3c9177e4cffc5740a3 100644 --- a/include/client/taos.h +++ b/include/client/taos.h @@ -163,6 +163,7 @@ DLL_EXPORT TAOS_RES *taos_query(TAOS *taos, const char *sql); DLL_EXPORT TAOS_ROW taos_fetch_row(TAOS_RES *res); DLL_EXPORT int taos_result_precision(TAOS_RES *res); // get the time precision of result DLL_EXPORT void taos_free_result(TAOS_RES *res); +DLL_EXPORT void taos_kill_query(TAOS *taos); DLL_EXPORT int taos_field_count(TAOS_RES *res); DLL_EXPORT int taos_num_fields(TAOS_RES *res); DLL_EXPORT int taos_affected_rows(TAOS_RES *res); diff --git a/source/client/inc/clientInt.h b/source/client/inc/clientInt.h index 9225d671315723038cb61a0f21735103c3a64466..3b21258fe1a3080400c055ff7a0f0d5a122054b2 100644 --- a/source/client/inc/clientInt.h +++ b/source/client/inc/clientInt.h @@ -338,6 +338,7 @@ int hbHandleRsp(SClientHbBatchRsp* hbRsp); SAppHbMgr* appHbMgrInit(SAppInstInfo* pAppInstInfo, char* key); void appHbMgrCleanup(void); void hbRemoveAppHbMrg(SAppHbMgr **pAppHbMgr); +void closeAllRequests(SHashObj *pRequests); // conn level int hbRegisterConn(SAppHbMgr* pAppHbMgr, int64_t tscRefId, int64_t clusterId, int8_t connType); diff --git a/source/client/src/clientHb.c b/source/client/src/clientHb.c index 4f4bb83fe5b0fe3d8ba0143be46790d7b9674e0c..43e83dd50146ec15f110eea04f49cd0c32a2952f 100644 --- a/source/client/src/clientHb.c +++ b/source/client/src/clientHb.c @@ -66,25 +66,31 @@ static int32_t hbProcessDBInfoRsp(void *value, int32_t valueLen, struct SCatalog if (rsp->vgVersion < 0) { code = catalogRemoveDB(pCatalog, rsp->db, rsp->uid); } else { - SDBVgInfo vgInfo = {0}; - vgInfo.vgVersion = rsp->vgVersion; - vgInfo.hashMethod = rsp->hashMethod; - vgInfo.vgHash = taosHashInit(rsp->vgNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK); - if (NULL == vgInfo.vgHash) { + SDBVgInfo *vgInfo = taosMemoryCalloc(1, sizeof(SDBVgInfo)); + if (NULL == vgInfo) { + return TSDB_CODE_TSC_OUT_OF_MEMORY; + } + + vgInfo->vgVersion = rsp->vgVersion; + vgInfo->hashMethod = rsp->hashMethod; + vgInfo->vgHash = taosHashInit(rsp->vgNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK); + if (NULL == vgInfo->vgHash) { + taosMemoryFree(vgInfo); tscError("hash init[%d] failed", rsp->vgNum); return TSDB_CODE_TSC_OUT_OF_MEMORY; } for (int32_t j = 0; j < rsp->vgNum; ++j) { SVgroupInfo *pInfo = taosArrayGet(rsp->pVgroupInfos, j); - if (taosHashPut(vgInfo.vgHash, &pInfo->vgId, sizeof(int32_t), pInfo, sizeof(SVgroupInfo)) != 0) { + if (taosHashPut(vgInfo->vgHash, &pInfo->vgId, sizeof(int32_t), pInfo, sizeof(SVgroupInfo)) != 0) { tscError("hash push failed, errno:%d", errno); - taosHashCleanup(vgInfo.vgHash); + taosHashCleanup(vgInfo->vgHash); + taosMemoryFree(vgInfo); return TSDB_CODE_TSC_OUT_OF_MEMORY; } } - catalogUpdateDBVgInfo(pCatalog, rsp->db, rsp->uid, &vgInfo); + catalogUpdateDBVgInfo(pCatalog, rsp->db, rsp->uid, vgInfo); } if (code) { diff --git a/source/client/src/clientMain.c b/source/client/src/clientMain.c index a24e6faf7fae8f368fc30cedb139b9e0775a390b..be3efe0f98a26f23720d94539f49e2a5f1667a80 100644 --- a/source/client/src/clientMain.c +++ b/source/client/src/clientMain.c @@ -181,6 +181,17 @@ void taos_free_result(TAOS_RES *res) { } } +void taos_kill_query(TAOS *taos) { + if (NULL == taos) { + return; + } + int64_t rid = *(int64_t*)taos; + + STscObj* pTscObj = acquireTscObj(rid); + closeAllRequests(pTscObj->pRequests); + releaseTscObj(rid); +} + int taos_field_count(TAOS_RES *res) { if (res == NULL || TD_RES_TMQ_META(res)) { return 0; diff --git a/source/dnode/mnode/impl/src/mndMain.c b/source/dnode/mnode/impl/src/mndMain.c index 4c9974ba1a9e0d3346cd9e5b2f00406837db4b97..e03f82ba18b999cbc5e33328dd5e3097fd6f8ade 100644 --- a/source/dnode/mnode/impl/src/mndMain.c +++ b/source/dnode/mnode/impl/src/mndMain.c @@ -527,6 +527,11 @@ int32_t mndProcessSyncMsg(SRpcMsg *pMsg) { static int32_t mndCheckMnodeState(SRpcMsg *pMsg) { if (!IsReq(pMsg)) return 0; + if (pMsg->msgType == TDMT_VND_QUERY || pMsg->msgType == TDMT_VND_QUERY_CONTINUE || + pMsg->msgType == TDMT_VND_QUERY_HEARTBEAT || pMsg->msgType == TDMT_VND_FETCH || + pMsg->msgType == TDMT_VND_DROP_TASK) { + return 0; + } if (mndAcquireRpcRef(pMsg->info.node) == 0) return 0; if (pMsg->msgType == TDMT_MND_MQ_TIMER || pMsg->msgType == TDMT_MND_TELEM_TIMER || pMsg->msgType == TDMT_MND_TRANS_TIMER || pMsg->msgType == TDMT_MND_TTL_TIMER) { diff --git a/source/libs/catalog/src/ctgAsync.c b/source/libs/catalog/src/ctgAsync.c index 139539821c34673391622831578a00e212bc8fe1..683cbdb47c7fea6a15d044124979eaf99adba063 100644 --- a/source/libs/catalog/src/ctgAsync.c +++ b/source/libs/catalog/src/ctgAsync.c @@ -789,8 +789,6 @@ _return: int32_t ctgCallUserCb(void* param) { SCtgJob* pJob = (SCtgJob*)param; - - //taosSsleep(2); (*pJob->userFp)(&pJob->jobRes, pJob->userParam, pJob->jobResCode); @@ -827,6 +825,9 @@ _return: qDebug("QID:0x%" PRIx64 " ctg call user callback with rsp %s", pJob->queryId, tstrerror(code)); pJob->jobResCode = code; + + taosSsleep(2); + qDebug("QID:0x%" PRIx64 " ctg after sleep", pJob->queryId); taosAsyncExec(ctgCallUserCb, pJob, NULL); diff --git a/source/libs/catalog/src/ctgCache.c b/source/libs/catalog/src/ctgCache.c index 8cd6c7d203aac7b928b630e2a1d16aa41034673a..301cdbef20142d3c08408f3fc9b1e6fc56170f8e 100644 --- a/source/libs/catalog/src/ctgCache.c +++ b/source/libs/catalog/src/ctgCache.c @@ -1564,27 +1564,27 @@ int32_t ctgOpUpdateVgroup(SCtgCacheOperation *operation) { SCatalog* pCtg = msg->pCtg; if (NULL == dbInfo->vgHash) { - return TSDB_CODE_SUCCESS; + goto _return; } if (dbInfo->vgVersion < 0 || taosHashGetSize(dbInfo->vgHash) <= 0) { ctgError("invalid db vgInfo, dbFName:%s, vgHash:%p, vgVersion:%d, vgHashSize:%d", dbFName, dbInfo->vgHash, dbInfo->vgVersion, taosHashGetSize(dbInfo->vgHash)); - CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); + CTG_ERR_JRET(TSDB_CODE_APP_ERROR); } bool newAdded = false; SDbVgVersion vgVersion = {.dbId = msg->dbId, .vgVersion = dbInfo->vgVersion, .numOfTable = dbInfo->numOfTable}; SCtgDBCache *dbCache = NULL; - CTG_ERR_RET(ctgGetAddDBCache(msg->pCtg, dbFName, msg->dbId, &dbCache)); + CTG_ERR_JRET(ctgGetAddDBCache(msg->pCtg, dbFName, msg->dbId, &dbCache)); if (NULL == dbCache) { ctgInfo("conflict db update, ignore this update, dbFName:%s, dbId:0x%"PRIx64, dbFName, msg->dbId); - CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR); + CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR); } SCtgVgCache *vgCache = &dbCache->vgCache; - CTG_ERR_RET(ctgWLockVgInfo(msg->pCtg, dbCache)); + CTG_ERR_JRET(ctgWLockVgInfo(msg->pCtg, dbCache)); if (vgCache->vgInfo) { SDBVgInfo *vgInfo = vgCache->vgInfo; @@ -1593,14 +1593,14 @@ int32_t ctgOpUpdateVgroup(SCtgCacheOperation *operation) { ctgDebug("db vgVer is old, dbFName:%s, vgVer:%d, curVer:%d", dbFName, dbInfo->vgVersion, vgInfo->vgVersion); ctgWUnlockVgInfo(dbCache); - return TSDB_CODE_SUCCESS; + goto _return; } if (dbInfo->vgVersion == vgInfo->vgVersion && dbInfo->numOfTable == vgInfo->numOfTable) { ctgDebug("no new db vgVer or numOfTable, dbFName:%s, vgVer:%d, numOfTable:%d", dbFName, dbInfo->vgVersion, dbInfo->numOfTable); ctgWUnlockVgInfo(dbCache); - return TSDB_CODE_SUCCESS; + goto _return; } ctgFreeVgInfo(vgInfo); diff --git a/source/libs/scalar/src/scalar.c b/source/libs/scalar/src/scalar.c index 75a9bc3809632f88601f8628add9aed4b8365cf4..cef8ff707564558835c57cefb59515858d2e2688 100644 --- a/source/libs/scalar/src/scalar.c +++ b/source/libs/scalar/src/scalar.c @@ -936,6 +936,25 @@ EDealRes sclCalcWalker(SNode* pNode, void* pContext) { return DEAL_RES_ERROR; } +int32_t sclExtendResRows(SScalarParam *pDst, SScalarParam *pSrc, SArray *pBlockList) { + SSDataBlock* pb = taosArrayGetP(pBlockList, 0); + SScalarParam *pLeft = taosMemoryCalloc(1, sizeof(SScalarParam)); + if (NULL == pLeft) { + sclError("calloc %d failed", (int32_t)sizeof(SScalarParam)); + SCL_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); + } + + pLeft->numOfRows = pb->info.rows; + colInfoDataEnsureCapacity(pDst->columnData, pb->info.rows); + + _bin_scalar_fn_t OperatorFn = getBinScalarOperatorFn(OP_TYPE_ASSIGN); + OperatorFn(pLeft, pSrc, pDst, TSDB_ORDER_ASC); + + taosMemoryFree(pLeft); + + return TSDB_CODE_SUCCESS; +} + int32_t scalarCalculateConstants(SNode *pNode, SNode **pRes) { if (NULL == pNode) { SCL_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); @@ -983,9 +1002,14 @@ int32_t scalarCalculate(SNode *pNode, SArray *pBlockList, SScalarParam *pDst) { SCL_ERR_JRET(TSDB_CODE_QRY_APP_ERROR); } - colInfoDataEnsureCapacity(pDst->columnData, res->numOfRows); - colDataAssign(pDst->columnData, res->columnData, res->numOfRows, NULL); - pDst->numOfRows = res->numOfRows; + if (1 == res->numOfRows) { + SCL_ERR_JRET(sclExtendResRows(pDst, res, pBlockList)); + } else { + colInfoDataEnsureCapacity(pDst->columnData, res->numOfRows); + colDataAssign(pDst->columnData, res->columnData, res->numOfRows, NULL); + pDst->numOfRows = res->numOfRows; + } + taosHashRemove(ctx.pRes, (void *)&pNode, POINTER_BYTES); } diff --git a/tests/script/api/stopquery.c b/tests/script/api/stopquery.c index 0008aa3303d9a6cb377323791f31cdbdc755a95e..addd1272cfa8078d8dc527ac1b2a58e40c4e0dba 100644 --- a/tests/script/api/stopquery.c +++ b/tests/script/api/stopquery.c @@ -36,7 +36,7 @@ int64_t st, et; char hostName[128]; char dbName[128]; char tbName[128]; -int32_t runTimes = 10000; +int32_t runTimes = 1; typedef struct { int id; @@ -367,8 +367,8 @@ void *closeThreadFp(void *arg) { SSP_CB_PARAM* qParam = (SSP_CB_PARAM*)arg; while (true) { if (qParam->taos) { - usleep(rand() % 10000); - //usleep(1000000); + //usleep(rand() % 10000); + usleep(1000000); taos_close(qParam->taos); break; }