提交 d4027164 编写于 作者: D dapan1121

enh: stop query

上级 de06f055
......@@ -163,6 +163,7 @@ DLL_EXPORT TAOS_RES *taos_query(TAOS *taos, const char *sql);
DLL_EXPORT TAOS_ROW taos_fetch_row(TAOS_RES *res);
DLL_EXPORT int taos_result_precision(TAOS_RES *res); // get the time precision of result
DLL_EXPORT void taos_free_result(TAOS_RES *res);
DLL_EXPORT void taos_kill_query(TAOS *taos);
DLL_EXPORT int taos_field_count(TAOS_RES *res);
DLL_EXPORT int taos_num_fields(TAOS_RES *res);
DLL_EXPORT int taos_affected_rows(TAOS_RES *res);
......
......@@ -338,6 +338,7 @@ int hbHandleRsp(SClientHbBatchRsp* hbRsp);
SAppHbMgr* appHbMgrInit(SAppInstInfo* pAppInstInfo, char* key);
void appHbMgrCleanup(void);
void hbRemoveAppHbMrg(SAppHbMgr **pAppHbMgr);
void closeAllRequests(SHashObj *pRequests);
// conn level
int hbRegisterConn(SAppHbMgr* pAppHbMgr, int64_t tscRefId, int64_t clusterId, int8_t connType);
......
......@@ -66,25 +66,31 @@ static int32_t hbProcessDBInfoRsp(void *value, int32_t valueLen, struct SCatalog
if (rsp->vgVersion < 0) {
code = catalogRemoveDB(pCatalog, rsp->db, rsp->uid);
} else {
SDBVgInfo vgInfo = {0};
vgInfo.vgVersion = rsp->vgVersion;
vgInfo.hashMethod = rsp->hashMethod;
vgInfo.vgHash = taosHashInit(rsp->vgNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
if (NULL == vgInfo.vgHash) {
SDBVgInfo *vgInfo = taosMemoryCalloc(1, sizeof(SDBVgInfo));
if (NULL == vgInfo) {
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
vgInfo->vgVersion = rsp->vgVersion;
vgInfo->hashMethod = rsp->hashMethod;
vgInfo->vgHash = taosHashInit(rsp->vgNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
if (NULL == vgInfo->vgHash) {
taosMemoryFree(vgInfo);
tscError("hash init[%d] failed", rsp->vgNum);
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
for (int32_t j = 0; j < rsp->vgNum; ++j) {
SVgroupInfo *pInfo = taosArrayGet(rsp->pVgroupInfos, j);
if (taosHashPut(vgInfo.vgHash, &pInfo->vgId, sizeof(int32_t), pInfo, sizeof(SVgroupInfo)) != 0) {
if (taosHashPut(vgInfo->vgHash, &pInfo->vgId, sizeof(int32_t), pInfo, sizeof(SVgroupInfo)) != 0) {
tscError("hash push failed, errno:%d", errno);
taosHashCleanup(vgInfo.vgHash);
taosHashCleanup(vgInfo->vgHash);
taosMemoryFree(vgInfo);
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
}
catalogUpdateDBVgInfo(pCatalog, rsp->db, rsp->uid, &vgInfo);
catalogUpdateDBVgInfo(pCatalog, rsp->db, rsp->uid, vgInfo);
}
if (code) {
......
......@@ -181,6 +181,17 @@ void taos_free_result(TAOS_RES *res) {
}
}
void taos_kill_query(TAOS *taos) {
if (NULL == taos) {
return;
}
int64_t rid = *(int64_t*)taos;
STscObj* pTscObj = acquireTscObj(rid);
closeAllRequests(pTscObj->pRequests);
releaseTscObj(rid);
}
int taos_field_count(TAOS_RES *res) {
if (res == NULL || TD_RES_TMQ_META(res)) {
return 0;
......
......@@ -527,6 +527,11 @@ int32_t mndProcessSyncMsg(SRpcMsg *pMsg) {
static int32_t mndCheckMnodeState(SRpcMsg *pMsg) {
if (!IsReq(pMsg)) return 0;
if (pMsg->msgType == TDMT_VND_QUERY || pMsg->msgType == TDMT_VND_QUERY_CONTINUE ||
pMsg->msgType == TDMT_VND_QUERY_HEARTBEAT || pMsg->msgType == TDMT_VND_FETCH ||
pMsg->msgType == TDMT_VND_DROP_TASK) {
return 0;
}
if (mndAcquireRpcRef(pMsg->info.node) == 0) return 0;
if (pMsg->msgType == TDMT_MND_MQ_TIMER || pMsg->msgType == TDMT_MND_TELEM_TIMER ||
pMsg->msgType == TDMT_MND_TRANS_TIMER || pMsg->msgType == TDMT_MND_TTL_TIMER) {
......
......@@ -789,8 +789,6 @@ _return:
int32_t ctgCallUserCb(void* param) {
SCtgJob* pJob = (SCtgJob*)param;
//taosSsleep(2);
(*pJob->userFp)(&pJob->jobRes, pJob->userParam, pJob->jobResCode);
......@@ -827,6 +825,9 @@ _return:
qDebug("QID:0x%" PRIx64 " ctg call user callback with rsp %s", pJob->queryId, tstrerror(code));
pJob->jobResCode = code;
taosSsleep(2);
qDebug("QID:0x%" PRIx64 " ctg after sleep", pJob->queryId);
taosAsyncExec(ctgCallUserCb, pJob, NULL);
......
......@@ -1564,27 +1564,27 @@ int32_t ctgOpUpdateVgroup(SCtgCacheOperation *operation) {
SCatalog* pCtg = msg->pCtg;
if (NULL == dbInfo->vgHash) {
return TSDB_CODE_SUCCESS;
goto _return;
}
if (dbInfo->vgVersion < 0 || taosHashGetSize(dbInfo->vgHash) <= 0) {
ctgError("invalid db vgInfo, dbFName:%s, vgHash:%p, vgVersion:%d, vgHashSize:%d",
dbFName, dbInfo->vgHash, dbInfo->vgVersion, taosHashGetSize(dbInfo->vgHash));
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
CTG_ERR_JRET(TSDB_CODE_APP_ERROR);
}
bool newAdded = false;
SDbVgVersion vgVersion = {.dbId = msg->dbId, .vgVersion = dbInfo->vgVersion, .numOfTable = dbInfo->numOfTable};
SCtgDBCache *dbCache = NULL;
CTG_ERR_RET(ctgGetAddDBCache(msg->pCtg, dbFName, msg->dbId, &dbCache));
CTG_ERR_JRET(ctgGetAddDBCache(msg->pCtg, dbFName, msg->dbId, &dbCache));
if (NULL == dbCache) {
ctgInfo("conflict db update, ignore this update, dbFName:%s, dbId:0x%"PRIx64, dbFName, msg->dbId);
CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
}
SCtgVgCache *vgCache = &dbCache->vgCache;
CTG_ERR_RET(ctgWLockVgInfo(msg->pCtg, dbCache));
CTG_ERR_JRET(ctgWLockVgInfo(msg->pCtg, dbCache));
if (vgCache->vgInfo) {
SDBVgInfo *vgInfo = vgCache->vgInfo;
......@@ -1593,14 +1593,14 @@ int32_t ctgOpUpdateVgroup(SCtgCacheOperation *operation) {
ctgDebug("db vgVer is old, dbFName:%s, vgVer:%d, curVer:%d", dbFName, dbInfo->vgVersion, vgInfo->vgVersion);
ctgWUnlockVgInfo(dbCache);
return TSDB_CODE_SUCCESS;
goto _return;
}
if (dbInfo->vgVersion == vgInfo->vgVersion && dbInfo->numOfTable == vgInfo->numOfTable) {
ctgDebug("no new db vgVer or numOfTable, dbFName:%s, vgVer:%d, numOfTable:%d", dbFName, dbInfo->vgVersion, dbInfo->numOfTable);
ctgWUnlockVgInfo(dbCache);
return TSDB_CODE_SUCCESS;
goto _return;
}
ctgFreeVgInfo(vgInfo);
......
......@@ -936,6 +936,25 @@ EDealRes sclCalcWalker(SNode* pNode, void* pContext) {
return DEAL_RES_ERROR;
}
int32_t sclExtendResRows(SScalarParam *pDst, SScalarParam *pSrc, SArray *pBlockList) {
SSDataBlock* pb = taosArrayGetP(pBlockList, 0);
SScalarParam *pLeft = taosMemoryCalloc(1, sizeof(SScalarParam));
if (NULL == pLeft) {
sclError("calloc %d failed", (int32_t)sizeof(SScalarParam));
SCL_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
pLeft->numOfRows = pb->info.rows;
colInfoDataEnsureCapacity(pDst->columnData, pb->info.rows);
_bin_scalar_fn_t OperatorFn = getBinScalarOperatorFn(OP_TYPE_ASSIGN);
OperatorFn(pLeft, pSrc, pDst, TSDB_ORDER_ASC);
taosMemoryFree(pLeft);
return TSDB_CODE_SUCCESS;
}
int32_t scalarCalculateConstants(SNode *pNode, SNode **pRes) {
if (NULL == pNode) {
SCL_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
......@@ -983,9 +1002,14 @@ int32_t scalarCalculate(SNode *pNode, SArray *pBlockList, SScalarParam *pDst) {
SCL_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
}
colInfoDataEnsureCapacity(pDst->columnData, res->numOfRows);
colDataAssign(pDst->columnData, res->columnData, res->numOfRows, NULL);
pDst->numOfRows = res->numOfRows;
if (1 == res->numOfRows) {
SCL_ERR_JRET(sclExtendResRows(pDst, res, pBlockList));
} else {
colInfoDataEnsureCapacity(pDst->columnData, res->numOfRows);
colDataAssign(pDst->columnData, res->columnData, res->numOfRows, NULL);
pDst->numOfRows = res->numOfRows;
}
taosHashRemove(ctx.pRes, (void *)&pNode, POINTER_BYTES);
}
......
......@@ -36,7 +36,7 @@ int64_t st, et;
char hostName[128];
char dbName[128];
char tbName[128];
int32_t runTimes = 10000;
int32_t runTimes = 1;
typedef struct {
int id;
......@@ -367,8 +367,8 @@ void *closeThreadFp(void *arg) {
SSP_CB_PARAM* qParam = (SSP_CB_PARAM*)arg;
while (true) {
if (qParam->taos) {
usleep(rand() % 10000);
//usleep(1000000);
//usleep(rand() % 10000);
usleep(1000000);
taos_close(qParam->taos);
break;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册