diff --git a/src/client/src/tscSecondaryMerge.c b/src/client/src/tscSecondaryMerge.c index 0ca079bc6e0b6a460c140e154e36be886b2eecad..1d4ed51f32a68cb39d511f83c5ea1f47a3aeb4eb 100644 --- a/src/client/src/tscSecondaryMerge.c +++ b/src/client/src/tscSecondaryMerge.c @@ -636,10 +636,11 @@ int32_t tscLocalReducerEnvCreate(SSqlObj *pSql, tExtMemBuffer ***pMemBuffer, tOr pModel = createColumnModel(pSchema, pQueryInfo->exprsInfo.numOfExprs, capacity); -// for (int32_t i = 0; i < pTableMetaInfo->pMetricMeta->numOfVnodes; ++i) { -// (*pMemBuffer)[i] = createExtMemBuffer(nBufferSizes, rlen, pModel); -// (*pMemBuffer)[i]->flushModel = MULTIPLE_APPEND_MODEL; -// } + size_t numOfSubs = taosArrayGetSize(pTableMetaInfo->vgroupIdList); + for (int32_t i = 0; i < numOfSubs; ++i) { + (*pMemBuffer)[i] = createExtMemBuffer(nBufferSizes, rlen, pModel); + (*pMemBuffer)[i]->flushModel = MULTIPLE_APPEND_MODEL; + } if (createOrderDescriptor(pOrderDesc, pCmd, pModel) != TSDB_CODE_SUCCESS) { pRes->code = TSDB_CODE_CLI_OUT_OF_MEMORY; diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index 498a33838966bd4de795f0ab9474cd71bc83988c..111ebc4e8c690cef9c52ea214dacde0cf5d58dfd 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -509,31 +509,31 @@ int tscBuildRetrieveMsg(SSqlObj *pSql, SSqlInfo *pInfo) { } int tscBuildSubmitMsg(SSqlObj *pSql, SSqlInfo *pInfo) { - SSubmitMsg *pShellMsg; - char * pMsg, *pStart; - SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0); STableMeta* pTableMeta = tscGetMetaInfo(pQueryInfo, 0)->pTableMeta; - pStart = pSql->cmd.payload + tsRpcHeadSize; - pMsg = pStart; + char* pMsg = pSql->cmd.payload + tsRpcHeadSize; + + // NOTE: shell message size should not include SMsgDesc + int32_t size = pSql->cmd.payloadLen - sizeof(SMsgDesc); SMsgDesc* pMsgDesc = (SMsgDesc*) pMsg; - pMsgDesc->numOfVnodes = htonl(1); //set the number of vnodes + + pMsgDesc->numOfVnodes = htonl(1); //todo set the right number of vnodes pMsg += sizeof(SMsgDesc); - pShellMsg = (SSubmitMsg *)pMsg; + SSubmitMsg *pShellMsg = (SSubmitMsg *)pMsg; + pShellMsg->header.vgId = htonl(pTableMeta->vgId); - pShellMsg->header.contLen = htonl(pSql->cmd.payloadLen); + pShellMsg->header.contLen = htonl(size); pShellMsg->length = pShellMsg->header.contLen; pShellMsg->numOfBlocks = htonl(pSql->cmd.numOfTablesInSubmit); // number of meters to be inserted - // pSql->cmd.payloadLen is set during parse sql routine, so we do not use it here + // pSql->cmd.payloadLen is set during copying data into paylaod pSql->cmd.msgType = TSDB_MSG_TYPE_SUBMIT; + tscTrace("%p build submit msg, vgId:%d numOfVnodes:%d", pSql, pTableMeta->vgId, htons(pMsgDesc->numOfVnodes)); -// tscTrace("%p update submit msg vnode:%s:%d", pSql, taosIpStr(pTableMeta->vpeerDesc[pTableMeta->index].ip), -// htons(pShellMsg->vnode)); return TSDB_CODE_SUCCESS; } diff --git a/src/client/src/tscSubquery.c b/src/client/src/tscSubquery.c index ccf187ac649578f026eb952e87ad45715279edd3..b0c7b68ab4cfc263f6728d4ff8e95c224ec988cf 100644 --- a/src/client/src/tscSubquery.c +++ b/src/client/src/tscSubquery.c @@ -1006,14 +1006,12 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) { pRes->qhandle = 1; // hack the qhandle check - const uint32_t nBufferSize = (1 << 16); // 64KB + const uint32_t nBufferSize = (1u << 16); // 64KB SQueryInfo * pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); - int32_t numOfSubQueries = 1; -// int32_t numOfSubQueries = pTableMetaInfo->pMetricMeta->numOfVnodes; - + int32_t numOfSubQueries = taosArrayGetSize(pTableMetaInfo->vgroupIdList); assert(numOfSubQueries > 0); int32_t ret = tscLocalReducerEnvCreate(pSql, &pMemoryBuf, &pDesc, &pModel, nBufferSize); @@ -1119,7 +1117,7 @@ static void tscFreeSubSqlObj(SRetrieveSupport *trsupport, SSqlObj *pSql) { tfree(trsupport); } -static void tscRetrieveFromVnodeCallBack(void *param, TAOS_RES *tres, int numOfRows); +static void tscRetrieveFromDnodeCallBack(void *param, TAOS_RES *tres, int numOfRows); static void tscAbortFurtherRetryRetrieval(SRetrieveSupport *trsupport, TAOS_RES *tres, int32_t errCode) { // set no disk space error info @@ -1141,7 +1139,7 @@ static void tscAbortFurtherRetryRetrieval(SRetrieveSupport *trsupport, TAOS_RES pthread_mutex_unlock(&trsupport->queryMutex); - tscRetrieveFromVnodeCallBack(trsupport, tres, trsupport->pState->code); + tscRetrieveFromDnodeCallBack(trsupport, tres, trsupport->pState->code); } static void tscHandleSubRetrievalError(SRetrieveSupport *trsupport, SSqlObj *pSql, int numOfRows) { @@ -1236,7 +1234,86 @@ static void tscHandleSubRetrievalError(SRetrieveSupport *trsupport, SSqlObj *pSq } } -static void tscRetrieveFromVnodeCallBack(void *param, TAOS_RES *tres, int numOfRows) { +static void tscAllDataRetrievedFromDnode(SRetrieveSupport *trsupport, SSqlObj* pSql) { + int32_t idx = trsupport->subqueryIndex; + SSqlObj * pPObj = trsupport->pParentSqlObj; + tOrderDescriptor *pDesc = trsupport->pOrderDescriptor; + + SSubqueryState* pState = trsupport->pState; + SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0); + + // data in from current vnode is stored in cache and disk + uint32_t numOfRowsFromSubquery = trsupport->pExtMemBuffer[idx]->numOfTotalElems + trsupport->localBuffer->numOfElems; +// tscTrace("%p sub:%p all data retrieved from ip:%u,vid:%d, numOfRows:%d, orderOfSub:%d", pPObj, pSql, pSvd->ip, +// pSvd->vnode, numOfRowsFromSubquery, idx); + + tColModelCompact(pDesc->pColumnModel, trsupport->localBuffer, pDesc->pColumnModel->capacity); + +#ifdef _DEBUG_VIEW + printf("%" PRIu64 " rows data flushed to disk:\n", trsupport->localBuffer->numOfElems); + SSrcColumnInfo colInfo[256] = {0}; + tscGetSrcColumnInfo(colInfo, pQueryInfo); + tColModelDisplayEx(pDesc->pColumnModel, trsupport->localBuffer->data, trsupport->localBuffer->numOfElems, + trsupport->localBuffer->numOfElems, colInfo); +#endif + + if (tsTotalTmpDirGB != 0 && tsAvailTmpDirGB < tsMinimalTmpDirGB) { + tscError("%p sub:%p client disk space remain %.3f GB, need at least %.3f GB, stop query", pPObj, pSql, + tsAvailTmpDirGB, tsMinimalTmpDirGB); + tscAbortFurtherRetryRetrieval(trsupport, pSql, TSDB_CODE_CLI_NO_DISKSPACE); + return; + } + + // each result for a vnode is ordered as an independant list, + // then used as an input of loser tree for disk-based merge routine + int32_t ret = tscFlushTmpBuffer(trsupport->pExtMemBuffer[idx], pDesc, trsupport->localBuffer, + pQueryInfo->groupbyExpr.orderType); + if (ret != 0) { // set no disk space error info, and abort retry + return tscAbortFurtherRetryRetrieval(trsupport, pSql, TSDB_CODE_CLI_NO_DISKSPACE); + } + + // keep this value local variable, since the pState variable may be released by other threads, if atomic_add opertion + // increases the finished value up to pState->numOfTotal value, which means all subqueries are completed. + // In this case, the comparsion between finished value and released pState->numOfTotal is not safe. + int32_t numOfTotal = pState->numOfTotal; + + int32_t finished = atomic_add_fetch_32(&pState->numOfCompleted, 1); + if (finished < numOfTotal) { + tscTrace("%p sub:%p orderOfSub:%d freed, finished subqueries:%d", pPObj, pSql, trsupport->subqueryIndex, finished); + return tscFreeSubSqlObj(trsupport, pSql); + } + + // all sub-queries are returned, start to local merge process + pDesc->pColumnModel->capacity = trsupport->pExtMemBuffer[idx]->numOfElemsPerPage; + + tscTrace("%p retrieve from %d vnodes completed.final NumOfRows:%d,start to build loser tree", pPObj, + pState->numOfTotal, pState->numOfRetrievedRows); + + SQueryInfo *pPQueryInfo = tscGetQueryInfoDetail(&pPObj->cmd, 0); + tscClearInterpInfo(pPQueryInfo); + + tscCreateLocalReducer(trsupport->pExtMemBuffer, pState->numOfTotal, pDesc, trsupport->pFinalColModel, + &pPObj->cmd, &pPObj->res); + tscTrace("%p build loser tree completed", pPObj); + + pPObj->res.precision = pSql->res.precision; + pPObj->res.numOfRows = 0; + pPObj->res.row = 0; + + // only free once + tfree(trsupport->pState); + tscFreeSubSqlObj(trsupport, pSql); + + // set the command flag must be after the semaphore been correctly set. + pPObj->cmd.command = TSDB_SQL_RETRIEVE_METRIC; + if (pPObj->res.code == TSDB_CODE_SUCCESS) { + (*pPObj->fp)(pPObj->param, pPObj, 0); + } else { + tscQueueAsyncRes(pPObj); + } +} + +static void tscRetrieveFromDnodeCallBack(void *param, TAOS_RES *tres, int numOfRows) { SRetrieveSupport *trsupport = (SRetrieveSupport *)param; int32_t idx = trsupport->subqueryIndex; SSqlObj * pPObj = trsupport->pParentSqlObj; @@ -1265,15 +1342,15 @@ static void tscRetrieveFromVnodeCallBack(void *param, TAOS_RES *tres, int numOfR STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); // SVnodeSidList *vnodeInfo = tscGetVnodeSidList(pTableMetaInfo->pMetricMeta, idx); - SVnodeSidList *vnodeInfo = 0; - SVnodeDesc * pSvd = &vnodeInfo->vpeerDesc[vnodeInfo->index]; +// SVnodeSidList *vnodeInfo = 0; +// SVnodeDesc * pSvd = &vnodeInfo->vpeerDesc[vnodeInfo->index]; if (numOfRows > 0) { assert(pRes->numOfRows == numOfRows); int64_t num = atomic_add_fetch_64(&pState->numOfRetrievedRows, numOfRows); - tscTrace("%p sub:%p retrieve numOfRows:%d totalNumOfRows:%d from ip:%u,vid:%d,orderOfSub:%d", pPObj, pSql, - pRes->numOfRows, pState->numOfRetrievedRows, pSvd->ip, pSvd->vnode, idx); +// tscTrace("%p sub:%p retrieve numOfRows:%d totalNumOfRows:%d from ip:%u,vid:%d,orderOfSub:%d", pPObj, pSql, +// pRes->numOfRows, pState->numOfRetrievedRows, pSvd->ip, pSvd->vnode, idx); if (num > tsMaxNumOfOrderedResults && tscIsProjectionQueryOnSTable(pQueryInfo, 0)) { tscError("%p sub:%p num of OrderedRes is too many, max allowed:%" PRId64 " , current:%" PRId64, @@ -1299,85 +1376,16 @@ static void tscRetrieveFromVnodeCallBack(void *param, TAOS_RES *tres, int numOfR int32_t ret = saveToBuffer(trsupport->pExtMemBuffer[idx], pDesc, trsupport->localBuffer, pRes->data, pRes->numOfRows, pQueryInfo->groupbyExpr.orderType); - if (ret < 0) { - // set no disk space error info, and abort retry + if (ret < 0) { // set no disk space error info, and abort retry tscAbortFurtherRetryRetrieval(trsupport, tres, TSDB_CODE_CLI_NO_DISKSPACE); - } else { + } else if (pRes->completed) { + tscAllDataRetrievedFromDnode(trsupport, pSql); + } else { // continue fetch data from dnode pthread_mutex_unlock(&trsupport->queryMutex); - taos_fetch_rows_a(tres, tscRetrieveFromVnodeCallBack, param); - } - - } else { // all data has been retrieved to client - /* data in from current vnode is stored in cache and disk */ - uint32_t numOfRowsFromVnode = trsupport->pExtMemBuffer[idx]->numOfTotalElems + trsupport->localBuffer->numOfElems; - tscTrace("%p sub:%p all data retrieved from ip:%u,vid:%d, numOfRows:%d, orderOfSub:%d", pPObj, pSql, pSvd->ip, - pSvd->vnode, numOfRowsFromVnode, idx); - - tColModelCompact(pDesc->pColumnModel, trsupport->localBuffer, pDesc->pColumnModel->capacity); - -#ifdef _DEBUG_VIEW - printf("%" PRIu64 " rows data flushed to disk:\n", trsupport->localBuffer->numOfElems); - SSrcColumnInfo colInfo[256] = {0}; - tscGetSrcColumnInfo(colInfo, pQueryInfo); - tColModelDisplayEx(pDesc->pColumnModel, trsupport->localBuffer->data, trsupport->localBuffer->numOfElems, - trsupport->localBuffer->numOfElems, colInfo); -#endif - - if (tsTotalTmpDirGB != 0 && tsAvailTmpDirGB < tsMinimalTmpDirGB) { - tscError("%p sub:%p client disk space remain %.3f GB, need at least %.3f GB, stop query", pPObj, pSql, - tsAvailTmpDirGB, tsMinimalTmpDirGB); - tscAbortFurtherRetryRetrieval(trsupport, tres, TSDB_CODE_CLI_NO_DISKSPACE); - return; - } - - // each result for a vnode is ordered as an independant list, - // then used as an input of loser tree for disk-based merge routine - int32_t ret = tscFlushTmpBuffer(trsupport->pExtMemBuffer[idx], pDesc, trsupport->localBuffer, - pQueryInfo->groupbyExpr.orderType); - if (ret != 0) { - /* set no disk space error info, and abort retry */ - return tscAbortFurtherRetryRetrieval(trsupport, tres, TSDB_CODE_CLI_NO_DISKSPACE); - } - - // keep this value local variable, since the pState variable may be released by other threads, if atomic_add opertion - // increases the finished value up to pState->numOfTotal value, which means all subqueries are completed. - // In this case, the comparsion between finished value and released pState->numOfTotal is not safe. - int32_t numOfTotal = pState->numOfTotal; - - int32_t finished = atomic_add_fetch_32(&pState->numOfCompleted, 1); - if (finished < numOfTotal) { - tscTrace("%p sub:%p orderOfSub:%d freed, finished subqueries:%d", pPObj, pSql, trsupport->subqueryIndex, finished); - return tscFreeSubSqlObj(trsupport, pSql); - } - - // all sub-queries are returned, start to local merge process - pDesc->pColumnModel->capacity = trsupport->pExtMemBuffer[idx]->numOfElemsPerPage; - - tscTrace("%p retrieve from %d vnodes completed.final NumOfRows:%d,start to build loser tree", pPObj, - pState->numOfTotal, pState->numOfRetrievedRows); - - SQueryInfo *pPQueryInfo = tscGetQueryInfoDetail(&pPObj->cmd, 0); - tscClearInterpInfo(pPQueryInfo); - - tscCreateLocalReducer(trsupport->pExtMemBuffer, pState->numOfTotal, pDesc, trsupport->pFinalColModel, - &pPObj->cmd, &pPObj->res); - tscTrace("%p build loser tree completed", pPObj); - - pPObj->res.precision = pSql->res.precision; - pPObj->res.numOfRows = 0; - pPObj->res.row = 0; - - // only free once - tfree(trsupport->pState); - tscFreeSubSqlObj(trsupport, pSql); - - // set the command flag must be after the semaphore been correctly set. - pPObj->cmd.command = TSDB_SQL_RETRIEVE_METRIC; - if (pPObj->res.code == TSDB_CODE_SUCCESS) { - (*pPObj->fp)(pPObj->param, pPObj, 0); - } else { - tscQueueAsyncRes(pPObj); + taos_fetch_rows_a(tres, tscRetrieveFromDnodeCallBack, param); } + } else { // all data has been retrieved to client + tscAllDataRetrievedFromDnode(trsupport, pSql); } } @@ -1437,7 +1445,7 @@ void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) { /* * if a query on a vnode is failed, all retrieve operations from vnode that occurs later - * than this one are actually not necessary, we simply call the tscRetrieveFromVnodeCallBack + * than this one are actually not necessary, we simply call the tscRetrieveFromDnodeCallBack * function to abort current and remain retrieve process. * * NOTE: threadsafe is required. @@ -1475,7 +1483,7 @@ void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) { trsupport->subqueryIndex, pState->code); } - tscRetrieveFromVnodeCallBack(param, tres, pState->code); + tscRetrieveFromDnodeCallBack(param, tres, pState->code); } else { // success, proceed to retrieve data from dnode if (vnodeInfo != NULL) { tscTrace("%p sub:%p query complete,ip:%u,vid:%d,orderOfSub:%d,retrieve data", trsupport->pParentSqlObj, pSql, @@ -1486,7 +1494,7 @@ void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) { trsupport->subqueryIndex); } - taos_fetch_rows_a(tres, tscRetrieveFromVnodeCallBack, param); + taos_fetch_rows_a(tres, tscRetrieveFromDnodeCallBack, param); } } diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index 18f491aa9edaaa60efbc6cbca6bfc54e532f8b9c..ec6881db3f5ae4f1e32dbf790bfb662ae3df84c5 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -608,7 +608,7 @@ int32_t tscCopyDataBlockToPayload(SSqlObj* pSql, STableDataBlocks* pDataBlock) { * the payloadLen should be actual message body size * the old value of payloadLen is the allocated payload size */ - pCmd->payloadLen = pDataBlock->nAllocSize - tsRpcHeadSize - sizeof(SMsgDesc); + pCmd->payloadLen = pDataBlock->nAllocSize - tsRpcHeadSize; assert(pCmd->allocSize >= pCmd->payloadLen + tsRpcHeadSize + 100 && pCmd->payloadLen > 0); return TSDB_CODE_SUCCESS; diff --git a/src/query/src/queryExecutor.c b/src/query/src/queryExecutor.c index 224512f0ae8f409bd6dae69dcfa07d8b0eebc28a..83cbae42664407d1245adfc9740fa31be5af1de0 100644 --- a/src/query/src/queryExecutor.c +++ b/src/query/src/queryExecutor.c @@ -2204,7 +2204,7 @@ static int32_t getInitialPageNum(SQInfo *pQInfo) { size_t s = taosArrayGetSize(pQInfo->pTableIdList); num = MAX(s, INITIAL_RESULT_ROWS_VALUE); } else { // for super table query, one page for each subset - // num = pQInfo->pSidSet->numOfSubSet; + num = 1;//pQInfo->pSidSet->numOfSubSet; } assert(num > 0); @@ -3873,7 +3873,7 @@ static int32_t getNumOfSubset(SQInfo *pQInfo) { if (isGroupbyNormalCol(pQuery->pGroupbyExpr) || (isIntervalQuery(pQuery))) { totalSubset = numOfClosedTimeWindow(&pQInfo->runtimeEnv.windowResInfo); } else { - // totalSubset = pQInfo->pSidSet->numOfSubSet; + totalSubset = 1;//pQInfo->pSidSet->numOfSubSet; } return totalSubset; @@ -3909,19 +3909,18 @@ static int32_t doCopyToSData(SQInfo *pQInfo, SWindowResult *result, int32_t orde int32_t numOfRowsToCopy = result[i].numOfRows - pQInfo->offset; int32_t oldOffset = pQInfo->offset; - assert(0); /* * current output space is not enough to keep all the result data of this group, only copy partial results * to SQuery object's result buffer */ - // if (numOfRowsToCopy > pQuery->pointsToRead - numOfResult) { - // numOfRowsToCopy = pQuery->pointsToRead - numOfResult; - // pQInfo->offset += numOfRowsToCopy; - // } else { - // pQInfo->offset = 0; - // pQInfo->subgroupIdx += 1; - // } + if (numOfRowsToCopy > pQuery->rec.capacity - numOfResult) { + numOfRowsToCopy = pQuery->rec.capacity - numOfResult; + pQInfo->offset += numOfRowsToCopy; + } else { + pQInfo->offset = 0; + pQInfo->subgroupIdx += 1; + } for (int32_t j = 0; j < pQuery->numOfOutputCols; ++j) { int32_t size = pRuntimeEnv->pCtx[j].outputBytes; @@ -3932,13 +3931,12 @@ static int32_t doCopyToSData(SQInfo *pQInfo, SWindowResult *result, int32_t orde } numOfResult += numOfRowsToCopy; - assert(0); - // if (numOfResult == pQuery->rec.pointsToRead) { - // break; - // } + if (numOfResult == pQuery->rec.capacity) { + break; + } } - dTrace("QInfo:%p copy data to SQuery buf completed", GET_QINFO_ADDR(pQuery)); + dTrace("QInfo:%p copy data to SQuery buf completed", pQInfo); #ifdef _DEBUG_VIEW displayInterResult(pQuery->sdata, pQuery, numOfResult); @@ -4135,10 +4133,11 @@ int32_t vnodeQueryResultInterpolate(SQInfo *pQInfo, tFilePage **pDst, tFilePage } void vnodePrintQueryStatistics(SQInfo *pQInfo) { +#if 0 SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv; SQuery *pQuery = pRuntimeEnv->pQuery; -#if 0 + SQueryCostSummary *pSummary = &pRuntimeEnv->summary; if (pRuntimeEnv->pResultBuf == NULL) { pSummary->tmpBufferInDisk = 0; @@ -4179,41 +4178,30 @@ void vnodePrintQueryStatistics(SQInfo *pQInfo) { #endif } -int32_t initQInfo(SQInfo *pQInfo, void *param, void* tsdb) { +int32_t doInitializeQInfo(SQInfo *pQInfo, void *param, void* tsdb, bool isSTableQuery) { + SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv; + SQuery *pQuery = pQInfo->runtimeEnv.pQuery; int32_t code = TSDB_CODE_SUCCESS; - // only the successful complete requries the sem_post/over = 1 operations. - if ((QUERY_IS_ASC_QUERY(pQuery) && (pQuery->window.skey > pQuery->window.ekey)) || - (!QUERY_IS_ASC_QUERY(pQuery) && (pQuery->window.ekey > pQuery->window.skey))) { - dTrace("QInfo:%p no result in time range %" PRId64 "-%" PRId64 ", order %d", pQInfo, pQuery->window.skey, - pQuery->window.ekey, pQuery->order.order); - - sem_post(&pQInfo->dataReady); - setQueryStatus(pQuery, QUERY_COMPLETED); - return TSDB_CODE_SUCCESS; - } - setScanLimitationByResultBuffer(pQuery); changeExecuteScanOrder(pQuery, false); // dataInCache requires lastKey value pQuery->lastKey = pQuery->window.skey; - STsdbQueryCond cond = {0}; - - cond.twindow = (STimeWindow){.skey = pQuery->window.skey, .ekey = pQuery->window.ekey}; - cond.order = pQuery->order.order; - cond.colList = *pQuery->colList; + STsdbQueryCond cond = { + .twindow = pQuery->window, + .order = pQuery->order.order, + .colList = *pQuery->colList, + }; SArray *cols = taosArrayInit(pQuery->numOfCols, sizeof(pQuery->colList[0])); for (int32_t i = 0; i < pQuery->numOfCols; ++i) { taosArrayPush(cols, &pQuery->colList[i]); } - - pQInfo->runtimeEnv.pQueryHandle = tsdbQueryByTableId(tsdb, &cond, pQInfo->pTableIdList, cols); - - SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv; + + pRuntimeEnv->pQueryHandle = tsdbQueryByTableId(tsdb, &cond, pQInfo->pTableIdList, cols); pRuntimeEnv->pQuery = pQuery; pRuntimeEnv->pTSBuf = param; @@ -4224,15 +4212,34 @@ int32_t initQInfo(SQInfo *pQInfo, void *param, void* tsdb) { } // create runtime environment - code = setupQueryRuntimeEnv(&pQInfo->runtimeEnv, NULL, pQuery->order.order, false); + code = setupQueryRuntimeEnv(pRuntimeEnv, NULL, pQuery->order.order, isSTableQuery); if (code != TSDB_CODE_SUCCESS) { return code; } - pRuntimeEnv->numOfRowsPerPage = getNumOfRowsInResultPage(pQuery, false); - if (isGroupbyNormalCol(pQuery->pGroupbyExpr) || isIntervalQuery(pQuery)) { + pRuntimeEnv->numOfRowsPerPage = getNumOfRowsInResultPage(pQuery, isSTableQuery); + + if (isSTableQuery) { + int32_t rows = getInitialPageNum(pQInfo); + code = createDiskbasedResultBuffer(&pRuntimeEnv->pResultBuf, rows, pQuery->rowSize); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + + if (pQuery->intervalTime == 0) { + int16_t type = TSDB_DATA_TYPE_NULL; + + if (isGroupbyNormalCol(pQuery->pGroupbyExpr)) { // group by columns not tags; + type = getGroupbyColumnType(pQuery, pQuery->pGroupbyExpr); + } else { + type = TSDB_DATA_TYPE_INT; // group id + } + + initWindowResInfo(&pRuntimeEnv->windowResInfo, pRuntimeEnv, 512, 4096, type); + } + + } else if (isGroupbyNormalCol(pQuery->pGroupbyExpr) || isIntervalQuery(pQuery)) { int32_t rows = getInitialPageNum(pQInfo); - code = createDiskbasedResultBuffer(&pRuntimeEnv->pResultBuf, rows, pQuery->rowSize); if (code != TSDB_CODE_SUCCESS) { return code; @@ -4349,6 +4356,7 @@ static int64_t queryOnDataBlocks(SQInfo *pQInfo) { for(int32_t i = 0; i < numOfTables; ++i) { if (pQInfo->pTableDataInfo[i].pTableQInfo->tid == blockInfo.sid) { pTableDataInfo = &pQInfo->pTableDataInfo[i]; + break; } } @@ -4361,7 +4369,7 @@ static int64_t queryOnDataBlocks(SQInfo *pQInfo) { SArray * pDataBlock = loadDataBlockOnDemand(pRuntimeEnv, &blockInfo, &pStatis); TSKEY nextKey = blockInfo.window.ekey; - if (pQuery->intervalTime == 0) { + if (!isIntervalQuery(pQuery)) { setExecutionContext(pQInfo, pTableQueryInfo, pTableDataInfo->tableIndex, pTableDataInfo->groupIdx, nextKey); } else { // interval query setIntervalQueryRange(pTableQueryInfo, pQInfo, nextKey); @@ -4487,9 +4495,10 @@ static int64_t doCheckMetersInGroup(SQInfo *pQInfo, int32_t index, int32_t start */ static void vnodeSTableSeqProcessor(SQInfo *pQInfo) { SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv; - -#if 0 SQuery* pQuery = pRuntimeEnv->pQuery; + setQueryStatus(pQuery, QUERY_COMPLETED); + +#if 0 // tSidSet *pTableIdList = pSupporter->pSidSet; int32_t vid = getMeterObj(pSupporter->pMetersHashTable, pMeterSidExtInfo[0]->sid)->vnode; @@ -4876,9 +4885,10 @@ static void multiTableQueryProcess(SQInfo *pQInfo) { doRestoreContext(pQInfo); } else { dTrace("QInfo:%p no need to do reversed scan, query completed", pQInfo); - return; } + setQueryStatus(pQuery, QUERY_COMPLETED); + if (pQInfo->code != TSDB_CODE_SUCCESS || isQueryKilled(pQInfo)) { dTrace("QInfo:%p query killed or error occurred, code:%d, abort", pQInfo, pQInfo->code); return; @@ -5245,14 +5255,16 @@ static int32_t validateQueryMsg(SQueryTableMsg *pQueryMsg) { static char* createTableIdList(SQueryTableMsg* pQueryMsg, char* pMsg, SArray** pTableIdList) { assert(pQueryMsg->numOfTables > 0); - *pTableIdList = taosArrayInit(pQueryMsg->numOfTables, sizeof(STableIdInfo)); + *pTableIdList = taosArrayInit(pQueryMsg->numOfTables, sizeof(STableId)); STableIdInfo *pTableIdInfo = (STableIdInfo *)pMsg; pTableIdInfo->sid = htonl(pTableIdInfo->sid); pTableIdInfo->uid = htobe64(pTableIdInfo->uid); pTableIdInfo->key = htobe64(pTableIdInfo->key); - taosArrayPush(*pTableIdList, pTableIdInfo); + STableId id = {.uid = pTableIdInfo->uid, .tid = pTableIdInfo->sid}; + taosArrayPush(*pTableIdList, &id); + pMsg += sizeof(STableIdInfo); for (int32_t j = 1; j < pQueryMsg->numOfTables; ++j) { @@ -5820,7 +5832,18 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGrou pQuery->pos = -1; - dTrace("QInfo %p is allocated", pQInfo); + pQuery->window.skey = pQueryMsg->window.skey; + pQuery->window.ekey = pQueryMsg->window.ekey; + pQuery->lastKey = pQuery->window.skey; + + if (sem_init(&pQInfo->dataReady, 0, 0) != 0) { + dError("QInfo:%p init dataReady sem failed, reason:%s", pQInfo, strerror(errno)); + goto _clean_memory; + } + + vnodeParametersSafetyCheck(pQuery); + + dTrace("qmsg:%p create QInfo:%p, QInfo created", pQueryMsg, pQInfo); return pQInfo; _clean_memory: @@ -5859,57 +5882,42 @@ static bool isValidQInfo(void *param) { } static void freeQInfo(SQInfo *pQInfo); -static int32_t createQInfo(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGroupbyExpr, SSqlFunctionExpr *pSqlExprs, - SArray *pTableIdList, void* tsdb, SQInfo **pQInfo) { +static int32_t initializeQInfo(SQueryTableMsg *pQueryMsg, void* tsdb, SQInfo *pQInfo, bool isSTable) { int32_t code = TSDB_CODE_SUCCESS; + SQuery *pQuery = pQInfo->runtimeEnv.pQuery; - (*pQInfo) = createQInfoImpl(pQueryMsg, pGroupbyExpr, pSqlExprs, pTableIdList); - if ((*pQInfo) == NULL) { - code = TSDB_CODE_SERV_OUT_OF_MEMORY; - goto _error; - } - - SQuery *pQuery = (*pQInfo)->runtimeEnv.pQuery; - dTrace("qmsg:%p create QInfo:%p, QInfo created", pQueryMsg, pQInfo); - - pQuery->window.skey = pQueryMsg->window.skey; - pQuery->window.ekey = pQueryMsg->window.ekey; - pQuery->lastKey = pQuery->window.skey; - - if (sem_init(&(*pQInfo)->dataReady, 0, 0) != 0) { - dError("QInfo:%p init dataReady sem failed, reason:%s", pQInfo, strerror(errno)); - code = TSDB_CODE_APP_ERROR; - goto _error; - } - - vnodeParametersSafetyCheck(pQuery); - STSBuf *pTSBuf = NULL; if (pQueryMsg->tsLen > 0) { // open new file to save the result char *tsBlock = (char *)pQueryMsg + pQueryMsg->tsOffset; pTSBuf = tsBufCreateFromCompBlocks(tsBlock, pQueryMsg->tsNumOfBlocks, pQueryMsg->tsLen, pQueryMsg->tsOrder); - + tsBufResetPos(pTSBuf); tsBufNextPos(pTSBuf); } + + // only the successful complete requries the sem_post/over = 1 operations. + if ((QUERY_IS_ASC_QUERY(pQuery) && (pQuery->window.skey > pQuery->window.ekey)) || + (!QUERY_IS_ASC_QUERY(pQuery) && (pQuery->window.ekey > pQuery->window.skey))) { + dTrace("QInfo:%p no result in time range %" PRId64 "-%" PRId64 ", order %d", pQInfo, pQuery->window.skey, + pQuery->window.ekey, pQuery->order.order); + + sem_post(&pQInfo->dataReady); + setQueryStatus(pQuery, QUERY_COMPLETED); + return TSDB_CODE_SUCCESS; + } // filter the qualified - if ((code = initQInfo(*pQInfo, pTSBuf, tsdb)) != TSDB_CODE_SUCCESS) { + if ((code = doInitializeQInfo(pQInfo, pTSBuf, tsdb, isSTable)) != TSDB_CODE_SUCCESS) { goto _error; } - // if (pQInfo->over == 1) { - // vnodeAddRefCount(pQInfo); // for retrieve procedure - // return pQInfo; - // } - // dTrace("QInfo:%p set query flag and prepare runtime environment completed, ref:%d, wait for schedule", pQInfo, // pQInfo->refCount); return code; _error: // table query ref will be decrease during error handling - freeQInfo(*pQInfo); + freeQInfo(pQInfo); return code; } @@ -6068,7 +6076,10 @@ int32_t qCreateQueryInfo(void* tsdb, SQueryTableMsg *pQueryMsg, SQInfo **pQInfo) // super table query SArray* res = NULL; + bool isSTableQuery = false; if ((pQueryMsg->queryType & TSDB_QUERY_TYPE_STABLE_QUERY) != 0) { + isSTableQuery = true; + STableId* id = taosArrayGet(pTableIdList, 0); id->uid = -1; @@ -6081,8 +6092,13 @@ int32_t qCreateQueryInfo(void* tsdb, SQueryTableMsg *pQueryMsg, SQInfo **pQInfo) res = pTableIdList; } - code = createQInfo(pQueryMsg, pGroupbyExpr, pExprs, res, tsdb, pQInfo); - + (*pQInfo) = createQInfoImpl(pQueryMsg, pGroupbyExpr, pExprs, res); + if ((*pQInfo) == NULL) { + code = TSDB_CODE_SERV_OUT_OF_MEMORY; + } + + code = initializeQInfo(pQueryMsg, tsdb, *pQInfo, isSTableQuery); + _query_over: if (code != TSDB_CODE_SUCCESS) { taosArrayDestroy(pTableIdList); diff --git a/src/vnode/tsdb/src/tsdbRead.c b/src/vnode/tsdb/src/tsdbRead.c index 3119fdc7ccc9287fea28b4364cad75e7faa08784..7b996c8f7a170fdf278c1cb844ca85686f65fa6a 100644 --- a/src/vnode/tsdb/src/tsdbRead.c +++ b/src/vnode/tsdb/src/tsdbRead.c @@ -73,7 +73,7 @@ typedef struct SQueryFilesInfo { char dbFilePathPrefix[PATH_MAX]; } SQueryFilesInfo; -typedef struct STableQueryInfo { +typedef struct STableCheckInfo { STableId tableId; TSKEY lastKey; STable * pTableObj; @@ -81,7 +81,8 @@ typedef struct STableQueryInfo { int32_t numOfBlocks; int32_t start; SCompBlock *pBlock; -} STableQueryInfo; + SSkipListIterator* iter; +} STableCheckInfo; typedef struct { SCompBlock *compBlock; @@ -90,14 +91,19 @@ typedef struct { typedef struct STableDataBlockInfoEx { SCompBlockFields pBlock; - STableQueryInfo* pMeterDataInfo; + STableCheckInfo* pMeterDataInfo; int32_t blockIndex; int32_t groupIdx; /* number of group is less than the total number of meters */ } STableDataBlockInfoEx; +enum { + SINGLE_TABLE_MODEL = 1, + MULTI_TABLE_MODEL = 2, +}; + typedef struct STsdbQueryHandle { struct STsdbRepo* pTsdb; - + int8_t model; // access model, single table model or multi-table model SQueryHandlePos cur; // current position SQueryHandlePos start; // the start position, used for secondary/third iteration int32_t unzipBufSize; @@ -120,14 +126,13 @@ typedef struct STsdbQueryHandle { bool locateStart; int32_t realNumOfRows; bool loadDataAfterSeek; // load data after seek. - SArray* pTableQueryInfo; + SArray* pTableCheckInfo; int32_t activeIndex; int32_t tableIndex; bool isFirstSlot; void * qinfo; // query info handle, for debug purpose - SSkipListIterator* memIter; STableDataBlockInfoEx *pDataBlockInfoEx; } STsdbQueryHandle; @@ -271,19 +276,21 @@ tsdb_query_handle_t *tsdbQueryByTableId(tsdb_repo_t* tsdb, STsdbQueryCond *pCond size_t size = taosArrayGetSize(idList); assert(size >= 1); - pQueryHandle->pTableQueryInfo = taosArrayInit(size, sizeof(STableQueryInfo)); + pQueryHandle->pTableCheckInfo = taosArrayInit(size, sizeof(STableCheckInfo)); for(int32_t i = 0; i < size; ++i) { STableId id = *(STableId*) taosArrayGet(idList, i); - STableQueryInfo info = { + STableCheckInfo info = { .lastKey = pQueryHandle->window.skey, .tableId = id, .pTableObj = tsdbGetTableByUid(tsdbGetMeta(tsdb), id.uid), //todo this may be failed }; - taosArrayPush(pQueryHandle->pTableQueryInfo, &info); + taosArrayPush(pQueryHandle->pTableCheckInfo, &info); } + pQueryHandle->model = (size > 1)? MULTI_TABLE_MODEL:SINGLE_TABLE_MODEL; + pQueryHandle->activeIndex = 0; // malloc buffer in order to load data from file @@ -314,11 +321,13 @@ tsdb_query_handle_t *tsdbQueryByTableId(tsdb_repo_t* tsdb, STsdbQueryCond *pCond return (tsdb_query_handle_t)pQueryHandle; } -bool tsdbNextDataBlock(tsdb_query_handle_t *pQueryHandle) { - STsdbQueryHandle* pHandle = (STsdbQueryHandle*) pQueryHandle; - STableQueryInfo* pTableQInfo = taosArrayGet(pHandle->pTableQueryInfo, pHandle->activeIndex); +static bool hasMoreDataInCacheForSingleModel(STsdbQueryHandle* pHandle) { + assert(pHandle->activeIndex == 0 && taosArrayGetSize(pHandle->pTableCheckInfo) == 1); - STable *pTable = pTableQInfo->pTableObj; + STableCheckInfo* pTableCheckInfo = taosArrayGet(pHandle->pTableCheckInfo, pHandle->activeIndex); + + STable *pTable = pTableCheckInfo->pTableObj; + assert(pTable != NULL); // no data in cache, abort if (pTable->mem == NULL && pTable->imem == NULL) { @@ -326,13 +335,49 @@ bool tsdbNextDataBlock(tsdb_query_handle_t *pQueryHandle) { } // all data in mem are checked already. - if (pTableQInfo->lastKey > pTable->mem->keyLast) { + if (pTableCheckInfo->lastKey > pTable->mem->keyLast) { return false; } return true; } +static bool hasMoreDataInCacheForMultiModel(STsdbQueryHandle* pHandle) { + size_t numOfTables = taosArrayGetSize(pHandle->pTableCheckInfo); + assert(numOfTables > 0); + + while(pHandle->activeIndex < numOfTables) { + STableCheckInfo* pTableCheckInfo = taosArrayGet(pHandle->pTableCheckInfo, pHandle->activeIndex); + + STable *pTable = pTableCheckInfo->pTableObj; + if (pTable->mem == NULL && pTable->imem == NULL) { + pHandle->activeIndex += 1; // try next table if exits + continue; + } + + // all data in mem are checked already. + if (pTableCheckInfo->lastKey > pTable->mem->keyLast) { + pHandle->activeIndex += 1; // try next table if exits + continue; + } + + return true; + } + + // all tables has checked already + return false; +} + +// handle data in cache situation +bool tsdbNextDataBlock(tsdb_query_handle_t *pQueryHandle) { + STsdbQueryHandle* pHandle = (STsdbQueryHandle*) pQueryHandle; + if (pHandle->model == SINGLE_TABLE_MODEL) { + return hasMoreDataInCacheForSingleModel(pHandle); + } else { + return hasMoreDataInCacheForMultiModel(pHandle); + } +} + static int tsdbReadRowsFromCache(SSkipListIterator *pIter, TSKEY maxKey, int maxRowsToRead, TSKEY* skey, TSKEY* ekey, STsdbQueryHandle* pHandle) { int numOfRows = 0; @@ -370,7 +415,7 @@ static int tsdbReadRowsFromCache(SSkipListIterator *pIter, TSKEY maxKey, int max SDataBlockInfo tsdbRetrieveDataBlockInfo(tsdb_query_handle_t *pQueryHandle) { STsdbQueryHandle* pHandle = (STsdbQueryHandle*) pQueryHandle; - STableQueryInfo* pTableQInfo = taosArrayGet(pHandle->pTableQueryInfo, pHandle->activeIndex); + STableCheckInfo* pTableQInfo = taosArrayGet(pHandle->pTableCheckInfo, pHandle->activeIndex); STable *pTable = pTableQInfo->pTableObj; TSKEY skey = 0, ekey = 0; @@ -379,11 +424,11 @@ SDataBlockInfo tsdbRetrieveDataBlockInfo(tsdb_query_handle_t *pQueryHandle) { if (pTable->mem != NULL) { // create mem table iterator if it is not created yet - if (pHandle->memIter == NULL) { - pHandle->memIter = tSkipListCreateIter(pTable->mem->pData); + if (pTableQInfo->iter == NULL) { + pTableQInfo->iter = tSkipListCreateIter(pTable->mem->pData); } - rows = tsdbReadRowsFromCache(pHandle->memIter, INT64_MAX, 2, &skey, &ekey, pHandle); + rows = tsdbReadRowsFromCache(pTableQInfo->iter, INT64_MAX, 2, &skey, &ekey, pHandle); } SDataBlockInfo blockInfo = {