提交 ad051d4e 编写于 作者: D dapan1121

fix: fix stop query issue

上级 346c8640
...@@ -181,6 +181,7 @@ typedef struct SRequestSendRecvBody { ...@@ -181,6 +181,7 @@ typedef struct SRequestSendRecvBody {
tsem_t rspSem; // not used now tsem_t rspSem; // not used now
__taos_async_fn_t queryFp; __taos_async_fn_t queryFp;
__taos_async_fn_t fetchFp; __taos_async_fn_t fetchFp;
EQueryExecMode execMode;
void* param; void* param;
SDataBuf requestMsg; SDataBuf requestMsg;
int64_t queryJob; // query job, created according to sql query DAG. int64_t queryJob; // query job, created according to sql query DAG.
......
...@@ -933,6 +933,8 @@ SRequestObj* launchQuery(uint64_t connId, const char* sql, int sqlLen, bool vali ...@@ -933,6 +933,8 @@ SRequestObj* launchQuery(uint64_t connId, const char* sql, int sqlLen, bool vali
void launchAsyncQuery(SRequestObj* pRequest, SQuery* pQuery, SMetaData* pResultMeta) { void launchAsyncQuery(SRequestObj* pRequest, SQuery* pQuery, SMetaData* pResultMeta) {
int32_t code = 0; int32_t code = 0;
pRequest->body.execMode = pQuery->execMode;
switch (pQuery->execMode) { switch (pQuery->execMode) {
case QUERY_EXEC_MODE_LOCAL: case QUERY_EXEC_MODE_LOCAL:
asyncExecLocalCmd(pRequest, pQuery); asyncExecLocalCmd(pRequest, pQuery);
...@@ -1149,7 +1151,6 @@ STscObj* taosConnectImpl(const char* user, const char* auth, const char* db, __t ...@@ -1149,7 +1151,6 @@ STscObj* taosConnectImpl(const char* user, const char* auth, const char* db, __t
SRequestObj* pRequest = createRequest(pTscObj->id, TDMT_MND_CONNECT); SRequestObj* pRequest = createRequest(pTscObj->id, TDMT_MND_CONNECT);
if (pRequest == NULL) { if (pRequest == NULL) {
destroyTscObj(pTscObj); destroyTscObj(pTscObj);
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
return NULL; return NULL;
} }
......
...@@ -49,7 +49,7 @@ int taos_options(TSDB_OPTION option, const void *arg, ...) { ...@@ -49,7 +49,7 @@ int taos_options(TSDB_OPTION option, const void *arg, ...) {
} }
// this function may be called by user or system, or by both simultaneously. // this function may be called by user or system, or by both simultaneously.
void taos_cleanup(void) { void taos_cleanup(void) {
tscInfo("start to cleanup client environment"); tscDebug("start to cleanup client environment");
if (atomic_val_compare_exchange_32(&sentinel, TSC_VAR_NOT_RELEASE, TSC_VAR_RELEASED) != TSC_VAR_NOT_RELEASE) { if (atomic_val_compare_exchange_32(&sentinel, TSC_VAR_NOT_RELEASE, TSC_VAR_RELEASED) != TSC_VAR_NOT_RELEASE) {
return; return;
} }
...@@ -58,7 +58,10 @@ void taos_cleanup(void) { ...@@ -58,7 +58,10 @@ void taos_cleanup(void) {
clientReqRefPool = -1; clientReqRefPool = -1;
taosCloseRef(id); taosCloseRef(id);
cleanupTaskQueue(); hbMgrCleanUp();
catalogDestroy();
schedulerDestroy();
fmFuncMgtDestroy(); fmFuncMgtDestroy();
qCleanupKeywordsTable(); qCleanupKeywordsTable();
...@@ -67,12 +70,11 @@ void taos_cleanup(void) { ...@@ -67,12 +70,11 @@ void taos_cleanup(void) {
clientConnRefPool = -1; clientConnRefPool = -1;
taosCloseRef(id); taosCloseRef(id);
hbMgrCleanUp(); rpcCleanup();
tscDebug("rpc cleanup");
catalogDestroy(); cleanupTaskQueue();
schedulerDestroy();
rpcCleanup();
tscInfo("all local resources released"); tscInfo("all local resources released");
taosCleanupCfg(); taosCleanupCfg();
taosCloseLog(); taosCloseLog();
...@@ -852,27 +854,24 @@ void taos_fetch_rows_a(TAOS_RES *res, __taos_async_fn_t fp, void *param) { ...@@ -852,27 +854,24 @@ void taos_fetch_rows_a(TAOS_RES *res, __taos_async_fn_t fp, void *param) {
} }
// all data has returned to App already, no need to try again // all data has returned to App already, no need to try again
if (pResultInfo->completed && (pRequest->body.queryJob != 0)) { if (pResultInfo->completed) {
pResultInfo->numOfRows = 0; // it is a local executed query, no need to do async fetch
pRequest->body.fetchFp(param, pRequest, pResultInfo->numOfRows); if (QUERY_EXEC_MODE_LOCAL == pRequest->body.execMode) {
return; ASSERT(pResultInfo->numOfRows >= 0);
} if (pResultInfo->localResultFetched) {
pResultInfo->numOfRows = 0;
// it is a local executed query, no need to do async fetch pResultInfo->current = 0;
if (pRequest->body.queryJob == 0) { } else {
ASSERT(pResultInfo->completed && pResultInfo->numOfRows >= 0); pResultInfo->localResultFetched = true;
if (pResultInfo->localResultFetched) { }
pResultInfo->numOfRows = 0;
pResultInfo->current = 0;
pRequest->body.fetchFp(param, pRequest, pResultInfo->numOfRows);
} else { } else {
pResultInfo->localResultFetched = true; pResultInfo->numOfRows = 0;
pRequest->body.fetchFp(param, pRequest, pResultInfo->numOfRows);
} }
pRequest->body.fetchFp(param, pRequest, pResultInfo->numOfRows);
return; return;
} }
SSchedulerReq req = { SSchedulerReq req = {
.syncReq = false, .syncReq = false,
.fetchFp = fetchCallback, .fetchFp = fetchCallback,
......
...@@ -633,6 +633,7 @@ int sqConCleanupSyncQuery(bool fetch) { ...@@ -633,6 +633,7 @@ int sqConCleanupSyncQuery(bool fetch) {
pthread_join(qid, NULL); pthread_join(qid, NULL);
pthread_join(cid, NULL); pthread_join(cid, NULL);
break;
} }
CASE_LEAVE(); CASE_LEAVE();
} }
...@@ -648,6 +649,7 @@ int sqConCleanupAsyncQuery(bool fetch) { ...@@ -648,6 +649,7 @@ int sqConCleanupAsyncQuery(bool fetch) {
pthread_join(qid, NULL); pthread_join(qid, NULL);
pthread_join(cid, NULL); pthread_join(cid, NULL);
break;
} }
CASE_LEAVE(); CASE_LEAVE();
} }
...@@ -655,7 +657,7 @@ int sqConCleanupAsyncQuery(bool fetch) { ...@@ -655,7 +657,7 @@ int sqConCleanupAsyncQuery(bool fetch) {
void sqRunAllCase(void) { void sqRunAllCase(void) {
#if 0 #if 1
sqStopSyncQuery(false); sqStopSyncQuery(false);
sqStopSyncQuery(true); sqStopSyncQuery(true);
sqStopAsyncQuery(false); sqStopAsyncQuery(false);
...@@ -688,16 +690,17 @@ void sqRunAllCase(void) { ...@@ -688,16 +690,17 @@ void sqRunAllCase(void) {
sqConKillAsyncQuery(true); sqConKillAsyncQuery(true);
#endif #endif
/*
sqConCleanupSyncQuery(false); sqConCleanupSyncQuery(false);
sqConCleanupSyncQuery(true); sqConCleanupSyncQuery(true);
sqConCleanupAsyncQuery(false); sqConCleanupAsyncQuery(false);
sqConCleanupAsyncQuery(true); sqConCleanupAsyncQuery(true);
*/
int32_t l = 5; int32_t l = 5;
while (l) { while (l) {
printf("%d\n", l--); printf("%d\n", l--);
sleep(1000); sleep(1);
} }
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册