From 25ee3b480bb871723102e30f4cd96c72c8a39c27 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 13 Oct 2020 11:18:11 +0800 Subject: [PATCH] [td-225] refactor client code --- src/client/inc/tscLocalMerge.h | 7 - src/client/inc/tscUtil.h | 1 - src/client/inc/tsclient.h | 11 +- src/client/src/tscAsync.c | 28 +--- src/client/src/tscLocalMerge.c | 2 +- src/client/src/tscSQLParser.c | 18 ++- src/client/src/tscServer.c | 24 ++-- src/client/src/tscSql.c | 15 ++- src/client/src/tscStream.c | 2 +- src/client/src/tscSubquery.c | 225 +++++++++++++++------------------ src/client/src/tscUtil.c | 18 +-- src/vnode/src/vnodeRead.c | 1 + 12 files changed, 171 insertions(+), 181 deletions(-) diff --git a/src/client/inc/tscLocalMerge.h b/src/client/inc/tscLocalMerge.h index 4e579b0729..5baa66a9e0 100644 --- a/src/client/inc/tscLocalMerge.h +++ b/src/client/inc/tscLocalMerge.h @@ -72,17 +72,10 @@ typedef struct SLocalReducer { bool orderPrjOnSTable; // projection query on stable } SLocalReducer; -typedef struct SSubqueryState { - int32_t numOfRemain; // the number of remain unfinished subquery - int32_t numOfTotal; // the number of total sub-queries - uint64_t numOfRetrievedRows; // total number of points in this query -} SSubqueryState; - typedef struct SRetrieveSupport { tExtMemBuffer ** pExtMemBuffer; // for build loser tree tOrderDescriptor *pOrderDescriptor; SColumnModel * pFinalColModel; // colModel for final result - SSubqueryState * pState; int32_t subqueryIndex; // index of current vnode in vnode list SSqlObj * pParentSql; tFilePage * localBuffer; // temp buffer, there is a buffer for each vnode to diff --git a/src/client/inc/tscUtil.h b/src/client/inc/tscUtil.h index 3aff1b8ef3..594226b1fc 100644 --- a/src/client/inc/tscUtil.h +++ b/src/client/inc/tscUtil.h @@ -66,7 +66,6 @@ typedef struct STidTags { #pragma pack(pop) typedef struct SJoinSupporter { - SSubqueryState* pState; SSqlObj* pObj; // parent SqlObj int32_t subqueryIndex; // index of sub query SInterval interval; diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index 046bea2c27..b6ab3702c9 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -334,6 +334,12 @@ typedef struct STscObj { T_REF_DECLARE() } STscObj; +typedef struct SSubqueryState { + int32_t numOfRemain; // the number of remain unfinished subquery + int32_t numOfSub; // the number of total sub-queries + uint64_t numOfRetrievedRows; // total number of points in this query +} SSubqueryState; + typedef struct SSqlObj { void *signature; pthread_t owner; // owner of sql object, by which it is executed @@ -355,10 +361,11 @@ typedef struct SSqlObj { tsem_t rspSem; SSqlCmd cmd; SSqlRes res; - uint16_t numOfSubs; + + SSubqueryState subState; struct SSqlObj **pSubs; - struct SSqlObj * prev, *next; + struct SSqlObj *prev, *next; struct SSqlObj **self; } SSqlObj; diff --git a/src/client/src/tscAsync.c b/src/client/src/tscAsync.c index 400613595b..a76b77bb86 100644 --- a/src/client/src/tscAsync.c +++ b/src/client/src/tscAsync.c @@ -361,15 +361,6 @@ void tscProcessFetchRow(SSchedMsg *pMsg) { (*pSql->fetchFp)(pSql->param, pSql, pRes->tsrow); } -void tscProcessAsyncRes(SSchedMsg *pMsg) { - SSqlObj *pSql = (SSqlObj *)pMsg->ahandle; - SSqlRes *pRes = &pSql->res; - assert(pSql->fp != NULL && pSql->fetchFp != NULL); - - pSql->fp = pSql->fetchFp; - (*pSql->fp)(pSql->param, pSql, pRes->code); -} - // this function will be executed by queue task threads, so the terrno is not valid static void tscProcessAsyncError(SSchedMsg *pMsg) { void (*fp)() = pMsg->ahandle; @@ -393,22 +384,15 @@ void tscQueueAsyncRes(SSqlObj *pSql) { if (pSql == NULL || pSql->signature != pSql) { tscDebug("%p SqlObj is freed, not add into queue async res", pSql); return; - } else { - tscError("%p add into queued async res, code:%s", pSql, tstrerror(pSql->res.code)); } - SSchedMsg schedMsg = { 0 }; - schedMsg.fp = tscProcessAsyncRes; - schedMsg.ahandle = pSql; - schedMsg.thandle = (void *)1; - schedMsg.msg = NULL; - taosScheduleTask(tscQhandle, &schedMsg); -} + tscError("%p add into queued async res, code:%s", pSql, tstrerror(pSql->res.code)); -void tscProcessAsyncFree(SSchedMsg *pMsg) { - SSqlObj *pSql = (SSqlObj *)pMsg->ahandle; - tscDebug("%p sql is freed", pSql); - taos_free_result(pSql); + SSqlRes *pRes = &pSql->res; + assert(pSql->fp != NULL && pSql->fetchFp != NULL); + + pSql->fp = pSql->fetchFp; + (*pSql->fp)(pSql->param, pSql, pRes->code); } int tscSendMsgToServer(SSqlObj *pSql); diff --git a/src/client/src/tscLocalMerge.c b/src/client/src/tscLocalMerge.c index 19abbe32e5..16f208da98 100644 --- a/src/client/src/tscLocalMerge.c +++ b/src/client/src/tscLocalMerge.c @@ -639,7 +639,7 @@ int32_t tscLocalReducerEnvCreate(SSqlObj *pSql, tExtMemBuffer ***pMemBuffer, tOr SQueryInfo * pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); - (*pMemBuffer) = (tExtMemBuffer **)malloc(POINTER_BYTES * pSql->numOfSubs); + (*pMemBuffer) = (tExtMemBuffer **)malloc(POINTER_BYTES * pSql->subState.numOfSub); if (*pMemBuffer == NULL) { tscError("%p failed to allocate memory", pSql); pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY; diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index b65d7b7112..b87882ba63 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -1603,8 +1603,8 @@ int32_t addProjectionExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, t return TSDB_CODE_SUCCESS; } -static int32_t setExprInfoForFunctions(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSchema* pSchema, SConvertFunc cvtFunc, char* aliasName, - int32_t resColIdx, SColumnIndex* pColIndex) { +static int32_t setExprInfoForFunctions(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSchema* pSchema, SConvertFunc cvtFunc, + char* aliasName, int32_t resColIdx, SColumnIndex* pColIndex, bool finalResult) { const char* msg1 = "not support column types"; int16_t type = 0; @@ -1650,8 +1650,13 @@ static int32_t setExprInfoForFunctions(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SS SColumnIndex index = {.tableIndex = pColIndex->tableIndex, .columnIndex = PRIMARYKEY_TIMESTAMP_COL_INDEX}; tscColumnListInsert(pQueryInfo->colList, &index); + // if it is not in the final result, do not add it SColumnList ids = getColumnList(1, pColIndex->tableIndex, pColIndex->columnIndex); - insertResultField(pQueryInfo, resColIdx, &ids, bytes, (int8_t)type, columnName, pExpr); + if (finalResult) { + insertResultField(pQueryInfo, resColIdx, &ids, bytes, (int8_t)type, columnName, pExpr); + } else { + tscColumnListInsert(pQueryInfo->colList, &(ids.ids[0])); + } return TSDB_CODE_SUCCESS; } @@ -1926,7 +1931,7 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col for (int32_t j = 0; j < tscGetNumOfColumns(pTableMetaInfo->pTableMeta); ++j) { index.columnIndex = j; - if (setExprInfoForFunctions(pCmd, pQueryInfo, pSchema, cvtFunc, pItem->aliasName, colIndex++, &index) != 0) { + if (setExprInfoForFunctions(pCmd, pQueryInfo, pSchema, cvtFunc, pItem->aliasName, colIndex++, &index, finalResult) != 0) { return TSDB_CODE_TSC_INVALID_SQL; } } @@ -1943,7 +1948,8 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col if ((index.columnIndex >= tscGetNumOfColumns(pTableMetaInfo->pTableMeta)) || (index.columnIndex < 0)) { return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg6); } - if (setExprInfoForFunctions(pCmd, pQueryInfo, pSchema, cvtFunc, pItem->aliasName, colIndex + i, &index) != 0) { + + if (setExprInfoForFunctions(pCmd, pQueryInfo, pSchema, cvtFunc, pItem->aliasName, colIndex + i, &index, finalResult) != 0) { return TSDB_CODE_TSC_INVALID_SQL; } @@ -1980,7 +1986,7 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col for (int32_t i = 0; i < tscGetNumOfColumns(pTableMetaInfo->pTableMeta); ++i) { SColumnIndex index = {.tableIndex = j, .columnIndex = i}; - if (setExprInfoForFunctions(pCmd, pQueryInfo, pSchema, cvtFunc, pItem->aliasName, colIndex, &index) != 0) { + if (setExprInfoForFunctions(pCmd, pQueryInfo, pSchema, cvtFunc, pItem->aliasName, colIndex, &index, finalResult) != 0) { return TSDB_CODE_TSC_INVALID_SQL; } diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index 0b76bff3b1..5cddaa1c4d 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -24,7 +24,6 @@ #include "tschemautil.h" #include "tsclient.h" #include "ttimer.h" -#include "tutil.h" #include "tlockfree.h" SRpcCorEpSet tscMgmtEpSet; @@ -478,20 +477,29 @@ void tscKillSTableQuery(SSqlObj *pSql) { pSql->res.code = TSDB_CODE_TSC_QUERY_CANCELLED; - for (int i = 0; i < pSql->numOfSubs; ++i) { + for (int i = 0; i < pSql->subState.numOfSub; ++i) { // NOTE: pSub may have been released already here SSqlObj *pSub = pSql->pSubs[i]; if (pSub == NULL) { continue; } - pSub->res.code = TSDB_CODE_TSC_QUERY_CANCELLED; - if (pSub->pRpcCtx != NULL) { - rpcCancelRequest(pSub->pRpcCtx); - pSub->pRpcCtx = NULL; + void** p = taosCacheAcquireByKey(tscObjCache, &pSub, sizeof(TSDB_CACHE_PTR_TYPE)); + if (p == NULL) { + continue; + } + + SSqlObj* pSubObj = (SSqlObj*) (*p); + assert(pSubObj->self == (SSqlObj**) p); + + pSubObj->res.code = TSDB_CODE_TSC_QUERY_CANCELLED; + if (pSubObj->pRpcCtx != NULL) { + rpcCancelRequest(pSubObj->pRpcCtx); + pSubObj->pRpcCtx = NULL; } - tscQueueAsyncRes(pSub); // async res? not other functions? + tscQueueAsyncRes(pSubObj); // async res? not other functions? + taosCacheRelease(tscObjCache, (void**) &p, false); } tscDebug("%p super table query cancelled", pSql); @@ -1455,7 +1463,7 @@ int tscProcessLocalRetrieveRsp(SSqlObj *pSql) { int tscProcessRetrieveLocalMergeRsp(SSqlObj *pSql) { SSqlRes *pRes = &pSql->res; - SSqlCmd *pCmd = &pSql->cmd; + SSqlCmd* pCmd = &pSql->cmd; int32_t code = pRes->code; if (pRes->code != TSDB_CODE_SUCCESS) { diff --git a/src/client/src/tscSql.c b/src/client/src/tscSql.c index 7bc09a365a..fa88c57fdd 100644 --- a/src/client/src/tscSql.c +++ b/src/client/src/tscSql.c @@ -523,7 +523,7 @@ int taos_fetch_block(TAOS_RES *res, TAOS_ROW *rows) { pRes->numOfClauseTotal = 0; pRes->rspType = 0; - pSql->numOfSubs = 0; + pSql->subState.numOfSub = 0; taosTFree(pSql->pSubs); assert(pSql->fp == NULL); @@ -559,7 +559,7 @@ int taos_select_db(TAOS *taos, const char *db) { } // send free message to vnode to free qhandle and corresponding resources in vnode -static bool tscKillQueryInDnode(SSqlObj* pSql) { +static UNUSED_FUNC bool tscKillQueryInDnode(SSqlObj* pSql) { SSqlCmd* pCmd = &pSql->cmd; SSqlRes* pRes = &pSql->res; @@ -568,10 +568,18 @@ static bool tscKillQueryInDnode(SSqlObj* pSql) { } SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0); - if ((pQueryInfo == NULL) || tscIsTwoStageSTableQuery(pQueryInfo, 0)) { + if (pQueryInfo == NULL) { return true; } + if (pSql->pRpcCtx != NULL) { + rpcCancelRequest(pSql->pRpcCtx); + pSql->pRpcCtx = NULL; + return true; + } else { + pSql->res.code = TSDB_CODE_TSC_QUERY_CANCELLED; + } + STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); tscRemoveFromSqlList(pSql); @@ -700,6 +708,7 @@ void taos_stop_query(TAOS_RES *res) { } else { // TODO multithreads bug if (pSql->cmd.command < TSDB_SQL_LOCAL && pSql->pRpcCtx != NULL) { rpcCancelRequest(pSql->pRpcCtx); + pSql->pRpcCtx = NULL; } } diff --git a/src/client/src/tscStream.c b/src/client/src/tscStream.c index d01ede279a..61614b56fb 100644 --- a/src/client/src/tscStream.c +++ b/src/client/src/tscStream.c @@ -274,7 +274,7 @@ static void tscProcessStreamRetrieveResult(void *param, TAOS_RES *res, int numOf taosCacheRelease(tscMetaCache, (void**)&(pTableMetaInfo->pTableMeta), false); tscFreeSqlResult(pSql); taosTFree(pSql->pSubs); - pSql->numOfSubs = 0; + pSql->subState.numOfSub = 0; taosTFree(pTableMetaInfo->vgroupList); tscSetNextLaunchTimer(pStream, pSql); } diff --git a/src/client/src/tscSubquery.c b/src/client/src/tscSubquery.c index 0c57b94054..59b5404eea 100644 --- a/src/client/src/tscSubquery.c +++ b/src/client/src/tscSubquery.c @@ -26,7 +26,6 @@ #include "tscSubquery.h" typedef struct SInsertSupporter { - SSubqueryState* pState; SSqlObj* pSql; int32_t index; } SInsertSupporter; @@ -174,7 +173,6 @@ SJoinSupporter* tscCreateJoinSupporter(SSqlObj* pSql, SSubqueryState* pState, in } pSupporter->pObj = pSql; - pSupporter->pState = pState; pSupporter->subqueryIndex = index; SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, pSql->cmd.clauseIndex); @@ -250,7 +248,7 @@ static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) { SJoinSupporter* pSupporter = NULL; //If the columns are not involved in the final select clause, the corresponding query will not be issued. - for (int32_t i = 0; i < pSql->numOfSubs; ++i) { + for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) { pSupporter = pSql->pSubs[i]->param; if (taosArrayGetSize(pSupporter->exprList) > 0) { ++numOfSub; @@ -260,16 +258,16 @@ static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) { assert(numOfSub > 0); // scan all subquery, if one sub query has only ts, ignore it - tscDebug("%p start to launch secondary subqueries, total:%d, only:%d needs to query", pSql, pSql->numOfSubs, numOfSub); + tscDebug("%p start to launch secondary subqueries, total:%d, only:%d needs to query", pSql, pSql->subState.numOfSub, numOfSub); //the subqueries that do not actually launch the secondary query to virtual node is set as completed. - SSubqueryState* pState = pSupporter->pState; - pState->numOfTotal = pSql->numOfSubs; - pState->numOfRemain = numOfSub; - +// SSubqueryState* pState = pSupporter->pState; +// pState->numOfSub = pSql->subState.numOfSub; +// pSql->numOfRemain = numOfSub; + bool success = true; - for (int32_t i = 0; i < pSql->numOfSubs; ++i) { + for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) { SSqlObj *pPrevSub = pSql->pSubs[i]; pSql->pSubs[i] = NULL; @@ -322,7 +320,7 @@ static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) { memset(&pSupporter->fieldsInfo, 0, sizeof(SFieldInfo)); SQueryInfo *pNewQueryInfo = tscGetQueryInfoDetail(&pNew->cmd, 0); - assert(pNew->numOfSubs == 0 && pNew->cmd.numOfClause == 1 && pNewQueryInfo->numOfTables == 1); + assert(pNew->subState.numOfSub == 0 && pNew->cmd.numOfClause == 1 && pNewQueryInfo->numOfTables == 1); tscFieldInfoUpdateOffset(pNewQueryInfo); @@ -373,13 +371,13 @@ static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) { if (!success) { pSql->res.code = TSDB_CODE_TSC_OUT_OF_MEMORY; tscError("%p failed to prepare subqueries objs for secondary phase query, numOfSub:%d, code:%d", pSql, - pSql->numOfSubs, pSql->res.code); + pSql->subState.numOfSub, pSql->res.code); freeJoinSubqueryObj(pSql); return pSql->res.code; } - for(int32_t i = 0; i < pSql->numOfSubs; ++i) { + for(int32_t i = 0; i < pSql->subState.numOfSub; ++i) { if (pSql->pSubs[i] == NULL) { continue; } @@ -391,17 +389,13 @@ static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) { } void freeJoinSubqueryObj(SSqlObj* pSql) { - SSubqueryState* pState = NULL; - - for (int32_t i = 0; i < pSql->numOfSubs; ++i) { + for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) { SSqlObj* pSub = pSql->pSubs[i]; if (pSub == NULL) { continue; } SJoinSupporter* p = pSub->param; - pState = p->pState; - tscDestroyJoinSupporter(p); if (pSub->res.code == TSDB_CODE_SUCCESS) { @@ -409,14 +403,13 @@ void freeJoinSubqueryObj(SSqlObj* pSql) { } } - taosTFree(pState); - pSql->numOfSubs = 0; + pSql->subState.numOfSub = 0; } static void quitAllSubquery(SSqlObj* pSqlObj, SJoinSupporter* pSupporter) { - assert(pSupporter->pState->numOfRemain > 0); + assert(pSqlObj->subState.numOfRemain > 0); - if (atomic_sub_fetch_32(&pSupporter->pState->numOfRemain, 1) <= 0) { + if (atomic_sub_fetch_32(&pSqlObj->subState.numOfRemain, 1) <= 0) { tscError("%p all subquery return and query failed, global code:%d", pSqlObj, pSqlObj->res.code); freeJoinSubqueryObj(pSqlObj); } @@ -680,7 +673,7 @@ static void tidTagRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow // no data exists in next vnode, mark the query completed // only when there is no subquery exits any more, proceeds to get the intersect of the tuple sets. - if (atomic_sub_fetch_32(&pSupporter->pState->numOfRemain, 1) > 0) { + if (atomic_sub_fetch_32(&pParentSql->subState.numOfRemain, 1) > 0) { return; } @@ -716,10 +709,10 @@ static void tidTagRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow STableMetaInfo* pTableMetaInfo2 = tscGetMetaInfo(pQueryInfo2, 0); tscBuildVgroupTableInfo(pParentSql, pTableMetaInfo2, s2); - pSupporter->pState->numOfTotal = 2; - pSupporter->pState->numOfRemain = pSupporter->pState->numOfTotal; + pParentSql->subState.numOfSub = 2; + pParentSql->subState.numOfRemain = pParentSql->subState.numOfSub; - for (int32_t m = 0; m < pParentSql->numOfSubs; ++m) { + for (int32_t m = 0; m < pParentSql->subState.numOfSub; ++m) { SSqlObj* sub = pParentSql->pSubs[m]; issueTSCompQuery(sub, sub->param, pParentSql); } @@ -818,7 +811,7 @@ static void tsCompRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow return; } - if (atomic_sub_fetch_32(&pSupporter->pState->numOfRemain, 1) > 0) { + if (atomic_sub_fetch_32(&pParentSql->subState.numOfRemain, 1) > 0) { return; } @@ -850,7 +843,6 @@ static void joinRetrieveFinalResCallback(void* param, TAOS_RES* tres, int numOfR SJoinSupporter* pSupporter = (SJoinSupporter*)param; SSqlObj* pParentSql = pSupporter->pObj; - SSubqueryState* pState = pSupporter->pState; SSqlObj* pSql = (SSqlObj*)tres; SSqlCmd* pCmd = &pSql->cmd; @@ -871,6 +863,7 @@ static void joinRetrieveFinalResCallback(void* param, TAOS_RES* tres, int numOfR pRes->numOfTotal += pRes->numOfRows; } + SSubqueryState* pState = &pParentSql->subState; if (tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0) && numOfRows == 0) { STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); assert(pQueryInfo->numOfTables == 1); @@ -878,7 +871,7 @@ static void joinRetrieveFinalResCallback(void* param, TAOS_RES* tres, int numOfR // for projection query, need to try next vnode if current vnode is exhausted if ((++pTableMetaInfo->vgroupIndex) < pTableMetaInfo->vgroupList->numOfVgroups) { pState->numOfRemain = 1; - pState->numOfTotal = 1; + pState->numOfSub = 1; pSql->cmd.command = TSDB_SQL_SELECT; pSql->fp = tscJoinQueryCallback; @@ -888,12 +881,12 @@ static void joinRetrieveFinalResCallback(void* param, TAOS_RES* tres, int numOfR } } - if (atomic_sub_fetch_32(&pState->numOfRemain, 1) > 0) { - tscDebug("%p sub:%p completed, remain:%d, total:%d", pParentSql, tres, pState->numOfRemain, pState->numOfTotal); + if (atomic_sub_fetch_32(&pParentSql->subState.numOfRemain, 1) > 0) { + tscDebug("%p sub:%p completed, remain:%d, total:%d", pParentSql, tres, pParentSql->subState.numOfRemain, pState->numOfSub); return; } - tscDebug("%p all %d secondary subqueries retrieval completed, code:%d", tres, pState->numOfTotal, pParentSql->res.code); + tscDebug("%p all %d secondary subqueries retrieval completed, code:%d", tres, pState->numOfSub, pParentSql->res.code); if (pParentSql->res.code != TSDB_CODE_SUCCESS) { freeJoinSubqueryObj(pParentSql); @@ -901,7 +894,7 @@ static void joinRetrieveFinalResCallback(void* param, TAOS_RES* tres, int numOfR } // update the records for each subquery in parent sql object. - for (int32_t i = 0; i < pParentSql->numOfSubs; ++i) { + for (int32_t i = 0; i < pState->numOfSub; ++i) { if (pParentSql->pSubs[i] == NULL) { continue; } @@ -917,32 +910,26 @@ static void joinRetrieveFinalResCallback(void* param, TAOS_RES* tres, int numOfR static SJoinSupporter* tscUpdateSubqueryStatus(SSqlObj* pSql, int32_t numOfFetch) { int32_t notInvolved = 0; SJoinSupporter* pSupporter = NULL; - SSubqueryState* pState = NULL; + SSubqueryState* pState = &pSql->subState; - for(int32_t i = 0; i < pSql->numOfSubs; ++i) { + for(int32_t i = 0; i < pSql->subState.numOfSub; ++i) { if (pSql->pSubs[i] == NULL) { notInvolved++; } else { pSupporter = (SJoinSupporter*)pSql->pSubs[i]->param; - pState = pSupporter->pState; } } - assert(pState != NULL); - if (pState != NULL) { - pState->numOfTotal = pSql->numOfSubs; - pState->numOfRemain = numOfFetch; - } - + pState->numOfRemain = numOfFetch; return pSupporter; } void tscFetchDatablockFromSubquery(SSqlObj* pSql) { - assert(pSql->numOfSubs >= 1); + assert(pSql->subState.numOfSub >= 1); int32_t numOfFetch = 0; bool hasData = true; - for (int32_t i = 0; i < pSql->numOfSubs; ++i) { + for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) { // if the subquery is NULL, it does not involved in the final result generation SSqlObj* pSub = pSql->pSubs[i]; if (pSub == NULL) { @@ -989,7 +976,7 @@ void tscFetchDatablockFromSubquery(SSqlObj* pSql) { tscDebug("%p retrieve data from %d subqueries", pSql, numOfFetch); SJoinSupporter* pSupporter = tscUpdateSubqueryStatus(pSql, numOfFetch); - for (int32_t i = 0; i < pSql->numOfSubs; ++i) { + for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) { SSqlObj* pSql1 = pSql->pSubs[i]; if (pSql1 == NULL) { continue; @@ -1124,7 +1111,7 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) { } // wait for the other subqueries response from vnode - if (atomic_sub_fetch_32(&pSupporter->pState->numOfRemain, 1) > 0) { + if (atomic_sub_fetch_32(&pParentSql->subState.numOfRemain, 1) > 0) { return; } @@ -1136,7 +1123,7 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) { * data instead of returning to its invoker */ if (pTableMetaInfo->vgroupIndex > 0 && tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0)) { - pSupporter->pState->numOfRemain = pSupporter->pState->numOfTotal; // reset the record value + pParentSql->subState.numOfRemain = pParentSql->subState.numOfSub; // reset the record value pSql->fp = joinRetrieveFinalResCallback; // continue retrieve data pSql->cmd.command = TSDB_SQL_FETCH; @@ -1164,8 +1151,9 @@ int32_t tscCreateJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSupporter pSql->res.qhandle = 0x1; assert(pSql->res.numOfRows == 0); + int32_t index = 0; if (pSql->pSubs == NULL) { - pSql->pSubs = calloc(pSupporter->pState->numOfTotal, POINTER_BYTES); + pSql->pSubs = calloc(pSql->subState.numOfSub, POINTER_BYTES); if (pSql->pSubs == NULL) { return TSDB_CODE_TSC_OUT_OF_MEMORY; } @@ -1176,8 +1164,8 @@ int32_t tscCreateJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSupporter return TSDB_CODE_TSC_OUT_OF_MEMORY; } - pSql->pSubs[pSql->numOfSubs++] = pNew; - assert(pSql->numOfSubs <= pSupporter->pState->numOfTotal); + pSql->pSubs[index++] = pNew; + assert(index <= pSql->subState.numOfSub); if (QUERY_IS_JOIN_QUERY(pQueryInfo->type)) { addGroupInfoForSubquery(pSql, pNew, 0, tableIndex); @@ -1221,7 +1209,7 @@ int32_t tscCreateJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSupporter STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pNewQueryInfo, 0); if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) { // return the tableId & tag - SColumnIndex index = {0}; + SColumnIndex colIndex = {0}; STagCond* pTagCond = &pSupporter->tagCond; assert(pTagCond->joinInfo.hasJoin); @@ -1234,7 +1222,7 @@ int32_t tscCreateJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSupporter SSchema* pSchema = tscGetTableTagSchema(pTableMetaInfo->pTableMeta); for(int32_t i = 0; i < numOfTags; ++i) { if (pSchema[i].colId == tagColId) { - index.columnIndex = i; + colIndex.columnIndex = i; break; } } @@ -1251,18 +1239,18 @@ int32_t tscCreateJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSupporter // set get tags query type TSDB_QUERY_SET_TYPE(pNewQueryInfo->type, TSDB_QUERY_TYPE_TAG_FILTER_QUERY); - tscAddSpecialColumnForSelect(pNewQueryInfo, 0, TSDB_FUNC_TID_TAG, &index, &s1, TSDB_COL_TAG); + tscAddSpecialColumnForSelect(pNewQueryInfo, 0, TSDB_FUNC_TID_TAG, &colIndex, &s1, TSDB_COL_TAG); size_t numOfCols = taosArrayGetSize(pNewQueryInfo->colList); tscDebug( "%p subquery:%p tableIndex:%d, vgroupIndex:%d, type:%d, transfer to tid_tag query to retrieve (tableId, tags), " "exprInfo:%" PRIzu ", colList:%" PRIzu ", fieldsInfo:%d, tagIndex:%d, name:%s", pSql, pNew, tableIndex, pTableMetaInfo->vgroupIndex, pNewQueryInfo->type, tscSqlExprNumOfExprs(pNewQueryInfo), - numOfCols, pNewQueryInfo->fieldsInfo.numOfOutput, index.columnIndex, pNewQueryInfo->pTableMetaInfo[0]->name); + numOfCols, pNewQueryInfo->fieldsInfo.numOfOutput, colIndex.columnIndex, pNewQueryInfo->pTableMetaInfo[0]->name); } else { SSchema colSchema = {.type = TSDB_DATA_TYPE_BINARY, .bytes = 1}; - SColumnIndex index = {0, PRIMARYKEY_TIMESTAMP_COL_INDEX}; - tscAddSpecialColumnForSelect(pNewQueryInfo, 0, TSDB_FUNC_TS_COMP, &index, &colSchema, TSDB_COL_NORMAL); + SColumnIndex colIndex = {0, PRIMARYKEY_TIMESTAMP_COL_INDEX}; + tscAddSpecialColumnForSelect(pNewQueryInfo, 0, TSDB_FUNC_TS_COMP, &colIndex, &colSchema, TSDB_COL_NORMAL); // set the tags value for ts_comp function SSqlExpr *pExpr = tscSqlExprGet(pNewQueryInfo, 0); @@ -1320,8 +1308,7 @@ void tscHandleMasterJoinQuery(SSqlObj* pSql) { goto _error; } - pState->numOfTotal = pQueryInfo->numOfTables; - pState->numOfRemain = pState->numOfTotal; + pSql->subState.numOfSub = pQueryInfo->numOfTables; bool hasEmptySub = false; @@ -1354,10 +1341,10 @@ void tscHandleMasterJoinQuery(SSqlObj* pSql) { pSql->cmd.command = TSDB_SQL_RETRIEVE_EMPTY_RESULT; (*pSql->fp)(pSql->param, pSql, 0); } else { - for (int32_t i = 0; i < pSql->numOfSubs; ++i) { + for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) { SSqlObj* pSub = pSql->pSubs[i]; if ((code = tscProcessSql(pSub)) != TSDB_CODE_SUCCESS) { - pState->numOfRemain = i - 1; // the already sent reques will continue and do not go to the error process routine + pSql->subState.numOfRemain = i - 1; // the already sent request will continue and do not go to the error process routine break; } } @@ -1373,7 +1360,7 @@ void tscHandleMasterJoinQuery(SSqlObj* pSql) { } static void doCleanupSubqueries(SSqlObj *pSql, int32_t numOfSubs, SSubqueryState* pState) { - assert(numOfSubs <= pSql->numOfSubs && numOfSubs >= 0 && pState != NULL); + assert(numOfSubs <= pSql->subState.numOfSub && numOfSubs >= 0 && pState != NULL); for(int32_t i = 0; i < numOfSubs; ++i) { SSqlObj* pSub = pSql->pSubs[i]; @@ -1411,8 +1398,8 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) { SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); - pSql->numOfSubs = pTableMetaInfo->vgroupList->numOfVgroups; - assert(pSql->numOfSubs > 0); + pSql->subState.numOfSub = pTableMetaInfo->vgroupList->numOfVgroups; + assert(pSql->subState.numOfSub > 0); int32_t ret = tscLocalReducerEnvCreate(pSql, &pMemoryBuf, &pDesc, &pModel, nBufferSize); if (ret != 0) { @@ -1422,28 +1409,26 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) { return ret; } - pSql->pSubs = calloc(pSql->numOfSubs, POINTER_BYTES); + pSql->pSubs = calloc(pSql->subState.numOfSub, POINTER_BYTES); - tscDebug("%p retrieved query data from %d vnode(s)", pSql, pSql->numOfSubs); + tscDebug("%p retrieved query data from %d vnode(s)", pSql, pSql->subState.numOfSub); SSubqueryState *pState = calloc(1, sizeof(SSubqueryState)); if (pSql->pSubs == NULL || pState == NULL) { taosTFree(pState); taosTFree(pSql->pSubs); pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY; - tscLocalReducerEnvDestroy(pMemoryBuf, pDesc, pModel, pSql->numOfSubs); + tscLocalReducerEnvDestroy(pMemoryBuf, pDesc, pModel, pSql->subState.numOfSub); tscQueueAsyncRes(pSql); return ret; } - pState->numOfTotal = pSql->numOfSubs; - pState->numOfRemain = pSql->numOfSubs; - + pSql->subState.numOfRemain = pSql->subState.numOfSub; pRes->code = TSDB_CODE_SUCCESS; int32_t i = 0; - for (; i < pSql->numOfSubs; ++i) { + for (; i < pSql->subState.numOfSub; ++i) { SRetrieveSupport *trs = (SRetrieveSupport *)calloc(1, sizeof(SRetrieveSupport)); if (trs == NULL) { tscError("%p failed to malloc buffer for SRetrieveSupport, orderOfSub:%d, reason:%s", pSql, i, strerror(errno)); @@ -1452,8 +1437,7 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) { trs->pExtMemBuffer = pMemoryBuf; trs->pOrderDescriptor = pDesc; - trs->pState = pState; - + trs->localBuffer = (tFilePage *)calloc(1, nBufferSize + sizeof(tFilePage)); if (trs->localBuffer == NULL) { tscError("%p failed to malloc buffer for local buffer, orderOfSub:%d, reason:%s", pSql, i, strerror(errno)); @@ -1461,8 +1445,8 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) { break; } - trs->subqueryIndex = i; - trs->pParentSql = pSql; + trs->subqueryIndex = i; + trs->pParentSql = pSql; trs->pFinalColModel = pModel; SSqlObj *pNew = tscCreateSTableSubquery(pSql, trs, NULL); @@ -1483,39 +1467,39 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) { tscDebug("%p sub:%p create subquery success. orderOfSub:%d", pSql, pNew, trs->subqueryIndex); } - if (i < pSql->numOfSubs) { + if (i < pSql->subState.numOfSub) { tscError("%p failed to prepare subquery structure and launch subqueries", pSql); pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY; - tscLocalReducerEnvDestroy(pMemoryBuf, pDesc, pModel, pSql->numOfSubs); + tscLocalReducerEnvDestroy(pMemoryBuf, pDesc, pModel, pSql->subState.numOfSub); doCleanupSubqueries(pSql, i, pState); return pRes->code; // free all allocated resource } if (pRes->code == TSDB_CODE_TSC_QUERY_CANCELLED) { - tscLocalReducerEnvDestroy(pMemoryBuf, pDesc, pModel, pSql->numOfSubs); + tscLocalReducerEnvDestroy(pMemoryBuf, pDesc, pModel, pSql->subState.numOfSub); doCleanupSubqueries(pSql, i, pState); return pRes->code; } - for(int32_t j = 0; j < pSql->numOfSubs; ++j) { + for(int32_t j = 0; j < pSql->subState.numOfSub; ++j) { SSqlObj* pSub = pSql->pSubs[j]; SRetrieveSupport* pSupport = pSub->param; tscDebug("%p sub:%p launch subquery, orderOfSub:%d.", pSql, pSub, pSupport->subqueryIndex); tscProcessSql(pSub); } - + return TSDB_CODE_SUCCESS; } static void tscFreeSubSqlObj(SRetrieveSupport *trsupport, SSqlObj *pSql) { tscDebug("%p start to free subquery obj", pSql); - int32_t index = trsupport->subqueryIndex; - SSqlObj *pParentSql = trsupport->pParentSql; +// int32_t index = trsupport->subqueryIndex; +// SSqlObj *pParentSql = trsupport->pParentSql; - assert(pSql == pParentSql->pSubs[index]); +// assert(pSql == pParentSql->pSubs[index]); taosTFree(trsupport->localBuffer); taosTFree(trsupport); } @@ -1578,9 +1562,14 @@ void tscHandleSubqueryError(SRetrieveSupport *trsupport, SSqlObj *pSql, int numO int32_t subqueryIndex = trsupport->subqueryIndex; assert(pSql != NULL); - SSubqueryState* pState = trsupport->pState; - assert(pState->numOfRemain <= pState->numOfTotal && pState->numOfRemain >= 0 && pParentSql->numOfSubs == pState->numOfTotal); - + SSubqueryState* pState = &pParentSql->subState; + int32_t remain = pState->numOfRemain; + int32_t sub = pState->numOfSub; + UNUSED(remain); + UNUSED(sub); + + assert(pParentSql->subState.numOfRemain <= pState->numOfSub && pParentSql->subState.numOfRemain >= 0); + // retrieved in subquery failed. OR query cancelled in retrieve phase. if (taos_errno(pSql) == TSDB_CODE_SUCCESS && pParentSql->res.code != TSDB_CODE_SUCCESS) { @@ -1610,24 +1599,23 @@ void tscHandleSubqueryError(SRetrieveSupport *trsupport, SSqlObj *pSql, int numO } } - int32_t remain = -1; - if ((remain = atomic_sub_fetch_32(&pState->numOfRemain, 1)) > 0) { + remain = -1; + if ((remain = atomic_sub_fetch_32(&pParentSql->subState.numOfRemain, 1)) > 0) { tscDebug("%p sub:%p orderOfSub:%d freed, finished subqueries:%d", pParentSql, pSql, trsupport->subqueryIndex, - pState->numOfTotal - remain); + pState->numOfSub - remain); tscFreeSubSqlObj(trsupport, pSql); return; } // all subqueries are failed - tscError("%p retrieve from %d vnode(s) completed,code:%s.FAILED.", pParentSql, pState->numOfTotal, + tscError("%p retrieve from %d vnode(s) completed,code:%s.FAILED.", pParentSql, pState->numOfSub, tstrerror(pParentSql->res.code)); // release allocated resource tscLocalReducerEnvDestroy(trsupport->pExtMemBuffer, trsupport->pOrderDescriptor, trsupport->pFinalColModel, - pState->numOfTotal); + pState->numOfSub); - taosTFree(trsupport->pState); tscFreeSubSqlObj(trsupport, pSql); // in case of second stage join subquery, invoke its callback function instead of regular QueueAsyncRes @@ -1647,7 +1635,7 @@ static void tscAllDataRetrievedFromDnode(SRetrieveSupport *trsupport, SSqlObj* p SSqlObj * pParentSql = trsupport->pParentSql; tOrderDescriptor *pDesc = trsupport->pOrderDescriptor; - SSubqueryState* pState = trsupport->pState; + SSubqueryState* pState = &pParentSql->subState; SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0); STableMetaInfo* pTableMetaInfo = pQueryInfo->pTableMetaInfo[0]; @@ -1684,9 +1672,9 @@ static void tscAllDataRetrievedFromDnode(SRetrieveSupport *trsupport, SSqlObj* p } int32_t remain = -1; - if ((remain = atomic_sub_fetch_32(&pState->numOfRemain, 1)) > 0) { + if ((remain = atomic_sub_fetch_32(&pParentSql->subState.numOfRemain, 1)) > 0) { tscDebug("%p sub:%p orderOfSub:%d freed, finished subqueries:%d", pParentSql, pSql, trsupport->subqueryIndex, - pState->numOfTotal - remain); + pState->numOfSub - remain); tscFreeSubSqlObj(trsupport, pSql); return; @@ -1696,20 +1684,18 @@ static void tscAllDataRetrievedFromDnode(SRetrieveSupport *trsupport, SSqlObj* p pDesc->pColumnModel->capacity = trsupport->pExtMemBuffer[idx]->numOfElemsPerPage; tscDebug("%p retrieve from %d vnodes completed.final NumOfRows:%" PRId64 ",start to build loser tree", pParentSql, - pState->numOfTotal, pState->numOfRetrievedRows); + pState->numOfSub, pState->numOfRetrievedRows); SQueryInfo *pPQueryInfo = tscGetQueryInfoDetail(&pParentSql->cmd, 0); tscClearInterpInfo(pPQueryInfo); - tscCreateLocalReducer(trsupport->pExtMemBuffer, pState->numOfTotal, pDesc, trsupport->pFinalColModel, pParentSql); + tscCreateLocalReducer(trsupport->pExtMemBuffer, pState->numOfSub, pDesc, trsupport->pFinalColModel, pParentSql); tscDebug("%p build loser tree completed", pParentSql); pParentSql->res.precision = pSql->res.precision; pParentSql->res.numOfRows = 0; pParentSql->res.row = 0; - // only free once - taosTFree(trsupport->pState); tscFreeSubSqlObj(trsupport, pSql); // set the command flag must be after the semaphore been correctly set. @@ -1730,8 +1716,8 @@ static void tscRetrieveFromDnodeCallBack(void *param, TAOS_RES *tres, int numOfR assert(tres != NULL); SSqlObj *pSql = (SSqlObj *)tres; - SSubqueryState* pState = trsupport->pState; - assert(pState->numOfRemain <= pState->numOfTotal && pState->numOfRemain >= 0 && pParentSql->numOfSubs == pState->numOfTotal); + SSubqueryState* pState = &pParentSql->subState; + assert(pState->numOfRemain <= pState->numOfSub && pState->numOfRemain >= 0); STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0); SCMVgroupInfo *pVgroup = &pTableMetaInfo->vgroupList->vgroups[0]; @@ -1748,6 +1734,10 @@ static void tscRetrieveFromDnodeCallBack(void *param, TAOS_RES *tres, int numOfR if (taos_errno(pSql) != TSDB_CODE_SUCCESS) { assert(numOfRows == taos_errno(pSql)); + if (numOfRows == TSDB_CODE_TSC_QUERY_CANCELLED) { + trsupport->numOfRetry = MAX_NUM_OF_SUBQUERY_RETRY; + } + if (trsupport->numOfRetry++ < MAX_NUM_OF_SUBQUERY_RETRY) { tscError("%p sub:%p failed code:%s, retry:%d", pParentSql, pSql, tstrerror(numOfRows), trsupport->numOfRetry); @@ -1819,7 +1809,7 @@ static SSqlObj *tscCreateSTableSubquery(SSqlObj *pSql, SRetrieveSupport *trsuppo SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pNew->cmd, 0); pQueryInfo->type |= TSDB_QUERY_TYPE_STABLE_SUBQUERY; - assert(pQueryInfo->numOfTables == 1 && pNew->cmd.numOfClause == 1 && trsupport->subqueryIndex < pSql->numOfSubs); + assert(pQueryInfo->numOfTables == 1 && pNew->cmd.numOfClause == 1 && trsupport->subqueryIndex < pSql->subState.numOfSub); // launch subquery for each vnode, so the subquery index equals to the vgroupIndex. STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, table_index); @@ -1890,7 +1880,6 @@ void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) { static void multiVnodeInsertFinalize(void* param, TAOS_RES* tres, int numOfRows) { SInsertSupporter *pSupporter = (SInsertSupporter *)param; SSqlObj* pParentObj = pSupporter->pSql; - SSubqueryState* pState = pSupporter->pState; // record the total inserted rows if (numOfRows > 0) { @@ -1906,15 +1895,12 @@ static void multiVnodeInsertFinalize(void* param, TAOS_RES* tres, int numOfRows) taosTFree(pSupporter); - if (atomic_sub_fetch_32(&pState->numOfRemain, 1) > 0) { + if (atomic_sub_fetch_32(&pParentObj->subState.numOfRemain, 1) > 0) { return; } tscDebug("%p Async insertion completed, total inserted:%" PRId64, pParentObj, pParentObj->res.numOfRows); - // release data block data - taosTFree(pState); - // restore user defined fp pParentObj->fp = pParentObj->fetchFp; @@ -1935,7 +1921,7 @@ int32_t tscHandleInsertRetry(SSqlObj* pSql) { SSqlRes* pRes = &pSql->res; SInsertSupporter* pSupporter = (SInsertSupporter*) pSql->param; - assert(pSupporter->index < pSupporter->pState->numOfTotal); + assert(pSupporter->index < pSupporter->pSql->subState.numOfSub); STableDataBlocks* pTableDataBlock = taosArrayGetP(pCmd->pDataBlocks, pSupporter->index); int32_t code = tscCopyDataBlockToPayload(pSql, pTableDataBlock); @@ -1952,33 +1938,29 @@ int32_t tscHandleMultivnodeInsert(SSqlObj *pSql) { SSqlCmd *pCmd = &pSql->cmd; SSqlRes *pRes = &pSql->res; - pSql->numOfSubs = (uint16_t)taosArrayGetSize(pCmd->pDataBlocks); - assert(pSql->numOfSubs > 0); + pSql->subState.numOfSub = (uint16_t)taosArrayGetSize(pCmd->pDataBlocks); + assert(pSql->subState.numOfSub > 0); pRes->code = TSDB_CODE_SUCCESS; // the number of already initialized subqueries int32_t numOfSub = 0; - SSubqueryState *pState = calloc(1, sizeof(SSubqueryState)); - pState->numOfTotal = pSql->numOfSubs; - pState->numOfRemain = pSql->numOfSubs; - - pSql->pSubs = calloc(pSql->numOfSubs, POINTER_BYTES); + pSql->subState.numOfRemain = pSql->subState.numOfSub; + pSql->pSubs = calloc(pSql->subState.numOfSub, POINTER_BYTES); if (pSql->pSubs == NULL) { goto _error; } - tscDebug("%p submit data to %d vnode(s)", pSql, pSql->numOfSubs); + tscDebug("%p submit data to %d vnode(s)", pSql, pSql->subState.numOfSub); - while(numOfSub < pSql->numOfSubs) { + while(numOfSub < pSql->subState.numOfSub) { SInsertSupporter* pSupporter = calloc(1, sizeof(SInsertSupporter)); if (pSupporter == NULL) { goto _error; } pSupporter->pSql = pSql; - pSupporter->pState = pState; pSupporter->index = numOfSub; SSqlObj *pNew = createSimpleSubObj(pSql, multiVnodeInsertFinalize, pSupporter, TSDB_SQL_INSERT); @@ -2001,12 +1983,12 @@ int32_t tscHandleMultivnodeInsert(SSqlObj *pSql) { numOfSub++; } else { tscDebug("%p prepare submit data block failed in async insertion, vnodeIdx:%d, total:%d, code:%s", pSql, numOfSub, - pSql->numOfSubs, tstrerror(pRes->code)); + pSql->subState.numOfSub, tstrerror(pRes->code)); goto _error; } } - if (numOfSub < pSql->numOfSubs) { + if (numOfSub < pSql->subState.numOfSub) { tscError("%p failed to prepare subObj structure and launch sub-insertion", pSql); pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY; goto _error; @@ -2024,7 +2006,6 @@ int32_t tscHandleMultivnodeInsert(SSqlObj *pSql) { return TSDB_CODE_SUCCESS; _error: - taosTFree(pState); return TSDB_CODE_TSC_OUT_OF_MEMORY; } @@ -2046,7 +2027,7 @@ static void doBuildResFromSubqueries(SSqlObj* pSql) { SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, pSql->cmd.clauseIndex); int32_t numOfRes = INT32_MAX; - for (int32_t i = 0; i < pSql->numOfSubs; ++i) { + for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) { if (pSql->pSubs[i] == NULL) { continue; } @@ -2236,7 +2217,7 @@ static UNUSED_FUNC bool tscHasRemainDataInSubqueryResultSet(SSqlObj *pSql) { if (tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0)) { bool allSubqueryExhausted = true; - for (int32_t i = 0; i < pSql->numOfSubs; ++i) { + for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) { if (pSql->pSubs[i] == NULL) { continue; } @@ -2262,7 +2243,7 @@ static UNUSED_FUNC bool tscHasRemainDataInSubqueryResultSet(SSqlObj *pSql) { hasData = !allSubqueryExhausted; } else { // otherwise, in case inner join, if any subquery exhausted, query completed. - for (int32_t i = 0; i < pSql->numOfSubs; ++i) { + for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) { if (pSql->pSubs[i] == 0) { continue; } diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index f6bb97e52f..33c51c5571 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -360,26 +360,26 @@ void tscPartiallyFreeSqlObj(SSqlObj* pSql) { tscFreeSqlResult(pSql); taosTFree(pSql->pSubs); - pSql->numOfSubs = 0; + pSql->subState.numOfSub = 0; pSql->self = 0; tscResetSqlCmdObj(pCmd, false); } -static UNUSED_FUNC void tscFreeSubobj(SSqlObj* pSql) { - if (pSql->numOfSubs == 0) { +static void tscFreeSubobj(SSqlObj* pSql) { + if (pSql->subState.numOfSub == 0) { return; } - tscDebug("%p start to free sub SqlObj, numOfSub:%d", pSql, pSql->numOfSubs); + tscDebug("%p start to free sub SqlObj, numOfSub:%d", pSql, pSql->subState.numOfSub); - for(int32_t i = 0; i < pSql->numOfSubs; ++i) { + for(int32_t i = 0; i < pSql->subState.numOfSub; ++i) { tscDebug("%p free sub SqlObj:%p, index:%d", pSql, pSql->pSubs[i], i); taos_free_result(pSql->pSubs[i]); pSql->pSubs[i] = NULL; } - pSql->numOfSubs = 0; + pSql->subState.numOfSub = 0; } /** @@ -415,7 +415,9 @@ void tscFreeSqlObj(SSqlObj* pSql) { tscDebug("%p start to free sqlObj", pSql); + pSql->res.code = TSDB_CODE_TSC_QUERY_CANCELLED; tscFreeSubobj(pSql); + tscPartiallyFreeSqlObj(pSql); pSql->signature = NULL; @@ -2284,7 +2286,7 @@ void tscTryQueryNextVnode(SSqlObj* pSql, __async_cb_func_t fp) { * * For super table join with projection query, if anyone of the subquery is exhausted, the query completed. */ - pSql->numOfSubs = 0; + pSql->subState.numOfSub = 0; pCmd->command = TSDB_SQL_SELECT; tscResetForNextRetrieve(pRes); @@ -2316,7 +2318,7 @@ void tscTryQueryNextClause(SSqlObj* pSql, __async_cb_func_t fp) { pRes->numOfTotal = num; taosTFree(pSql->pSubs); - pSql->numOfSubs = 0; + pSql->subState.numOfSub = 0; pSql->fp = fp; tscDebug("%p try data in the next subclause:%d, total subclause:%d", pSql, pCmd->clauseIndex, pCmd->numOfClause); diff --git a/src/vnode/src/vnodeRead.c b/src/vnode/src/vnodeRead.c index 58e97075ac..a3ed080f24 100644 --- a/src/vnode/src/vnodeRead.c +++ b/src/vnode/src/vnodeRead.c @@ -261,6 +261,7 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { if (vnodeNotifyCurrentQhandle(pReadMsg->rpcMsg.handle, *handle, pVnode->vgId) != TSDB_CODE_SUCCESS) { vError("vgId:%d, QInfo:%p, retrieve discarded since link is broken, %p", pVnode->vgId, *handle, pReadMsg->rpcMsg.handle); code = TSDB_CODE_RPC_NETWORK_UNAVAIL; + qKillQuery(*handle); qReleaseQInfo(pVnode->qMgmt, (void**) &handle, true); return code; } -- GitLab