diff --git a/src/client/inc/tscUtil.h b/src/client/inc/tscUtil.h index c59ec3e624ec84a76ae805d969a6b319667fb1af..ebd5de1ab3a7faa85badd81165168347aa65f7b5 100644 --- a/src/client/inc/tscUtil.h +++ b/src/client/inc/tscUtil.h @@ -318,6 +318,7 @@ void doAddGroupColumnForSubquery(SQueryInfo* pQueryInfo, int32_t tagIndex, SSqlC int16_t tscGetJoinTagColIdByUid(STagCond* pTagCond, uint64_t uid); int16_t tscGetTagColIndexById(STableMeta* pTableMeta, int16_t colId); +int32_t doInitSubState(SSqlObj* pSql, int32_t numOfSubqueries); void tscPrintSelNodeList(SSqlObj* pSql, int32_t subClauseIndex); diff --git a/src/client/src/tscSubquery.c b/src/client/src/tscSubquery.c index e3bec2c2eaa50bb8e753ad499889b94dc23c2f40..edc3dbfc82aa6c6c7dcbb9fa6548c9f49864e324 100644 --- a/src/client/src/tscSubquery.c +++ b/src/client/src/tscSubquery.c @@ -2459,11 +2459,48 @@ static void doSendQueryReqs(SSchedMsg* pSchedMsg) { tfree(p); } +static void doConcurrentlySendSubQueries(SSqlObj* pSql) { + SSubqueryState *pState = &pSql->subState; + + // concurrently sent the query requests. + const int32_t MAX_REQUEST_PER_TASK = 8; + + int32_t numOfTasks = (pState->numOfSub + MAX_REQUEST_PER_TASK - 1)/MAX_REQUEST_PER_TASK; + assert(numOfTasks >= 1); + + int32_t num; + if (pState->numOfSub / numOfTasks == MAX_REQUEST_PER_TASK) { + num = MAX_REQUEST_PER_TASK; + } else { + num = pState->numOfSub / numOfTasks + 1; + } + tscDebug("0x%"PRIx64 " query will be sent by %d threads", pSql->self, numOfTasks); + + for(int32_t j = 0; j < numOfTasks; ++j) { + SSchedMsg schedMsg = {0}; + schedMsg.fp = doSendQueryReqs; + schedMsg.ahandle = (void*)pSql; + + schedMsg.thandle = NULL; + SPair* p = calloc(1, sizeof(SPair)); + p->first = j * num; + + if (j == numOfTasks - 1) { + p->second = pState->numOfSub; + } else { + p->second = (j + 1) * num; + } + + schedMsg.msg = p; + taosScheduleTask(tscQhandle, &schedMsg); + } +} + int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) { SSqlRes *pRes = &pSql->res; SSqlCmd *pCmd = &pSql->cmd; - // pRes->code check only serves in launching metric sub-queries + // pRes->code check only serves in launching super table sub-queries if (pRes->code == TSDB_CODE_TSC_QUERY_CANCELLED) { pCmd->command = TSDB_SQL_RETRIEVE_GLOBALMERGE; // enable the abort of kill super table function. return pRes->code; @@ -2474,22 +2511,23 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) { pRes->qId = 0x1; // hack the qhandle check - const uint32_t nBufferSize = (1u << 18u); // 256KB + const uint32_t nBufferSize = (1u << 18u); // 256KB, default buffer size SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd); STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); + SSubqueryState *pState = &pSql->subState; - pState->numOfSub = 0; - if (pTableMetaInfo->pVgroupTables == NULL) { - pState->numOfSub = pTableMetaInfo->vgroupList->numOfVgroups; - } else { - pState->numOfSub = (int32_t)taosArrayGetSize(pTableMetaInfo->pVgroupTables); + int32_t numOfSub = (pTableMetaInfo->pVgroupTables == NULL) ? pTableMetaInfo->vgroupList->numOfVgroups + : (int32_t)taosArrayGetSize(pTableMetaInfo->pVgroupTables); + + int32_t ret = doInitSubState(pSql, numOfSub); + if (ret != 0) { + tscAsyncResultOnError(pSql); + return ret; } - assert(pState->numOfSub > 0); - - int32_t ret = tscCreateGlobalMergerEnv(pQueryInfo, &pMemoryBuf, pSql->subState.numOfSub, &pDesc, nBufferSize, pSql->self); + ret = tscCreateGlobalMergerEnv(pQueryInfo, &pMemoryBuf, pSql->subState.numOfSub, &pDesc, nBufferSize, pSql->self); if (ret != 0) { pRes->code = ret; tscAsyncResultOnError(pSql); @@ -2499,32 +2537,6 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) { } tscDebug("0x%"PRIx64" retrieved query data from %d vnode(s)", pSql->self, pState->numOfSub); - pSql->pSubs = calloc(pState->numOfSub, POINTER_BYTES); - if (pSql->pSubs == NULL) { - tfree(pSql->pSubs); - pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY; - tscDestroyGlobalMergerEnv(pMemoryBuf, pDesc,pState->numOfSub); - - tscAsyncResultOnError(pSql); - return ret; - } - - if (pState->states == NULL) { - pState->states = calloc(pState->numOfSub, sizeof(*pState->states)); - if (pState->states == NULL) { - pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY; - tscDestroyGlobalMergerEnv(pMemoryBuf, pDesc,pState->numOfSub); - - tscAsyncResultOnError(pSql); - return ret; - } - - pthread_mutex_init(&pState->mutex, NULL); - } - - memset(pState->states, 0, sizeof(*pState->states) * pState->numOfSub); - tscDebug("0x%"PRIx64" reset all sub states to 0", pSql->self); - pRes->code = TSDB_CODE_SUCCESS; int32_t i = 0; @@ -2545,8 +2557,8 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) { break; } - trs->subqueryIndex = i; - trs->pParentSql = pSql; + trs->subqueryIndex = i; + trs->pParentSql = pSql; SSqlObj *pNew = tscCreateSTableSubquery(pSql, trs, NULL); if (pNew == NULL) { @@ -2582,39 +2594,7 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) { return pRes->code; } - // concurrently sent the query requests. - const int32_t MAX_REQUEST_PER_TASK = 8; - - int32_t numOfTasks = (pState->numOfSub + MAX_REQUEST_PER_TASK - 1)/MAX_REQUEST_PER_TASK; - assert(numOfTasks >= 1); - - int32_t num; - if (pState->numOfSub / numOfTasks == MAX_REQUEST_PER_TASK) { - num = MAX_REQUEST_PER_TASK; - } else { - num = pState->numOfSub / numOfTasks + 1; - } - tscDebug("0x%"PRIx64 " query will be sent by %d threads", pSql->self, numOfTasks); - - for(int32_t j = 0; j < numOfTasks; ++j) { - SSchedMsg schedMsg = {0}; - schedMsg.fp = doSendQueryReqs; - schedMsg.ahandle = (void*)pSql; - - schedMsg.thandle = NULL; - SPair* p = calloc(1, sizeof(SPair)); - p->first = j * num; - - if (j == numOfTasks - 1) { - p->second = pState->numOfSub; - } else { - p->second = (j + 1) * num; - } - - schedMsg.msg = p; - taosScheduleTask(tscQhandle, &schedMsg); - } - + doConcurrentlySendSubQueries(pSql); return TSDB_CODE_SUCCESS; } diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index 31631560af12a01e27d5a71acaecb724cc822b5d..fe3e330aa97bb217a596df6fe428c115f29103b5 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -3944,6 +3944,21 @@ static void tscSubqueryCompleteCallback(void* param, TAOS_RES* tres, int code) { taos_fetch_rows_a(tres, tscSubqueryRetrieveCallback, param); } +int32_t doInitSubState(SSqlObj* pSql, int32_t numOfSubqueries) { + assert(pSql->subState.numOfSub == 0 && pSql->pSubs == NULL && pSql->subState.states == NULL); + pSql->subState.numOfSub = numOfSubqueries; + + pSql->pSubs = calloc(pSql->subState.numOfSub, POINTER_BYTES); + pSql->subState.states = calloc(pSql->subState.numOfSub, sizeof(int8_t)); + + int32_t code = pthread_mutex_init(&pSql->subState.mutex, NULL); + if (pSql->pSubs == NULL || pSql->subState.states == NULL || code != 0) { + return TSDB_CODE_TSC_OUT_OF_MEMORY; + } + + return TSDB_CODE_SUCCESS; +} + // do execute the query according to the query execution plan void executeQuery(SSqlObj* pSql, SQueryInfo* pQueryInfo) { int32_t code = TSDB_CODE_SUCCESS; @@ -3959,16 +3974,8 @@ void executeQuery(SSqlObj* pSql, SQueryInfo* pQueryInfo) { } if (taosArrayGetSize(pQueryInfo->pUpstream) > 0) { // nest query. do execute it firstly - assert(pSql->subState.numOfSub == 0); - pSql->subState.numOfSub = (int32_t) taosArrayGetSize(pQueryInfo->pUpstream); - assert(pSql->pSubs == NULL); - pSql->pSubs = calloc(pSql->subState.numOfSub, POINTER_BYTES); - assert(pSql->subState.states == NULL); - pSql->subState.states = calloc(pSql->subState.numOfSub, sizeof(int8_t)); - code = pthread_mutex_init(&pSql->subState.mutex, NULL); - - if (pSql->pSubs == NULL || pSql->subState.states == NULL || code != TSDB_CODE_SUCCESS) { - code = TSDB_CODE_TSC_OUT_OF_MEMORY; + code = doInitSubState(pSql, (int32_t) taosArrayGetSize(pQueryInfo->pUpstream)); + if (code != TSDB_CODE_SUCCESS) { goto _error; } @@ -4315,7 +4322,9 @@ void tscTryQueryNextClause(SSqlObj* pSql, __async_cb_func_t fp) { } tfree(pSql->pSubs); + tfree(pSql->subState.states); pSql->subState.numOfSub = 0; + pthread_mutex_destroy(&pSql->subState.mutex); pSql->fp = fp; diff --git a/src/util/src/tarray.c b/src/util/src/tarray.c index d0d126c1e4d7f2e7c0913585df6031b556291fc3..2d6c513cb57ce1d524a1fb69df68702e624ede7b 100644 --- a/src/util/src/tarray.c +++ b/src/util/src/tarray.c @@ -112,14 +112,15 @@ void taosArrayRemoveBatch(SArray *pArray, const int32_t* pData, int32_t numOfEle i += 1; } - assert(i == pData[numOfElems - 1] + 1); + assert(i == pData[numOfElems - 1] + 1 && i <= size); - int32_t dstIndex = pData[numOfElems - 1] - numOfElems + 1; int32_t srcIndex = pData[numOfElems - 1] + 1; - - char* dst = TARRAY_GET_ELEM(pArray, dstIndex); - char* src = TARRAY_GET_ELEM(pArray, srcIndex); - memmove(dst, src, pArray->elemSize * (pArray->size - numOfElems)); + int32_t dstIndex = pData[numOfElems - 1] - numOfElems + 1; + if (pArray->size - srcIndex > 0) { + char* dst = TARRAY_GET_ELEM(pArray, dstIndex); + char* src = TARRAY_GET_ELEM(pArray, srcIndex); + memmove(dst, src, pArray->elemSize * (pArray->size - srcIndex)); + } pArray->size -= numOfElems; }