diff --git a/src/client/inc/tscLocalMerge.h b/src/client/inc/tscLocalMerge.h index c073f40546c6a5ceaeabeafd3d731fdf211402df..d38885ab2ee9734ae948e19d4816dcc8a8f73ce6 100644 --- a/src/client/inc/tscLocalMerge.h +++ b/src/client/inc/tscLocalMerge.h @@ -84,7 +84,7 @@ typedef struct SRetrieveSupport { SColumnModel * pFinalColModel; // colModel for final result SSubqueryState * pState; int32_t subqueryIndex; // index of current vnode in vnode list - SSqlObj * pParentSqlObj; + SSqlObj * pParentSql; tFilePage * localBuffer; // temp buffer, there is a buffer for each vnode to uint32_t numOfRetry; // record the number of retry times pthread_mutex_t queryMutex; diff --git a/src/client/inc/tscSubquery.h b/src/client/inc/tscSubquery.h index 82d490376aa8de18814c6cf9a1f3fbfb5be6ce75..d5833675aa1a761eb94402bfdc3fe04212d41d75 100644 --- a/src/client/inc/tscSubquery.h +++ b/src/client/inc/tscSubquery.h @@ -36,6 +36,8 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql); int32_t tscHandleMultivnodeInsert(SSqlObj *pSql); +int32_t tscHandleInsertRetry(SSqlObj* pSql); + void tscBuildResFromSubqueries(SSqlObj *pSql); void **doSetResultRowData(SSqlObj *pSql, bool finalResult); diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index 471db9c99f73fbc6881c849f93d25e52261710e9..6ce94d5aa4d10688d7f92dfe2c9ccbfc8852bc9d 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -213,8 +213,7 @@ typedef struct SQueryInfo { typedef struct { int command; uint8_t msgType; - bool autoCreated; // if the table is missing, on-the-fly create it. during getmeterMeta - int8_t dataSourceType; // load data from file or not + bool autoCreated; // create table if it is not existed during retrieve table meta in mnode union { int32_t count; @@ -222,18 +221,23 @@ typedef struct { }; int32_t insertType; - int32_t clauseIndex; // index of multiple subclause query + int32_t clauseIndex; // index of multiple subclause query + + char * curSql; // current sql, resume position of sql after parsing paused int8_t parseFinished; + short numOfCols; uint32_t allocSize; char * payload; int32_t payloadLen; SQueryInfo **pQueryInfo; int32_t numOfClause; - char * curSql; // current sql, resume position of sql after parsing paused - void * pTableList; // referred table involved in sql int32_t batchSize; // for parameter ('?') binding and batch processing int32_t numOfParams; + + int8_t dataSourceType; // load data from file or not + int8_t submitSchema; // submit block is built with table schema + SHashObj *pTableList; // referred table involved in sql SArray *pDataBlocks; // SArray submit data blocks after parsing sql } SSqlCmd; diff --git a/src/client/src/tscAsync.c b/src/client/src/tscAsync.c index 0bfee46b4bfe740ed23668bfb51cc340cd4d195e..2de45bcc6e0a0be3f41f492d0a0b760c7585d3a4 100644 --- a/src/client/src/tscAsync.c +++ b/src/client/src/tscAsync.c @@ -431,6 +431,8 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) { return; } + tscDebug("%p get tableMeta successfully", pSql); + if (pSql->pStream == NULL) { SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); @@ -446,20 +448,20 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) { assert(code == TSDB_CODE_SUCCESS); } - - assert((tscGetNumOfTags(pTableMetaInfo->pTableMeta) != 0) && pTableMetaInfo->vgroupIndex >= 0 && pSql->param != NULL); + assert((tscGetNumOfTags(pTableMetaInfo->pTableMeta) != 0) && pSql->param != NULL); SRetrieveSupport *trs = (SRetrieveSupport *)pSql->param; - SSqlObj * pParObj = trs->pParentSqlObj; + SSqlObj * pParObj = trs->pParentSql; + // NOTE: the vgroupInfo for the queried super table must be existed here. assert(pParObj->signature == pParObj && trs->subqueryIndex == pTableMetaInfo->vgroupIndex && - tscGetNumOfTags(pTableMetaInfo->pTableMeta) != 0); + pTableMetaInfo->vgroupIndex >= 0 && pTableMetaInfo->vgroupList != NULL); - // NOTE: the vgroupInfo for the queried super table must be existed here. - assert(pTableMetaInfo->vgroupList != NULL); if ((code = tscProcessSql(pSql)) == TSDB_CODE_SUCCESS) { return; } + + goto _error; } else { // continue to process normal async query if (pCmd->parseFinished) { tscDebug("%p update table meta in local cache, continue to process sql and send corresponding query", pSql); @@ -472,18 +474,41 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) { assert(code == TSDB_CODE_SUCCESS); } - // if failed to process sql, go to error handler - if ((code = tscProcessSql(pSql)) == TSDB_CODE_SUCCESS) { - return; + // in case of insert, redo parsing the sql string and build new submit data block for two reasons: + // 1. the table Id(tid & uid) may have been update, the submit block needs to be updated + // 2. vnode may need the schema information along with submit block to update its local table schema. + if (pCmd->command == TSDB_SQL_INSERT) { + tscDebug("%p redo parse sql string to build submit block", pSql); + + pCmd->parseFinished = false; + if ((code = tsParseSql(pSql, true)) == TSDB_CODE_SUCCESS) { + /* + * Discard previous built submit blocks, and then parse the sql string again and build up all submit blocks, + * and send the required submit block according to index value in supporter to server. + */ + pSql->fp = pSql->fetchFp; // restore the fp + if ((code = tscHandleInsertRetry(pSql)) == TSDB_CODE_SUCCESS) { + return; + } + } + + } else {// in case of other query type, continue + if ((code = tscProcessSql(pSql)) == TSDB_CODE_SUCCESS) { + return; + } } -// // todo update the submit message according to the new table meta -// // 1. table uid, 2. ip address -// code = tscSendMsgToServer(pSql); -// if (code == TSDB_CODE_SUCCESS) return; + + goto _error; } else { tscDebug("%p continue parse sql after get table meta", pSql); code = tsParseSql(pSql, false); + if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) { + return; + } else if (code != TSDB_CODE_SUCCESS) { + goto _error; + } + if (TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_STMT_INSERT)) { STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0); code = tscGetTableMeta(pSql, pTableMetaInfo); @@ -492,45 +517,49 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) { } else { assert(code == TSDB_CODE_SUCCESS); } + (*pSql->fp)(pSql->param, pSql, code); return; } - - if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) return; + + // proceed to invoke the tscDoQuery(); } } } else { // stream computing STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0); - code = tscGetTableMeta(pSql, pTableMetaInfo); - pRes->code = code; - if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) return; + code = tscGetTableMeta(pSql, pTableMetaInfo); + if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) { + return; + } else if (code != TSDB_CODE_SUCCESS) { + goto _error; + } if (code == TSDB_CODE_SUCCESS && UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) { code = tscGetSTableVgroupInfo(pSql, pCmd->clauseIndex); - pRes->code = code; - - if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) return; + if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) { + return; + } else if (code != TSDB_CODE_SUCCESS) { + goto _error; + } } - } - - if (code != TSDB_CODE_SUCCESS) { - pSql->res.code = code; - tscQueueAsyncRes(pSql); - return; - } - if (pSql->pStream) { tscDebug("%p stream:%p meta is updated, start new query, command:%d", pSql, pSql->pStream, pSql->cmd.command); if (!pSql->cmd.parseFinished) { tsParseSql(pSql, false); sem_post(&pSql->rspSem); } + return; - } else { - tscDebug("%p get tableMeta successfully", pSql); } tscDoQuery(pSql); + return; + + _error: + if (code != TSDB_CODE_SUCCESS) { + pSql->res.code = code; + tscQueueAsyncRes(pSql); + } } diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index 0ffe50c8f43eb959fba236992fe592556a69d8e1..a3d3b035e2c4564acd34a71fe1c1490ddc25ec75 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -348,8 +348,7 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcIpSet *pIpSet) { int doProcessSql(SSqlObj *pSql) { SSqlCmd *pCmd = &pSql->cmd; SSqlRes *pRes = &pSql->res; - int32_t code = TSDB_CODE_SUCCESS; - + if (pCmd->command == TSDB_SQL_SELECT || pCmd->command == TSDB_SQL_FETCH || pCmd->command == TSDB_SQL_RETRIEVE || @@ -366,10 +365,13 @@ int doProcessSql(SSqlObj *pSql) { return pRes->code; } - code = tscSendMsgToServer(pSql); + int32_t code = tscSendMsgToServer(pSql); + + // NOTE: if code is TSDB_CODE_SUCCESS, pSql may have been released here already by other threads. if (code != TSDB_CODE_SUCCESS) { pRes->code = code; tscQueueAsyncRes(pSql); + return pRes->code; } return TSDB_CODE_SUCCESS; diff --git a/src/client/src/tscSubquery.c b/src/client/src/tscSubquery.c index 0c2e3f3a1c6251c828eb728095366d5aefb7ecfe..0243d115f0b8dbe87e055c14966a39c11015a09e 100644 --- a/src/client/src/tscSubquery.c +++ b/src/client/src/tscSubquery.c @@ -25,6 +25,7 @@ typedef struct SInsertSupporter { SSubqueryState* pState; SSqlObj* pSql; + int32_t index; } SInsertSupporter; static void freeJoinSubqueryObj(SSqlObj* pSql); @@ -1414,7 +1415,7 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) { } trs->subqueryIndex = i; - trs->pParentSqlObj = pSql; + trs->pParentSql = pSql; trs->pFinalColModel = pModel; pthread_mutexattr_t mutexattr; @@ -1499,7 +1500,7 @@ static void tscAbortFurtherRetryRetrieval(SRetrieveSupport *trsupport, TAOS_RES tscError("sub:%p failed to flush data to disk, reason:%s", tres, tstrerror(code)); #endif - SSqlObj* pParentSql = trsupport->pParentSqlObj; + SSqlObj* pParentSql = trsupport->pParentSql; pParentSql->res.code = code; trsupport->numOfRetry = MAX_NUM_OF_SUBQUERY_RETRY; @@ -1508,8 +1509,45 @@ static void tscAbortFurtherRetryRetrieval(SRetrieveSupport *trsupport, TAOS_RES tscHandleSubqueryError(trsupport, tres, pParentSql->res.code); } +/* + * current query failed, and the retry count is less than the available + * count, retry query clear previous retrieved data, then launch a new sub query + */ +static int32_t tscReissueSubquery(SRetrieveSupport *trsupport, SSqlObj *pSql, int32_t code) { + SSqlObj *pParentSql = trsupport->pParentSql; + int32_t subqueryIndex = trsupport->subqueryIndex; + + STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0); + SCMVgroupInfo* pVgroup = &pTableMetaInfo->vgroupList->vgroups[0]; + + tExtMemBufferClear(trsupport->pExtMemBuffer[subqueryIndex]); + + // clear local saved number of results + trsupport->localBuffer->num = 0; + pthread_mutex_unlock(&trsupport->queryMutex); + + tscTrace("%p sub:%p retrieve failed, code:%s, orderOfSub:%d, retry:%d", trsupport->pParentSql, pSql, + tstrerror(code), subqueryIndex, trsupport->numOfRetry); + + SSqlObj *pNew = tscCreateSqlObjForSubquery(trsupport->pParentSql, trsupport, pSql); + + // todo add to async res or not?? + if (pNew == NULL) { + tscError("%p sub:%p failed to create new subquery due to out of memory, abort retry, vgId:%d, orderOfSub:%d", + trsupport->pParentSql, pSql, pVgroup->vgId, trsupport->subqueryIndex); + + pParentSql->res.code = TSDB_CODE_TSC_OUT_OF_MEMORY; + trsupport->numOfRetry = MAX_NUM_OF_SUBQUERY_RETRY; + + return pParentSql->res.code; + } + + taos_free_result(pSql); + return tscProcessSql(pNew); +} + void tscHandleSubqueryError(SRetrieveSupport *trsupport, SSqlObj *pSql, int numOfRows) { - SSqlObj *pParentSql = trsupport->pParentSqlObj; + SSqlObj *pParentSql = trsupport->pParentSql; int32_t subqueryIndex = trsupport->subqueryIndex; assert(pSql != NULL); @@ -1528,38 +1566,16 @@ void tscHandleSubqueryError(SRetrieveSupport *trsupport, SSqlObj *pSql, int numO tscDebug("%p query is cancelled, sub:%p, orderOfSub:%d abort retrieve, code:%d", pParentSql, pSql, subqueryIndex, pParentSql->res.code); } - + if (numOfRows >= 0) { // current query is successful, but other sub query failed, still abort current query. tscDebug("%p sub:%p retrieve numOfRows:%d,orderOfSub:%d", pParentSql, pSql, numOfRows, subqueryIndex); tscError("%p sub:%p abort further retrieval due to other queries failure,orderOfSub:%d,code:%d", pParentSql, pSql, subqueryIndex, pParentSql->res.code); } else { if (trsupport->numOfRetry++ < MAX_NUM_OF_SUBQUERY_RETRY && pParentSql->res.code == TSDB_CODE_SUCCESS) { - /* - * current query failed, and the retry count is less than the available - * count, retry query clear previous retrieved data, then launch a new sub query - */ - tExtMemBufferClear(trsupport->pExtMemBuffer[subqueryIndex]); - - // clear local saved number of results - trsupport->localBuffer->num = 0; - pthread_mutex_unlock(&trsupport->queryMutex); - - tscDebug("%p sub:%p retrieve failed, code:%s, orderOfSub:%d, retry:%d", trsupport->pParentSqlObj, pSql, - tstrerror(numOfRows), subqueryIndex, trsupport->numOfRetry); - - SSqlObj *pNew = tscCreateSqlObjForSubquery(trsupport->pParentSqlObj, trsupport, pSql); - if (pNew == NULL) { - tscError("%p sub:%p failed to create new subquery sqlObj due to out of memory, abort retry", - trsupport->pParentSqlObj, pSql); - - pParentSql->res.code = TSDB_CODE_TSC_OUT_OF_MEMORY; - trsupport->numOfRetry = MAX_NUM_OF_SUBQUERY_RETRY; + if (tscReissueSubquery(trsupport, pSql, numOfRows) == TSDB_CODE_SUCCESS) { return; } - - tscProcessSql(pNew); - return; } else { // reach the maximum retry count, abort atomic_val_compare_exchange_32(&pParentSql->res.code, TSDB_CODE_SUCCESS, numOfRows); tscError("%p sub:%p retrieve failed,code:%s,orderOfSub:%d failed.no more retry,set global code:%s", pParentSql, pSql, @@ -1600,7 +1616,7 @@ void tscHandleSubqueryError(SRetrieveSupport *trsupport, SSqlObj *pSql, int numO static void tscAllDataRetrievedFromDnode(SRetrieveSupport *trsupport, SSqlObj* pSql) { int32_t idx = trsupport->subqueryIndex; - SSqlObj * pPObj = trsupport->pParentSqlObj; + SSqlObj * pParentSql = trsupport->pParentSql; tOrderDescriptor *pDesc = trsupport->pOrderDescriptor; SSubqueryState* pState = trsupport->pState; @@ -1610,7 +1626,7 @@ static void tscAllDataRetrievedFromDnode(SRetrieveSupport *trsupport, SSqlObj* p // data in from current vnode is stored in cache and disk uint32_t numOfRowsFromSubquery = trsupport->pExtMemBuffer[idx]->numOfTotalElems + trsupport->localBuffer->num; - tscDebug("%p sub:%p all data retrieved from ip:%s, vgId:%d, numOfRows:%d, orderOfSub:%d", pPObj, pSql, + tscDebug("%p sub:%p all data retrieved from ip:%s, vgId:%d, numOfRows:%d, orderOfSub:%d", pParentSql, pSql, pTableMetaInfo->vgroupList->vgroups[0].ipAddr[0].fqdn, pTableMetaInfo->vgroupList->vgroups[0].vgId, numOfRowsFromSubquery, idx); @@ -1624,15 +1640,14 @@ static void tscAllDataRetrievedFromDnode(SRetrieveSupport *trsupport, SSqlObj* p trsupport->localBuffer->num, colInfo); #endif - if (tsTotalTmpDirGB != 0 && tsAvailTmpDirGB < tsMinimalTmpDirGB) { - tscError("%p sub:%p client disk space remain %.3f GB, need at least %.3f GB, stop query", pPObj, pSql, - tsAvailTmpDirGB, tsMinimalTmpDirGB); - tscAbortFurtherRetryRetrieval(trsupport, pSql, TSDB_CODE_TSC_NO_DISKSPACE); - return; + if (tsTotalTmpDirGB != 0 && tsAvailTmpDirectorySpace < tsReservedTmpDirectorySpace) { + tscError("%p sub:%p client disk space remain %.3f GB, need at least %.3f GB, stop query", pParentSql, pSql, + tsAvailTmpDirectorySpace, tsReservedTmpDirectorySpace); + return tscAbortFurtherRetryRetrieval(trsupport, pSql, TSDB_CODE_TSC_NO_DISKSPACE); } // each result for a vnode is ordered as an independant list, - // then used as an input of loser tree for disk-based merge routine + // then used as an input of loser tree for disk-based merge int32_t code = tscFlushTmpBuffer(trsupport->pExtMemBuffer[idx], pDesc, trsupport->localBuffer, pQueryInfo->groupbyExpr.orderType); if (code != 0) { // set no disk space error info, and abort retry return tscAbortFurtherRetryRetrieval(trsupport, pSql, code); @@ -1640,7 +1655,7 @@ static void tscAllDataRetrievedFromDnode(SRetrieveSupport *trsupport, SSqlObj* p int32_t remain = -1; if ((remain = atomic_sub_fetch_32(&pState->numOfRemain, 1)) > 0) { - tscDebug("%p sub:%p orderOfSub:%d freed, finished subqueries:%d", pPObj, pSql, trsupport->subqueryIndex, + tscDebug("%p sub:%p orderOfSub:%d freed, finished subqueries:%d", pParentSql, pSql, trsupport->subqueryIndex, pState->numOfTotal - remain); return tscFreeSubSqlObj(trsupport, pSql); @@ -1649,29 +1664,29 @@ static void tscAllDataRetrievedFromDnode(SRetrieveSupport *trsupport, SSqlObj* p // all sub-queries are returned, start to local merge process pDesc->pColumnModel->capacity = trsupport->pExtMemBuffer[idx]->numOfElemsPerPage; - tscDebug("%p retrieve from %d vnodes completed.final NumOfRows:%" PRId64 ",start to build loser tree", pPObj, + tscDebug("%p retrieve from %d vnodes completed.final NumOfRows:%" PRId64 ",start to build loser tree", pParentSql, pState->numOfTotal, pState->numOfRetrievedRows); - SQueryInfo *pPQueryInfo = tscGetQueryInfoDetail(&pPObj->cmd, 0); + SQueryInfo *pPQueryInfo = tscGetQueryInfoDetail(&pParentSql->cmd, 0); tscClearInterpInfo(pPQueryInfo); - tscCreateLocalReducer(trsupport->pExtMemBuffer, pState->numOfTotal, pDesc, trsupport->pFinalColModel, pPObj); - tscDebug("%p build loser tree completed", pPObj); + tscCreateLocalReducer(trsupport->pExtMemBuffer, pState->numOfTotal, pDesc, trsupport->pFinalColModel, pParentSql); + tscDebug("%p build loser tree completed", pParentSql); - pPObj->res.precision = pSql->res.precision; - pPObj->res.numOfRows = 0; - pPObj->res.row = 0; + pParentSql->res.precision = pSql->res.precision; + pParentSql->res.numOfRows = 0; + pParentSql->res.row = 0; // only free once tfree(trsupport->pState); tscFreeSubSqlObj(trsupport, pSql); // set the command flag must be after the semaphore been correctly set. - pPObj->cmd.command = TSDB_SQL_RETRIEVE_LOCALMERGE; - if (pPObj->res.code == TSDB_CODE_SUCCESS) { - (*pPObj->fp)(pPObj->param, pPObj, 0); + pParentSql->cmd.command = TSDB_SQL_RETRIEVE_LOCALMERGE; + if (pParentSql->res.code == TSDB_CODE_SUCCESS) { + (*pParentSql->fp)(pParentSql->param, pParentSql, 0); } else { - tscQueueAsyncRes(pPObj); + tscQueueAsyncRes(pParentSql); } } @@ -1679,22 +1694,48 @@ static void tscRetrieveFromDnodeCallBack(void *param, TAOS_RES *tres, int numOfR SRetrieveSupport *trsupport = (SRetrieveSupport *)param; tOrderDescriptor *pDesc = trsupport->pOrderDescriptor; int32_t idx = trsupport->subqueryIndex; - SSqlObj * pPObj = trsupport->pParentSqlObj; + SSqlObj * pParentSql = trsupport->pParentSql; SSqlObj *pSql = (SSqlObj *)tres; if (pSql == NULL) { // sql object has been released in error process, return immediately - tscDebug("%p subquery has been released, idx:%d, abort", pPObj, idx); + tscDebug("%p subquery has been released, idx:%d, abort", pParentSql, idx); return; } SSubqueryState* pState = trsupport->pState; - assert(pState->numOfRemain <= pState->numOfTotal && pState->numOfRemain >= 0 && pPObj->numOfSubs == pState->numOfTotal); + assert(pState->numOfRemain <= pState->numOfTotal && pState->numOfRemain >= 0 && pParentSql->numOfSubs == pState->numOfTotal); // query process and cancel query process may execute at the same time pthread_mutex_lock(&trsupport->queryMutex); - - if (numOfRows < 0 || pPObj->res.code != TSDB_CODE_SUCCESS) { - return tscHandleSubqueryError(trsupport, pSql, numOfRows); + + STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0); + SCMVgroupInfo* pVgroup = &pTableMetaInfo->vgroupList->vgroups[0]; + + if (pParentSql->res.code != TSDB_CODE_SUCCESS) { + trsupport->numOfRetry = MAX_NUM_OF_SUBQUERY_RETRY; + tscTrace("%p query cancelled or failed, sub:%p, vgId:%d, orderOfSub:%d, code:%s, global code:%s", + pParentSql, pSql, pVgroup->vgId, trsupport->subqueryIndex, tstrerror(numOfRows), tstrerror(pParentSql->res.code)); + + tscHandleSubqueryError(param, tres, numOfRows); + return; + } + + if (taos_errno(pSql) != TSDB_CODE_SUCCESS) { + assert(numOfRows == taos_errno(pSql)); + + if (trsupport->numOfRetry++ < MAX_NUM_OF_SUBQUERY_RETRY) { + tscTrace("%p sub:%p failed code:%s, retry:%d", pParentSql, pSql, tstrerror(numOfRows), trsupport->numOfRetry); + + if (tscReissueSubquery(trsupport, pSql, numOfRows) == TSDB_CODE_SUCCESS) { + return; + } + } else { + tscTrace("%p sub:%p reach the max retry times, set global code:%s", pParentSql, pSql, tstrerror(numOfRows)); + atomic_val_compare_exchange_32(&pParentSql->res.code, TSDB_CODE_SUCCESS, numOfRows); // set global code and abort + } + + tscHandleSubqueryError(param, tres, numOfRows); + return; } SSqlRes * pRes = &pSql->res; @@ -1704,14 +1745,13 @@ static void tscRetrieveFromDnodeCallBack(void *param, TAOS_RES *tres, int numOfR assert(pRes->numOfRows == numOfRows); int64_t num = atomic_add_fetch_64(&pState->numOfRetrievedRows, numOfRows); - tscDebug("%p sub:%p retrieve numOfRows:%" PRId64 " totalNumOfRows:%" PRIu64 " from ip:%s, orderOfSub:%d", pPObj, pSql, + tscDebug("%p sub:%p retrieve numOfRows:%" PRId64 " totalNumOfRows:%" PRIu64 " from ip:%s, orderOfSub:%d", pParentSql, pSql, pRes->numOfRows, pState->numOfRetrievedRows, pSql->ipList.fqdn[pSql->ipList.inUse], idx); if (num > tsMaxNumOfOrderedResults && tscIsProjectionQueryOnSTable(pQueryInfo, 0)) { tscError("%p sub:%p num of OrderedRes is too many, max allowed:%" PRId32 " , current:%" PRId64, - pPObj, pSql, tsMaxNumOfOrderedResults, num); - tscAbortFurtherRetryRetrieval(trsupport, tres, TSDB_CODE_TSC_SORTED_RES_TOO_MANY); - return; + pParentSql, pSql, tsMaxNumOfOrderedResults, num); + return tscAbortFurtherRetryRetrieval(trsupport, tres, TSDB_CODE_TSC_SORTED_RES_TOO_MANY); } #ifdef _DEBUG_VIEW @@ -1722,11 +1762,11 @@ static void tscRetrieveFromDnodeCallBack(void *param, TAOS_RES *tres, int numOfR tColModelDisplayEx(pDesc->pColumnModel, pRes->data, pRes->numOfRows, pRes->numOfRows, colInfo); #endif - if (tsTotalTmpDirGB != 0 && tsAvailTmpDirGB < tsMinimalTmpDirGB) { - tscError("%p sub:%p client disk space remain %.3f GB, need at least %.3f GB, stop query", pPObj, pSql, - tsAvailTmpDirGB, tsMinimalTmpDirGB); - tscAbortFurtherRetryRetrieval(trsupport, tres, TSDB_CODE_TSC_NO_DISKSPACE); - return; + // no disk space for tmp directory + if (tsTotalTmpDirGB != 0 && tsAvailTmpDirectorySpace < tsReservedTmpDirectorySpace) { + tscError("%p sub:%p client disk space remain %.3f GB, need at least %.3f GB, stop query", pParentSql, pSql, + tsAvailTmpDirectorySpace, tsReservedTmpDirectorySpace); + return tscAbortFurtherRetryRetrieval(trsupport, tres, TSDB_CODE_TSC_NO_DISKSPACE); } int32_t ret = saveToBuffer(trsupport->pExtMemBuffer[idx], pDesc, trsupport->localBuffer, pRes->data, @@ -1771,80 +1811,56 @@ static SSqlObj *tscCreateSqlObjForSubquery(SSqlObj *pSql, SRetrieveSupport *trsu void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) { SRetrieveSupport *trsupport = (SRetrieveSupport *) param; - SSqlObj* pParentSql = trsupport->pParentSqlObj; + SSqlObj* pParentSql = trsupport->pParentSql; SSqlObj* pSql = (SSqlObj *) tres; SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0); assert(pSql->cmd.numOfClause == 1 && pQueryInfo->numOfTables == 1); STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0); - SCMVgroupInfo* pVgroup = &pTableMetaInfo->vgroupList->vgroups[0]; - - SSubqueryState* pState = trsupport->pState; - assert(pState->numOfRemain <= pState->numOfTotal && pState->numOfRemain >= 0 && pParentSql->numOfSubs == pState->numOfTotal); + SCMVgroupInfo* pVgroup = &pTableMetaInfo->vgroupList->vgroups[trsupport->subqueryIndex]; - // todo set error code + // stable query killed or other subquery failed, all query stopped if (pParentSql->res.code != TSDB_CODE_SUCCESS) { - - // stable query is killed, abort further retry trsupport->numOfRetry = MAX_NUM_OF_SUBQUERY_RETRY; - - if (pParentSql->res.code != TSDB_CODE_SUCCESS) { - code = pParentSql->res.code; - } - - tscDebug("%p query cancelled or failed, sub:%p, orderOfSub:%d abort, code:%s", pParentSql, pSql, - trsupport->subqueryIndex, tstrerror(code)); + tscTrace("%p query cancelled or failed, sub:%p, vgId:%d, orderOfSub:%d, code:%s, global code:%s", + pParentSql, pSql, pVgroup->vgId, trsupport->subqueryIndex, tstrerror(code), tstrerror(pParentSql->res.code)); + + tscHandleSubqueryError(param, tres, code); + return; } /* - * if a query on a vnode is failed, all retrieve operations from vnode that occurs later + * if a subquery on a vnode failed, all retrieve operations from vnode that occurs later * than this one are actually not necessary, we simply call the tscRetrieveFromDnodeCallBack * function to abort current and remain retrieve process. * * NOTE: thread safe is required. */ - if (code != TSDB_CODE_SUCCESS) { - if (trsupport->numOfRetry++ >= MAX_NUM_OF_SUBQUERY_RETRY) { - tscDebug("%p sub:%p reach the max retry times, set global code:%s", pParentSql, pSql, tstrerror(code)); - atomic_val_compare_exchange_32(&pParentSql->res.code, TSDB_CODE_SUCCESS, code); - } else { // does not reach the maximum retry time, go on - tscDebug("%p sub:%p failed code:%s, retry:%d", pParentSql, pSql, tstrerror(code), trsupport->numOfRetry); - - SSqlObj *pNew = tscCreateSqlObjForSubquery(pParentSql, trsupport, pSql); - - if (pNew == NULL) { - tscError("%p sub:%p failed to create new subquery due to out of memory, abort retry, vgId:%d, orderOfSub:%d", - trsupport->pParentSqlObj, pSql, pVgroup->vgId, trsupport->subqueryIndex); - - pParentSql->res.code = TSDB_CODE_TSC_OUT_OF_MEMORY; - trsupport->numOfRetry = MAX_NUM_OF_SUBQUERY_RETRY; - } else { - SQueryInfo *pNewQueryInfo = tscGetQueryInfoDetail(&pNew->cmd, 0); - assert(pNewQueryInfo->pTableMetaInfo[0]->pTableMeta != NULL); + if (taos_errno(pSql) != TSDB_CODE_SUCCESS) { + assert(code == taos_errno(pSql)); - taos_free_result(pSql); - tscProcessSql(pNew); + if (trsupport->numOfRetry++ < MAX_NUM_OF_SUBQUERY_RETRY) { + tscTrace("%p sub:%p failed code:%s, retry:%d", pParentSql, pSql, tstrerror(code), trsupport->numOfRetry); + if (tscReissueSubquery(trsupport, pSql, code) == TSDB_CODE_SUCCESS) { return; } - } - } - - if (pParentSql->res.code != TSDB_CODE_SUCCESS) { // at least one peer subquery failed, abort current query - tscDebug("%p sub:%p query failed,ip:%s,vgId:%d,orderOfSub:%d,global code:%d", pParentSql, pSql, - pVgroup->ipAddr[0].fqdn, pVgroup->vgId, trsupport->subqueryIndex, pParentSql->res.code); - - tscHandleSubqueryError(param, tres, pParentSql->res.code); - } else { // success, proceed to retrieve data from dnode - tscDebug("%p sub:%p query complete, ip:%s, vgId:%d, orderOfSub:%d, retrieve data", trsupport->pParentSqlObj, pSql, - pVgroup->ipAddr[0].fqdn, pVgroup->vgId, trsupport->subqueryIndex); - - if (pSql->res.qhandle == 0) { // qhandle is NULL, code is TSDB_CODE_SUCCESS means no results generated from this vnode - tscRetrieveFromDnodeCallBack(param, pSql, 0); } else { - taos_fetch_rows_a(tres, tscRetrieveFromDnodeCallBack, param); + tscTrace("%p sub:%p reach the max retry times, set global code:%s", pParentSql, pSql, tstrerror(code)); + atomic_val_compare_exchange_32(&pParentSql->res.code, TSDB_CODE_SUCCESS, code); // set global code and abort } - + + tscHandleSubqueryError(param, tres, pParentSql->res.code); + return; + } + + tscTrace("%p sub:%p query complete, ip:%s, vgId:%d, orderOfSub:%d, retrieve data", trsupport->pParentSql, pSql, + pVgroup->ipAddr[0].fqdn, pVgroup->vgId, trsupport->subqueryIndex); + + if (pSql->res.qhandle == 0) { // qhandle is NULL, code is TSDB_CODE_SUCCESS means no results generated from this vnode + tscRetrieveFromDnodeCallBack(param, pSql, 0); + } else { + taos_fetch_rows_a(tres, tscRetrieveFromDnodeCallBack, param); } } @@ -1876,13 +1892,36 @@ static void multiVnodeInsertFinalize(void* param, TAOS_RES* tres, int numOfRows) // release data block data tfree(pState); -// pParentCmd->pDataBlocks = tscDestroyBlockArrayList(pParentCmd->pDataBlocks); - + // restore user defined fp pParentObj->fp = pParentObj->fetchFp; - + + // todo remove this parameter in async callback function definition. // all data has been sent to vnode, call user function - (*pParentObj->fp)(pParentObj->param, pParentObj, numOfRows); + int32_t v = (pParentObj->res.code != TSDB_CODE_SUCCESS)? pParentObj->res.code:pParentObj->res.numOfRows; + (*pParentObj->fp)(pParentObj->param, pParentObj, v); +} + +/** + * it is a subquery, so after parse the sql string, copy the submit block to payload of itself + * @param pSql + * @return + */ +int32_t tscHandleInsertRetry(SSqlObj* pSql) { + assert(pSql != NULL && pSql->param != NULL); + SSqlCmd* pCmd = &pSql->cmd; + SSqlRes* pRes = &pSql->res; + + SInsertSupporter* pSupporter = (SInsertSupporter*) pSql->param; + assert(pSupporter->index < pSupporter->pState->numOfTotal); + + STableDataBlocks* pTableDataBlock = taosArrayGetP(pCmd->pDataBlocks, pSupporter->index); + pRes->code = tscCopyDataBlockToPayload(pSql, pTableDataBlock); + if (pRes->code != TSDB_CODE_SUCCESS) { + return pRes->code; + } + + return tscProcessSql(pSql); } int32_t tscHandleMultivnodeInsert(SSqlObj *pSql) { @@ -1906,10 +1945,11 @@ int32_t tscHandleMultivnodeInsert(SSqlObj *pSql) { while(numOfSub < pSql->numOfSubs) { SInsertSupporter* pSupporter = calloc(1, sizeof(SInsertSupporter)); - pSupporter->pSql = pSql; + pSupporter->pSql = pSql; pSupporter->pState = pState; - - SSqlObj *pNew = createSimpleSubObj(pSql, multiVnodeInsertFinalize, pSupporter, TSDB_SQL_INSERT);//createSubqueryObj(pSql, 0, multiVnodeInsertFinalize, pSupporter1, TSDB_SQL_INSERT, NULL); + pSupporter->index = numOfSub; + + SSqlObj *pNew = createSimpleSubObj(pSql, multiVnodeInsertFinalize, pSupporter, TSDB_SQL_INSERT); if (pNew == NULL) { tscError("%p failed to malloc buffer for subObj, orderOfSub:%d, reason:%s", pSql, numOfSub, strerror(errno)); goto _error; @@ -1940,6 +1980,8 @@ int32_t tscHandleMultivnodeInsert(SSqlObj *pSql) { return pRes->code; // free all allocated resource } + pCmd->pDataBlocks = tscDestroyBlockArrayList(pCmd->pDataBlocks); + // use the local variable for (int32_t j = 0; j < numOfSub; ++j) { SSqlObj *pSub = pSql->pSubs[j]; @@ -1947,7 +1989,6 @@ int32_t tscHandleMultivnodeInsert(SSqlObj *pSql) { tscProcessSql(pSub); } - pCmd->pDataBlocks = tscDestroyBlockArrayList(pCmd->pDataBlocks); return TSDB_CODE_SUCCESS; _error: diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index d15a0d7fcc39703cff2c6e5d74f377c33e63b6b4..259bcd4cbd2e63f52c33f072908ef647731d750e 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -562,10 +562,8 @@ int32_t tscGetDataBlockFromList(void* pHashList, SArray* pDataBlockList, int64_t return TSDB_CODE_SUCCESS; } -static int trimDataBlock(void* pDataBlock, STableDataBlocks* pTableDataBlock) { +static int trimDataBlock(void* pDataBlock, STableDataBlocks* pTableDataBlock, bool includeSchema) { // TODO: optimize this function, handle the case while binary is not presented - int len = 0; - STableMeta* pTableMeta = pTableDataBlock->pTableMeta; STableComInfo tinfo = tscGetTableInfo(pTableMeta); SSchema* pSchema = tscGetTableSchema(pTableMeta); @@ -575,16 +573,37 @@ static int trimDataBlock(void* pDataBlock, STableDataBlocks* pTableDataBlock) { pDataBlock += sizeof(SSubmitBlk); int32_t flen = 0; // original total length of row - for (int32_t i = 0; i < tinfo.numOfColumns; ++i) { - flen += TYPE_BYTES[pSchema[i].type]; + + // schema needs to be included into the submit data block + if (includeSchema) { + int32_t numOfCols = tscGetNumOfColumns(pTableDataBlock->pTableMeta); + for(int32_t j = 0; j < numOfCols; ++j) { + STColumn* pCol = (STColumn*) pDataBlock; + pCol->colId = pSchema[j].colId; + pCol->type = pSchema[j].type; + pCol->bytes = pSchema[j].bytes; + pCol->offset = 0; + + pDataBlock += sizeof(STColumn); + flen += TYPE_BYTES[pSchema[j].type]; + } + + int32_t schemaSize = sizeof(STColumn) * numOfCols; + pBlock->schemaLen = schemaSize; + } else { + for (int32_t j = 0; j < tinfo.numOfColumns; ++j) { + flen += TYPE_BYTES[pSchema[j].type]; + } + + pBlock->schemaLen = 0; } char* p = pTableDataBlock->pData + sizeof(SSubmitBlk); - pBlock->len = 0; + pBlock->dataLen = 0; int32_t numOfRows = htons(pBlock->numOfRows); for (int32_t i = 0; i < numOfRows; ++i) { - SDataRow trow = (SDataRow)pDataBlock; + SDataRow trow = (SDataRow) pDataBlock; dataRowSetLen(trow, TD_DATA_ROW_HEAD_SIZE + flen); dataRowSetVersion(trow, pTableMeta->sversion); @@ -595,20 +614,21 @@ static int trimDataBlock(void* pDataBlock, STableDataBlocks* pTableDataBlock) { p += pSchema[j].bytes; } - // p += pTableDataBlock->rowSize; pDataBlock += dataRowLen(trow); - pBlock->len += dataRowLen(trow); + pBlock->dataLen += dataRowLen(trow); } - len = pBlock->len; - pBlock->len = htonl(pBlock->len); + int32_t len = pBlock->dataLen + pBlock->schemaLen; + pBlock->dataLen = htonl(pBlock->dataLen); + pBlock->schemaLen = htonl(pBlock->schemaLen); + return len; } int32_t tscMergeTableDataBlocks(SSqlObj* pSql, SArray* pTableDataBlockList) { SSqlCmd* pCmd = &pSql->cmd; - // the expanded size when a row data is converted to SDataRow format + // the maximum expanded size in byte when a row-wise data is converted to SDataRow format const int32_t MAX_EXPAND_SIZE = TD_DATA_ROW_HEAD_SIZE + TYPE_BYTES[TSDB_DATA_TYPE_BINARY]; void* pVnodeDataBlockHashList = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false); @@ -617,7 +637,6 @@ int32_t tscMergeTableDataBlocks(SSqlObj* pSql, SArray* pTableDataBlockList) { size_t total = taosArrayGetSize(pTableDataBlockList); for (int32_t i = 0; i < total; ++i) { STableDataBlocks* pOneTableBlock = taosArrayGetP(pTableDataBlockList, i); - STableDataBlocks* dataBuf = NULL; int32_t ret = @@ -666,16 +685,17 @@ int32_t tscMergeTableDataBlocks(SSqlObj* pSql, SArray* pTableDataBlockList) { pBlocks->uid = htobe64(pBlocks->uid); pBlocks->sversion = htonl(pBlocks->sversion); pBlocks->numOfRows = htons(pBlocks->numOfRows); + pBlocks->schemaLen = 0; // erase the empty space reserved for binary data - int32_t finalLen = trimDataBlock(dataBuf->pData + dataBuf->size, pOneTableBlock); + int32_t finalLen = trimDataBlock(dataBuf->pData + dataBuf->size, pOneTableBlock, pCmd->submitSchema); assert(finalLen <= len); dataBuf->size += (finalLen + sizeof(SSubmitBlk)); assert(dataBuf->size <= dataBuf->nAllocSize); // the length does not include the SSubmitBlk structure - pBlocks->len = htonl(finalLen); + pBlocks->dataLen = htonl(finalLen); dataBuf->numOfTables += 1; } diff --git a/src/common/inc/tglobal.h b/src/common/inc/tglobal.h index 371df5c3356a2fd27c9885b9406daa11b1324a9d..af1d7dd44187449af02c75dc1b80e6d7e2657ac3 100644 --- a/src/common/inc/tglobal.h +++ b/src/common/inc/tglobal.h @@ -128,10 +128,10 @@ extern float tsTotalLogDirGB; extern float tsTotalTmpDirGB; extern float tsTotalDataDirGB; extern float tsAvailLogDirGB; -extern float tsAvailTmpDirGB; +extern float tsAvailTmpDirectorySpace; extern float tsAvailDataDirGB; extern float tsMinimalLogDirGB; -extern float tsMinimalTmpDirGB; +extern float tsReservedTmpDirectorySpace; extern float tsMinimalDataDirGB; extern int32_t tsTotalMemoryMB; extern int32_t tsVersion; diff --git a/src/common/src/tglobal.c b/src/common/src/tglobal.c index f70bcb936accca6c05dba6de91f18834d73120a9..0fb63c04767a61c3b66df0a2e27342d81077590e 100644 --- a/src/common/src/tglobal.c +++ b/src/common/src/tglobal.c @@ -170,9 +170,9 @@ int64_t tsStreamMax; int32_t tsNumOfCores = 1; float tsTotalTmpDirGB = 0; float tsTotalDataDirGB = 0; -float tsAvailTmpDirGB = 0; +float tsAvailTmpDirectorySpace = 0; float tsAvailDataDirGB = 0; -float tsMinimalTmpDirGB = 0.1; +float tsReservedTmpDirectorySpace = 0.1; float tsMinimalDataDirGB = 0.5; int32_t tsTotalMemoryMB = 0; int32_t tsVersion = 0; @@ -807,7 +807,7 @@ static void doInitGlobalConfig() { taosInitConfigOption(cfg); cfg.option = "minimalTmpDirGB"; - cfg.ptr = &tsMinimalTmpDirGB; + cfg.ptr = &tsReservedTmpDirectorySpace; cfg.valType = TAOS_CFG_VTYPE_FLOAT; cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW; cfg.minValue = 0.001; diff --git a/src/cq/src/cqMain.c b/src/cq/src/cqMain.c index 769dfea774b86c9cd420edcd1b15d1020da3a67f..098d69fcb206acbd5ec88d66cc1c3b9c347d02ba 100644 --- a/src/cq/src/cqMain.c +++ b/src/cq/src/cqMain.c @@ -283,7 +283,8 @@ static void cqProcessStreamRes(void *param, TAOS_RES *tres, TAOS_ROW row) { } tdAppendColVal(trow, val, c->type, c->bytes, c->offset); } - pBlk->len = htonl(dataRowLen(trow)); + pBlk->dataLen = htonl(dataRowLen(trow)); + pBlk->schemaLen = 0; pBlk->uid = htobe64(pObj->uid); pBlk->tid = htonl(pObj->tid); diff --git a/src/inc/taosmsg.h b/src/inc/taosmsg.h index c8bd2c60769ca80eec1d6ac1f2fd390719e367bf..13fa799b3fedaba096467757746f59b9a15a8ec6 100644 --- a/src/inc/taosmsg.h +++ b/src/inc/taosmsg.h @@ -192,7 +192,8 @@ typedef struct SSubmitBlk { int32_t tid; // table id int32_t padding; // TODO just for padding here int32_t sversion; // data schema version - int32_t len; // data part length, not including the SSubmitBlk head + int32_t dataLen; // data part length, not including the SSubmitBlk head + int32_t schemaLen; // schema length, if length is 0, no schema exists int16_t numOfRows; // total number of rows in current submit block char data[]; } SSubmitBlk; diff --git a/src/mnode/src/mnodeTable.c b/src/mnode/src/mnodeTable.c index 1f1035887c8ca02b43364e2c7b5dc4e493d2cdd1..e76e23c47e4bfcdbda2da36f2b66d550dd4f101a 100644 --- a/src/mnode/src/mnodeTable.c +++ b/src/mnode/src/mnodeTable.c @@ -457,10 +457,9 @@ static int32_t mnodeSuperTableActionUpdate(SSdbOper *pOper) { free(pNew); free(oldTableId); free(oldSchema); - - mnodeDecTableRef(pTable); } + mnodeDecTableRef(pTable); return TSDB_CODE_SUCCESS; } diff --git a/src/os/linux/src/linuxSysPara.c b/src/os/linux/src/linuxSysPara.c index e4239c11e586a03b23f0130f92b4d07306e49feb..c2134765dfefba5cde330cb6a835e0c7c5261028 100644 --- a/src/os/linux/src/linuxSysPara.c +++ b/src/os/linux/src/linuxSysPara.c @@ -326,12 +326,12 @@ bool taosGetDisk() { if (statvfs("/tmp", &info)) { //tsTotalTmpDirGB = 0; - //tsAvailTmpDirGB = 0; + //tsAvailTmpDirectorySpace = 0; uError("failed to get disk size, tmpDir:/tmp errno:%s", strerror(errno)); return false; } else { tsTotalTmpDirGB = (float)((double)info.f_blocks * (double)info.f_frsize / unit); - tsAvailTmpDirGB = (float)((double)info.f_bavail * (double)info.f_frsize / unit); + tsAvailTmpDirectorySpace = (float)((double)info.f_bavail * (double)info.f_frsize / unit); } return true; diff --git a/src/query/inc/qExecutor.h b/src/query/inc/qExecutor.h index ee61f4f702fd1cf9860588cc8284abb26a857251..a83072384595d96fc60cc57ff35b1502c4b83b49 100644 --- a/src/query/inc/qExecutor.h +++ b/src/query/inc/qExecutor.h @@ -161,12 +161,12 @@ typedef struct SQuery { } SQuery; typedef struct SQueryRuntimeEnv { - SResultInfo* resultInfo; // todo refactor to merge with SWindowResInfo + SResultInfo* resultInfo; // todo refactor to merge with SWindowResInfo SQuery* pQuery; SQLFunctionCtx* pCtx; int16_t numOfRowsPerPage; int16_t offset[TSDB_MAX_COLUMNS]; - uint16_t scanFlag; // denotes reversed scan of data or not + uint16_t scanFlag; // denotes reversed scan of data or not SFillInfo* pFillInfo; SWindowResInfo windowResInfo; STSBuf* pTSBuf; @@ -176,7 +176,8 @@ typedef struct SQueryRuntimeEnv { void* pQueryHandle; void* pSecQueryHandle; // another thread for SDiskbasedResultBuf* pResultBuf; // query result buffer based on blocked-wised disk file - bool topBotQuery; // false; + bool topBotQuery; // false + int32_t prevGroupId; // previous executed group id } SQueryRuntimeEnv; typedef struct SQInfo { diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index 94fefa16f32994f19ed8902e78e5c63c59912edb..fdb7e890d97e99afc387b567cfb873f16855b44f 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -3305,13 +3305,16 @@ void destroyTableQueryInfo(STableQueryInfo *pTableQueryInfo, int32_t numOfCols) * @param pRuntimeEnv * @param pDataBlockInfo */ -void setExecutionContext(SQInfo *pQInfo, void* pTable, int32_t groupIndex, TSKEY nextKey) { +void setExecutionContext(SQInfo *pQInfo, int32_t groupIndex, TSKEY nextKey) { SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv; - STableQueryInfo *pTableQueryInfo = pRuntimeEnv->pQuery->current; - - SWindowResInfo * pWindowResInfo = &pRuntimeEnv->windowResInfo; - int32_t GROUPRESULTID = 1; + STableQueryInfo *pTableQueryInfo = pRuntimeEnv->pQuery->current; + SWindowResInfo *pWindowResInfo = &pRuntimeEnv->windowResInfo; + + if (pRuntimeEnv->prevGroupId != INT32_MIN && pRuntimeEnv->prevGroupId == groupIndex) { + return; + } + int32_t GROUPRESULTID = 1; SWindowResult *pWindowRes = doSetTimeWindowFromKey(pRuntimeEnv, pWindowResInfo, (char *)&groupIndex, sizeof(groupIndex)); if (pWindowRes == NULL) { return; @@ -3328,6 +3331,8 @@ void setExecutionContext(SQInfo *pQInfo, void* pTable, int32_t groupIndex, TSKEY } } + // record the current active group id + pRuntimeEnv->prevGroupId = groupIndex; setWindowResOutputBuf(pRuntimeEnv, pWindowRes); initCtxOutputBuf(pRuntimeEnv); @@ -4072,6 +4077,7 @@ int32_t doInitQInfo(SQInfo *pQInfo, void *param, void *tsdb, int32_t vgId, bool pRuntimeEnv->pTSBuf = param; pRuntimeEnv->cur.vgroupIndex = -1; pRuntimeEnv->stableQuery = isSTableQuery; + pRuntimeEnv->prevGroupId = INT32_MIN; if (param != NULL) { int16_t order = (pQuery->order.order == pRuntimeEnv->pTSBuf->tsOrder) ? TSDB_ORDER_ASC : TSDB_ORDER_DESC; @@ -4176,8 +4182,7 @@ static int64_t scanMultiTableDataBlocks(SQInfo *pQInfo) { if (!isGroupbyNormalCol(pQuery->pGroupbyExpr)) { if (!isIntervalQuery(pQuery)) { int32_t step = QUERY_IS_ASC_QUERY(pQuery)? 1:-1; - setExecutionContext(pQInfo, (*pTableQueryInfo)->pTable, (*pTableQueryInfo)->groupIndex, - blockInfo.window.ekey + step); + setExecutionContext(pQInfo, (*pTableQueryInfo)->groupIndex, blockInfo.window.ekey + step); } else { // interval query TSKEY nextKey = blockInfo.window.skey; setIntervalQueryRange(pQInfo, nextKey); @@ -4553,7 +4558,8 @@ static void doSaveContext(SQInfo *pQInfo) { if (pRuntimeEnv->pSecQueryHandle != NULL) { tsdbCleanupQueryHandle(pRuntimeEnv->pSecQueryHandle); } - + + pRuntimeEnv->prevGroupId = INT32_MIN; pRuntimeEnv->pSecQueryHandle = tsdbQueryTables(pQInfo->tsdb, &cond, &pQInfo->tableGroupInfo, pQInfo); setQueryStatus(pQuery, QUERY_NOT_COMPLETED); @@ -6009,7 +6015,7 @@ void qDestroyQueryInfo(qinfo_t qHandle, void (*fp)(void*), void* param) { return; } - int16_t ref = T_REF_DEC(pQInfo); + int32_t ref = T_REF_DEC(pQInfo); qDebug("QInfo:%p dec refCount, value:%d", pQInfo, ref); if (ref == 0) { diff --git a/src/query/tests/astTest.cpp b/src/query/tests/astTest.cpp index d767e7ad7b427dd99c841c622a26ccda1a8141e9..15eb780021646627b889c2c6bf3a5d9b0053de43 100644 --- a/src/query/tests/astTest.cpp +++ b/src/query/tests/astTest.cpp @@ -631,5 +631,5 @@ void exprSerializeTest2() { } } // namespace TEST(testCase, astTest) { - exprSerializeTest2(); +// exprSerializeTest2(); } \ No newline at end of file diff --git a/src/tsdb/src/tsdbMain.c b/src/tsdb/src/tsdbMain.c index 6c6b3130737f2efeac3996b15aacb37133927176..e650cef45cf14a351ff7986985df43b4d1112b71 100644 --- a/src/tsdb/src/tsdbMain.c +++ b/src/tsdb/src/tsdbMain.c @@ -768,7 +768,8 @@ static SSubmitBlk *tsdbGetSubmitMsgNext(SSubmitMsgIter *pIter) { SSubmitBlk *pBlock = pIter->pBlock; if (pBlock == NULL) return NULL; - pBlock->len = htonl(pBlock->len); + pBlock->dataLen = htonl(pBlock->dataLen); + pBlock->schemaLen = htonl(pBlock->schemaLen); pBlock->numOfRows = htons(pBlock->numOfRows); pBlock->uid = htobe64(pBlock->uid); pBlock->tid = htonl(pBlock->tid); @@ -776,11 +777,11 @@ static SSubmitBlk *tsdbGetSubmitMsgNext(SSubmitMsgIter *pIter) { pBlock->sversion = htonl(pBlock->sversion); pBlock->padding = htonl(pBlock->padding); - pIter->len = pIter->len + sizeof(SSubmitBlk) + pBlock->len; + pIter->len = pIter->len + sizeof(SSubmitBlk) + pBlock->dataLen; if (pIter->len >= pIter->totalLen) { pIter->pBlock = NULL; } else { - pIter->pBlock = (SSubmitBlk *)((char *)pBlock + pBlock->len + sizeof(SSubmitBlk)); + pIter->pBlock = (SSubmitBlk *)((char *)pBlock + pBlock->dataLen + sizeof(SSubmitBlk)); } return pBlock; @@ -832,10 +833,10 @@ _err: } static int tsdbInitSubmitBlkIter(SSubmitBlk *pBlock, SSubmitBlkIter *pIter) { - if (pBlock->len <= 0) return -1; - pIter->totalLen = pBlock->len; + if (pBlock->dataLen <= 0) return -1; + pIter->totalLen = pBlock->dataLen; pIter->len = 0; - pIter->row = (SDataRow)(pBlock->data); + pIter->row = (SDataRow)(pBlock->data+pBlock->schemaLen); return 0; } diff --git a/src/tsdb/src/tsdbMeta.c b/src/tsdb/src/tsdbMeta.c index 3b25d688965bfb40221039c1516fdecb1e14ca10..13b676ee1dc27be6a8f6c2f76fc5b8cd942d6089 100644 --- a/src/tsdb/src/tsdbMeta.c +++ b/src/tsdb/src/tsdbMeta.c @@ -550,13 +550,13 @@ int tsdbUnlockRepoMeta(STsdbRepo *pRepo) { } void tsdbRefTable(STable *pTable) { - int16_t ref = T_REF_INC(pTable); + int32_t ref = T_REF_INC(pTable); UNUSED(ref); // tsdbDebug("ref table %"PRIu64", tid:%d, refCount:%d", TABLE_UID(pTable), TABLE_TID(pTable), ref); } void tsdbUnRefTable(STable *pTable) { - int16_t ref = T_REF_DEC(pTable); + int32_t ref = T_REF_DEC(pTable); tsdbDebug("unref table uid:%"PRIu64", tid:%d, refCount:%d", TABLE_UID(pTable), TABLE_TID(pTable), ref); if (ref == 0) { @@ -1252,4 +1252,4 @@ static int tsdbRmTableFromMeta(STsdbRepo *pRepo, STable *pTable) { } return 0; -} \ No newline at end of file +} diff --git a/src/util/inc/tref.h b/src/util/inc/tref.h index fa2517e19a16942fd874cab1fff00262c1b90722..0503325326ab90837de9c1990de44db45548b946 100644 --- a/src/util/inc/tref.h +++ b/src/util/inc/tref.h @@ -22,7 +22,7 @@ typedef void (*_ref_fn_t)(const void* pObj); #define T_REF_DECLARE() \ struct { \ - int16_t val; \ + int32_t val; \ } _ref; #define T_REF_REGISTER_FUNC(s, e) \ @@ -31,7 +31,7 @@ typedef void (*_ref_fn_t)(const void* pObj); _ref_fn_t end; \ } _ref_func = {.begin = (s), .end = (e)}; -#define T_REF_INC(x) (atomic_add_fetch_16(&((x)->_ref.val), 1)) +#define T_REF_INC(x) (atomic_add_fetch_32(&((x)->_ref.val), 1)) #define T_REF_INC_WITH_CB(x, p) \ do { \ @@ -41,11 +41,11 @@ typedef void (*_ref_fn_t)(const void* pObj); } \ } while (0) -#define T_REF_DEC(x) (atomic_sub_fetch_16(&((x)->_ref.val), 1)) +#define T_REF_DEC(x) (atomic_sub_fetch_32(&((x)->_ref.val), 1)) #define T_REF_DEC_WITH_CB(x, p) \ do { \ - int32_t v = atomic_sub_fetch_16(&((x)->_ref.val), 1); \ + int32_t v = atomic_sub_fetch_32(&((x)->_ref.val), 1); \ if (v == 0 && (p)->_ref_func.end != NULL) { \ (p)->_ref_func.end((x)); \ } \ diff --git a/src/util/src/tcache.c b/src/util/src/tcache.c index 2dd641731c7f2cff5cf49c6bf4d7ce1b5865dd6d..2e57ad83aee0a59246bbe6ec25012daecbe916d4 100644 --- a/src/util/src/tcache.c +++ b/src/util/src/tcache.c @@ -415,7 +415,7 @@ void taosCacheRelease(SCacheObj *pCacheObj, void **data, bool _remove) { } *data = NULL; - int16_t ref = T_REF_DEC(pNode); + int32_t ref = T_REF_DEC(pNode); uDebug("%p data released, refcnt:%d", pNode, ref); if (_remove) {