提交 59315e0f 编写于 作者: dengyihao's avatar dengyihao

Merge branch 'enh/stopquery' of https://github.com/taosdata/TDengine into stopQuery

...@@ -163,6 +163,7 @@ DLL_EXPORT TAOS_RES *taos_query(TAOS *taos, const char *sql); ...@@ -163,6 +163,7 @@ DLL_EXPORT TAOS_RES *taos_query(TAOS *taos, const char *sql);
DLL_EXPORT TAOS_ROW taos_fetch_row(TAOS_RES *res); DLL_EXPORT TAOS_ROW taos_fetch_row(TAOS_RES *res);
DLL_EXPORT int taos_result_precision(TAOS_RES *res); // get the time precision of result DLL_EXPORT int taos_result_precision(TAOS_RES *res); // get the time precision of result
DLL_EXPORT void taos_free_result(TAOS_RES *res); DLL_EXPORT void taos_free_result(TAOS_RES *res);
DLL_EXPORT void taos_kill_query(TAOS *taos);
DLL_EXPORT int taos_field_count(TAOS_RES *res); DLL_EXPORT int taos_field_count(TAOS_RES *res);
DLL_EXPORT int taos_num_fields(TAOS_RES *res); DLL_EXPORT int taos_num_fields(TAOS_RES *res);
DLL_EXPORT int taos_affected_rows(TAOS_RES *res); DLL_EXPORT int taos_affected_rows(TAOS_RES *res);
......
...@@ -338,6 +338,7 @@ int hbHandleRsp(SClientHbBatchRsp* hbRsp); ...@@ -338,6 +338,7 @@ int hbHandleRsp(SClientHbBatchRsp* hbRsp);
SAppHbMgr* appHbMgrInit(SAppInstInfo* pAppInstInfo, char* key); SAppHbMgr* appHbMgrInit(SAppInstInfo* pAppInstInfo, char* key);
void appHbMgrCleanup(void); void appHbMgrCleanup(void);
void hbRemoveAppHbMrg(SAppHbMgr **pAppHbMgr); void hbRemoveAppHbMrg(SAppHbMgr **pAppHbMgr);
void closeAllRequests(SHashObj *pRequests);
// conn level // conn level
int hbRegisterConn(SAppHbMgr* pAppHbMgr, int64_t tscRefId, int64_t clusterId, int8_t connType); int hbRegisterConn(SAppHbMgr* pAppHbMgr, int64_t tscRefId, int64_t clusterId, int8_t connType);
......
...@@ -66,25 +66,31 @@ static int32_t hbProcessDBInfoRsp(void *value, int32_t valueLen, struct SCatalog ...@@ -66,25 +66,31 @@ static int32_t hbProcessDBInfoRsp(void *value, int32_t valueLen, struct SCatalog
if (rsp->vgVersion < 0) { if (rsp->vgVersion < 0) {
code = catalogRemoveDB(pCatalog, rsp->db, rsp->uid); code = catalogRemoveDB(pCatalog, rsp->db, rsp->uid);
} else { } else {
SDBVgInfo vgInfo = {0}; SDBVgInfo *vgInfo = taosMemoryCalloc(1, sizeof(SDBVgInfo));
vgInfo.vgVersion = rsp->vgVersion; if (NULL == vgInfo) {
vgInfo.hashMethod = rsp->hashMethod; return TSDB_CODE_TSC_OUT_OF_MEMORY;
vgInfo.vgHash = taosHashInit(rsp->vgNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK); }
if (NULL == vgInfo.vgHash) {
vgInfo->vgVersion = rsp->vgVersion;
vgInfo->hashMethod = rsp->hashMethod;
vgInfo->vgHash = taosHashInit(rsp->vgNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
if (NULL == vgInfo->vgHash) {
taosMemoryFree(vgInfo);
tscError("hash init[%d] failed", rsp->vgNum); tscError("hash init[%d] failed", rsp->vgNum);
return TSDB_CODE_TSC_OUT_OF_MEMORY; return TSDB_CODE_TSC_OUT_OF_MEMORY;
} }
for (int32_t j = 0; j < rsp->vgNum; ++j) { for (int32_t j = 0; j < rsp->vgNum; ++j) {
SVgroupInfo *pInfo = taosArrayGet(rsp->pVgroupInfos, j); SVgroupInfo *pInfo = taosArrayGet(rsp->pVgroupInfos, j);
if (taosHashPut(vgInfo.vgHash, &pInfo->vgId, sizeof(int32_t), pInfo, sizeof(SVgroupInfo)) != 0) { if (taosHashPut(vgInfo->vgHash, &pInfo->vgId, sizeof(int32_t), pInfo, sizeof(SVgroupInfo)) != 0) {
tscError("hash push failed, errno:%d", errno); tscError("hash push failed, errno:%d", errno);
taosHashCleanup(vgInfo.vgHash); taosHashCleanup(vgInfo->vgHash);
taosMemoryFree(vgInfo);
return TSDB_CODE_TSC_OUT_OF_MEMORY; return TSDB_CODE_TSC_OUT_OF_MEMORY;
} }
} }
catalogUpdateDBVgInfo(pCatalog, rsp->db, rsp->uid, &vgInfo); catalogUpdateDBVgInfo(pCatalog, rsp->db, rsp->uid, vgInfo);
} }
if (code) { if (code) {
......
...@@ -181,6 +181,17 @@ void taos_free_result(TAOS_RES *res) { ...@@ -181,6 +181,17 @@ void taos_free_result(TAOS_RES *res) {
} }
} }
void taos_kill_query(TAOS *taos) {
if (NULL == taos) {
return;
}
int64_t rid = *(int64_t*)taos;
STscObj* pTscObj = acquireTscObj(rid);
closeAllRequests(pTscObj->pRequests);
releaseTscObj(rid);
}
int taos_field_count(TAOS_RES *res) { int taos_field_count(TAOS_RES *res) {
if (res == NULL || TD_RES_TMQ_META(res)) { if (res == NULL || TD_RES_TMQ_META(res)) {
return 0; return 0;
......
...@@ -527,6 +527,11 @@ int32_t mndProcessSyncMsg(SRpcMsg *pMsg) { ...@@ -527,6 +527,11 @@ int32_t mndProcessSyncMsg(SRpcMsg *pMsg) {
static int32_t mndCheckMnodeState(SRpcMsg *pMsg) { static int32_t mndCheckMnodeState(SRpcMsg *pMsg) {
if (!IsReq(pMsg)) return 0; if (!IsReq(pMsg)) return 0;
if (pMsg->msgType == TDMT_VND_QUERY || pMsg->msgType == TDMT_VND_QUERY_CONTINUE ||
pMsg->msgType == TDMT_VND_QUERY_HEARTBEAT || pMsg->msgType == TDMT_VND_FETCH ||
pMsg->msgType == TDMT_VND_DROP_TASK) {
return 0;
}
if (mndAcquireRpcRef(pMsg->info.node) == 0) return 0; if (mndAcquireRpcRef(pMsg->info.node) == 0) return 0;
if (pMsg->msgType == TDMT_MND_MQ_TIMER || pMsg->msgType == TDMT_MND_TELEM_TIMER || if (pMsg->msgType == TDMT_MND_MQ_TIMER || pMsg->msgType == TDMT_MND_TELEM_TIMER ||
pMsg->msgType == TDMT_MND_TRANS_TIMER || pMsg->msgType == TDMT_MND_TTL_TIMER) { pMsg->msgType == TDMT_MND_TRANS_TIMER || pMsg->msgType == TDMT_MND_TTL_TIMER) {
......
...@@ -789,8 +789,6 @@ _return: ...@@ -789,8 +789,6 @@ _return:
int32_t ctgCallUserCb(void* param) { int32_t ctgCallUserCb(void* param) {
SCtgJob* pJob = (SCtgJob*)param; SCtgJob* pJob = (SCtgJob*)param;
//taosSsleep(2);
(*pJob->userFp)(&pJob->jobRes, pJob->userParam, pJob->jobResCode); (*pJob->userFp)(&pJob->jobRes, pJob->userParam, pJob->jobResCode);
...@@ -827,6 +825,9 @@ _return: ...@@ -827,6 +825,9 @@ _return:
qDebug("QID:0x%" PRIx64 " ctg call user callback with rsp %s", pJob->queryId, tstrerror(code)); qDebug("QID:0x%" PRIx64 " ctg call user callback with rsp %s", pJob->queryId, tstrerror(code));
pJob->jobResCode = code; pJob->jobResCode = code;
taosSsleep(2);
qDebug("QID:0x%" PRIx64 " ctg after sleep", pJob->queryId);
taosAsyncExec(ctgCallUserCb, pJob, NULL); taosAsyncExec(ctgCallUserCb, pJob, NULL);
......
...@@ -1564,27 +1564,27 @@ int32_t ctgOpUpdateVgroup(SCtgCacheOperation *operation) { ...@@ -1564,27 +1564,27 @@ int32_t ctgOpUpdateVgroup(SCtgCacheOperation *operation) {
SCatalog* pCtg = msg->pCtg; SCatalog* pCtg = msg->pCtg;
if (NULL == dbInfo->vgHash) { if (NULL == dbInfo->vgHash) {
return TSDB_CODE_SUCCESS; goto _return;
} }
if (dbInfo->vgVersion < 0 || taosHashGetSize(dbInfo->vgHash) <= 0) { if (dbInfo->vgVersion < 0 || taosHashGetSize(dbInfo->vgHash) <= 0) {
ctgError("invalid db vgInfo, dbFName:%s, vgHash:%p, vgVersion:%d, vgHashSize:%d", ctgError("invalid db vgInfo, dbFName:%s, vgHash:%p, vgVersion:%d, vgHashSize:%d",
dbFName, dbInfo->vgHash, dbInfo->vgVersion, taosHashGetSize(dbInfo->vgHash)); dbFName, dbInfo->vgHash, dbInfo->vgVersion, taosHashGetSize(dbInfo->vgHash));
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); CTG_ERR_JRET(TSDB_CODE_APP_ERROR);
} }
bool newAdded = false; bool newAdded = false;
SDbVgVersion vgVersion = {.dbId = msg->dbId, .vgVersion = dbInfo->vgVersion, .numOfTable = dbInfo->numOfTable}; SDbVgVersion vgVersion = {.dbId = msg->dbId, .vgVersion = dbInfo->vgVersion, .numOfTable = dbInfo->numOfTable};
SCtgDBCache *dbCache = NULL; SCtgDBCache *dbCache = NULL;
CTG_ERR_RET(ctgGetAddDBCache(msg->pCtg, dbFName, msg->dbId, &dbCache)); CTG_ERR_JRET(ctgGetAddDBCache(msg->pCtg, dbFName, msg->dbId, &dbCache));
if (NULL == dbCache) { if (NULL == dbCache) {
ctgInfo("conflict db update, ignore this update, dbFName:%s, dbId:0x%"PRIx64, dbFName, msg->dbId); ctgInfo("conflict db update, ignore this update, dbFName:%s, dbId:0x%"PRIx64, dbFName, msg->dbId);
CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR); CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
} }
SCtgVgCache *vgCache = &dbCache->vgCache; SCtgVgCache *vgCache = &dbCache->vgCache;
CTG_ERR_RET(ctgWLockVgInfo(msg->pCtg, dbCache)); CTG_ERR_JRET(ctgWLockVgInfo(msg->pCtg, dbCache));
if (vgCache->vgInfo) { if (vgCache->vgInfo) {
SDBVgInfo *vgInfo = vgCache->vgInfo; SDBVgInfo *vgInfo = vgCache->vgInfo;
...@@ -1593,14 +1593,14 @@ int32_t ctgOpUpdateVgroup(SCtgCacheOperation *operation) { ...@@ -1593,14 +1593,14 @@ int32_t ctgOpUpdateVgroup(SCtgCacheOperation *operation) {
ctgDebug("db vgVer is old, dbFName:%s, vgVer:%d, curVer:%d", dbFName, dbInfo->vgVersion, vgInfo->vgVersion); ctgDebug("db vgVer is old, dbFName:%s, vgVer:%d, curVer:%d", dbFName, dbInfo->vgVersion, vgInfo->vgVersion);
ctgWUnlockVgInfo(dbCache); ctgWUnlockVgInfo(dbCache);
return TSDB_CODE_SUCCESS; goto _return;
} }
if (dbInfo->vgVersion == vgInfo->vgVersion && dbInfo->numOfTable == vgInfo->numOfTable) { if (dbInfo->vgVersion == vgInfo->vgVersion && dbInfo->numOfTable == vgInfo->numOfTable) {
ctgDebug("no new db vgVer or numOfTable, dbFName:%s, vgVer:%d, numOfTable:%d", dbFName, dbInfo->vgVersion, dbInfo->numOfTable); ctgDebug("no new db vgVer or numOfTable, dbFName:%s, vgVer:%d, numOfTable:%d", dbFName, dbInfo->vgVersion, dbInfo->numOfTable);
ctgWUnlockVgInfo(dbCache); ctgWUnlockVgInfo(dbCache);
return TSDB_CODE_SUCCESS; goto _return;
} }
ctgFreeVgInfo(vgInfo); ctgFreeVgInfo(vgInfo);
......
...@@ -936,6 +936,25 @@ EDealRes sclCalcWalker(SNode* pNode, void* pContext) { ...@@ -936,6 +936,25 @@ EDealRes sclCalcWalker(SNode* pNode, void* pContext) {
return DEAL_RES_ERROR; return DEAL_RES_ERROR;
} }
int32_t sclExtendResRows(SScalarParam *pDst, SScalarParam *pSrc, SArray *pBlockList) {
SSDataBlock* pb = taosArrayGetP(pBlockList, 0);
SScalarParam *pLeft = taosMemoryCalloc(1, sizeof(SScalarParam));
if (NULL == pLeft) {
sclError("calloc %d failed", (int32_t)sizeof(SScalarParam));
SCL_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
pLeft->numOfRows = pb->info.rows;
colInfoDataEnsureCapacity(pDst->columnData, pb->info.rows);
_bin_scalar_fn_t OperatorFn = getBinScalarOperatorFn(OP_TYPE_ASSIGN);
OperatorFn(pLeft, pSrc, pDst, TSDB_ORDER_ASC);
taosMemoryFree(pLeft);
return TSDB_CODE_SUCCESS;
}
int32_t scalarCalculateConstants(SNode *pNode, SNode **pRes) { int32_t scalarCalculateConstants(SNode *pNode, SNode **pRes) {
if (NULL == pNode) { if (NULL == pNode) {
SCL_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); SCL_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
...@@ -983,9 +1002,14 @@ int32_t scalarCalculate(SNode *pNode, SArray *pBlockList, SScalarParam *pDst) { ...@@ -983,9 +1002,14 @@ int32_t scalarCalculate(SNode *pNode, SArray *pBlockList, SScalarParam *pDst) {
SCL_ERR_JRET(TSDB_CODE_QRY_APP_ERROR); SCL_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
} }
colInfoDataEnsureCapacity(pDst->columnData, res->numOfRows); if (1 == res->numOfRows) {
colDataAssign(pDst->columnData, res->columnData, res->numOfRows, NULL); SCL_ERR_JRET(sclExtendResRows(pDst, res, pBlockList));
pDst->numOfRows = res->numOfRows; } else {
colInfoDataEnsureCapacity(pDst->columnData, res->numOfRows);
colDataAssign(pDst->columnData, res->columnData, res->numOfRows, NULL);
pDst->numOfRows = res->numOfRows;
}
taosHashRemove(ctx.pRes, (void *)&pNode, POINTER_BYTES); taosHashRemove(ctx.pRes, (void *)&pNode, POINTER_BYTES);
} }
......
...@@ -36,7 +36,7 @@ int64_t st, et; ...@@ -36,7 +36,7 @@ int64_t st, et;
char hostName[128]; char hostName[128];
char dbName[128]; char dbName[128];
char tbName[128]; char tbName[128];
int32_t runTimes = 10000; int32_t runTimes = 1;
typedef struct { typedef struct {
int id; int id;
...@@ -367,8 +367,8 @@ void *closeThreadFp(void *arg) { ...@@ -367,8 +367,8 @@ void *closeThreadFp(void *arg) {
SSP_CB_PARAM* qParam = (SSP_CB_PARAM*)arg; SSP_CB_PARAM* qParam = (SSP_CB_PARAM*)arg;
while (true) { while (true) {
if (qParam->taos) { if (qParam->taos) {
usleep(rand() % 10000); //usleep(rand() % 10000);
//usleep(1000000); usleep(1000000);
taos_close(qParam->taos); taos_close(qParam->taos);
break; break;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册