From 46e58d52b152e35b1e0361e9f480ac9880195442 Mon Sep 17 00:00:00 2001 From: hjxilinx Date: Sat, 21 Mar 2020 00:49:26 +0800 Subject: [PATCH] [TD-32] refactor codes. --- .../inc/{tscJoinProcess.h => tscSubquery.h} | 6 + src/client/inc/tscUtil.h | 4 +- src/client/inc/tsclient.h | 4 +- src/client/src/TSDBJNIConnector.c | 6 +- src/client/src/tscAsync.c | 42 - src/client/src/tscFunctionImpl.c | 2 +- src/client/src/tscJoinProcess.c | 812 --------- src/client/src/tscParseInsert.c | 3 +- src/client/src/tscServer.c | 729 +------- src/client/src/tscSql.c | 8 +- src/client/src/tscSubquery.c | 1572 +++++++++++++++++ src/client/src/tscUtil.c | 95 +- 12 files changed, 1616 insertions(+), 1667 deletions(-) rename src/client/inc/{tscJoinProcess.h => tscSubquery.h} (89%) delete mode 100644 src/client/src/tscJoinProcess.c create mode 100644 src/client/src/tscSubquery.c diff --git a/src/client/inc/tscJoinProcess.h b/src/client/inc/tscSubquery.h similarity index 89% rename from src/client/inc/tscJoinProcess.h rename to src/client/inc/tscSubquery.h index bc1101df27..f8a6fbf5b1 100644 --- a/src/client/inc/tscJoinProcess.h +++ b/src/client/inc/tscSubquery.h @@ -32,6 +32,12 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code); SJoinSubquerySupporter* tscCreateJoinSupporter(SSqlObj* pSql, SSubqueryState* pState, int32_t index); void tscDestroyJoinSupporter(SJoinSubquerySupporter* pSupporter); +int32_t tscHandleMasterJoinQuery(SSqlObj* pSql); + +int32_t tscHandleMasterSTableQuery(SSqlObj *pSql); + +int32_t tscHandleMultivnodeInsert(SSqlObj *pSql); + #ifdef __cplusplus } #endif diff --git a/src/client/inc/tscUtil.h b/src/client/inc/tscUtil.h index 71e5c8ffe2..7ec3bc679e 100644 --- a/src/client/inc/tscUtil.h +++ b/src/client/inc/tscUtil.h @@ -104,7 +104,7 @@ bool tscIsProjectionQueryOnSTable(SQueryInfo* pQueryInfo, int32_t tableIndex); bool tscProjectionQueryOnTable(SQueryInfo* pQueryInfo); -bool tscIsTwoStageMergeMetricQuery(SQueryInfo* pQueryInfo, int32_t tableIndex); +bool tscIsTwoStageSTableQuery(SQueryInfo* pQueryInfo, int32_t tableIndex); bool tscQueryOnMetric(SSqlCmd* pCmd); bool tscQueryMetricTags(SQueryInfo* pQueryInfo); bool tscIsSelectivityWithTagQuery(SSqlCmd* pCmd); @@ -252,8 +252,6 @@ void tscTryQueryNextVnode(SSqlObj *pSql, __async_cb_func_t fp); void tscAsyncQuerySingleRowForNextVnode(void *param, TAOS_RES *tres, int numOfRows); void tscTryQueryNextClause(SSqlObj* pSql, void (*queryFp)()); -int32_t launchMultivnodeInsert(SSqlObj *pSql); - #ifdef __cplusplus } #endif diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index 039736634d..fe9b97bf49 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -259,7 +259,7 @@ typedef struct { union { bool existsCheck; // check if the table exists or not bool inStream; // denote if current sql is executed in stream or not - bool createOnDemand; // if the table is missing, on-the-fly create it. during getmeterMeta + bool autoCreated; // if the table is missing, on-the-fly create it. during getmeterMeta int8_t dataSourceType; // load data from file or not }; @@ -404,8 +404,6 @@ extern int (*tscBuildMsg[TSDB_SQL_MAX])(SSqlObj *pSql, SSqlInfo *pInfo); void tscProcessMsgFromServer(SRpcMsg *rpcMsg); int tscProcessSql(SSqlObj *pSql); -void tscAsyncInsertMultiVnodesProxy(void *param, TAOS_RES *tres, int numOfRows); - int tscRenewMeterMeta(SSqlObj *pSql, char *tableId); void tscQueueAsyncRes(SSqlObj *pSql); diff --git a/src/client/src/TSDBJNIConnector.c b/src/client/src/TSDBJNIConnector.c index 228403c79d..dfaedc7c14 100644 --- a/src/client/src/TSDBJNIConnector.c +++ b/src/client/src/TSDBJNIConnector.c @@ -13,13 +13,13 @@ * along with this program. If not, see . */ -#include "os.h" #include "com_taosdata_jdbc_TSDBJNIConnector.h" +#include "os.h" #include "taos.h" #include "tlog.h" -#include "tscJoinProcess.h" -#include "tsclient.h" +#include "tscSubquery.h" #include "tscUtil.h" +#include "tsclient.h" #include "ttime.h" int __init = 0; diff --git a/src/client/src/tscAsync.c b/src/client/src/tscAsync.c index d24cff5618..47d0a31a35 100644 --- a/src/client/src/tscAsync.c +++ b/src/client/src/tscAsync.c @@ -397,48 +397,6 @@ void tscQueueAsyncFreeResult(SSqlObj *pSql) { taosScheduleTask(tscQhandle, &schedMsg); } -void tscAsyncInsertMultiVnodesProxy(void *param, TAOS_RES *tres, int numOfRows) { - SSqlObj *pSql = (SSqlObj *)param; - SSqlCmd *pCmd = &pSql->cmd; - int32_t code = TSDB_CODE_SUCCESS; - - assert(pCmd->dataSourceType != 0 && pSql->signature == pSql); - - int32_t index = 0; - SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, index); - - STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); - assert(pQueryInfo->numOfTables == 1 || pQueryInfo->numOfTables == 2); - - SDataBlockList *pDataBlocks = pCmd->pDataBlocks; - if (pDataBlocks == NULL || pTableMetaInfo->vnodeIndex >= pDataBlocks->nSize) { - // restore user defined fp - pSql->fp = pSql->fetchFp; - tscTrace("%p Async insertion completed, destroy data block list", pSql); - - // release data block data - pCmd->pDataBlocks = tscDestroyBlockArrayList(pCmd->pDataBlocks); - - // all data has been sent to vnode, call user function - (*pSql->fp)(pSql->param, tres, numOfRows); - } else { - do { - code = tscCopyDataBlockToPayload(pSql, pDataBlocks->pData[pTableMetaInfo->vnodeIndex++]); - if (code != TSDB_CODE_SUCCESS) { - tscTrace("%p prepare submit data block failed in async insertion, vnodeIdx:%d, total:%d, code:%d", - pSql, pTableMetaInfo->vnodeIndex - 1, pDataBlocks->nSize, code); - } - - } while (code != TSDB_CODE_SUCCESS && pTableMetaInfo->vnodeIndex < pDataBlocks->nSize); - - // build submit msg may fail - if (code == TSDB_CODE_SUCCESS) { - tscTrace("%p async insertion, vnodeIdx:%d, total:%d", pSql, pTableMetaInfo->vnodeIndex - 1, pDataBlocks->nSize); - tscProcessSql(pSql); - } - } -} - int tscSendMsgToServer(SSqlObj *pSql); void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) { diff --git a/src/client/src/tscFunctionImpl.c b/src/client/src/tscFunctionImpl.c index 196e49b6a0..689f9715d0 100644 --- a/src/client/src/tscFunctionImpl.c +++ b/src/client/src/tscFunctionImpl.c @@ -24,7 +24,7 @@ #include "taosdef.h" #include "taosmsg.h" #include "tlog.h" -#include "tscJoinProcess.h" +#include "tscSubquery.h" #include "tscompression.h" #include "tsqlfunction.h" #include "ttime.h" diff --git a/src/client/src/tscJoinProcess.c b/src/client/src/tscJoinProcess.c deleted file mode 100644 index 6bb7d95942..0000000000 --- a/src/client/src/tscJoinProcess.c +++ /dev/null @@ -1,812 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#include "os.h" -#include "tscJoinProcess.h" -#include "tsclient.h" -#include "qtsbuf.h" - -static void freeSubqueryObj(SSqlObj* pSql); - -static bool doCompare(int32_t order, int64_t left, int64_t right) { - if (order == TSQL_SO_ASC) { - return left < right; - } else { - return left > right; - } -} - -static int64_t doTSBlockIntersect(SSqlObj* pSql, SJoinSubquerySupporter* pSupporter1, - SJoinSubquerySupporter* pSupporter2, TSKEY* st, TSKEY* et) { - STSBuf* output1 = tsBufCreate(true); - STSBuf* output2 = tsBufCreate(true); - - *st = INT64_MAX; - *et = INT64_MIN; - - SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, pSql->cmd.clauseIndex); - - SLimitVal* pLimit = &pQueryInfo->limit; - int32_t order = pQueryInfo->order.order; - - SQueryInfo* pSubQueryInfo1 = tscGetQueryInfoDetail(&pSql->pSubs[0]->cmd, 0); - SQueryInfo* pSubQueryInfo2 = tscGetQueryInfoDetail(&pSql->pSubs[1]->cmd, 0); - - pSubQueryInfo1->tsBuf = output1; - pSubQueryInfo2->tsBuf = output2; - - tsBufResetPos(pSupporter1->pTSBuf); - tsBufResetPos(pSupporter2->pTSBuf); - - // TODO add more details information - if (!tsBufNextPos(pSupporter1->pTSBuf)) { - tsBufFlush(output1); - tsBufFlush(output2); - - tscTrace("%p input1 is empty, 0 for secondary query after ts blocks intersecting", pSql); - return 0; - } - - if (!tsBufNextPos(pSupporter2->pTSBuf)) { - tsBufFlush(output1); - tsBufFlush(output2); - - tscTrace("%p input2 is empty, 0 for secondary query after ts blocks intersecting", pSql); - return 0; - } - - int64_t numOfInput1 = 1; - int64_t numOfInput2 = 1; - - while (1) { - STSElem elem1 = tsBufGetElem(pSupporter1->pTSBuf); - STSElem elem2 = tsBufGetElem(pSupporter2->pTSBuf); - -#ifdef _DEBUG_VIEW - // for debug purpose - tscPrint("%" PRId64 ", tags:%d \t %" PRId64 ", tags:%d", elem1.ts, elem1.tag, elem2.ts, elem2.tag); -#endif - - if (elem1.tag < elem2.tag || (elem1.tag == elem2.tag && doCompare(order, elem1.ts, elem2.ts))) { - if (!tsBufNextPos(pSupporter1->pTSBuf)) { - break; - } - - numOfInput1++; - } else if (elem1.tag > elem2.tag || (elem1.tag == elem2.tag && doCompare(order, elem2.ts, elem1.ts))) { - if (!tsBufNextPos(pSupporter2->pTSBuf)) { - break; - } - - numOfInput2++; - } else { - /* - * in case of stable query, limit/offset is not applied here. the limit/offset is applied to the - * final results which is acquired after the secondry merge of in the client. - */ - if (pLimit->offset == 0 || pQueryInfo->intervalTime > 0 || QUERY_IS_STABLE_QUERY(pQueryInfo->type)) { - if (*st > elem1.ts) { - *st = elem1.ts; - } - - if (*et < elem1.ts) { - *et = elem1.ts; - } - - tsBufAppend(output1, elem1.vnode, elem1.tag, (const char*)&elem1.ts, sizeof(elem1.ts)); - tsBufAppend(output2, elem2.vnode, elem2.tag, (const char*)&elem2.ts, sizeof(elem2.ts)); - } else { - pLimit->offset -= 1; - } - - if (!tsBufNextPos(pSupporter1->pTSBuf)) { - break; - } - - numOfInput1++; - - if (!tsBufNextPos(pSupporter2->pTSBuf)) { - break; - } - - numOfInput2++; - } - } - - /* - * failed to set the correct ts order yet in two cases: - * 1. only one element - * 2. only one element for each tag. - */ - if (output1->tsOrder == -1) { - output1->tsOrder = TSQL_SO_ASC; - output2->tsOrder = TSQL_SO_ASC; - } - - tsBufFlush(output1); - tsBufFlush(output2); - - tsBufDestory(pSupporter1->pTSBuf); - tsBufDestory(pSupporter2->pTSBuf); - - tscTrace("%p input1:%" PRId64 ", input2:%" PRId64 ", final:%" PRId64 " for secondary query after ts blocks " - "intersecting, skey:%" PRId64 ", ekey:%" PRId64, pSql, - numOfInput1, numOfInput2, output1->numOfTotal, *st, *et); - - return output1->numOfTotal; -} - -// todo handle failed to create sub query -SJoinSubquerySupporter* tscCreateJoinSupporter(SSqlObj* pSql, SSubqueryState* pState, int32_t index) { - SJoinSubquerySupporter* pSupporter = calloc(1, sizeof(SJoinSubquerySupporter)); - if (pSupporter == NULL) { - return NULL; - } - - pSupporter->pObj = pSql; - pSupporter->pState = pState; - - pSupporter->subqueryIndex = index; - SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, pSql->cmd.clauseIndex); - - pSupporter->interval = pQueryInfo->intervalTime; - pSupporter->limit = pQueryInfo->limit; - - STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, pSql->cmd.clauseIndex, index); - pSupporter->uid = pTableMetaInfo->pTableMeta->uid; - - assert (pSupporter->uid != 0); - - getTmpfilePath("join-", pSupporter->path); - pSupporter->f = fopen(pSupporter->path, "w"); - - if (pSupporter->f == NULL) { - tscError("%p failed to create tmp file:%s, reason:%s", pSql, pSupporter->path, strerror(errno)); - } - - return pSupporter; -} - -void tscDestroyJoinSupporter(SJoinSubquerySupporter* pSupporter) { - if (pSupporter == NULL) { - return; - } - - tscSqlExprInfoDestroy(&pSupporter->exprsInfo); - tscColumnBaseInfoDestroy(&pSupporter->colList); - - tscClearFieldInfo(&pSupporter->fieldsInfo); - - if (pSupporter->f != NULL) { - fclose(pSupporter->f); - unlink(pSupporter->path); - } - - tscTagCondRelease(&pSupporter->tagCond); - free(pSupporter); -} - -/* - * need the secondary query process - * In case of count(ts)/count(*)/spread(ts) query, that are only applied to - * primary timestamp column , the secondary query is not necessary - * - */ -bool needSecondaryQuery(SQueryInfo* pQueryInfo) { - for (int32_t i = 0; i < pQueryInfo->colList.numOfCols; ++i) { - SColumnBase* pBase = tscColumnBaseInfoGet(&pQueryInfo->colList, i); - if (pBase->colIndex.columnIndex != PRIMARYKEY_TIMESTAMP_COL_INDEX) { - return true; - } - } - - return false; -} - -/* - * launch secondary stage query to fetch the result that contains timestamp in set - */ -int32_t tscLaunchSecondPhaseSubqueries(SSqlObj* pSql) { - int32_t numOfSub = 0; - SJoinSubquerySupporter* pSupporter = NULL; - - /* - * If the columns are not involved in the final select clause, the secondary query will not be launched - * for the subquery. - */ - SSubqueryState* pState = NULL; - - for (int32_t i = 0; i < pSql->numOfSubs; ++i) { - pSupporter = pSql->pSubs[i]->param; - if (pSupporter->exprsInfo.numOfExprs > 0) { - ++numOfSub; - } - } - - assert(numOfSub > 0); - - // scan all subquery, if one sub query has only ts, ignore it - tscTrace("%p start to launch secondary subqueries, total:%d, only:%d needs to query, others are not retrieve in " - "select clause", pSql, pSql->numOfSubs, numOfSub); - - /* - * the subqueries that do not actually launch the secondary query to virtual node is set as completed. - */ - pState = pSupporter->pState; - pState->numOfTotal = pSql->numOfSubs; - pState->numOfCompleted = (pSql->numOfSubs - numOfSub); - - bool success = true; - - for (int32_t i = 0; i < pSql->numOfSubs; ++i) { - SSqlObj *pPrevSub = pSql->pSubs[i]; - pSql->pSubs[i] = NULL; - - pSupporter = pPrevSub->param; - - if (pSupporter->exprsInfo.numOfExprs == 0) { - tscTrace("%p subIndex: %d, not need to launch query, ignore it", pSql, i); - - tscDestroyJoinSupporter(pSupporter); - tscFreeSqlObj(pPrevSub); - - pSql->pSubs[i] = NULL; - continue; - } - - SQueryInfo *pSubQueryInfo = tscGetQueryInfoDetail(&pPrevSub->cmd, 0); - STSBuf *pTSBuf = pSubQueryInfo->tsBuf; - pSubQueryInfo->tsBuf = NULL; - - // free result for async object will also free sqlObj - assert(pSubQueryInfo->exprsInfo.numOfExprs == 1); // ts_comp query only requires one resutl columns - taos_free_result(pPrevSub); - - SSqlObj *pNew = createSubqueryObj(pSql, (int16_t) i, tscJoinQueryCallback, pSupporter, NULL); - if (pNew == NULL) { - tscDestroyJoinSupporter(pSupporter); - success = false; - break; - } - - tscClearSubqueryInfo(&pNew->cmd); - pSql->pSubs[i] = pNew; - - SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pNew->cmd, 0); - pQueryInfo->tsBuf = pTSBuf; // transfer the ownership of timestamp comp-z data to the new created object - - // set the second stage sub query for join process - pQueryInfo->type |= TSDB_QUERY_TYPE_JOIN_SEC_STAGE; - - pQueryInfo->intervalTime = pSupporter->interval; - pQueryInfo->groupbyExpr = pSupporter->groupbyExpr; - - tscColumnBaseInfoCopy(&pQueryInfo->colList, &pSupporter->colList, 0); - tscTagCondCopy(&pQueryInfo->tagCond, &pSupporter->tagCond); - - tscSqlExprCopy(&pQueryInfo->exprsInfo, &pSupporter->exprsInfo, pSupporter->uid, false); - tscFieldInfoCopyAll(&pQueryInfo->fieldsInfo, &pSupporter->fieldsInfo); - - pSupporter->exprsInfo.numOfExprs = 0; - pSupporter->fieldsInfo.numOfOutputCols = 0; - - /* - * if the first column of the secondary query is not ts function, add this function. - * Because this column is required to filter with timestamp after intersecting. - */ - if (pSupporter->exprsInfo.pExprs[0]->functionId != TSDB_FUNC_TS) { - tscAddTimestampColumn(pQueryInfo, TSDB_FUNC_TS, 0); - } - - SQueryInfo *pNewQueryInfo = tscGetQueryInfoDetail(&pNew->cmd, 0); - assert(pNew->numOfSubs == 0 && pNew->cmd.numOfClause == 1 && pNewQueryInfo->numOfTables == 1); - - tscFieldInfoCalOffset(pNewQueryInfo); - - STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pNewQueryInfo, 0); - - /* - * When handling the projection query, the offset value will be modified for table-table join, which is changed - * during the timestamp intersection. - */ - pSupporter->limit = pQueryInfo->limit; - pNewQueryInfo->limit = pSupporter->limit; - - // fetch the join tag column - if (UTIL_TABLE_IS_SUPERTABLE(pTableMetaInfo)) { - SSqlExpr *pExpr = tscSqlExprGet(pNewQueryInfo, 0); - assert(pQueryInfo->tagCond.joinInfo.hasJoin); - - int16_t tagColIndex = tscGetJoinTagColIndexByUid(&pQueryInfo->tagCond, pTableMetaInfo->pTableMeta->uid); - pExpr->param[0].i64Key = tagColIndex; - pExpr->numOfParams = 1; - } - - tscPrintSelectClause(pNew, 0); - - tscTrace("%p subquery:%p tableIndex:%d, vnodeIdx:%d, type:%d, exprInfo:%d, colList:%d, fieldsInfo:%d, name:%s", - pSql, pNew, 0, pTableMetaInfo->vnodeIndex, pNewQueryInfo->type, - pNewQueryInfo->exprsInfo.numOfExprs, pNewQueryInfo->colList.numOfCols, - pNewQueryInfo->fieldsInfo.numOfOutputCols, pNewQueryInfo->pTableMetaInfo[0]->name); - } - - //prepare the subqueries object failed, abort - if (!success) { - pSql->res.code = TSDB_CODE_CLI_OUT_OF_MEMORY; - tscError("%p failed to prepare subqueries objs for secondary phase query, numOfSub:%d, code:%d", pSql, - pSql->numOfSubs, pSql->res.code); - freeSubqueryObj(pSql); - - return pSql->res.code; - } - - for(int32_t i = 0; i < pSql->numOfSubs; ++i) { - SSqlObj* pSub = pSql->pSubs[i]; - if (pSub == NULL) { - continue; - } - - tscProcessSql(pSub); - } - - return TSDB_CODE_SUCCESS; -} - -void freeSubqueryObj(SSqlObj* pSql) { - SSubqueryState* pState = NULL; - - for (int32_t i = 0; i < pSql->numOfSubs; ++i) { - if (pSql->pSubs[i] != NULL) { - SJoinSubquerySupporter* p = pSql->pSubs[i]->param; - pState = p->pState; - - tscDestroyJoinSupporter(p); - - if (pSql->pSubs[i]->res.code == TSDB_CODE_SUCCESS) { - taos_free_result(pSql->pSubs[i]); - } - } - } - - tfree(pState); - pSql->numOfSubs = 0; -} - -static void doQuitSubquery(SSqlObj* pParentSql) { - freeSubqueryObj(pParentSql); - - tsem_wait(&pParentSql->emptyRspSem); - tsem_wait(&pParentSql->emptyRspSem); - - tsem_post(&pParentSql->rspSem); -} - -static void quitAllSubquery(SSqlObj* pSqlObj, SJoinSubquerySupporter* pSupporter) { - int32_t numOfTotal = pSupporter->pState->numOfTotal; - int32_t finished = atomic_add_fetch_32(&pSupporter->pState->numOfCompleted, 1); - - if (finished >= numOfTotal) { - pSqlObj->res.code = abs(pSupporter->pState->code); - tscError("%p all subquery return and query failed, global code:%d", pSqlObj, pSqlObj->res.code); - - doQuitSubquery(pSqlObj); - } -} - -// update the query time range according to the join results on timestamp -static void updateQueryTimeRange(SQueryInfo* pQueryInfo, int64_t st, int64_t et) { - assert(pQueryInfo->stime <= st && pQueryInfo->etime >= et); - - pQueryInfo->stime = st; - pQueryInfo->etime = et; -} - -static void joinRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) { - SJoinSubquerySupporter* pSupporter = (SJoinSubquerySupporter*)param; - SSqlObj* pParentSql = pSupporter->pObj; - - SSqlObj* pSql = (SSqlObj*)tres; - SSqlCmd* pCmd = &pSql->cmd; - SSqlRes* pRes = &pSql->res; - - SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); - - if ((pQueryInfo->type & TSDB_QUERY_TYPE_JOIN_SEC_STAGE) == 0) { - if (pSupporter->pState->code != TSDB_CODE_SUCCESS) { - tscError("%p abort query due to other subquery failure. code:%d, global code:%d", pSql, numOfRows, - pSupporter->pState->code); - - quitAllSubquery(pParentSql, pSupporter); - return; - } - - if (numOfRows > 0) { // write the data into disk - fwrite(pSql->res.data, pSql->res.numOfRows, 1, pSupporter->f); - fclose(pSupporter->f); - - STSBuf* pBuf = tsBufCreateFromFile(pSupporter->path, true); - if (pBuf == NULL) { - tscError("%p invalid ts comp file from vnode, abort sub query, file size:%d", pSql, numOfRows); - - pSupporter->pState->code = TSDB_CODE_APP_ERROR; // todo set the informative code - quitAllSubquery(pParentSql, pSupporter); - return; - } - - if (pSupporter->pTSBuf == NULL) { - tscTrace("%p create tmp file for ts block:%s", pSql, pBuf->path); - pSupporter->pTSBuf = pBuf; - } else { - assert(pQueryInfo->numOfTables == 1); // for subquery, only one metermetaInfo - STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); - - tsBufMerge(pSupporter->pTSBuf, pBuf, pTableMetaInfo->vnodeIndex); - tsBufDestory(pBuf); - } - - // open new file to save the result - getTmpfilePath("ts-join", pSupporter->path); - pSupporter->f = fopen(pSupporter->path, "w"); - pSql->res.row = pSql->res.numOfRows; - - taos_fetch_rows_a(tres, joinRetrieveCallback, param); - } else if (numOfRows == 0) { // no data from this vnode anymore - SQueryInfo* pParentQueryInfo = tscGetQueryInfoDetail(&pParentSql->cmd, pParentSql->cmd.clauseIndex); - - //todo refactor - if (tscNonOrderedProjectionQueryOnSTable(pParentQueryInfo, 0)) { - STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); - assert(pQueryInfo->numOfTables == 1); - - // for projection query, need to try next vnode - int32_t totalVnode = pTableMetaInfo->pMetricMeta->numOfVnodes; - if ((++pTableMetaInfo->vnodeIndex) < totalVnode) { - tscTrace("%p current vnode:%d exhausted, try next:%d. total vnode:%d. current numOfRes:%d", pSql, - pTableMetaInfo->vnodeIndex - 1, pTableMetaInfo->vnodeIndex, totalVnode, pRes->numOfTotal); - - pSql->cmd.command = TSDB_SQL_SELECT; - pSql->fp = tscJoinQueryCallback; - tscProcessSql(pSql); - - return; - } - } - - int32_t numOfTotal = pSupporter->pState->numOfTotal; - int32_t finished = atomic_add_fetch_32(&pSupporter->pState->numOfCompleted, 1); - - if (finished >= numOfTotal) { - assert(finished == numOfTotal); - - if (pSupporter->pState->code != TSDB_CODE_SUCCESS) { - tscTrace("%p sub:%p, numOfSub:%d, quit from further procedure due to other queries failure", pParentSql, tres, - pSupporter->subqueryIndex); - doQuitSubquery(pParentSql); - return; - } - - tscTrace("%p all subqueries retrieve ts complete, do ts block intersect", pParentSql); - - SJoinSubquerySupporter* p1 = pParentSql->pSubs[0]->param; - SJoinSubquerySupporter* p2 = pParentSql->pSubs[1]->param; - - TSKEY st, et; - - int64_t num = doTSBlockIntersect(pParentSql, p1, p2, &st, &et); - if (num <= 0) { // no result during ts intersect - tscTrace("%p free all sub SqlObj and quit", pParentSql); - doQuitSubquery(pParentSql); - } else { - updateQueryTimeRange(pParentQueryInfo, st, et); - tscLaunchSecondPhaseSubqueries(pParentSql); - } - } - } else { // failure of sub query - tscError("%p sub query failed, code:%d, index:%d", pSql, numOfRows, pSupporter->subqueryIndex); - pSupporter->pState->code = numOfRows; - - quitAllSubquery(pParentSql, pSupporter); - return; - } - - } else { // secondary stage retrieve, driven by taos_fetch_row or other functions - if (numOfRows < 0) { - pSupporter->pState->code = numOfRows; - tscError("%p retrieve failed, code:%d, index:%d", pSql, numOfRows, pSupporter->subqueryIndex); - } - - if (numOfRows >= 0) { - pSql->res.numOfTotal += pSql->res.numOfRows; - } - - if (tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0) && numOfRows == 0) { - STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); - assert(pQueryInfo->numOfTables == 1); - - // for projection query, need to try next vnode if current vnode is exhausted - if ((++pTableMetaInfo->vnodeIndex) < pTableMetaInfo->pMetricMeta->numOfVnodes) { - pSupporter->pState->numOfCompleted = 0; - pSupporter->pState->numOfTotal = 1; - - pSql->cmd.command = TSDB_SQL_SELECT; - pSql->fp = tscJoinQueryCallback; - tscProcessSql(pSql); - - return; - } - } - - int32_t numOfTotal = pSupporter->pState->numOfTotal; - int32_t finished = atomic_add_fetch_32(&pSupporter->pState->numOfCompleted, 1); - - if (finished >= numOfTotal) { - assert(finished == numOfTotal); - tscTrace("%p all %d secondary subquery retrieves completed, global code:%d", tres, numOfTotal, - pParentSql->res.code); - - if (pSupporter->pState->code != TSDB_CODE_SUCCESS) { - pParentSql->res.code = abs(pSupporter->pState->code); - freeSubqueryObj(pParentSql); - } - - tsem_post(&pParentSql->rspSem); - } else { - tscTrace("%p sub:%p completed, completed:%d, total:%d", pParentSql, tres, finished, numOfTotal); - } - } -} - -static SJoinSubquerySupporter* tscUpdateSubqueryStatus(SSqlObj* pSql, int32_t numOfFetch) { - int32_t notInvolved = 0; - SJoinSubquerySupporter* pSupporter = NULL; - SSubqueryState* pState = NULL; - - for(int32_t i = 0; i < pSql->numOfSubs; ++i) { - if (pSql->pSubs[i] == NULL) { - notInvolved++; - } else { - pSupporter = (SJoinSubquerySupporter*)pSql->pSubs[i]->param; - pState = pSupporter->pState; - } - } - - pState->numOfTotal = pSql->numOfSubs; - pState->numOfCompleted = pSql->numOfSubs - numOfFetch; - - return pSupporter; -} - -void tscFetchDatablockFromSubquery(SSqlObj* pSql) { - int32_t numOfFetch = 0; - assert(pSql->numOfSubs >= 1); - - for (int32_t i = 0; i < pSql->numOfSubs; ++i) { - if (pSql->pSubs[i] == NULL) { // this subquery does not need to involve in secondary query - continue; - } - - SSqlRes *pRes = &pSql->pSubs[i]->res; - SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->pSubs[i]->cmd, 0); - - STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); - - if (tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0)) { - if (pRes->row >= pRes->numOfRows && pTableMetaInfo->vnodeIndex < pTableMetaInfo->pMetricMeta->numOfVnodes && - (!tscHasReachLimitation(pQueryInfo, pRes))) { - numOfFetch++; - } - } else { - if (pRes->row >= pRes->numOfRows && (!tscHasReachLimitation(pQueryInfo, pRes))) { - numOfFetch++; - } - } - } - - if (numOfFetch <= 0) { - return; - } - - // TODO multi-vnode retrieve for projection query with limitation has bugs, since the global limiation is not handled - tscTrace("%p retrieve data from %d subqueries", pSql, numOfFetch); - - SJoinSubquerySupporter* pSupporter = tscUpdateSubqueryStatus(pSql, numOfFetch); - - for (int32_t i = 0; i < pSql->numOfSubs; ++i) { - SSqlObj* pSql1 = pSql->pSubs[i]; - if (pSql1 == NULL) { - continue; - } - - SSqlRes* pRes1 = &pSql1->res; - SSqlCmd* pCmd1 = &pSql1->cmd; - - pSupporter = (SJoinSubquerySupporter*)pSql1->param; - - // wait for all subqueries completed - SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd1, 0); - assert(pRes1->numOfRows >= 0 && pQueryInfo->numOfTables == 1); - - STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); - - if (pRes1->row >= pRes1->numOfRows) { - tscTrace("%p subquery:%p retrieve data from vnode, subquery:%d, vnodeIndex:%d", pSql, pSql1, - pSupporter->subqueryIndex, pTableMetaInfo->vnodeIndex); - - tscResetForNextRetrieve(pRes1); - pSql1->fp = joinRetrieveCallback; - - if (pCmd1->command < TSDB_SQL_LOCAL) { - pCmd1->command = (pCmd1->command > TSDB_SQL_MGMT) ? TSDB_SQL_RETRIEVE : TSDB_SQL_FETCH; - } - - tscProcessSql(pSql1); - } - } - - // wait for all subquery completed - tsem_wait(&pSql->rspSem); - - // update the records for each subquery - for(int32_t i = 0; i < pSql->numOfSubs; ++i) { - if (pSql->pSubs[i] == NULL) { - continue; - } - - SSqlRes* pRes1 = &pSql->pSubs[i]->res; - pRes1->numOfTotalInCurrentClause += pRes1->numOfRows; - } -} - -// all subqueries return, set the result output index -void tscSetupOutputColumnIndex(SSqlObj* pSql) { - SSqlCmd* pCmd = &pSql->cmd; - SSqlRes* pRes = &pSql->res; - - tscTrace("%p all subquery response, retrieve data", pSql); - - if (pRes->pColumnIndex != NULL) { - return; // the column transfer support struct has been built - } - - SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); - pRes->pColumnIndex = calloc(1, sizeof(SColumnIndex) * pQueryInfo->fieldsInfo.numOfOutputCols); - - for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutputCols; ++i) { - SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, i); - - int32_t tableIndexOfSub = -1; - for (int32_t j = 0; j < pQueryInfo->numOfTables; ++j) { - STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, j); - if (pTableMetaInfo->pTableMeta->uid == pExpr->uid) { - tableIndexOfSub = j; - break; - } - } - - assert(tableIndexOfSub >= 0 && tableIndexOfSub < pQueryInfo->numOfTables); - - SSqlCmd* pSubCmd = &pSql->pSubs[tableIndexOfSub]->cmd; - SQueryInfo* pSubQueryInfo = tscGetQueryInfoDetail(pSubCmd, 0); - - for (int32_t k = 0; k < pSubQueryInfo->exprsInfo.numOfExprs; ++k) { - SSqlExpr* pSubExpr = tscSqlExprGet(pSubQueryInfo, k); - if (pExpr->functionId == pSubExpr->functionId && pExpr->colInfo.colId == pSubExpr->colInfo.colId) { - pRes->pColumnIndex[i] = (SColumnIndex){.tableIndex = tableIndexOfSub, .columnIndex = k}; - break; - } - } - } -} - -void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) { - SSqlObj* pSql = (SSqlObj*)tres; - // STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0); - - // int32_t idx = pSql->cmd.vnodeIdx; - - // SVnodeSidList *vnodeInfo = NULL; - // if (pTableMetaInfo->pMetricMeta != NULL) { - // vnodeInfo = tscGetVnodeSidList(pTableMetaInfo->pMetricMeta, idx - 1); - // } - - SJoinSubquerySupporter* pSupporter = (SJoinSubquerySupporter*)param; - - // if (atomic_add_fetch_32(pSupporter->numOfComplete, 1) >= - // pSupporter->numOfTotal) { - // SSqlObj *pParentObj = pSupporter->pObj; - // - // if ((pSql->cmd.type & TSDB_QUERY_TYPE_JOIN_SEC_STAGE) != 1) { - // int32_t num = 0; - // tscFetchDatablockFromSubquery(pParentObj); - // TSKEY* ts = tscGetQualifiedTSList(pParentObj, &num); - // - // if (num <= 0) { - // // no qualified result - // } - // - // tscLaunchSecondPhaseSubqueries(pSql, ts, num); - // } else { - - // } - // } else { - SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0); - if ((pQueryInfo->type & TSDB_QUERY_TYPE_JOIN_SEC_STAGE) != TSDB_QUERY_TYPE_JOIN_SEC_STAGE) { - if (code != TSDB_CODE_SUCCESS) { // direct call joinRetrieveCallback and set the error code - joinRetrieveCallback(param, pSql, code); - } else { // first stage query, continue to retrieve data - pSql->fp = joinRetrieveCallback; - pSql->cmd.command = TSDB_SQL_FETCH; - tscProcessSql(pSql); - } - - } else { // second stage join subquery - SSqlObj* pParentSql = pSupporter->pObj; - - if (pSupporter->pState->code != TSDB_CODE_SUCCESS) { - tscError("%p abort query due to other subquery failure. code:%d, global code:%d", pSql, code, - pSupporter->pState->code); - quitAllSubquery(pParentSql, pSupporter); - - return; - } - - if (code != TSDB_CODE_SUCCESS) { - tscError("%p sub query failed, code:%d, set global code:%d, index:%d", pSql, code, code, - pSupporter->subqueryIndex); - pSupporter->pState->code = code; // todo set the informative code - - quitAllSubquery(pParentSql, pSupporter); - } else { - int32_t numOfTotal = pSupporter->pState->numOfTotal; - int32_t finished = atomic_add_fetch_32(&pSupporter->pState->numOfCompleted, 1); - - if (finished >= numOfTotal) { - assert(finished == numOfTotal); - - tscSetupOutputColumnIndex(pParentSql); - STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); - - /** - * if the query is a continue query (vnodeIndex > 0 for projection query) for next vnode, do the retrieval of - * data instead of returning to its invoker - */ - if (pTableMetaInfo->vnodeIndex > 0 && tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0)) { - assert(pTableMetaInfo->vnodeIndex < pTableMetaInfo->pMetricMeta->numOfVnodes); - pSupporter->pState->numOfCompleted = 0; // reset the record value - - pSql->fp = joinRetrieveCallback; // continue retrieve data - pSql->cmd.command = TSDB_SQL_FETCH; - tscProcessSql(pSql); - } else { // first retrieve from vnode during the secondary stage sub-query - if (pParentSql->fp == NULL) { - tsem_wait(&pParentSql->emptyRspSem); - tsem_wait(&pParentSql->emptyRspSem); - - tsem_post(&pParentSql->rspSem); - } else { - // set the command flag must be after the semaphore been correctly set. - // pPObj->cmd.command = TSDB_SQL_RETRIEVE_METRIC; - // if (pPObj->res.code == TSDB_CODE_SUCCESS) { - // (*pPObj->fp)(pPObj->param, pPObj, 0); - // } else { - // tscQueueAsyncRes(pPObj); - // } - assert(0); - } - } - } - } - } -} \ No newline at end of file diff --git a/src/client/src/tscParseInsert.c b/src/client/src/tscParseInsert.c index f2e40dd9fb..b96beb35fd 100644 --- a/src/client/src/tscParseInsert.c +++ b/src/client/src/tscParseInsert.c @@ -28,6 +28,7 @@ #include "taosdef.h" #include "tlog.h" +#include "tscSubquery.h" #include "tstoken.h" #include "ttime.h" @@ -1324,7 +1325,7 @@ int tsParseSql(SSqlObj *pSql, bool multiVnodeInsertion) { pSql->fetchFp = pSql->fp; // replace user defined callback function with multi-insert proxy function - pSql->fp = (void(*)())launchMultivnodeInsert; + pSql->fp = (void(*)())tscHandleMultivnodeInsert; } ret = tsParseInsertSql(pSql); diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index 23e3ad38bd..13a827697d 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -16,9 +16,9 @@ #include "os.h" #include "tcache.h" #include "trpc.h" -#include "tscJoinProcess.h" #include "tscProfile.h" #include "tscSecondaryMerge.h" +#include "tscSubquery.h" #include "tscUtil.h" #include "tschemautil.h" #include "tsclient.h" @@ -362,143 +362,28 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg) { rpcFreeCont(rpcMsg->pCont); } -static SSqlObj *tscCreateSqlObjForSubquery(SSqlObj *pSql, SRetrieveSupport *trsupport, SSqlObj *prevSqlObj); -static int tscLaunchSTableSubqueries(SSqlObj *pSql); - -// todo merge with callback -int32_t tscLaunchJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSubquerySupporter *pSupporter) { - SSqlCmd * pCmd = &pSql->cmd; - SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); - - pSql->res.qhandle = 0x1; - pSql->res.numOfRows = 0; - - if (pSql->pSubs == NULL) { - pSql->pSubs = calloc(pSupporter->pState->numOfTotal, POINTER_BYTES); - if (pSql->pSubs == NULL) { - return TSDB_CODE_CLI_OUT_OF_MEMORY; - } - } - - SSqlObj *pNew = createSubqueryObj(pSql, tableIndex, tscJoinQueryCallback, pSupporter, NULL); - if (pNew == NULL) { - return TSDB_CODE_CLI_OUT_OF_MEMORY; - } - - pSql->pSubs[pSql->numOfSubs++] = pNew; - assert(pSql->numOfSubs <= pSupporter->pState->numOfTotal); - - if (QUERY_IS_JOIN_QUERY(pQueryInfo->type)) { - addGroupInfoForSubquery(pSql, pNew, 0, tableIndex); - - // refactor as one method - SQueryInfo *pNewQueryInfo = tscGetQueryInfoDetail(&pNew->cmd, 0); - assert(pNewQueryInfo != NULL); - - tscColumnBaseInfoUpdateTableIndex(&pNewQueryInfo->colList, 0); - tscColumnBaseInfoCopy(&pSupporter->colList, &pNewQueryInfo->colList, 0); - - tscSqlExprCopy(&pSupporter->exprsInfo, &pNewQueryInfo->exprsInfo, pSupporter->uid, false); - tscFieldInfoCopyAll(&pSupporter->fieldsInfo, &pNewQueryInfo->fieldsInfo); - - tscTagCondCopy(&pSupporter->tagCond, &pNewQueryInfo->tagCond); - - pNew->cmd.numOfCols = 0; - pNewQueryInfo->intervalTime = 0; - memset(&pNewQueryInfo->limit, 0, sizeof(SLimitVal)); - - // backup the data and clear it in the sqlcmd object - pSupporter->groupbyExpr = pNewQueryInfo->groupbyExpr; - memset(&pNewQueryInfo->groupbyExpr, 0, sizeof(SSqlGroupbyExpr)); - - // this data needs to be transfer to support struct - pNewQueryInfo->fieldsInfo.numOfOutputCols = 0; - pNewQueryInfo->exprsInfo.numOfExprs = 0; - - // set the ts,tags that involved in join, as the output column of intermediate result - tscClearSubqueryInfo(&pNew->cmd); - - SSchema colSchema = {.type = TSDB_DATA_TYPE_BINARY, .bytes = 1}; - SColumnIndex index = {0, PRIMARYKEY_TIMESTAMP_COL_INDEX}; - - tscAddSpecialColumnForSelect(pNewQueryInfo, 0, TSDB_FUNC_TS_COMP, &index, &colSchema, TSDB_COL_NORMAL); - - // set the tags value for ts_comp function - SSqlExpr *pExpr = tscSqlExprGet(pNewQueryInfo, 0); - - STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pNewQueryInfo, 0); - int16_t tagColIndex = tscGetJoinTagColIndexByUid(&pSupporter->tagCond, pTableMetaInfo->pTableMeta->uid); - - pExpr->param->i64Key = tagColIndex; - pExpr->numOfParams = 1; - - // add the filter tag column - for (int32_t i = 0; i < pSupporter->colList.numOfCols; ++i) { - SColumnBase *pColBase = &pSupporter->colList.pColList[i]; - if (pColBase->numOfFilters > 0) { // copy to the pNew->cmd.colList if it is filtered. - tscColumnBaseCopy(&pNewQueryInfo->colList.pColList[pNewQueryInfo->colList.numOfCols], pColBase); - pNewQueryInfo->colList.numOfCols++; - } - } - - tscTrace("%p subquery:%p tableIndex:%d, vnodeIdx:%d, type:%d, transfer to ts_comp query to retrieve timestamps, " - "exprInfo:%d, colList:%d, fieldsInfo:%d, name:%s", - pSql, pNew, tableIndex, pTableMetaInfo->vnodeIndex, pNewQueryInfo->type, - pNewQueryInfo->exprsInfo.numOfExprs, pNewQueryInfo->colList.numOfCols, - pNewQueryInfo->fieldsInfo.numOfOutputCols, pNewQueryInfo->pTableMetaInfo[0]->name); - tscPrintSelectClause(pNew, 0); - - tscTrace("%p subquery:%p tableIndex:%d, vnodeIdx:%d, type:%d, transfer to ts_comp query to retrieve timestamps, " - "exprInfo:%d, colList:%d, fieldsInfo:%d, name:%s", - pSql, pNew, tableIndex, pTableMetaInfo->vnodeIndex, pNewQueryInfo->type, - pNewQueryInfo->exprsInfo.numOfExprs, pNewQueryInfo->colList.numOfCols, - pNewQueryInfo->fieldsInfo.numOfOutputCols, pNewQueryInfo->pTableMetaInfo[0]->name); - tscPrintSelectClause(pNew, 0); - } else { - SQueryInfo *pNewQueryInfo = tscGetQueryInfoDetail(&pNew->cmd, 0); - pNewQueryInfo->type |= TSDB_QUERY_TYPE_SUBQUERY; - } - -#ifdef _DEBUG_VIEW - tscPrintSelectClause(pNew, 0); -#endif - - return tscProcessSql(pNew); -} - int doProcessSql(SSqlObj *pSql) { SSqlCmd *pCmd = &pSql->cmd; SSqlRes *pRes = &pSql->res; - void *asyncFp = pSql->fp; - if (pCmd->command == TSDB_SQL_SELECT || pCmd->command == TSDB_SQL_FETCH || pCmd->command == TSDB_SQL_RETRIEVE || - pCmd->command == TSDB_SQL_INSERT || pCmd->command == TSDB_SQL_CONNECT || pCmd->command == TSDB_SQL_HB || - pCmd->command == TSDB_SQL_META || pCmd->command == TSDB_SQL_METRIC) { + if (pCmd->command == TSDB_SQL_SELECT || + pCmd->command == TSDB_SQL_FETCH || + pCmd->command == TSDB_SQL_RETRIEVE || + pCmd->command == TSDB_SQL_INSERT || + pCmd->command == TSDB_SQL_CONNECT || + pCmd->command == TSDB_SQL_HB || + pCmd->command == TSDB_SQL_META || + pCmd->command == TSDB_SQL_METRIC) { tscBuildMsg[pCmd->command](pSql, NULL); } int32_t code = tscSendMsgToServer(pSql); - - if (asyncFp) { - if (code != TSDB_CODE_SUCCESS) { - pRes->code = code; - tscQueueAsyncRes(pSql); - } - return 0; - } - if (code != TSDB_CODE_SUCCESS) { pRes->code = code; - return code; + tscQueueAsyncRes(pSql); } - - tsem_wait(&pSql->rspSem); - - if (pRes->code == TSDB_CODE_SUCCESS && tscProcessMsgRsp[pCmd->command]) (*tscProcessMsgRsp[pCmd->command])(pSql); - - tsem_post(&pSql->emptyRspSem); - - return pRes->code; + + return TSDB_CODE_SUCCESS; } int tscProcessSql(SSqlObj *pSql) { @@ -508,7 +393,7 @@ int tscProcessSql(SSqlObj *pSql) { SQueryInfo * pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); STableMetaInfo *pTableMetaInfo = NULL; - int16_t type = 0; + uint16_t type = 0; if (pQueryInfo != NULL) { pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); @@ -551,42 +436,7 @@ int tscProcessSql(SSqlObj *pSql) { // todo handle async situation if (QUERY_IS_JOIN_QUERY(type)) { if ((pQueryInfo->type & TSDB_QUERY_TYPE_SUBQUERY) == 0) { - SSubqueryState *pState = calloc(1, sizeof(SSubqueryState)); - - pState->numOfTotal = pQueryInfo->numOfTables; - - for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) { - SJoinSubquerySupporter *pSupporter = tscCreateJoinSupporter(pSql, pState, i); - - if (pSupporter == NULL) { // failed to create support struct, abort current query - tscError("%p tableIndex:%d, failed to allocate join support object, abort further query", pSql, i); - pState->numOfCompleted = pQueryInfo->numOfTables - i - 1; - pSql->res.code = TSDB_CODE_CLI_OUT_OF_MEMORY; - - return pSql->res.code; - } - - int32_t code = tscLaunchJoinSubquery(pSql, i, pSupporter); - if (code != TSDB_CODE_SUCCESS) { // failed to create subquery object, quit query - tscDestroyJoinSupporter(pSupporter); - pSql->res.code = TSDB_CODE_CLI_OUT_OF_MEMORY; - - break; - } - } - - tsem_post(&pSql->emptyRspSem); - tsem_wait(&pSql->rspSem); - - tsem_post(&pSql->emptyRspSem); - - if (pSql->numOfSubs <= 0) { - pSql->cmd.command = TSDB_SQL_RETRIEVE_EMPTY_RESULT; - } else { - pSql->cmd.command = TSDB_SQL_METRIC_JOIN_RETRIEVE; - } - - return TSDB_CODE_SUCCESS; + return tscHandleMasterJoinQuery(pSql); } else { // for first stage sub query, iterate all vnodes to get all timestamp if ((pQueryInfo->type & TSDB_QUERY_TYPE_JOIN_SEC_STAGE) != TSDB_QUERY_TYPE_JOIN_SEC_STAGE) { @@ -595,442 +445,22 @@ int tscProcessSql(SSqlObj *pSql) { } } - if (tscIsTwoStageMergeMetricQuery(pQueryInfo, 0)) { - /* - * (ref. line: 964) - * Before this function returns from tscLaunchSTableSubqueries and continues, pSql may have been released at user - * program context after retrieving all data from vnodes. User function is called at tscRetrieveFromVnodeCallBack. - * - * when pSql being released, pSql->fp == NULL, it may pass the check of pSql->fp == NULL, - * which causes deadlock. So we keep it as local variable. - */ - if (tscLaunchSTableSubqueries(pSql) != TSDB_CODE_SUCCESS) { - return pRes->code; - } - - return pSql->res.code; - } else if (pSql->fp == (void(*)())launchMultivnodeInsert) { // multi-vnodes insertion - launchMultivnodeInsert(pSql); + if (tscIsTwoStageSTableQuery(pQueryInfo, 0)) { // super table query + tscHandleMasterSTableQuery(pSql); + return pRes->code; + } else if (pSql->fp == (void(*)())tscHandleMultivnodeInsert) { // multi-vnodes insertion + tscHandleMultivnodeInsert(pSql); return pSql->res.code; } return doProcessSql(pSql); } -static void doCleanupSubqueries(SSqlObj *pSql, int32_t numOfSubs, SSubqueryState* pState) { - assert(numOfSubs <= pSql->numOfSubs && numOfSubs >= 0 && pState != NULL); - - for(int32_t i = 0; i < numOfSubs; ++i) { - SSqlObj* pSub = pSql->pSubs[i]; - assert(pSub != NULL); - - SRetrieveSupport* pSupport = pSub->param; - - tfree(pSupport->localBuffer); - - pthread_mutex_unlock(&pSupport->queryMutex); - pthread_mutex_destroy(&pSupport->queryMutex); - - tfree(pSupport); - - tscFreeSqlObj(pSub); - } - - free(pState); -} - -int tscLaunchSTableSubqueries(SSqlObj *pSql) { - SSqlRes *pRes = &pSql->res; - SSqlCmd *pCmd = &pSql->cmd; - - // pRes->code check only serves in launching metric sub-queries - if (pRes->code == TSDB_CODE_QUERY_CANCELLED) { - pCmd->command = TSDB_SQL_RETRIEVE_METRIC; // enable the abort of kill metric function. - return pRes->code; - } - - tExtMemBuffer ** pMemoryBuf = NULL; - tOrderDescriptor *pDesc = NULL; - SColumnModel * pModel = NULL; - - pRes->qhandle = 1; // hack the qhandle check - - const uint32_t nBufferSize = (1 << 16); // 64KB - - SQueryInfo * pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); - STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); - int32_t numOfSubQueries = pTableMetaInfo->pMetricMeta->numOfVnodes; - assert(numOfSubQueries > 0); - - int32_t ret = tscLocalReducerEnvCreate(pSql, &pMemoryBuf, &pDesc, &pModel, nBufferSize); - if (ret != 0) { - pRes->code = TSDB_CODE_CLI_OUT_OF_MEMORY; - if (pSql->fp) { - tscQueueAsyncRes(pSql); - } - return pRes->code; - } - - pSql->pSubs = calloc(numOfSubQueries, POINTER_BYTES); - pSql->numOfSubs = numOfSubQueries; - - tscTrace("%p retrieved query data from %d vnode(s)", pSql, numOfSubQueries); - SSubqueryState *pState = calloc(1, sizeof(SSubqueryState)); - pState->numOfTotal = numOfSubQueries; - pRes->code = TSDB_CODE_SUCCESS; - - int32_t i = 0; - for (; i < numOfSubQueries; ++i) { - SRetrieveSupport *trs = (SRetrieveSupport *)calloc(1, sizeof(SRetrieveSupport)); - if (trs == NULL) { - tscError("%p failed to malloc buffer for SRetrieveSupport, orderOfSub:%d, reason:%s", pSql, i, strerror(errno)); - break; - } - - trs->pExtMemBuffer = pMemoryBuf; - trs->pOrderDescriptor = pDesc; - trs->pState = pState; - - trs->localBuffer = (tFilePage *)calloc(1, nBufferSize + sizeof(tFilePage)); - if (trs->localBuffer == NULL) { - tscError("%p failed to malloc buffer for local buffer, orderOfSub:%d, reason:%s", pSql, i, strerror(errno)); - tfree(trs); - break; - } - - trs->subqueryIndex = i; - trs->pParentSqlObj = pSql; - trs->pFinalColModel = pModel; - - pthread_mutexattr_t mutexattr = {0}; - pthread_mutexattr_settype(&mutexattr, PTHREAD_MUTEX_RECURSIVE_NP); - pthread_mutex_init(&trs->queryMutex, &mutexattr); - pthread_mutexattr_destroy(&mutexattr); - - SSqlObj *pNew = tscCreateSqlObjForSubquery(pSql, trs, NULL); - if (pNew == NULL) { - tscError("%p failed to malloc buffer for subObj, orderOfSub:%d, reason:%s", pSql, i, strerror(errno)); - tfree(trs->localBuffer); - tfree(trs); - break; - } - - // todo handle multi-vnode situation - if (pQueryInfo->tsBuf) { - SQueryInfo *pNewQueryInfo = tscGetQueryInfoDetail(&pNew->cmd, 0); - pNewQueryInfo->tsBuf = tsBufClone(pQueryInfo->tsBuf); - } - - tscTrace("%p sub:%p create subquery success. orderOfSub:%d", pSql, pNew, trs->subqueryIndex); - } - - if (i < numOfSubQueries) { - tscError("%p failed to prepare subquery structure and launch subqueries", pSql); - pRes->code = TSDB_CODE_CLI_OUT_OF_MEMORY; - - tscLocalReducerEnvDestroy(pMemoryBuf, pDesc, pModel, numOfSubQueries); - doCleanupSubqueries(pSql, i, pState); - return pRes->code; // free all allocated resource - } - - if (pRes->code == TSDB_CODE_QUERY_CANCELLED) { - tscLocalReducerEnvDestroy(pMemoryBuf, pDesc, pModel, numOfSubQueries); - doCleanupSubqueries(pSql, i, pState); - return pRes->code; - } - - for(int32_t j = 0; j < numOfSubQueries; ++j) { - SSqlObj* pSub = pSql->pSubs[j]; - SRetrieveSupport* pSupport = pSub->param; - - tscTrace("%p sub:%p launch subquery, orderOfSub:%d.", pSql, pSub, pSupport->subqueryIndex); - tscProcessSql(pSub); - } - - return TSDB_CODE_SUCCESS; -} - -static void tscFreeSubSqlObj(SRetrieveSupport *trsupport, SSqlObj *pSql) { - tscTrace("%p start to free subquery result", pSql); - - if (pSql->res.code == TSDB_CODE_SUCCESS) { - taos_free_result(pSql); - } - - tfree(trsupport->localBuffer); - - pthread_mutex_unlock(&trsupport->queryMutex); - pthread_mutex_destroy(&trsupport->queryMutex); - - tfree(trsupport); -} - -static void tscRetrieveFromVnodeCallBack(void *param, TAOS_RES *tres, int numOfRows); - -static void tscAbortFurtherRetryRetrieval(SRetrieveSupport *trsupport, TAOS_RES *tres, int32_t errCode) { -// set no disk space error info -#ifdef WINDOWS - LPVOID lpMsgBuf; - FormatMessage(FORMAT_MESSAGE_ALLOCATE_BUFFER | FORMAT_MESSAGE_FROM_SYSTEM | FORMAT_MESSAGE_IGNORE_INSERTS, NULL, - GetLastError(), MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT), // Default language - (LPTSTR)&lpMsgBuf, 0, NULL); - tscError("sub:%p failed to flush data to disk:reason:%s", tres, lpMsgBuf); - LocalFree(lpMsgBuf); -#else - char buf[256] = {0}; - strerror_r(errno, buf, 256); - tscError("sub:%p failed to flush data to disk:reason:%s", tres, buf); -#endif - - trsupport->pState->code = -errCode; - trsupport->numOfRetry = MAX_NUM_OF_SUBQUERY_RETRY; - - pthread_mutex_unlock(&trsupport->queryMutex); - - tscRetrieveFromVnodeCallBack(trsupport, tres, trsupport->pState->code); -} - -static void tscHandleSubRetrievalError(SRetrieveSupport *trsupport, SSqlObj *pSql, int numOfRows) { - SSqlObj *pPObj = trsupport->pParentSqlObj; - int32_t subqueryIndex = trsupport->subqueryIndex; - - assert(pSql != NULL); - SSubqueryState* pState = trsupport->pState; - assert(pState->numOfCompleted < pState->numOfTotal && pState->numOfCompleted >= 0 && - pPObj->numOfSubs == pState->numOfTotal); - - /* retrieved in subquery failed. OR query cancelled in retrieve phase. */ - if (pState->code == TSDB_CODE_SUCCESS && pPObj->res.code != TSDB_CODE_SUCCESS) { - pState->code = -(int)pPObj->res.code; - - /* - * kill current sub-query connection, which may retrieve data from vnodes; - * Here we get: pPObj->res.code == TSDB_CODE_QUERY_CANCELLED - */ - pSql->res.numOfRows = 0; - trsupport->numOfRetry = MAX_NUM_OF_SUBQUERY_RETRY; // disable retry efforts - tscTrace("%p query is cancelled, sub:%p, orderOfSub:%d abort retrieve, code:%d", trsupport->pParentSqlObj, pSql, - subqueryIndex, pState->code); - } - - if (numOfRows >= 0) { // current query is successful, but other sub query failed, still abort current query. - tscTrace("%p sub:%p retrieve numOfRows:%d,orderOfSub:%d", pPObj, pSql, numOfRows, subqueryIndex); - tscError("%p sub:%p abort further retrieval due to other queries failure,orderOfSub:%d,code:%d", pPObj, pSql, - subqueryIndex, pState->code); - } else { - if (trsupport->numOfRetry++ < MAX_NUM_OF_SUBQUERY_RETRY && pState->code == TSDB_CODE_SUCCESS) { - /* - * current query failed, and the retry count is less than the available - * count, retry query clear previous retrieved data, then launch a new sub query - */ - tExtMemBufferClear(trsupport->pExtMemBuffer[subqueryIndex]); - - // clear local saved number of results - trsupport->localBuffer->numOfElems = 0; - pthread_mutex_unlock(&trsupport->queryMutex); - - tscTrace("%p sub:%p retrieve failed, code:%d, orderOfSub:%d, retry:%d", trsupport->pParentSqlObj, pSql, numOfRows, - subqueryIndex, trsupport->numOfRetry); - - SSqlObj *pNew = tscCreateSqlObjForSubquery(trsupport->pParentSqlObj, trsupport, pSql); - if (pNew == NULL) { - tscError("%p sub:%p failed to create new subquery sqlobj due to out of memory, abort retry", - trsupport->pParentSqlObj, pSql); - - pState->code = TSDB_CODE_CLI_OUT_OF_MEMORY; - trsupport->numOfRetry = MAX_NUM_OF_SUBQUERY_RETRY; - return; - } - - tscProcessSql(pNew); - return; - } else { // reach the maximum retry count, abort - atomic_val_compare_exchange_32(&pState->code, TSDB_CODE_SUCCESS, numOfRows); - tscError("%p sub:%p retrieve failed,code:%d,orderOfSub:%d failed.no more retry,set global code:%d", pPObj, pSql, - numOfRows, subqueryIndex, pState->code); - } - } - - int32_t numOfTotal = pState->numOfTotal; - - int32_t finished = atomic_add_fetch_32(&pState->numOfCompleted, 1); - if (finished < numOfTotal) { - tscTrace("%p sub:%p orderOfSub:%d freed, finished subqueries:%d", pPObj, pSql, trsupport->subqueryIndex, finished); - return tscFreeSubSqlObj(trsupport, pSql); - } - - // all subqueries are failed - tscError("%p retrieve from %d vnode(s) completed,code:%d.FAILED.", pPObj, pState->numOfTotal, pState->code); - pPObj->res.code = -(pState->code); - - // release allocated resource - tscLocalReducerEnvDestroy(trsupport->pExtMemBuffer, trsupport->pOrderDescriptor, trsupport->pFinalColModel, - pState->numOfTotal); - - tfree(trsupport->pState); - tscFreeSubSqlObj(trsupport, pSql); - - // in case of second stage join subquery, invoke its callback function instead of regular QueueAsyncRes - SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pPObj->cmd, 0); - - if ((pQueryInfo->type & TSDB_QUERY_TYPE_JOIN_SEC_STAGE) == TSDB_QUERY_TYPE_JOIN_SEC_STAGE) { - (*pPObj->fp)(pPObj->param, pPObj, pPObj->res.code); - } else { // regular super table query - if (pPObj->res.code != TSDB_CODE_SUCCESS) { - tscQueueAsyncRes(pPObj); - } - } -} - -void tscRetrieveFromVnodeCallBack(void *param, TAOS_RES *tres, int numOfRows) { - SRetrieveSupport *trsupport = (SRetrieveSupport *)param; - int32_t idx = trsupport->subqueryIndex; - SSqlObj * pPObj = trsupport->pParentSqlObj; - tOrderDescriptor *pDesc = trsupport->pOrderDescriptor; - - SSqlObj *pSql = (SSqlObj *)tres; - if (pSql == NULL) { // sql object has been released in error process, return immediately - tscTrace("%p subquery has been released, idx:%d, abort", pPObj, idx); - return; - } - - SSubqueryState* pState = trsupport->pState; - assert(pState->numOfCompleted < pState->numOfTotal && pState->numOfCompleted >= 0 && - pPObj->numOfSubs == pState->numOfTotal); - - // query process and cancel query process may execute at the same time - pthread_mutex_lock(&trsupport->queryMutex); - - if (numOfRows < 0 || pState->code < 0 || pPObj->res.code != TSDB_CODE_SUCCESS) { - return tscHandleSubRetrievalError(trsupport, pSql, numOfRows); - } - - SSqlRes * pRes = &pSql->res; - SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0); - - STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); - - SVnodeSidList *vnodeInfo = tscGetVnodeSidList(pTableMetaInfo->pMetricMeta, idx); - SVnodeDesc * pSvd = &vnodeInfo->vpeerDesc[vnodeInfo->index]; - - if (numOfRows > 0) { - assert(pRes->numOfRows == numOfRows); - int64_t num = atomic_add_fetch_64(&pState->numOfRetrievedRows, numOfRows); - - tscTrace("%p sub:%p retrieve numOfRows:%d totalNumOfRows:%d from ip:%u,vid:%d,orderOfSub:%d", pPObj, pSql, - pRes->numOfRows, pState->numOfRetrievedRows, pSvd->ip, pSvd->vnode, idx); - - if (num > tsMaxNumOfOrderedResults && tscIsProjectionQueryOnSTable(pQueryInfo, 0)) { - tscError("%p sub:%p num of OrderedRes is too many, max allowed:%" PRId64 " , current:%" PRId64, - pPObj, pSql, tsMaxNumOfOrderedResults, num); - tscAbortFurtherRetryRetrieval(trsupport, tres, TSDB_CODE_SORTED_RES_TOO_MANY); - return; - } - - -#ifdef _DEBUG_VIEW - printf("received data from vnode: %d rows\n", pRes->numOfRows); - SSrcColumnInfo colInfo[256] = {0}; - - tscGetSrcColumnInfo(colInfo, pQueryInfo); - tColModelDisplayEx(pDesc->pColumnModel, pRes->data, pRes->numOfRows, pRes->numOfRows, colInfo); -#endif - if (tsTotalTmpDirGB != 0 && tsAvailTmpDirGB < tsMinimalTmpDirGB) { - tscError("%p sub:%p client disk space remain %.3f GB, need at least %.3f GB, stop query", pPObj, pSql, - tsAvailTmpDirGB, tsMinimalTmpDirGB); - tscAbortFurtherRetryRetrieval(trsupport, tres, TSDB_CODE_CLI_NO_DISKSPACE); - return; - } - - int32_t ret = saveToBuffer(trsupport->pExtMemBuffer[idx], pDesc, trsupport->localBuffer, pRes->data, - pRes->numOfRows, pQueryInfo->groupbyExpr.orderType); - if (ret < 0) { - // set no disk space error info, and abort retry - tscAbortFurtherRetryRetrieval(trsupport, tres, TSDB_CODE_CLI_NO_DISKSPACE); - } else { - pthread_mutex_unlock(&trsupport->queryMutex); - taos_fetch_rows_a(tres, tscRetrieveFromVnodeCallBack, param); - } - - } else { // all data has been retrieved to client - /* data in from current vnode is stored in cache and disk */ - uint32_t numOfRowsFromVnode = trsupport->pExtMemBuffer[idx]->numOfTotalElems + trsupport->localBuffer->numOfElems; - tscTrace("%p sub:%p all data retrieved from ip:%u,vid:%d, numOfRows:%d, orderOfSub:%d", pPObj, pSql, pSvd->ip, - pSvd->vnode, numOfRowsFromVnode, idx); - - tColModelCompact(pDesc->pColumnModel, trsupport->localBuffer, pDesc->pColumnModel->capacity); - -#ifdef _DEBUG_VIEW - printf("%" PRIu64 " rows data flushed to disk:\n", trsupport->localBuffer->numOfElems); - SSrcColumnInfo colInfo[256] = {0}; - tscGetSrcColumnInfo(colInfo, pQueryInfo); - tColModelDisplayEx(pDesc->pColumnModel, trsupport->localBuffer->data, trsupport->localBuffer->numOfElems, - trsupport->localBuffer->numOfElems, colInfo); -#endif - - if (tsTotalTmpDirGB != 0 && tsAvailTmpDirGB < tsMinimalTmpDirGB) { - tscError("%p sub:%p client disk space remain %.3f GB, need at least %.3f GB, stop query", pPObj, pSql, - tsAvailTmpDirGB, tsMinimalTmpDirGB); - tscAbortFurtherRetryRetrieval(trsupport, tres, TSDB_CODE_CLI_NO_DISKSPACE); - return; - } - - // each result for a vnode is ordered as an independant list, - // then used as an input of loser tree for disk-based merge routine - int32_t ret = tscFlushTmpBuffer(trsupport->pExtMemBuffer[idx], pDesc, trsupport->localBuffer, - pQueryInfo->groupbyExpr.orderType); - if (ret != 0) { - /* set no disk space error info, and abort retry */ - return tscAbortFurtherRetryRetrieval(trsupport, tres, TSDB_CODE_CLI_NO_DISKSPACE); - } - - // keep this value local variable, since the pState variable may be released by other threads, if atomic_add opertion - // increases the finished value up to pState->numOfTotal value, which means all subqueries are completed. - // In this case, the comparsion between finished value and released pState->numOfTotal is not safe. - int32_t numOfTotal = pState->numOfTotal; - - int32_t finished = atomic_add_fetch_32(&pState->numOfCompleted, 1); - if (finished < numOfTotal) { - tscTrace("%p sub:%p orderOfSub:%d freed, finished subqueries:%d", pPObj, pSql, trsupport->subqueryIndex, finished); - return tscFreeSubSqlObj(trsupport, pSql); - } - - // all sub-queries are returned, start to local merge process - pDesc->pColumnModel->capacity = trsupport->pExtMemBuffer[idx]->numOfElemsPerPage; - - tscTrace("%p retrieve from %d vnodes completed.final NumOfRows:%d,start to build loser tree", pPObj, - pState->numOfTotal, pState->numOfRetrievedRows); - - SQueryInfo *pPQueryInfo = tscGetQueryInfoDetail(&pPObj->cmd, 0); - tscClearInterpInfo(pPQueryInfo); - - tscCreateLocalReducer(trsupport->pExtMemBuffer, pState->numOfTotal, pDesc, trsupport->pFinalColModel, - &pPObj->cmd, &pPObj->res); - tscTrace("%p build loser tree completed", pPObj); - - pPObj->res.precision = pSql->res.precision; - pPObj->res.numOfRows = 0; - pPObj->res.row = 0; - - // only free once - tfree(trsupport->pState); - tscFreeSubSqlObj(trsupport, pSql); - - // set the command flag must be after the semaphore been correctly set. - pPObj->cmd.command = TSDB_SQL_RETRIEVE_METRIC; - if (pPObj->res.code == TSDB_CODE_SUCCESS) { - (*pPObj->fp)(pPObj->param, pPObj, 0); - } else { - tscQueueAsyncRes(pPObj); - } - } -} - void tscKillMetricQuery(SSqlObj *pSql) { SSqlCmd* pCmd = &pSql->cmd; SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); - if (!tscIsTwoStageMergeMetricQuery(pQueryInfo, 0)) { + if (!tscIsTwoStageSTableQuery(pQueryInfo, 0)) { return; } @@ -1068,117 +498,6 @@ void tscKillMetricQuery(SSqlObj *pSql) { tscTrace("%p metric query is cancelled", pSql); } -static void tscRetrieveDataRes(void *param, TAOS_RES *tres, int retCode); - -static SSqlObj *tscCreateSqlObjForSubquery(SSqlObj *pSql, SRetrieveSupport *trsupport, SSqlObj *prevSqlObj) { - const int32_t table_index = 0; - - SSqlObj *pNew = createSubqueryObj(pSql, table_index, tscRetrieveDataRes, trsupport, prevSqlObj); - if (pNew != NULL) { // the sub query of two-stage super table query - SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pNew->cmd, 0); - pQueryInfo->type |= TSDB_QUERY_TYPE_STABLE_SUBQUERY; - - assert(pQueryInfo->numOfTables == 1 && pNew->cmd.numOfClause == 1); - - // launch subquery for each vnode, so the subquery index equals to the vnodeIndex. - STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, table_index); - pTableMetaInfo->vnodeIndex = trsupport->subqueryIndex; - - pSql->pSubs[trsupport->subqueryIndex] = pNew; - } - - return pNew; -} - -void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) { - SRetrieveSupport *trsupport = (SRetrieveSupport *)param; - - SSqlObj* pParentSql = trsupport->pParentSqlObj; - SSqlObj* pSql = (SSqlObj *)tres; - - STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0); - assert(pSql->cmd.numOfClause == 1 && pSql->cmd.pQueryInfo[0]->numOfTables == 1); - - int32_t idx = pTableMetaInfo->vnodeIndex; - - SVnodeSidList *vnodeInfo = NULL; - SVnodeDesc * pSvd = NULL; - if (pTableMetaInfo->pMetricMeta != NULL) { - vnodeInfo = tscGetVnodeSidList(pTableMetaInfo->pMetricMeta, idx); - pSvd = &vnodeInfo->vpeerDesc[vnodeInfo->index]; - } - - SSubqueryState* pState = trsupport->pState; - assert(pState->numOfCompleted < pState->numOfTotal && pState->numOfCompleted >= 0 && - pParentSql->numOfSubs == pState->numOfTotal); - - if (pParentSql->res.code != TSDB_CODE_SUCCESS || pState->code != TSDB_CODE_SUCCESS) { - // metric query is killed, Note: code must be less than 0 - trsupport->numOfRetry = MAX_NUM_OF_SUBQUERY_RETRY; - if (pParentSql->res.code != TSDB_CODE_SUCCESS) { - code = -(int)(pParentSql->res.code); - } else { - code = pState->code; - } - tscTrace("%p query cancelled or failed, sub:%p, orderOfSub:%d abort, code:%d", pParentSql, pSql, - trsupport->subqueryIndex, code); - } - - /* - * if a query on a vnode is failed, all retrieve operations from vnode that occurs later - * than this one are actually not necessary, we simply call the tscRetrieveFromVnodeCallBack - * function to abort current and remain retrieve process. - * - * NOTE: threadsafe is required. - */ - if (code != TSDB_CODE_SUCCESS) { - if (trsupport->numOfRetry++ >= MAX_NUM_OF_SUBQUERY_RETRY) { - tscTrace("%p sub:%p reach the max retry count,set global code:%d", pParentSql, pSql, code); - atomic_val_compare_exchange_32(&pState->code, 0, code); - } else { // does not reach the maximum retry count, go on - tscTrace("%p sub:%p failed code:%d, retry:%d", pParentSql, pSql, code, trsupport->numOfRetry); - - SSqlObj *pNew = tscCreateSqlObjForSubquery(pParentSql, trsupport, pSql); - if (pNew == NULL) { - tscError("%p sub:%p failed to create new subquery due to out of memory, abort retry, vid:%d, orderOfSub:%d", - trsupport->pParentSqlObj, pSql, pSvd != NULL ? pSvd->vnode : -1, trsupport->subqueryIndex); - - pState->code = -TSDB_CODE_CLI_OUT_OF_MEMORY; - trsupport->numOfRetry = MAX_NUM_OF_SUBQUERY_RETRY; - } else { - SQueryInfo *pNewQueryInfo = tscGetQueryInfoDetail(&pNew->cmd, 0); - assert(pNewQueryInfo->pTableMetaInfo[0]->pTableMeta != NULL && pNewQueryInfo->pTableMetaInfo[0]->pMetricMeta != NULL); - tscProcessSql(pNew); - return; - } - } - } - - if (pState->code != TSDB_CODE_SUCCESS) { // failed, abort - if (vnodeInfo != NULL) { - tscTrace("%p sub:%p query failed,ip:%u,vid:%d,orderOfSub:%d,global code:%d", pParentSql, pSql, - vnodeInfo->vpeerDesc[vnodeInfo->index].ip, vnodeInfo->vpeerDesc[vnodeInfo->index].vnode, - trsupport->subqueryIndex, pState->code); - } else { - tscTrace("%p sub:%p query failed,orderOfSub:%d,global code:%d", pParentSql, pSql, - trsupport->subqueryIndex, pState->code); - } - - tscRetrieveFromVnodeCallBack(param, tres, pState->code); - } else { // success, proceed to retrieve data from dnode - if (vnodeInfo != NULL) { - tscTrace("%p sub:%p query complete,ip:%u,vid:%d,orderOfSub:%d,retrieve data", trsupport->pParentSqlObj, pSql, - vnodeInfo->vpeerDesc[vnodeInfo->index].ip, vnodeInfo->vpeerDesc[vnodeInfo->index].vnode, - trsupport->subqueryIndex); - } else { - tscTrace("%p sub:%p query complete, orderOfSub:%d,retrieve data", trsupport->pParentSqlObj, pSql, - trsupport->subqueryIndex); - } - - taos_fetch_rows_a(tres, tscRetrieveFromVnodeCallBack, param); - } -} - int tscBuildRetrieveMsg(SSqlObj *pSql, SSqlInfo *pInfo) { char *pMsg, *pStart; @@ -2239,11 +1558,11 @@ int tscBuildTableMetaMsg(SSqlObj *pSql, SSqlInfo *pInfo) { pInfoMsg = (SCMTableInfoMsg *)pCmd->payload; strcpy(pInfoMsg->tableId, pTableMetaInfo->name); - pInfoMsg->createFlag = htons(pSql->cmd.createOnDemand ? 1 : 0); + pInfoMsg->createFlag = htons(pSql->cmd.autoCreated ? 1 : 0); pMsg = (char*)pInfoMsg + sizeof(SCMTableInfoMsg); - if (pSql->cmd.createOnDemand) { + if (pSql->cmd.autoCreated) { memcpy(pInfoMsg->tags, tmpData, sizeof(STagData)); pMsg += sizeof(STagData); } @@ -3075,7 +2394,7 @@ static int32_t doGetMeterMetaFromServer(SSqlObj *pSql, STableMetaInfo *pTableMet SQueryInfo *pNewQueryInfo = NULL; tscGetQueryInfoDetailSafely(&pNew->cmd, 0, &pNewQueryInfo); - pNew->cmd.createOnDemand = pSql->cmd.createOnDemand; // create table if not exists + pNew->cmd.autoCreated = pSql->cmd.autoCreated; // create table if not exists if (TSDB_CODE_SUCCESS != tscAllocPayload(&pNew->cmd, TSDB_DEFAULT_PAYLOAD_SIZE)) { tscError("%p malloc failed for payload to get meter meta", pSql); free(pNew); @@ -3126,7 +2445,7 @@ int32_t tscGetTableMeta(SSqlObj *pSql, STableMetaInfo *pTableMetaInfo) { } int tscGetMeterMetaEx(SSqlObj *pSql, STableMetaInfo *pTableMetaInfo, bool createIfNotExists) { - pSql->cmd.createOnDemand = createIfNotExists; + pSql->cmd.autoCreated = createIfNotExists; return tscGetTableMeta(pSql, pTableMetaInfo); } diff --git a/src/client/src/tscSql.c b/src/client/src/tscSql.c index 206415c834..aa24247ced 100644 --- a/src/client/src/tscSql.c +++ b/src/client/src/tscSql.c @@ -15,21 +15,21 @@ #include "hash.h" #include "os.h" +#include "qast.h" #include "tcache.h" #include "tlog.h" #include "tnote.h" #include "trpc.h" -#include "tscJoinProcess.h" #include "tscProfile.h" #include "tscSecondaryMerge.h" +#include "tscSubquery.h" #include "tscUtil.h" #include "tsclient.h" #include "tscompression.h" #include "tsocket.h" #include "ttimer.h" -#include "tutil.h" #include "ttokendef.h" -#include "qast.h" +#include "tutil.h" static bool validImpl(const char* str, size_t maxsize) { if (str == NULL) { @@ -994,7 +994,7 @@ void taos_stop_query(TAOS_RES *res) { pSql->res.code = TSDB_CODE_QUERY_CANCELLED; SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); - if (tscIsTwoStageMergeMetricQuery(pQueryInfo, 0)) { + if (tscIsTwoStageSTableQuery(pQueryInfo, 0)) { tscKillMetricQuery(pSql); return; } diff --git a/src/client/src/tscSubquery.c b/src/client/src/tscSubquery.c new file mode 100644 index 0000000000..1df977d996 --- /dev/null +++ b/src/client/src/tscSubquery.c @@ -0,0 +1,1572 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "tscSubquery.h" +#include "os.h" +#include "qtsbuf.h" +#include "tsclient.h" + +typedef struct SInsertSupporter { + SSubqueryState* pState; + SSqlObj* pSql; +} SInsertSupporter; + +static void freeSubqueryObj(SSqlObj* pSql); + +static bool doCompare(int32_t order, int64_t left, int64_t right) { + if (order == TSQL_SO_ASC) { + return left < right; + } else { + return left > right; + } +} + +static int64_t doTSBlockIntersect(SSqlObj* pSql, SJoinSubquerySupporter* pSupporter1, + SJoinSubquerySupporter* pSupporter2, TSKEY* st, TSKEY* et) { + STSBuf* output1 = tsBufCreate(true); + STSBuf* output2 = tsBufCreate(true); + + *st = INT64_MAX; + *et = INT64_MIN; + + SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, pSql->cmd.clauseIndex); + + SLimitVal* pLimit = &pQueryInfo->limit; + int32_t order = pQueryInfo->order.order; + + SQueryInfo* pSubQueryInfo1 = tscGetQueryInfoDetail(&pSql->pSubs[0]->cmd, 0); + SQueryInfo* pSubQueryInfo2 = tscGetQueryInfoDetail(&pSql->pSubs[1]->cmd, 0); + + pSubQueryInfo1->tsBuf = output1; + pSubQueryInfo2->tsBuf = output2; + + tsBufResetPos(pSupporter1->pTSBuf); + tsBufResetPos(pSupporter2->pTSBuf); + + // TODO add more details information + if (!tsBufNextPos(pSupporter1->pTSBuf)) { + tsBufFlush(output1); + tsBufFlush(output2); + + tscTrace("%p input1 is empty, 0 for secondary query after ts blocks intersecting", pSql); + return 0; + } + + if (!tsBufNextPos(pSupporter2->pTSBuf)) { + tsBufFlush(output1); + tsBufFlush(output2); + + tscTrace("%p input2 is empty, 0 for secondary query after ts blocks intersecting", pSql); + return 0; + } + + int64_t numOfInput1 = 1; + int64_t numOfInput2 = 1; + + while (1) { + STSElem elem1 = tsBufGetElem(pSupporter1->pTSBuf); + STSElem elem2 = tsBufGetElem(pSupporter2->pTSBuf); + +#ifdef _DEBUG_VIEW + // for debug purpose + tscPrint("%" PRId64 ", tags:%d \t %" PRId64 ", tags:%d", elem1.ts, elem1.tag, elem2.ts, elem2.tag); +#endif + + if (elem1.tag < elem2.tag || (elem1.tag == elem2.tag && doCompare(order, elem1.ts, elem2.ts))) { + if (!tsBufNextPos(pSupporter1->pTSBuf)) { + break; + } + + numOfInput1++; + } else if (elem1.tag > elem2.tag || (elem1.tag == elem2.tag && doCompare(order, elem2.ts, elem1.ts))) { + if (!tsBufNextPos(pSupporter2->pTSBuf)) { + break; + } + + numOfInput2++; + } else { + /* + * in case of stable query, limit/offset is not applied here. the limit/offset is applied to the + * final results which is acquired after the secondry merge of in the client. + */ + if (pLimit->offset == 0 || pQueryInfo->intervalTime > 0 || QUERY_IS_STABLE_QUERY(pQueryInfo->type)) { + if (*st > elem1.ts) { + *st = elem1.ts; + } + + if (*et < elem1.ts) { + *et = elem1.ts; + } + + tsBufAppend(output1, elem1.vnode, elem1.tag, (const char*)&elem1.ts, sizeof(elem1.ts)); + tsBufAppend(output2, elem2.vnode, elem2.tag, (const char*)&elem2.ts, sizeof(elem2.ts)); + } else { + pLimit->offset -= 1; + } + + if (!tsBufNextPos(pSupporter1->pTSBuf)) { + break; + } + + numOfInput1++; + + if (!tsBufNextPos(pSupporter2->pTSBuf)) { + break; + } + + numOfInput2++; + } + } + + /* + * failed to set the correct ts order yet in two cases: + * 1. only one element + * 2. only one element for each tag. + */ + if (output1->tsOrder == -1) { + output1->tsOrder = TSQL_SO_ASC; + output2->tsOrder = TSQL_SO_ASC; + } + + tsBufFlush(output1); + tsBufFlush(output2); + + tsBufDestory(pSupporter1->pTSBuf); + tsBufDestory(pSupporter2->pTSBuf); + + tscTrace("%p input1:%" PRId64 ", input2:%" PRId64 ", final:%" PRId64 " for secondary query after ts blocks " + "intersecting, skey:%" PRId64 ", ekey:%" PRId64, pSql, + numOfInput1, numOfInput2, output1->numOfTotal, *st, *et); + + return output1->numOfTotal; +} + +// todo handle failed to create sub query +SJoinSubquerySupporter* tscCreateJoinSupporter(SSqlObj* pSql, SSubqueryState* pState, int32_t index) { + SJoinSubquerySupporter* pSupporter = calloc(1, sizeof(SJoinSubquerySupporter)); + if (pSupporter == NULL) { + return NULL; + } + + pSupporter->pObj = pSql; + pSupporter->pState = pState; + + pSupporter->subqueryIndex = index; + SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, pSql->cmd.clauseIndex); + + pSupporter->interval = pQueryInfo->intervalTime; + pSupporter->limit = pQueryInfo->limit; + + STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, pSql->cmd.clauseIndex, index); + pSupporter->uid = pTableMetaInfo->pTableMeta->uid; + + assert (pSupporter->uid != 0); + + getTmpfilePath("join-", pSupporter->path); + pSupporter->f = fopen(pSupporter->path, "w"); + + if (pSupporter->f == NULL) { + tscError("%p failed to create tmp file:%s, reason:%s", pSql, pSupporter->path, strerror(errno)); + } + + return pSupporter; +} + +void tscDestroyJoinSupporter(SJoinSubquerySupporter* pSupporter) { + if (pSupporter == NULL) { + return; + } + + tscSqlExprInfoDestroy(&pSupporter->exprsInfo); + tscColumnBaseInfoDestroy(&pSupporter->colList); + + tscClearFieldInfo(&pSupporter->fieldsInfo); + + if (pSupporter->f != NULL) { + fclose(pSupporter->f); + unlink(pSupporter->path); + } + + tscTagCondRelease(&pSupporter->tagCond); + free(pSupporter); +} + +/* + * need the secondary query process + * In case of count(ts)/count(*)/spread(ts) query, that are only applied to + * primary timestamp column , the secondary query is not necessary + * + */ +bool needSecondaryQuery(SQueryInfo* pQueryInfo) { + for (int32_t i = 0; i < pQueryInfo->colList.numOfCols; ++i) { + SColumnBase* pBase = tscColumnBaseInfoGet(&pQueryInfo->colList, i); + if (pBase->colIndex.columnIndex != PRIMARYKEY_TIMESTAMP_COL_INDEX) { + return true; + } + } + + return false; +} + +/* + * launch secondary stage query to fetch the result that contains timestamp in set + */ +int32_t tscLaunchSecondPhaseSubqueries(SSqlObj* pSql) { + int32_t numOfSub = 0; + SJoinSubquerySupporter* pSupporter = NULL; + + /* + * If the columns are not involved in the final select clause, the secondary query will not be launched + * for the subquery. + */ + SSubqueryState* pState = NULL; + + for (int32_t i = 0; i < pSql->numOfSubs; ++i) { + pSupporter = pSql->pSubs[i]->param; + if (pSupporter->exprsInfo.numOfExprs > 0) { + ++numOfSub; + } + } + + assert(numOfSub > 0); + + // scan all subquery, if one sub query has only ts, ignore it + tscTrace("%p start to launch secondary subqueries, total:%d, only:%d needs to query, others are not retrieve in " + "select clause", pSql, pSql->numOfSubs, numOfSub); + + /* + * the subqueries that do not actually launch the secondary query to virtual node is set as completed. + */ + pState = pSupporter->pState; + pState->numOfTotal = pSql->numOfSubs; + pState->numOfCompleted = (pSql->numOfSubs - numOfSub); + + bool success = true; + + for (int32_t i = 0; i < pSql->numOfSubs; ++i) { + SSqlObj *pPrevSub = pSql->pSubs[i]; + pSql->pSubs[i] = NULL; + + pSupporter = pPrevSub->param; + + if (pSupporter->exprsInfo.numOfExprs == 0) { + tscTrace("%p subIndex: %d, not need to launch query, ignore it", pSql, i); + + tscDestroyJoinSupporter(pSupporter); + tscFreeSqlObj(pPrevSub); + + pSql->pSubs[i] = NULL; + continue; + } + + SQueryInfo *pSubQueryInfo = tscGetQueryInfoDetail(&pPrevSub->cmd, 0); + STSBuf *pTSBuf = pSubQueryInfo->tsBuf; + pSubQueryInfo->tsBuf = NULL; + + // free result for async object will also free sqlObj + assert(pSubQueryInfo->exprsInfo.numOfExprs == 1); // ts_comp query only requires one resutl columns + taos_free_result(pPrevSub); + + SSqlObj *pNew = createSubqueryObj(pSql, (int16_t) i, tscJoinQueryCallback, pSupporter, NULL); + if (pNew == NULL) { + tscDestroyJoinSupporter(pSupporter); + success = false; + break; + } + + tscClearSubqueryInfo(&pNew->cmd); + pSql->pSubs[i] = pNew; + + SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pNew->cmd, 0); + pQueryInfo->tsBuf = pTSBuf; // transfer the ownership of timestamp comp-z data to the new created object + + // set the second stage sub query for join process + pQueryInfo->type |= TSDB_QUERY_TYPE_JOIN_SEC_STAGE; + + pQueryInfo->intervalTime = pSupporter->interval; + pQueryInfo->groupbyExpr = pSupporter->groupbyExpr; + + tscColumnBaseInfoCopy(&pQueryInfo->colList, &pSupporter->colList, 0); + tscTagCondCopy(&pQueryInfo->tagCond, &pSupporter->tagCond); + + tscSqlExprCopy(&pQueryInfo->exprsInfo, &pSupporter->exprsInfo, pSupporter->uid, false); + tscFieldInfoCopyAll(&pQueryInfo->fieldsInfo, &pSupporter->fieldsInfo); + + pSupporter->exprsInfo.numOfExprs = 0; + pSupporter->fieldsInfo.numOfOutputCols = 0; + + /* + * if the first column of the secondary query is not ts function, add this function. + * Because this column is required to filter with timestamp after intersecting. + */ + if (pSupporter->exprsInfo.pExprs[0]->functionId != TSDB_FUNC_TS) { + tscAddTimestampColumn(pQueryInfo, TSDB_FUNC_TS, 0); + } + + SQueryInfo *pNewQueryInfo = tscGetQueryInfoDetail(&pNew->cmd, 0); + assert(pNew->numOfSubs == 0 && pNew->cmd.numOfClause == 1 && pNewQueryInfo->numOfTables == 1); + + tscFieldInfoCalOffset(pNewQueryInfo); + + STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pNewQueryInfo, 0); + + /* + * When handling the projection query, the offset value will be modified for table-table join, which is changed + * during the timestamp intersection. + */ + pSupporter->limit = pQueryInfo->limit; + pNewQueryInfo->limit = pSupporter->limit; + + // fetch the join tag column + if (UTIL_TABLE_IS_SUPERTABLE(pTableMetaInfo)) { + SSqlExpr *pExpr = tscSqlExprGet(pNewQueryInfo, 0); + assert(pQueryInfo->tagCond.joinInfo.hasJoin); + + int16_t tagColIndex = tscGetJoinTagColIndexByUid(&pQueryInfo->tagCond, pTableMetaInfo->pTableMeta->uid); + pExpr->param[0].i64Key = tagColIndex; + pExpr->numOfParams = 1; + } + + tscPrintSelectClause(pNew, 0); + + tscTrace("%p subquery:%p tableIndex:%d, vnodeIdx:%d, type:%d, exprInfo:%d, colList:%d, fieldsInfo:%d, name:%s", + pSql, pNew, 0, pTableMetaInfo->vnodeIndex, pNewQueryInfo->type, + pNewQueryInfo->exprsInfo.numOfExprs, pNewQueryInfo->colList.numOfCols, + pNewQueryInfo->fieldsInfo.numOfOutputCols, pNewQueryInfo->pTableMetaInfo[0]->name); + } + + //prepare the subqueries object failed, abort + if (!success) { + pSql->res.code = TSDB_CODE_CLI_OUT_OF_MEMORY; + tscError("%p failed to prepare subqueries objs for secondary phase query, numOfSub:%d, code:%d", pSql, + pSql->numOfSubs, pSql->res.code); + freeSubqueryObj(pSql); + + return pSql->res.code; + } + + for(int32_t i = 0; i < pSql->numOfSubs; ++i) { + SSqlObj* pSub = pSql->pSubs[i]; + if (pSub == NULL) { + continue; + } + + tscProcessSql(pSub); + } + + return TSDB_CODE_SUCCESS; +} + +void freeSubqueryObj(SSqlObj* pSql) { + SSubqueryState* pState = NULL; + + for (int32_t i = 0; i < pSql->numOfSubs; ++i) { + if (pSql->pSubs[i] != NULL) { + SJoinSubquerySupporter* p = pSql->pSubs[i]->param; + pState = p->pState; + + tscDestroyJoinSupporter(p); + + if (pSql->pSubs[i]->res.code == TSDB_CODE_SUCCESS) { + taos_free_result(pSql->pSubs[i]); + } + } + } + + tfree(pState); + pSql->numOfSubs = 0; +} + +static void doQuitSubquery(SSqlObj* pParentSql) { + freeSubqueryObj(pParentSql); + + tsem_wait(&pParentSql->emptyRspSem); + tsem_wait(&pParentSql->emptyRspSem); + + tsem_post(&pParentSql->rspSem); +} + +static void quitAllSubquery(SSqlObj* pSqlObj, SJoinSubquerySupporter* pSupporter) { + int32_t numOfTotal = pSupporter->pState->numOfTotal; + int32_t finished = atomic_add_fetch_32(&pSupporter->pState->numOfCompleted, 1); + + if (finished >= numOfTotal) { + pSqlObj->res.code = abs(pSupporter->pState->code); + tscError("%p all subquery return and query failed, global code:%d", pSqlObj, pSqlObj->res.code); + + doQuitSubquery(pSqlObj); + } +} + +// update the query time range according to the join results on timestamp +static void updateQueryTimeRange(SQueryInfo* pQueryInfo, int64_t st, int64_t et) { + assert(pQueryInfo->stime <= st && pQueryInfo->etime >= et); + + pQueryInfo->stime = st; + pQueryInfo->etime = et; +} + +static void joinRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) { + SJoinSubquerySupporter* pSupporter = (SJoinSubquerySupporter*)param; + SSqlObj* pParentSql = pSupporter->pObj; + + SSqlObj* pSql = (SSqlObj*)tres; + SSqlCmd* pCmd = &pSql->cmd; + SSqlRes* pRes = &pSql->res; + + SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); + + if ((pQueryInfo->type & TSDB_QUERY_TYPE_JOIN_SEC_STAGE) == 0) { + if (pSupporter->pState->code != TSDB_CODE_SUCCESS) { + tscError("%p abort query due to other subquery failure. code:%d, global code:%d", pSql, numOfRows, + pSupporter->pState->code); + + quitAllSubquery(pParentSql, pSupporter); + return; + } + + if (numOfRows > 0) { // write the data into disk + fwrite(pSql->res.data, pSql->res.numOfRows, 1, pSupporter->f); + fclose(pSupporter->f); + + STSBuf* pBuf = tsBufCreateFromFile(pSupporter->path, true); + if (pBuf == NULL) { + tscError("%p invalid ts comp file from vnode, abort sub query, file size:%d", pSql, numOfRows); + + pSupporter->pState->code = TSDB_CODE_APP_ERROR; // todo set the informative code + quitAllSubquery(pParentSql, pSupporter); + return; + } + + if (pSupporter->pTSBuf == NULL) { + tscTrace("%p create tmp file for ts block:%s", pSql, pBuf->path); + pSupporter->pTSBuf = pBuf; + } else { + assert(pQueryInfo->numOfTables == 1); // for subquery, only one metermetaInfo + STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); + + tsBufMerge(pSupporter->pTSBuf, pBuf, pTableMetaInfo->vnodeIndex); + tsBufDestory(pBuf); + } + + // open new file to save the result + getTmpfilePath("ts-join", pSupporter->path); + pSupporter->f = fopen(pSupporter->path, "w"); + pSql->res.row = pSql->res.numOfRows; + + taos_fetch_rows_a(tres, joinRetrieveCallback, param); + } else if (numOfRows == 0) { // no data from this vnode anymore + SQueryInfo* pParentQueryInfo = tscGetQueryInfoDetail(&pParentSql->cmd, pParentSql->cmd.clauseIndex); + + //todo refactor + if (tscNonOrderedProjectionQueryOnSTable(pParentQueryInfo, 0)) { + STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); + assert(pQueryInfo->numOfTables == 1); + + // for projection query, need to try next vnode + int32_t totalVnode = pTableMetaInfo->pMetricMeta->numOfVnodes; + if ((++pTableMetaInfo->vnodeIndex) < totalVnode) { + tscTrace("%p current vnode:%d exhausted, try next:%d. total vnode:%d. current numOfRes:%d", pSql, + pTableMetaInfo->vnodeIndex - 1, pTableMetaInfo->vnodeIndex, totalVnode, pRes->numOfTotal); + + pSql->cmd.command = TSDB_SQL_SELECT; + pSql->fp = tscJoinQueryCallback; + tscProcessSql(pSql); + + return; + } + } + + int32_t numOfTotal = pSupporter->pState->numOfTotal; + int32_t finished = atomic_add_fetch_32(&pSupporter->pState->numOfCompleted, 1); + + if (finished >= numOfTotal) { + assert(finished == numOfTotal); + + if (pSupporter->pState->code != TSDB_CODE_SUCCESS) { + tscTrace("%p sub:%p, numOfSub:%d, quit from further procedure due to other queries failure", pParentSql, tres, + pSupporter->subqueryIndex); + doQuitSubquery(pParentSql); + return; + } + + tscTrace("%p all subqueries retrieve ts complete, do ts block intersect", pParentSql); + + SJoinSubquerySupporter* p1 = pParentSql->pSubs[0]->param; + SJoinSubquerySupporter* p2 = pParentSql->pSubs[1]->param; + + TSKEY st, et; + + int64_t num = doTSBlockIntersect(pParentSql, p1, p2, &st, &et); + if (num <= 0) { // no result during ts intersect + tscTrace("%p free all sub SqlObj and quit", pParentSql); + doQuitSubquery(pParentSql); + } else { + updateQueryTimeRange(pParentQueryInfo, st, et); + tscLaunchSecondPhaseSubqueries(pParentSql); + } + } + } else { // failure of sub query + tscError("%p sub query failed, code:%d, index:%d", pSql, numOfRows, pSupporter->subqueryIndex); + pSupporter->pState->code = numOfRows; + + quitAllSubquery(pParentSql, pSupporter); + return; + } + + } else { // secondary stage retrieve, driven by taos_fetch_row or other functions + if (numOfRows < 0) { + pSupporter->pState->code = numOfRows; + tscError("%p retrieve failed, code:%d, index:%d", pSql, numOfRows, pSupporter->subqueryIndex); + } + + if (numOfRows >= 0) { + pSql->res.numOfTotal += pSql->res.numOfRows; + } + + if (tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0) && numOfRows == 0) { + STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); + assert(pQueryInfo->numOfTables == 1); + + // for projection query, need to try next vnode if current vnode is exhausted + if ((++pTableMetaInfo->vnodeIndex) < pTableMetaInfo->pMetricMeta->numOfVnodes) { + pSupporter->pState->numOfCompleted = 0; + pSupporter->pState->numOfTotal = 1; + + pSql->cmd.command = TSDB_SQL_SELECT; + pSql->fp = tscJoinQueryCallback; + tscProcessSql(pSql); + + return; + } + } + + int32_t numOfTotal = pSupporter->pState->numOfTotal; + int32_t finished = atomic_add_fetch_32(&pSupporter->pState->numOfCompleted, 1); + + if (finished >= numOfTotal) { + assert(finished == numOfTotal); + tscTrace("%p all %d secondary subquery retrieves completed, global code:%d", tres, numOfTotal, + pParentSql->res.code); + + if (pSupporter->pState->code != TSDB_CODE_SUCCESS) { + pParentSql->res.code = abs(pSupporter->pState->code); + freeSubqueryObj(pParentSql); + } + + tsem_post(&pParentSql->rspSem); + } else { + tscTrace("%p sub:%p completed, completed:%d, total:%d", pParentSql, tres, finished, numOfTotal); + } + } +} + +static SJoinSubquerySupporter* tscUpdateSubqueryStatus(SSqlObj* pSql, int32_t numOfFetch) { + int32_t notInvolved = 0; + SJoinSubquerySupporter* pSupporter = NULL; + SSubqueryState* pState = NULL; + + for(int32_t i = 0; i < pSql->numOfSubs; ++i) { + if (pSql->pSubs[i] == NULL) { + notInvolved++; + } else { + pSupporter = (SJoinSubquerySupporter*)pSql->pSubs[i]->param; + pState = pSupporter->pState; + } + } + + pState->numOfTotal = pSql->numOfSubs; + pState->numOfCompleted = pSql->numOfSubs - numOfFetch; + + return pSupporter; +} + +void tscFetchDatablockFromSubquery(SSqlObj* pSql) { + int32_t numOfFetch = 0; + assert(pSql->numOfSubs >= 1); + + for (int32_t i = 0; i < pSql->numOfSubs; ++i) { + if (pSql->pSubs[i] == NULL) { // this subquery does not need to involve in secondary query + continue; + } + + SSqlRes *pRes = &pSql->pSubs[i]->res; + SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->pSubs[i]->cmd, 0); + + STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); + + if (tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0)) { + if (pRes->row >= pRes->numOfRows && pTableMetaInfo->vnodeIndex < pTableMetaInfo->pMetricMeta->numOfVnodes && + (!tscHasReachLimitation(pQueryInfo, pRes))) { + numOfFetch++; + } + } else { + if (pRes->row >= pRes->numOfRows && (!tscHasReachLimitation(pQueryInfo, pRes))) { + numOfFetch++; + } + } + } + + if (numOfFetch <= 0) { + return; + } + + // TODO multi-vnode retrieve for projection query with limitation has bugs, since the global limiation is not handled + tscTrace("%p retrieve data from %d subqueries", pSql, numOfFetch); + + SJoinSubquerySupporter* pSupporter = tscUpdateSubqueryStatus(pSql, numOfFetch); + + for (int32_t i = 0; i < pSql->numOfSubs; ++i) { + SSqlObj* pSql1 = pSql->pSubs[i]; + if (pSql1 == NULL) { + continue; + } + + SSqlRes* pRes1 = &pSql1->res; + SSqlCmd* pCmd1 = &pSql1->cmd; + + pSupporter = (SJoinSubquerySupporter*)pSql1->param; + + // wait for all subqueries completed + SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd1, 0); + assert(pRes1->numOfRows >= 0 && pQueryInfo->numOfTables == 1); + + STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); + + if (pRes1->row >= pRes1->numOfRows) { + tscTrace("%p subquery:%p retrieve data from vnode, subquery:%d, vnodeIndex:%d", pSql, pSql1, + pSupporter->subqueryIndex, pTableMetaInfo->vnodeIndex); + + tscResetForNextRetrieve(pRes1); + pSql1->fp = joinRetrieveCallback; + + if (pCmd1->command < TSDB_SQL_LOCAL) { + pCmd1->command = (pCmd1->command > TSDB_SQL_MGMT) ? TSDB_SQL_RETRIEVE : TSDB_SQL_FETCH; + } + + tscProcessSql(pSql1); + } + } + + // wait for all subquery completed + tsem_wait(&pSql->rspSem); + + // update the records for each subquery + for(int32_t i = 0; i < pSql->numOfSubs; ++i) { + if (pSql->pSubs[i] == NULL) { + continue; + } + + SSqlRes* pRes1 = &pSql->pSubs[i]->res; + pRes1->numOfTotalInCurrentClause += pRes1->numOfRows; + } +} + +// all subqueries return, set the result output index +void tscSetupOutputColumnIndex(SSqlObj* pSql) { + SSqlCmd* pCmd = &pSql->cmd; + SSqlRes* pRes = &pSql->res; + + tscTrace("%p all subquery response, retrieve data", pSql); + + if (pRes->pColumnIndex != NULL) { + return; // the column transfer support struct has been built + } + + SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); + pRes->pColumnIndex = calloc(1, sizeof(SColumnIndex) * pQueryInfo->fieldsInfo.numOfOutputCols); + + for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutputCols; ++i) { + SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, i); + + int32_t tableIndexOfSub = -1; + for (int32_t j = 0; j < pQueryInfo->numOfTables; ++j) { + STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, j); + if (pTableMetaInfo->pTableMeta->uid == pExpr->uid) { + tableIndexOfSub = j; + break; + } + } + + assert(tableIndexOfSub >= 0 && tableIndexOfSub < pQueryInfo->numOfTables); + + SSqlCmd* pSubCmd = &pSql->pSubs[tableIndexOfSub]->cmd; + SQueryInfo* pSubQueryInfo = tscGetQueryInfoDetail(pSubCmd, 0); + + for (int32_t k = 0; k < pSubQueryInfo->exprsInfo.numOfExprs; ++k) { + SSqlExpr* pSubExpr = tscSqlExprGet(pSubQueryInfo, k); + if (pExpr->functionId == pSubExpr->functionId && pExpr->colInfo.colId == pSubExpr->colInfo.colId) { + pRes->pColumnIndex[i] = (SColumnIndex){.tableIndex = tableIndexOfSub, .columnIndex = k}; + break; + } + } + } +} + +void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) { + SSqlObj* pSql = (SSqlObj*)tres; + // STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0); + + // int32_t idx = pSql->cmd.vnodeIdx; + + // SVnodeSidList *vnodeInfo = NULL; + // if (pTableMetaInfo->pMetricMeta != NULL) { + // vnodeInfo = tscGetVnodeSidList(pTableMetaInfo->pMetricMeta, idx - 1); + // } + + SJoinSubquerySupporter* pSupporter = (SJoinSubquerySupporter*)param; + + // if (atomic_add_fetch_32(pSupporter->numOfComplete, 1) >= + // pSupporter->numOfTotal) { + // SSqlObj *pParentObj = pSupporter->pObj; + // + // if ((pSql->cmd.type & TSDB_QUERY_TYPE_JOIN_SEC_STAGE) != 1) { + // int32_t num = 0; + // tscFetchDatablockFromSubquery(pParentObj); + // TSKEY* ts = tscGetQualifiedTSList(pParentObj, &num); + // + // if (num <= 0) { + // // no qualified result + // } + // + // tscLaunchSecondPhaseSubqueries(pSql, ts, num); + // } else { + + // } + // } else { + SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0); + if ((pQueryInfo->type & TSDB_QUERY_TYPE_JOIN_SEC_STAGE) != TSDB_QUERY_TYPE_JOIN_SEC_STAGE) { + if (code != TSDB_CODE_SUCCESS) { // direct call joinRetrieveCallback and set the error code + joinRetrieveCallback(param, pSql, code); + } else { // first stage query, continue to retrieve data + pSql->fp = joinRetrieveCallback; + pSql->cmd.command = TSDB_SQL_FETCH; + tscProcessSql(pSql); + } + + } else { // second stage join subquery + SSqlObj* pParentSql = pSupporter->pObj; + + if (pSupporter->pState->code != TSDB_CODE_SUCCESS) { + tscError("%p abort query due to other subquery failure. code:%d, global code:%d", pSql, code, + pSupporter->pState->code); + quitAllSubquery(pParentSql, pSupporter); + + return; + } + + if (code != TSDB_CODE_SUCCESS) { + tscError("%p sub query failed, code:%d, set global code:%d, index:%d", pSql, code, code, + pSupporter->subqueryIndex); + pSupporter->pState->code = code; // todo set the informative code + + quitAllSubquery(pParentSql, pSupporter); + } else { + int32_t numOfTotal = pSupporter->pState->numOfTotal; + int32_t finished = atomic_add_fetch_32(&pSupporter->pState->numOfCompleted, 1); + + if (finished >= numOfTotal) { + assert(finished == numOfTotal); + + tscSetupOutputColumnIndex(pParentSql); + STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); + + /** + * if the query is a continue query (vnodeIndex > 0 for projection query) for next vnode, do the retrieval of + * data instead of returning to its invoker + */ + if (pTableMetaInfo->vnodeIndex > 0 && tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0)) { + assert(pTableMetaInfo->vnodeIndex < pTableMetaInfo->pMetricMeta->numOfVnodes); + pSupporter->pState->numOfCompleted = 0; // reset the record value + + pSql->fp = joinRetrieveCallback; // continue retrieve data + pSql->cmd.command = TSDB_SQL_FETCH; + tscProcessSql(pSql); + } else { // first retrieve from vnode during the secondary stage sub-query + if (pParentSql->fp == NULL) { + tsem_wait(&pParentSql->emptyRspSem); + tsem_wait(&pParentSql->emptyRspSem); + + tsem_post(&pParentSql->rspSem); + } else { + // set the command flag must be after the semaphore been correctly set. + // pPObj->cmd.command = TSDB_SQL_RETRIEVE_METRIC; + // if (pPObj->res.code == TSDB_CODE_SUCCESS) { + // (*pPObj->fp)(pPObj->param, pPObj, 0); + // } else { + // tscQueueAsyncRes(pPObj); + // } + assert(0); + } + } + } + } + } +} + +///////////////////////////////////////////////////////////////////////////////////////// +static void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code); + +static SSqlObj *tscCreateSqlObjForSubquery(SSqlObj *pSql, SRetrieveSupport *trsupport, SSqlObj *prevSqlObj); + +// todo merge with callback +int32_t tscLaunchJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSubquerySupporter *pSupporter) { + SSqlCmd * pCmd = &pSql->cmd; + SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); + + pSql->res.qhandle = 0x1; + pSql->res.numOfRows = 0; + + if (pSql->pSubs == NULL) { + pSql->pSubs = calloc(pSupporter->pState->numOfTotal, POINTER_BYTES); + if (pSql->pSubs == NULL) { + return TSDB_CODE_CLI_OUT_OF_MEMORY; + } + } + + SSqlObj *pNew = createSubqueryObj(pSql, tableIndex, tscJoinQueryCallback, pSupporter, NULL); + if (pNew == NULL) { + return TSDB_CODE_CLI_OUT_OF_MEMORY; + } + + pSql->pSubs[pSql->numOfSubs++] = pNew; + assert(pSql->numOfSubs <= pSupporter->pState->numOfTotal); + + if (QUERY_IS_JOIN_QUERY(pQueryInfo->type)) { + addGroupInfoForSubquery(pSql, pNew, 0, tableIndex); + + // refactor as one method + SQueryInfo *pNewQueryInfo = tscGetQueryInfoDetail(&pNew->cmd, 0); + assert(pNewQueryInfo != NULL); + + tscColumnBaseInfoUpdateTableIndex(&pNewQueryInfo->colList, 0); + tscColumnBaseInfoCopy(&pSupporter->colList, &pNewQueryInfo->colList, 0); + + tscSqlExprCopy(&pSupporter->exprsInfo, &pNewQueryInfo->exprsInfo, pSupporter->uid, false); + tscFieldInfoCopyAll(&pSupporter->fieldsInfo, &pNewQueryInfo->fieldsInfo); + + tscTagCondCopy(&pSupporter->tagCond, &pNewQueryInfo->tagCond); + + pNew->cmd.numOfCols = 0; + pNewQueryInfo->intervalTime = 0; + memset(&pNewQueryInfo->limit, 0, sizeof(SLimitVal)); + + // backup the data and clear it in the sqlcmd object + pSupporter->groupbyExpr = pNewQueryInfo->groupbyExpr; + memset(&pNewQueryInfo->groupbyExpr, 0, sizeof(SSqlGroupbyExpr)); + + // this data needs to be transfer to support struct + pNewQueryInfo->fieldsInfo.numOfOutputCols = 0; + pNewQueryInfo->exprsInfo.numOfExprs = 0; + + // set the ts,tags that involved in join, as the output column of intermediate result + tscClearSubqueryInfo(&pNew->cmd); + + SSchema colSchema = {.type = TSDB_DATA_TYPE_BINARY, .bytes = 1}; + SColumnIndex index = {0, PRIMARYKEY_TIMESTAMP_COL_INDEX}; + + tscAddSpecialColumnForSelect(pNewQueryInfo, 0, TSDB_FUNC_TS_COMP, &index, &colSchema, TSDB_COL_NORMAL); + + // set the tags value for ts_comp function + SSqlExpr *pExpr = tscSqlExprGet(pNewQueryInfo, 0); + + STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pNewQueryInfo, 0); + int16_t tagColIndex = tscGetJoinTagColIndexByUid(&pSupporter->tagCond, pTableMetaInfo->pTableMeta->uid); + + pExpr->param->i64Key = tagColIndex; + pExpr->numOfParams = 1; + + // add the filter tag column + for (int32_t i = 0; i < pSupporter->colList.numOfCols; ++i) { + SColumnBase *pColBase = &pSupporter->colList.pColList[i]; + if (pColBase->numOfFilters > 0) { // copy to the pNew->cmd.colList if it is filtered. + tscColumnBaseCopy(&pNewQueryInfo->colList.pColList[pNewQueryInfo->colList.numOfCols], pColBase); + pNewQueryInfo->colList.numOfCols++; + } + } + + tscTrace("%p subquery:%p tableIndex:%d, vnodeIdx:%d, type:%d, transfer to ts_comp query to retrieve timestamps, " + "exprInfo:%d, colList:%d, fieldsInfo:%d, name:%s", + pSql, pNew, tableIndex, pTableMetaInfo->vnodeIndex, pNewQueryInfo->type, + pNewQueryInfo->exprsInfo.numOfExprs, pNewQueryInfo->colList.numOfCols, + pNewQueryInfo->fieldsInfo.numOfOutputCols, pNewQueryInfo->pTableMetaInfo[0]->name); + tscPrintSelectClause(pNew, 0); + + tscTrace("%p subquery:%p tableIndex:%d, vnodeIdx:%d, type:%d, transfer to ts_comp query to retrieve timestamps, " + "exprInfo:%d, colList:%d, fieldsInfo:%d, name:%s", + pSql, pNew, tableIndex, pTableMetaInfo->vnodeIndex, pNewQueryInfo->type, + pNewQueryInfo->exprsInfo.numOfExprs, pNewQueryInfo->colList.numOfCols, + pNewQueryInfo->fieldsInfo.numOfOutputCols, pNewQueryInfo->pTableMetaInfo[0]->name); + tscPrintSelectClause(pNew, 0); + } else { + SQueryInfo *pNewQueryInfo = tscGetQueryInfoDetail(&pNew->cmd, 0); + pNewQueryInfo->type |= TSDB_QUERY_TYPE_SUBQUERY; + } + +#ifdef _DEBUG_VIEW + tscPrintSelectClause(pNew, 0); +#endif + + return tscProcessSql(pNew); +} + +// todo support async join query +int32_t tscHandleMasterJoinQuery(SSqlObj* pSql) { + SSqlCmd* pCmd = &pSql->cmd; + SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); + + assert((pQueryInfo->type & TSDB_QUERY_TYPE_SUBQUERY) == 0); + + SSubqueryState *pState = calloc(1, sizeof(SSubqueryState)); + + pState->numOfTotal = pQueryInfo->numOfTables; + + for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) { + SJoinSubquerySupporter *pSupporter = tscCreateJoinSupporter(pSql, pState, i); + + if (pSupporter == NULL) { // failed to create support struct, abort current query + tscError("%p tableIndex:%d, failed to allocate join support object, abort further query", pSql, i); + pState->numOfCompleted = pQueryInfo->numOfTables - i - 1; + pSql->res.code = TSDB_CODE_CLI_OUT_OF_MEMORY; + + return pSql->res.code; + } + + int32_t code = tscLaunchJoinSubquery(pSql, i, pSupporter); + if (code != TSDB_CODE_SUCCESS) { // failed to create subquery object, quit query + tscDestroyJoinSupporter(pSupporter); + pSql->res.code = TSDB_CODE_CLI_OUT_OF_MEMORY; + + break; + } + } + + tsem_post(&pSql->emptyRspSem); + tsem_wait(&pSql->rspSem); + + tsem_post(&pSql->emptyRspSem); + + if (pSql->numOfSubs <= 0) { + pSql->cmd.command = TSDB_SQL_RETRIEVE_EMPTY_RESULT; + } else { + pSql->cmd.command = TSDB_SQL_METRIC_JOIN_RETRIEVE; + } + + return TSDB_CODE_SUCCESS; +} + +static void doCleanupSubqueries(SSqlObj *pSql, int32_t numOfSubs, SSubqueryState* pState) { + assert(numOfSubs <= pSql->numOfSubs && numOfSubs >= 0 && pState != NULL); + + for(int32_t i = 0; i < numOfSubs; ++i) { + SSqlObj* pSub = pSql->pSubs[i]; + assert(pSub != NULL); + + SRetrieveSupport* pSupport = pSub->param; + + tfree(pSupport->localBuffer); + + pthread_mutex_unlock(&pSupport->queryMutex); + pthread_mutex_destroy(&pSupport->queryMutex); + + tfree(pSupport); + + tscFreeSqlObj(pSub); + } + + free(pState); +} + +int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) { + SSqlRes *pRes = &pSql->res; + SSqlCmd *pCmd = &pSql->cmd; + + // pRes->code check only serves in launching metric sub-queries + if (pRes->code == TSDB_CODE_QUERY_CANCELLED) { + pCmd->command = TSDB_SQL_RETRIEVE_METRIC; // enable the abort of kill metric function. + return pRes->code; + } + + tExtMemBuffer ** pMemoryBuf = NULL; + tOrderDescriptor *pDesc = NULL; + SColumnModel * pModel = NULL; + + pRes->qhandle = 1; // hack the qhandle check + + const uint32_t nBufferSize = (1 << 16); // 64KB + + SQueryInfo * pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); + STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); + int32_t numOfSubQueries = pTableMetaInfo->pMetricMeta->numOfVnodes; + assert(numOfSubQueries > 0); + + int32_t ret = tscLocalReducerEnvCreate(pSql, &pMemoryBuf, &pDesc, &pModel, nBufferSize); + if (ret != 0) { + pRes->code = TSDB_CODE_CLI_OUT_OF_MEMORY; + if (pSql->fp) { + tscQueueAsyncRes(pSql); + } + return pRes->code; + } + + pSql->pSubs = calloc(numOfSubQueries, POINTER_BYTES); + pSql->numOfSubs = numOfSubQueries; + + tscTrace("%p retrieved query data from %d vnode(s)", pSql, numOfSubQueries); + SSubqueryState *pState = calloc(1, sizeof(SSubqueryState)); + pState->numOfTotal = numOfSubQueries; + pRes->code = TSDB_CODE_SUCCESS; + + int32_t i = 0; + for (; i < numOfSubQueries; ++i) { + SRetrieveSupport *trs = (SRetrieveSupport *)calloc(1, sizeof(SRetrieveSupport)); + if (trs == NULL) { + tscError("%p failed to malloc buffer for SRetrieveSupport, orderOfSub:%d, reason:%s", pSql, i, strerror(errno)); + break; + } + + trs->pExtMemBuffer = pMemoryBuf; + trs->pOrderDescriptor = pDesc; + trs->pState = pState; + + trs->localBuffer = (tFilePage *)calloc(1, nBufferSize + sizeof(tFilePage)); + if (trs->localBuffer == NULL) { + tscError("%p failed to malloc buffer for local buffer, orderOfSub:%d, reason:%s", pSql, i, strerror(errno)); + tfree(trs); + break; + } + + trs->subqueryIndex = i; + trs->pParentSqlObj = pSql; + trs->pFinalColModel = pModel; + + pthread_mutexattr_t mutexattr = {0}; + pthread_mutexattr_settype(&mutexattr, PTHREAD_MUTEX_RECURSIVE_NP); + pthread_mutex_init(&trs->queryMutex, &mutexattr); + pthread_mutexattr_destroy(&mutexattr); + + SSqlObj *pNew = tscCreateSqlObjForSubquery(pSql, trs, NULL); + if (pNew == NULL) { + tscError("%p failed to malloc buffer for subObj, orderOfSub:%d, reason:%s", pSql, i, strerror(errno)); + tfree(trs->localBuffer); + tfree(trs); + break; + } + + // todo handle multi-vnode situation + if (pQueryInfo->tsBuf) { + SQueryInfo *pNewQueryInfo = tscGetQueryInfoDetail(&pNew->cmd, 0); + pNewQueryInfo->tsBuf = tsBufClone(pQueryInfo->tsBuf); + } + + tscTrace("%p sub:%p create subquery success. orderOfSub:%d", pSql, pNew, trs->subqueryIndex); + } + + if (i < numOfSubQueries) { + tscError("%p failed to prepare subquery structure and launch subqueries", pSql); + pRes->code = TSDB_CODE_CLI_OUT_OF_MEMORY; + + tscLocalReducerEnvDestroy(pMemoryBuf, pDesc, pModel, numOfSubQueries); + doCleanupSubqueries(pSql, i, pState); + return pRes->code; // free all allocated resource + } + + if (pRes->code == TSDB_CODE_QUERY_CANCELLED) { + tscLocalReducerEnvDestroy(pMemoryBuf, pDesc, pModel, numOfSubQueries); + doCleanupSubqueries(pSql, i, pState); + return pRes->code; + } + + for(int32_t j = 0; j < numOfSubQueries; ++j) { + SSqlObj* pSub = pSql->pSubs[j]; + SRetrieveSupport* pSupport = pSub->param; + + tscTrace("%p sub:%p launch subquery, orderOfSub:%d.", pSql, pSub, pSupport->subqueryIndex); + tscProcessSql(pSub); + } + + return TSDB_CODE_SUCCESS; +} + +static void tscFreeSubSqlObj(SRetrieveSupport *trsupport, SSqlObj *pSql) { + tscTrace("%p start to free subquery result", pSql); + + if (pSql->res.code == TSDB_CODE_SUCCESS) { + taos_free_result(pSql); + } + + tfree(trsupport->localBuffer); + + pthread_mutex_unlock(&trsupport->queryMutex); + pthread_mutex_destroy(&trsupport->queryMutex); + + tfree(trsupport); +} + +static void tscRetrieveFromVnodeCallBack(void *param, TAOS_RES *tres, int numOfRows); + +static void tscAbortFurtherRetryRetrieval(SRetrieveSupport *trsupport, TAOS_RES *tres, int32_t errCode) { +// set no disk space error info +#ifdef WINDOWS + LPVOID lpMsgBuf; + FormatMessage(FORMAT_MESSAGE_ALLOCATE_BUFFER | FORMAT_MESSAGE_FROM_SYSTEM | FORMAT_MESSAGE_IGNORE_INSERTS, NULL, + GetLastError(), MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT), // Default language + (LPTSTR)&lpMsgBuf, 0, NULL); + tscError("sub:%p failed to flush data to disk:reason:%s", tres, lpMsgBuf); + LocalFree(lpMsgBuf); +#else + char buf[256] = {0}; + strerror_r(errno, buf, 256); + tscError("sub:%p failed to flush data to disk:reason:%s", tres, buf); +#endif + + trsupport->pState->code = -errCode; + trsupport->numOfRetry = MAX_NUM_OF_SUBQUERY_RETRY; + + pthread_mutex_unlock(&trsupport->queryMutex); + + tscRetrieveFromVnodeCallBack(trsupport, tres, trsupport->pState->code); +} + +static void tscHandleSubRetrievalError(SRetrieveSupport *trsupport, SSqlObj *pSql, int numOfRows) { + SSqlObj *pPObj = trsupport->pParentSqlObj; + int32_t subqueryIndex = trsupport->subqueryIndex; + + assert(pSql != NULL); + SSubqueryState* pState = trsupport->pState; + assert(pState->numOfCompleted < pState->numOfTotal && pState->numOfCompleted >= 0 && + pPObj->numOfSubs == pState->numOfTotal); + + /* retrieved in subquery failed. OR query cancelled in retrieve phase. */ + if (pState->code == TSDB_CODE_SUCCESS && pPObj->res.code != TSDB_CODE_SUCCESS) { + pState->code = -(int)pPObj->res.code; + + /* + * kill current sub-query connection, which may retrieve data from vnodes; + * Here we get: pPObj->res.code == TSDB_CODE_QUERY_CANCELLED + */ + pSql->res.numOfRows = 0; + trsupport->numOfRetry = MAX_NUM_OF_SUBQUERY_RETRY; // disable retry efforts + tscTrace("%p query is cancelled, sub:%p, orderOfSub:%d abort retrieve, code:%d", trsupport->pParentSqlObj, pSql, + subqueryIndex, pState->code); + } + + if (numOfRows >= 0) { // current query is successful, but other sub query failed, still abort current query. + tscTrace("%p sub:%p retrieve numOfRows:%d,orderOfSub:%d", pPObj, pSql, numOfRows, subqueryIndex); + tscError("%p sub:%p abort further retrieval due to other queries failure,orderOfSub:%d,code:%d", pPObj, pSql, + subqueryIndex, pState->code); + } else { + if (trsupport->numOfRetry++ < MAX_NUM_OF_SUBQUERY_RETRY && pState->code == TSDB_CODE_SUCCESS) { + /* + * current query failed, and the retry count is less than the available + * count, retry query clear previous retrieved data, then launch a new sub query + */ + tExtMemBufferClear(trsupport->pExtMemBuffer[subqueryIndex]); + + // clear local saved number of results + trsupport->localBuffer->numOfElems = 0; + pthread_mutex_unlock(&trsupport->queryMutex); + + tscTrace("%p sub:%p retrieve failed, code:%d, orderOfSub:%d, retry:%d", trsupport->pParentSqlObj, pSql, numOfRows, + subqueryIndex, trsupport->numOfRetry); + + SSqlObj *pNew = tscCreateSqlObjForSubquery(trsupport->pParentSqlObj, trsupport, pSql); + if (pNew == NULL) { + tscError("%p sub:%p failed to create new subquery sqlobj due to out of memory, abort retry", + trsupport->pParentSqlObj, pSql); + + pState->code = TSDB_CODE_CLI_OUT_OF_MEMORY; + trsupport->numOfRetry = MAX_NUM_OF_SUBQUERY_RETRY; + return; + } + + tscProcessSql(pNew); + return; + } else { // reach the maximum retry count, abort + atomic_val_compare_exchange_32(&pState->code, TSDB_CODE_SUCCESS, numOfRows); + tscError("%p sub:%p retrieve failed,code:%d,orderOfSub:%d failed.no more retry,set global code:%d", pPObj, pSql, + numOfRows, subqueryIndex, pState->code); + } + } + + int32_t numOfTotal = pState->numOfTotal; + + int32_t finished = atomic_add_fetch_32(&pState->numOfCompleted, 1); + if (finished < numOfTotal) { + tscTrace("%p sub:%p orderOfSub:%d freed, finished subqueries:%d", pPObj, pSql, trsupport->subqueryIndex, finished); + return tscFreeSubSqlObj(trsupport, pSql); + } + + // all subqueries are failed + tscError("%p retrieve from %d vnode(s) completed,code:%d.FAILED.", pPObj, pState->numOfTotal, pState->code); + pPObj->res.code = -(pState->code); + + // release allocated resource + tscLocalReducerEnvDestroy(trsupport->pExtMemBuffer, trsupport->pOrderDescriptor, trsupport->pFinalColModel, + pState->numOfTotal); + + tfree(trsupport->pState); + tscFreeSubSqlObj(trsupport, pSql); + + // in case of second stage join subquery, invoke its callback function instead of regular QueueAsyncRes + SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pPObj->cmd, 0); + + if ((pQueryInfo->type & TSDB_QUERY_TYPE_JOIN_SEC_STAGE) == TSDB_QUERY_TYPE_JOIN_SEC_STAGE) { + (*pPObj->fp)(pPObj->param, pPObj, pPObj->res.code); + } else { // regular super table query + if (pPObj->res.code != TSDB_CODE_SUCCESS) { + tscQueueAsyncRes(pPObj); + } + } +} + +static void tscRetrieveFromVnodeCallBack(void *param, TAOS_RES *tres, int numOfRows) { + SRetrieveSupport *trsupport = (SRetrieveSupport *)param; + int32_t idx = trsupport->subqueryIndex; + SSqlObj * pPObj = trsupport->pParentSqlObj; + tOrderDescriptor *pDesc = trsupport->pOrderDescriptor; + + SSqlObj *pSql = (SSqlObj *)tres; + if (pSql == NULL) { // sql object has been released in error process, return immediately + tscTrace("%p subquery has been released, idx:%d, abort", pPObj, idx); + return; + } + + SSubqueryState* pState = trsupport->pState; + assert(pState->numOfCompleted < pState->numOfTotal && pState->numOfCompleted >= 0 && + pPObj->numOfSubs == pState->numOfTotal); + + // query process and cancel query process may execute at the same time + pthread_mutex_lock(&trsupport->queryMutex); + + if (numOfRows < 0 || pState->code < 0 || pPObj->res.code != TSDB_CODE_SUCCESS) { + return tscHandleSubRetrievalError(trsupport, pSql, numOfRows); + } + + SSqlRes * pRes = &pSql->res; + SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0); + + STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); + + SVnodeSidList *vnodeInfo = tscGetVnodeSidList(pTableMetaInfo->pMetricMeta, idx); + SVnodeDesc * pSvd = &vnodeInfo->vpeerDesc[vnodeInfo->index]; + + if (numOfRows > 0) { + assert(pRes->numOfRows == numOfRows); + int64_t num = atomic_add_fetch_64(&pState->numOfRetrievedRows, numOfRows); + + tscTrace("%p sub:%p retrieve numOfRows:%d totalNumOfRows:%d from ip:%u,vid:%d,orderOfSub:%d", pPObj, pSql, + pRes->numOfRows, pState->numOfRetrievedRows, pSvd->ip, pSvd->vnode, idx); + + if (num > tsMaxNumOfOrderedResults && tscIsProjectionQueryOnSTable(pQueryInfo, 0)) { + tscError("%p sub:%p num of OrderedRes is too many, max allowed:%" PRId64 " , current:%" PRId64, + pPObj, pSql, tsMaxNumOfOrderedResults, num); + tscAbortFurtherRetryRetrieval(trsupport, tres, TSDB_CODE_SORTED_RES_TOO_MANY); + return; + } + + +#ifdef _DEBUG_VIEW + printf("received data from vnode: %d rows\n", pRes->numOfRows); + SSrcColumnInfo colInfo[256] = {0}; + + tscGetSrcColumnInfo(colInfo, pQueryInfo); + tColModelDisplayEx(pDesc->pColumnModel, pRes->data, pRes->numOfRows, pRes->numOfRows, colInfo); +#endif + if (tsTotalTmpDirGB != 0 && tsAvailTmpDirGB < tsMinimalTmpDirGB) { + tscError("%p sub:%p client disk space remain %.3f GB, need at least %.3f GB, stop query", pPObj, pSql, + tsAvailTmpDirGB, tsMinimalTmpDirGB); + tscAbortFurtherRetryRetrieval(trsupport, tres, TSDB_CODE_CLI_NO_DISKSPACE); + return; + } + + int32_t ret = saveToBuffer(trsupport->pExtMemBuffer[idx], pDesc, trsupport->localBuffer, pRes->data, + pRes->numOfRows, pQueryInfo->groupbyExpr.orderType); + if (ret < 0) { + // set no disk space error info, and abort retry + tscAbortFurtherRetryRetrieval(trsupport, tres, TSDB_CODE_CLI_NO_DISKSPACE); + } else { + pthread_mutex_unlock(&trsupport->queryMutex); + taos_fetch_rows_a(tres, tscRetrieveFromVnodeCallBack, param); + } + + } else { // all data has been retrieved to client + /* data in from current vnode is stored in cache and disk */ + uint32_t numOfRowsFromVnode = trsupport->pExtMemBuffer[idx]->numOfTotalElems + trsupport->localBuffer->numOfElems; + tscTrace("%p sub:%p all data retrieved from ip:%u,vid:%d, numOfRows:%d, orderOfSub:%d", pPObj, pSql, pSvd->ip, + pSvd->vnode, numOfRowsFromVnode, idx); + + tColModelCompact(pDesc->pColumnModel, trsupport->localBuffer, pDesc->pColumnModel->capacity); + +#ifdef _DEBUG_VIEW + printf("%" PRIu64 " rows data flushed to disk:\n", trsupport->localBuffer->numOfElems); + SSrcColumnInfo colInfo[256] = {0}; + tscGetSrcColumnInfo(colInfo, pQueryInfo); + tColModelDisplayEx(pDesc->pColumnModel, trsupport->localBuffer->data, trsupport->localBuffer->numOfElems, + trsupport->localBuffer->numOfElems, colInfo); +#endif + + if (tsTotalTmpDirGB != 0 && tsAvailTmpDirGB < tsMinimalTmpDirGB) { + tscError("%p sub:%p client disk space remain %.3f GB, need at least %.3f GB, stop query", pPObj, pSql, + tsAvailTmpDirGB, tsMinimalTmpDirGB); + tscAbortFurtherRetryRetrieval(trsupport, tres, TSDB_CODE_CLI_NO_DISKSPACE); + return; + } + + // each result for a vnode is ordered as an independant list, + // then used as an input of loser tree for disk-based merge routine + int32_t ret = tscFlushTmpBuffer(trsupport->pExtMemBuffer[idx], pDesc, trsupport->localBuffer, + pQueryInfo->groupbyExpr.orderType); + if (ret != 0) { + /* set no disk space error info, and abort retry */ + return tscAbortFurtherRetryRetrieval(trsupport, tres, TSDB_CODE_CLI_NO_DISKSPACE); + } + + // keep this value local variable, since the pState variable may be released by other threads, if atomic_add opertion + // increases the finished value up to pState->numOfTotal value, which means all subqueries are completed. + // In this case, the comparsion between finished value and released pState->numOfTotal is not safe. + int32_t numOfTotal = pState->numOfTotal; + + int32_t finished = atomic_add_fetch_32(&pState->numOfCompleted, 1); + if (finished < numOfTotal) { + tscTrace("%p sub:%p orderOfSub:%d freed, finished subqueries:%d", pPObj, pSql, trsupport->subqueryIndex, finished); + return tscFreeSubSqlObj(trsupport, pSql); + } + + // all sub-queries are returned, start to local merge process + pDesc->pColumnModel->capacity = trsupport->pExtMemBuffer[idx]->numOfElemsPerPage; + + tscTrace("%p retrieve from %d vnodes completed.final NumOfRows:%d,start to build loser tree", pPObj, + pState->numOfTotal, pState->numOfRetrievedRows); + + SQueryInfo *pPQueryInfo = tscGetQueryInfoDetail(&pPObj->cmd, 0); + tscClearInterpInfo(pPQueryInfo); + + tscCreateLocalReducer(trsupport->pExtMemBuffer, pState->numOfTotal, pDesc, trsupport->pFinalColModel, + &pPObj->cmd, &pPObj->res); + tscTrace("%p build loser tree completed", pPObj); + + pPObj->res.precision = pSql->res.precision; + pPObj->res.numOfRows = 0; + pPObj->res.row = 0; + + // only free once + tfree(trsupport->pState); + tscFreeSubSqlObj(trsupport, pSql); + + // set the command flag must be after the semaphore been correctly set. + pPObj->cmd.command = TSDB_SQL_RETRIEVE_METRIC; + if (pPObj->res.code == TSDB_CODE_SUCCESS) { + (*pPObj->fp)(pPObj->param, pPObj, 0); + } else { + tscQueueAsyncRes(pPObj); + } + } +} + +static SSqlObj *tscCreateSqlObjForSubquery(SSqlObj *pSql, SRetrieveSupport *trsupport, SSqlObj *prevSqlObj) { + const int32_t table_index = 0; + + SSqlObj *pNew = createSubqueryObj(pSql, table_index, tscRetrieveDataRes, trsupport, prevSqlObj); + if (pNew != NULL) { // the sub query of two-stage super table query + SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pNew->cmd, 0); + pQueryInfo->type |= TSDB_QUERY_TYPE_STABLE_SUBQUERY; + + assert(pQueryInfo->numOfTables == 1 && pNew->cmd.numOfClause == 1); + + // launch subquery for each vnode, so the subquery index equals to the vnodeIndex. + STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, table_index); + pTableMetaInfo->vnodeIndex = trsupport->subqueryIndex; + + pSql->pSubs[trsupport->subqueryIndex] = pNew; + } + + return pNew; +} + +void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) { + SRetrieveSupport *trsupport = (SRetrieveSupport *)param; + + SSqlObj* pParentSql = trsupport->pParentSqlObj; + SSqlObj* pSql = (SSqlObj *)tres; + + STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0); + assert(pSql->cmd.numOfClause == 1 && pSql->cmd.pQueryInfo[0]->numOfTables == 1); + + int32_t idx = pTableMetaInfo->vnodeIndex; + + SVnodeSidList *vnodeInfo = NULL; + SVnodeDesc * pSvd = NULL; + if (pTableMetaInfo->pMetricMeta != NULL) { + vnodeInfo = tscGetVnodeSidList(pTableMetaInfo->pMetricMeta, idx); + pSvd = &vnodeInfo->vpeerDesc[vnodeInfo->index]; + } + + SSubqueryState* pState = trsupport->pState; + assert(pState->numOfCompleted < pState->numOfTotal && pState->numOfCompleted >= 0 && + pParentSql->numOfSubs == pState->numOfTotal); + + if (pParentSql->res.code != TSDB_CODE_SUCCESS || pState->code != TSDB_CODE_SUCCESS) { + // metric query is killed, Note: code must be less than 0 + trsupport->numOfRetry = MAX_NUM_OF_SUBQUERY_RETRY; + if (pParentSql->res.code != TSDB_CODE_SUCCESS) { + code = -(int)(pParentSql->res.code); + } else { + code = pState->code; + } + tscTrace("%p query cancelled or failed, sub:%p, orderOfSub:%d abort, code:%d", pParentSql, pSql, + trsupport->subqueryIndex, code); + } + + /* + * if a query on a vnode is failed, all retrieve operations from vnode that occurs later + * than this one are actually not necessary, we simply call the tscRetrieveFromVnodeCallBack + * function to abort current and remain retrieve process. + * + * NOTE: threadsafe is required. + */ + if (code != TSDB_CODE_SUCCESS) { + if (trsupport->numOfRetry++ >= MAX_NUM_OF_SUBQUERY_RETRY) { + tscTrace("%p sub:%p reach the max retry count,set global code:%d", pParentSql, pSql, code); + atomic_val_compare_exchange_32(&pState->code, 0, code); + } else { // does not reach the maximum retry count, go on + tscTrace("%p sub:%p failed code:%d, retry:%d", pParentSql, pSql, code, trsupport->numOfRetry); + + SSqlObj *pNew = tscCreateSqlObjForSubquery(pParentSql, trsupport, pSql); + if (pNew == NULL) { + tscError("%p sub:%p failed to create new subquery due to out of memory, abort retry, vid:%d, orderOfSub:%d", + trsupport->pParentSqlObj, pSql, pSvd != NULL ? pSvd->vnode : -1, trsupport->subqueryIndex); + + pState->code = -TSDB_CODE_CLI_OUT_OF_MEMORY; + trsupport->numOfRetry = MAX_NUM_OF_SUBQUERY_RETRY; + } else { + SQueryInfo *pNewQueryInfo = tscGetQueryInfoDetail(&pNew->cmd, 0); + assert(pNewQueryInfo->pTableMetaInfo[0]->pTableMeta != NULL && pNewQueryInfo->pTableMetaInfo[0]->pMetricMeta != NULL); + tscProcessSql(pNew); + return; + } + } + } + + if (pState->code != TSDB_CODE_SUCCESS) { // failed, abort + if (vnodeInfo != NULL) { + tscTrace("%p sub:%p query failed,ip:%u,vid:%d,orderOfSub:%d,global code:%d", pParentSql, pSql, + vnodeInfo->vpeerDesc[vnodeInfo->index].ip, vnodeInfo->vpeerDesc[vnodeInfo->index].vnode, + trsupport->subqueryIndex, pState->code); + } else { + tscTrace("%p sub:%p query failed,orderOfSub:%d,global code:%d", pParentSql, pSql, + trsupport->subqueryIndex, pState->code); + } + + tscRetrieveFromVnodeCallBack(param, tres, pState->code); + } else { // success, proceed to retrieve data from dnode + if (vnodeInfo != NULL) { + tscTrace("%p sub:%p query complete,ip:%u,vid:%d,orderOfSub:%d,retrieve data", trsupport->pParentSqlObj, pSql, + vnodeInfo->vpeerDesc[vnodeInfo->index].ip, vnodeInfo->vpeerDesc[vnodeInfo->index].vnode, + trsupport->subqueryIndex); + } else { + tscTrace("%p sub:%p query complete, orderOfSub:%d,retrieve data", trsupport->pParentSqlObj, pSql, + trsupport->subqueryIndex); + } + + taos_fetch_rows_a(tres, tscRetrieveFromVnodeCallBack, param); + } +} + +static void multiVnodeInsertMerge(void* param, TAOS_RES* tres, int numOfRows) { + SInsertSupporter *pSupporter = (SInsertSupporter *)param; + SSqlObj* pParentObj = pSupporter->pSql; + SSqlCmd* pParentCmd = &pParentObj->cmd; + + SSubqueryState* pState = pSupporter->pState; + int32_t total = pState->numOfTotal; + + // increase the total inserted rows + if (numOfRows > 0) { + pParentObj->res.numOfRows += numOfRows; + } + + int32_t completed = atomic_add_fetch_32(&pState->numOfCompleted, 1); + if (completed < total) { + return; + } + + tscTrace("%p Async insertion completed, total inserted:%d", pParentObj, pParentObj->res.numOfRows); + + // release data block data + pParentCmd->pDataBlocks = tscDestroyBlockArrayList(pParentCmd->pDataBlocks); + + // restore user defined fp + pParentObj->fp = pParentObj->fetchFp; + + // all data has been sent to vnode, call user function + (*pParentObj->fp)(pParentObj->param, tres, numOfRows); +} + +int32_t tscHandleMultivnodeInsert(SSqlObj *pSql) { + SSqlRes *pRes = &pSql->res; + SSqlCmd *pCmd = &pSql->cmd; + + pRes->qhandle = 1; // hack the qhandle check + SDataBlockList *pDataBlocks = pCmd->pDataBlocks; + + pSql->pSubs = calloc(pDataBlocks->nSize, POINTER_BYTES); + pSql->numOfSubs = pDataBlocks->nSize; + assert(pDataBlocks->nSize > 0); + + tscTrace("%p submit data to %d vnode(s)", pSql, pDataBlocks->nSize); + SSubqueryState *pState = calloc(1, sizeof(SSubqueryState)); + pState->numOfTotal = pSql->numOfSubs; + + pRes->code = TSDB_CODE_SUCCESS; + + int32_t i = 0; + for (; i < pSql->numOfSubs; ++i) { + SInsertSupporter* pSupporter = calloc(1, sizeof(SInsertSupporter)); + pSupporter->pSql = pSql; + pSupporter->pState = pState; + + SSqlObj *pNew = createSubqueryObj(pSql, 0, multiVnodeInsertMerge, pSupporter, NULL); + if (pNew == NULL) { + tscError("%p failed to malloc buffer for subObj, orderOfSub:%d, reason:%s", pSql, i, strerror(errno)); + break; + } + + pSql->pSubs[i] = pNew; + tscTrace("%p sub:%p create subObj success. orderOfSub:%d", pSql, pNew, i); + } + + if (i < pSql->numOfSubs) { + tscError("%p failed to prepare subObj structure and launch sub-insertion", pSql); + pRes->code = TSDB_CODE_CLI_OUT_OF_MEMORY; + return pRes->code; // free all allocated resource + } + + for (int32_t j = 0; j < pSql->numOfSubs; ++j) { + SSqlObj *pSub = pSql->pSubs[j]; + pSub->cmd.command = TSDB_SQL_INSERT; + int32_t code = tscCopyDataBlockToPayload(pSub, pDataBlocks->pData[j]); + + if (code != TSDB_CODE_SUCCESS) { + tscTrace("%p prepare submit data block failed in async insertion, vnodeIdx:%d, total:%d, code:%d", pSql, j, + pDataBlocks->nSize, code); + } + + tscTrace("%p sub:%p launch sub insert, orderOfSub:%d", pSql, pSub, j); + tscProcessSql(pSub); + } + + return TSDB_CODE_SUCCESS; +} diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index 7cc92088c6..3fbbc7e183 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -21,9 +21,9 @@ #include "tcache.h" #include "tkey.h" #include "tmd5.h" -#include "tscJoinProcess.h" #include "tscProfile.h" #include "tscSecondaryMerge.h" +#include "tscSubquery.h" #include "tschemautil.h" #include "tsclient.h" #include "ttimer.h" @@ -209,7 +209,7 @@ STableSidExtInfo* tscGetMeterSidInfo(SVnodeSidList* pSidList, int32_t idx) { return (STableSidExtInfo*)(pSidList->pSidExtInfoList[idx] + (char*)pSidList); } -bool tscIsTwoStageMergeMetricQuery(SQueryInfo* pQueryInfo, int32_t tableIndex) { +bool tscIsTwoStageSTableQuery(SQueryInfo* pQueryInfo, int32_t tableIndex) { if (pQueryInfo == NULL) { return false; } @@ -2321,94 +2321,3 @@ void tscTryQueryNextClause(SSqlObj* pSql, void (*queryFp)()) { tscProcessSql(pSql); } } - -typedef struct SinsertSupporter { - SSubqueryState* pState; - SSqlObj* pSql; -} SinsertSupporter; - -void multiVnodeInsertMerge(void* param, TAOS_RES* tres, int numOfRows) { - SinsertSupporter *pSupporter = (SinsertSupporter *)param; - SSqlObj* pParentObj = pSupporter->pSql; - SSqlCmd* pParentCmd = &pParentObj->cmd; - - SSubqueryState* pState = pSupporter->pState; - int32_t total = pState->numOfTotal; - - // increase the total inserted rows - if (numOfRows > 0) { - pParentObj->res.numOfRows += numOfRows; - } - - int32_t completed = atomic_add_fetch_32(&pState->numOfCompleted, 1); - if (completed < total) { - return; - } - - tscTrace("%p Async insertion completed, total inserted:%d", pParentObj, pParentObj->res.numOfRows); - - // release data block data - pParentCmd->pDataBlocks = tscDestroyBlockArrayList(pParentCmd->pDataBlocks); - - // restore user defined fp - pParentObj->fp = pParentObj->fetchFp; - - // all data has been sent to vnode, call user function - (*pParentObj->fp)(pParentObj->param, tres, numOfRows); -} - -int32_t launchMultivnodeInsert(SSqlObj *pSql) { - SSqlRes *pRes = &pSql->res; - SSqlCmd *pCmd = &pSql->cmd; - - pRes->qhandle = 1; // hack the qhandle check - SDataBlockList *pDataBlocks = pCmd->pDataBlocks; - - pSql->pSubs = calloc(pDataBlocks->nSize, POINTER_BYTES); - pSql->numOfSubs = pDataBlocks->nSize; - assert(pDataBlocks->nSize > 0); - - tscTrace("%p submit data to %d vnode(s)", pSql, pDataBlocks->nSize); - SSubqueryState *pState = calloc(1, sizeof(SSubqueryState)); - pState->numOfTotal = pSql->numOfSubs; - - pRes->code = TSDB_CODE_SUCCESS; - - int32_t i = 0; - for (; i < pSql->numOfSubs; ++i) { - SinsertSupporter* pSupporter = calloc(1, sizeof(SinsertSupporter)); - pSupporter->pSql = pSql; - pSupporter->pState = pState; - - SSqlObj *pNew = createSubqueryObj(pSql, 0, multiVnodeInsertMerge, pSupporter, NULL); - if (pNew == NULL) { - tscError("%p failed to malloc buffer for subObj, orderOfSub:%d, reason:%s", pSql, i, strerror(errno)); - break; - } - - pSql->pSubs[i] = pNew; - tscTrace("%p sub:%p create subObj success. orderOfSub:%d", pSql, pNew, i); - } - - if (i < pSql->numOfSubs) { - tscError("%p failed to prepare subObj structure and launch sub-insertion", pSql); - pRes->code = TSDB_CODE_CLI_OUT_OF_MEMORY; - return pRes->code; // free all allocated resource - } - - for (int32_t j = 0; j < pSql->numOfSubs; ++j) { - SSqlObj *pSub = pSql->pSubs[j]; - pSub->cmd.command = TSDB_SQL_INSERT; - int32_t code = tscCopyDataBlockToPayload(pSub, pDataBlocks->pData[j]); - - if (code != TSDB_CODE_SUCCESS) { - tscTrace("%p prepare submit data block failed in async insertion, vnodeIdx:%d, total:%d, code:%d", pSql, j, - pDataBlocks->nSize, code); - } - - tscTrace("%p sub:%p launch sub insert, orderOfSub:%d", pSql, pSub, j); - tscProcessSql(pSub); - } - - return TSDB_CODE_SUCCESS; -} -- GitLab