diff --git a/src/client/src/tscSubquery.c b/src/client/src/tscSubquery.c index 422d07ccde76b7358513d06aa942191d71478410..75307087b19a163af4a51a7d58bdfbd80bf7c2ca 100644 --- a/src/client/src/tscSubquery.c +++ b/src/client/src/tscSubquery.c @@ -15,8 +15,9 @@ #define _GNU_SOURCE #include "os.h" - #include "texpr.h" + +#include "tsched.h" #include "qTsbuf.h" #include "tcompare.h" #include "tscLog.h" @@ -2423,6 +2424,26 @@ int32_t tscHandleFirstRoundStableQuery(SSqlObj *pSql) { return terrno; } +typedef struct SPair { + int32_t first; + int32_t second; +} SPair; + +static void doSendQueryReqs(SSchedMsg* pSchedMsg) { + SSqlObj* pSql = pSchedMsg->ahandle; + SPair* p = pSchedMsg->msg; + + for(int32_t i = p->first; i < p->second; ++i) { + SSqlObj* pSub = pSql->pSubs[i]; + SRetrieveSupport* pSupport = pSub->param; + + tscDebug("0x%"PRIx64" sub:0x%"PRIx64" launch subquery, orderOfSub:%d.", pSql->self, pSub->self, pSupport->subqueryIndex); + tscProcessSql(pSub); + } + + tfree(p); +} + int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) { SSqlRes *pRes = &pSql->res; SSqlCmd *pCmd = &pSql->cmd; @@ -2546,13 +2567,33 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) { doCleanupSubqueries(pSql, i); return pRes->code; } - - for(int32_t j = 0; j < pState->numOfSub; ++j) { - SSqlObj* pSub = pSql->pSubs[j]; - SRetrieveSupport* pSupport = pSub->param; - - tscDebug("0x%"PRIx64" sub:%p launch subquery, orderOfSub:%d.", pSql->self, pSub, pSupport->subqueryIndex); - tscProcessSql(pSub); + + // concurrently sent the query requests. + const int32_t MAX_REQUEST_PER_TASK = 8; + + int32_t numOfTasks = (pState->numOfSub + MAX_REQUEST_PER_TASK - 1)/MAX_REQUEST_PER_TASK; + assert(numOfTasks >= 1); + + int32_t num = (pState->numOfSub/numOfTasks) + 1; + tscDebug("0x%"PRIx64 " query will be sent by %d threads", pSql->self, numOfTasks); + + for(int32_t j = 0; j < numOfTasks; ++j) { + SSchedMsg schedMsg = {0}; + schedMsg.fp = doSendQueryReqs; + schedMsg.ahandle = (void*)pSql; + + schedMsg.thandle = NULL; + SPair* p = calloc(1, sizeof(SPair)); + p->first = j * num; + + if (j == numOfTasks - 1) { + p->second = pState->numOfSub; + } else { + p->second = (j + 1) * num; + } + + schedMsg.msg = p; + taosScheduleTask(tscQhandle, &schedMsg); } return TSDB_CODE_SUCCESS;