diff --git a/src/client/inc/tscLocalMerge.h b/src/client/inc/tscLocalMerge.h index 397a60d140e52d0a9e0e6a67ca57025793ba9cbf..4e579b0729431b0fc00d69431a304c4546f02fb0 100644 --- a/src/client/inc/tscLocalMerge.h +++ b/src/client/inc/tscLocalMerge.h @@ -87,7 +87,6 @@ typedef struct SRetrieveSupport { SSqlObj * pParentSql; tFilePage * localBuffer; // temp buffer, there is a buffer for each vnode to uint32_t numOfRetry; // record the number of retry times - pthread_mutex_t queryMutex; } SRetrieveSupport; int32_t tscLocalReducerEnvCreate(SSqlObj *pSql, tExtMemBuffer ***pMemBuffer, tOrderDescriptor **pDesc, diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index 6d02bc7fbdae33bf45368f267f65304d089db6f6..65a868cf4039a9390795d54896f74aa4a7245f0e 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -323,6 +323,7 @@ typedef struct SSqlObj { SSqlRes res; uint16_t numOfSubs; struct SSqlObj **pSubs; + tsem_t subReadySem; struct SSqlObj * prev, *next; } SSqlObj; diff --git a/src/client/src/tscLocalMerge.c b/src/client/src/tscLocalMerge.c index 156bc1de9ed240573c4a3e989bc5b18a7d95162a..cabf2a6a11efd015a19685ca989c0651ad5a89f0 100644 --- a/src/client/src/tscLocalMerge.c +++ b/src/client/src/tscLocalMerge.c @@ -1377,13 +1377,12 @@ int32_t tscDoLocalMerge(SSqlObj *pSql) { tscResetForNextRetrieve(pRes); if (pSql->signature != pSql || pRes == NULL || pRes->pLocalReducer == NULL) { // all data has been processed - tscDebug("%p %s call the drop local reducer", pSql, __FUNCTION__); - tscDestroyLocalReducer(pSql); - return 0; + tscError("%p local merge abort due to error occurs, code:%s", pSql, tstrerror(pRes->code)); + return pRes->code; } SLocalReducer *pLocalReducer = pRes->pLocalReducer; - SQueryInfo * pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); + SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); // set the data merge in progress int32_t prevStatus = @@ -1478,8 +1477,8 @@ int32_t tscDoLocalMerge(SSqlObj *pSql) { * so the processing of previous group is completed. */ int32_t numOfRes = finalizeRes(pQueryInfo, pLocalReducer); + bool sameGroup = isSameGroup(pCmd, pLocalReducer, pLocalReducer->prevRowOfInput, tmpBuffer); - bool sameGroup = isSameGroup(pCmd, pLocalReducer, pLocalReducer->prevRowOfInput, tmpBuffer); tFilePage *pResBuf = pLocalReducer->pResultBuf; /* diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index 05264a2665105c2c8ac423bc263a69142cfaedfd..5a7a858ba14d2da0dc2c207ef0100ee9da7325ca 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -314,10 +314,10 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) { pRes->rspLen = 0; - if (pRes->code != TSDB_CODE_TSC_QUERY_CANCELLED) { - pRes->code = (rpcMsg->code != TSDB_CODE_SUCCESS) ? rpcMsg->code : TSDB_CODE_RPC_NETWORK_UNAVAIL; - } else { + if (pRes->code == TSDB_CODE_TSC_QUERY_CANCELLED) { tscDebug("%p query is cancelled, code:%s", pSql, tstrerror(pRes->code)); + } else { + pRes->code = rpcMsg->code; } if (pRes->code == TSDB_CODE_SUCCESS) { @@ -460,11 +460,10 @@ void tscKillSTableQuery(SSqlObj *pSql) { continue; } - /* - * here, we cannot set the command = TSDB_SQL_KILL_QUERY. Otherwise, it may cause - * sub-queries not correctly released and master sql object of super table query reaches an abnormal state. - */ - rpcCancelRequest(pSub->pRpcCtx); + if (pSub->pRpcCtx != NULL) { + rpcCancelRequest(pSub->pRpcCtx); + } + pSub->res.code = TSDB_CODE_TSC_QUERY_CANCELLED; tscQueueAsyncRes(pSub); } @@ -1443,6 +1442,17 @@ int tscProcessRetrieveLocalMergeRsp(SSqlObj *pSql) { SSqlRes *pRes = &pSql->res; SSqlCmd *pCmd = &pSql->cmd; + int32_t code = pRes->code; + if (pRes->code != TSDB_CODE_SUCCESS) { + tscQueueAsyncRes(pSql); + return code; + } + + // all subquery have completed already + if (pRes->pLocalReducer == NULL) { + sem_wait(&pSql->subReadySem); + } + pRes->code = tscDoLocalMerge(pSql); SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); @@ -1453,7 +1463,7 @@ int tscProcessRetrieveLocalMergeRsp(SSqlObj *pSql) { pRes->row = 0; pRes->completed = (pRes->numOfRows == 0); - int32_t code = pRes->code; + code = pRes->code; if (pRes->code == TSDB_CODE_SUCCESS) { (*pSql->fp)(pSql->param, pSql, pRes->numOfRows); } else { diff --git a/src/client/src/tscSql.c b/src/client/src/tscSql.c index 1bd885466c995883b7487b1efcb1dadc8d7a5013..cfcc9f0fa60919bab5fa02bd55027738e49ac885 100644 --- a/src/client/src/tscSql.c +++ b/src/client/src/tscSql.c @@ -295,6 +295,8 @@ TAOS_RES* taos_query_c(TAOS *taos, const char *sqlstr, uint32_t sqlLen) { } tsem_init(&pSql->rspSem, 0, 0); + tsem_init(&pSql->subReadySem, 0, 0); + doAsyncQuery(pObj, pSql, waitForQueryRsp, taos, sqlstr, sqlLen); tsem_wait(&pSql->rspSem); @@ -655,16 +657,13 @@ int* taos_fetch_lengths(TAOS_RES *res) { char *taos_get_client_info() { return version; } void taos_stop_query(TAOS_RES *res) { - if (res == NULL) { + SSqlObj *pSql = (SSqlObj *)res; + if (pSql == NULL || pSql->signature != pSql) { return; } - SSqlObj *pSql = (SSqlObj *)res; - SSqlCmd *pCmd = &pSql->cmd; - - if (pSql->signature != pSql) return; tscDebug("%p start to cancel query", res); - + SSqlCmd *pCmd = &pSql->cmd; SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); if (tscIsTwoStageSTableQuery(pQueryInfo, 0)) { @@ -674,9 +673,8 @@ void taos_stop_query(TAOS_RES *res) { if (pSql->cmd.command < TSDB_SQL_LOCAL) { rpcCancelRequest(pSql->pRpcCtx); } - pSql->res.code = TSDB_CODE_TSC_QUERY_CANCELLED; - tscQueueAsyncRes(pSql); + pSql->res.code = TSDB_CODE_TSC_QUERY_CANCELLED; tscDebug("%p query is cancelled", res); } diff --git a/src/client/src/tscSubquery.c b/src/client/src/tscSubquery.c index dc3445680c387bb8b1b7c2befd22965fa26292a9..1aeaba650d711295626fcbde07b38283528b73ae 100644 --- a/src/client/src/tscSubquery.c +++ b/src/client/src/tscSubquery.c @@ -1338,10 +1338,6 @@ static void doCleanupSubqueries(SSqlObj *pSql, int32_t numOfSubs, SSubqueryState SRetrieveSupport* pSupport = pSub->param; taosTFree(pSupport->localBuffer); - - pthread_mutex_unlock(&pSupport->queryMutex); - pthread_mutex_destroy(&pSupport->queryMutex); - taosTFree(pSupport); tscFreeSqlObj(pSub); @@ -1414,13 +1410,6 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) { trs->pParentSql = pSql; trs->pFinalColModel = pModel; - pthread_mutexattr_t mutexattr; - memset(&mutexattr, 0, sizeof(pthread_mutexattr_t)); - - pthread_mutexattr_settype(&mutexattr, PTHREAD_MUTEX_RECURSIVE_NP); - pthread_mutex_init(&trs->queryMutex, &mutexattr); - pthread_mutexattr_destroy(&mutexattr); - SSqlObj *pNew = tscCreateSqlObjForSubquery(pSql, trs, NULL); if (pNew == NULL) { tscError("%p failed to malloc buffer for subObj, orderOfSub:%d, reason:%s", pSql, i, strerror(errno)); @@ -1461,6 +1450,12 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) { tscDebug("%p sub:%p launch subquery, orderOfSub:%d.", pSql, pSub, pSupport->subqueryIndex); tscProcessSql(pSub); } + + // set the command flag must be after the semaphore been correctly set. + pSql->cmd.command = TSDB_SQL_RETRIEVE_LOCALMERGE; + if (pRes->code == TSDB_CODE_SUCCESS) { + (*pSql->fp)(pSql->param, pSql, 0); + } return TSDB_CODE_SUCCESS; } @@ -1469,12 +1464,8 @@ static void tscFreeSubSqlObj(SRetrieveSupport *trsupport, SSqlObj *pSql) { tscDebug("%p start to free subquery result", pSql); taos_free_result(pSql); - taosTFree(trsupport->localBuffer); - pthread_mutex_unlock(&trsupport->queryMutex); - pthread_mutex_destroy(&trsupport->queryMutex); - taosTFree(trsupport); } @@ -1498,8 +1489,6 @@ static void tscAbortFurtherRetryRetrieval(SRetrieveSupport *trsupport, TAOS_RES pParentSql->res.code = code; trsupport->numOfRetry = MAX_NUM_OF_SUBQUERY_RETRY; - - pthread_mutex_unlock(&trsupport->queryMutex); tscHandleSubqueryError(trsupport, tres, pParentSql->res.code); } @@ -1518,8 +1507,6 @@ static int32_t tscReissueSubquery(SRetrieveSupport *trsupport, SSqlObj *pSql, in // clear local saved number of results trsupport->localBuffer->num = 0; - pthread_mutex_unlock(&trsupport->queryMutex); - tscTrace("%p sub:%p retrieve failed, code:%s, orderOfSub:%d, retry:%d", trsupport->pParentSql, pSql, tstrerror(code), subqueryIndex, trsupport->numOfRetry); @@ -1602,15 +1589,7 @@ void tscHandleSubqueryError(SRetrieveSupport *trsupport, SSqlObj *pSql, int numO tscFreeSubSqlObj(trsupport, pSql); // in case of second stage join subquery, invoke its callback function instead of regular QueueAsyncRes - SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pParentSql->cmd, 0); - - if (!TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_JOIN_SEC_STAGE)) { - (*pParentSql->fp)(pParentSql->param, pParentSql, pParentSql->res.code); - } else { // regular super table query - if (pParentSql->res.code != TSDB_CODE_SUCCESS) { - tscQueueAsyncRes(pParentSql); - } - } + tsem_post(&pParentSql->subReadySem); } static void tscAllDataRetrievedFromDnode(SRetrieveSupport *trsupport, SSqlObj* pSql) { @@ -1682,14 +1661,9 @@ static void tscAllDataRetrievedFromDnode(SRetrieveSupport *trsupport, SSqlObj* p // only free once taosTFree(trsupport->pState); tscFreeSubSqlObj(trsupport, pSql); - - // set the command flag must be after the semaphore been correctly set. - pParentSql->cmd.command = TSDB_SQL_RETRIEVE_LOCALMERGE; - if (pParentSql->res.code == TSDB_CODE_SUCCESS) { - (*pParentSql->fp)(pParentSql->param, pParentSql, 0); - } else { - tscQueueAsyncRes(pParentSql); - } + + // all subqueries are completed, retrieve from local can be proceeded. + tsem_post(&pParentSql->subReadySem); } static void tscRetrieveFromDnodeCallBack(void *param, TAOS_RES *tres, int numOfRows) { @@ -1707,9 +1681,6 @@ static void tscRetrieveFromDnodeCallBack(void *param, TAOS_RES *tres, int numOfR SSubqueryState* pState = trsupport->pState; assert(pState->numOfRemain <= pState->numOfTotal && pState->numOfRemain >= 0 && pParentSql->numOfSubs == pState->numOfTotal); - // query process and cancel query process may execute at the same time - pthread_mutex_lock(&trsupport->queryMutex); - STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0); SCMVgroupInfo* pVgroup = &pTableMetaInfo->vgroupList->vgroups[0]; @@ -1783,7 +1754,6 @@ static void tscRetrieveFromDnodeCallBack(void *param, TAOS_RES *tres, int numOfR return; } else { // continue fetch data from dnode - pthread_mutex_unlock(&trsupport->queryMutex); taos_fetch_rows_a(tres, tscRetrieveFromDnodeCallBack, param); } @@ -2083,12 +2053,6 @@ void tscBuildResFromSubqueries(SSqlObj *pSql) { tsem_post(&pSql->rspSem); return; } - -// if (pSql->res.code == TSDB_CODE_SUCCESS) { -// (*pSql->fp)(pSql->param, pSql, pRes->numOfRows); -// } else { -// tscQueueAsyncRes(pSql); -// } } static void transferNcharData(SSqlObj *pSql, int32_t columnIndex, TAOS_FIELD *pField) { diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index 716d48fa0338373aaef7a31e32b6b45d583bd8eb..feae58090c44462a21e3ea9371943be0a4dec85e 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -388,6 +388,8 @@ void tscFreeSqlObj(SSqlObj* pSql) { taosTFree(pSql->sqlstr); tsem_destroy(&pSql->rspSem); + tsem_destroy(&pSql->subReadySem); + free(pSql); } diff --git a/src/kit/shell/src/shellEngine.c b/src/kit/shell/src/shellEngine.c index d82ddf8871798f753ffd4da3703155d01fad1cb3..45a81c7b56d731ebc4996f12f654b9f0dfe208e4 100644 --- a/src/kit/shell/src/shellEngine.c +++ b/src/kit/shell/src/shellEngine.c @@ -25,6 +25,8 @@ #include "taosdef.h" #include "taoserror.h" #include "tglobal.h" +#include "tsclient.h" + #include /**************** Global variables ****************/ @@ -64,11 +66,6 @@ TAOS *shellInit(SShellArguments *args) { } taos_init(); - /* - * set tsTableMetaKeepTimer = 3000ms - * means not save cache in shell - */ - tsTableMetaKeepTimer = 3000; // Connect to the database. TAOS *con = NULL; @@ -303,8 +300,7 @@ void shellRunCommandOnServer(TAOS *con, char command[]) { return; } - int num_fields = taos_field_count(pSql); - if (num_fields != 0) { // select and show kinds of commands + if (!tscIsUpdateQuery(pSql)) { // select and show kinds of commands int error_no = 0; int numOfRows = shellDumpResult(pSql, fname, &error_no, printMode); if (numOfRows < 0) {