diff --git a/src/client/src/tscSubquery.c b/src/client/src/tscSubquery.c index f3af685cebfc8daea679777362d55fc09cc3c3a1..45bcf4095ac2b8c4b74fbfed45f8e3bf8f891091 100644 --- a/src/client/src/tscSubquery.c +++ b/src/client/src/tscSubquery.c @@ -2433,6 +2433,26 @@ int32_t tscHandleFirstRoundStableQuery(SSqlObj *pSql) { return terrno; } +typedef struct SPair { + int32_t first; + int32_t second; +} SPair; + +static void doSendQueryReqs(SSchedMsg* pSchedMsg) { + SSqlObj* pSql = pSchedMsg->ahandle; + SPair* p = pSchedMsg->msg; + + for(int32_t i = p->first; i < p->second; ++i) { + SSqlObj* pSub = pSql->pSubs[i]; + SRetrieveSupport* pSupport = pSub->param; + + tscDebug("0x%"PRIx64" sub:0x%"PRIx64" launch subquery, orderOfSub:%d.", pSql->self, pSub->self, pSupport->subqueryIndex); + tscBuildAndSendRequest(pSub, NULL); + } + + tfree(p); +} + int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) { SSqlRes *pRes = &pSql->res; SSqlCmd *pCmd = &pSql->cmd; @@ -2555,13 +2575,33 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) { doCleanupSubqueries(pSql, i); return pRes->code; } - - for(int32_t j = 0; j < pState->numOfSub; ++j) { - SSqlObj* pSub = pSql->pSubs[j]; - SRetrieveSupport* pSupport = pSub->param; - - tscDebug("0x%"PRIx64" sub:0x%"PRIx64" launch subquery, orderOfSub:%d.", pSql->self, pSub->self, pSupport->subqueryIndex); - tscBuildAndSendRequest(pSub, NULL); + + // concurrently sent the query requests. + const int32_t MAX_REQUEST_PER_TASK = 8; + + int32_t numOfTasks = (pState->numOfSub + MAX_REQUEST_PER_TASK - 1)/MAX_REQUEST_PER_TASK; + assert(numOfTasks >= 1); + + int32_t num = (pState->numOfSub/numOfTasks) + 1; + tscDebug("0x%"PRIx64 " query will be sent by %d threads", pSql->self, numOfTasks); + + for(int32_t j = 0; j < numOfTasks; ++j) { + SSchedMsg schedMsg = {0}; + schedMsg.fp = doSendQueryReqs; + schedMsg.ahandle = (void*)pSql; + + schedMsg.thandle = NULL; + SPair* p = calloc(1, sizeof(SPair)); + p->first = j * num; + + if (j == numOfTasks - 1) { + p->second = pState->numOfSub; + } else { + p->second = (j + 1) * num; + } + + schedMsg.msg = p; + taosScheduleTask(tscQhandle, &schedMsg); } return TSDB_CODE_SUCCESS; diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index 303612fc8e663e0a304ee1556500456be7720c10..0e2aba1d247657f6d4199817614fc08f282d7333 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -4265,6 +4265,7 @@ static void doCopyQueryResultToMsg(SQInfo *pQInfo, int32_t numOfRows, char *data } qDebug("QInfo:0x%"PRIx64" set %d subscribe info", pQInfo->qId, total); + // Check if query is completed or not for stable query or normal table query respectively. if (Q_STATUS_EQUAL(pRuntimeEnv->status, QUERY_COMPLETED) && pRuntimeEnv->proot->status == OP_EXEC_DONE) { setQueryStatus(pRuntimeEnv, QUERY_OVER); @@ -7079,6 +7080,10 @@ static SSDataBlock* doTagScan(void* param, bool* newgroup) { qDebug("QInfo:0x%"PRIx64" create tag values results completed, rows:%d", GET_QID(pRuntimeEnv), count); } + if (pOperator->status == OP_EXEC_DONE) { + setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED); + } + pRes->info.rows = count; return (pRes->info.rows == 0)? NULL:pInfo->pRes; }