diff --git a/src/dnode/src/dnodeRead.c b/src/dnode/src/dnodeRead.c index 3419128c72647c5a88dd62ab1ff51f80f407e26c..0ae093728ff1274e6f103755efc6d26cf87849e5 100644 --- a/src/dnode/src/dnodeRead.c +++ b/src/dnode/src/dnodeRead.c @@ -226,47 +226,59 @@ static void dnodeProcessReadResult(SReadMsg *pRead) { rpcFreeCont(pRead->rpcMsg.pCont); // free the received message } +static void dnodeContinueExecuteQuery(void* qhandle, SReadMsg *pMsg) { + SReadMsg readMsg = { + .rpcMsg = {.msgType = TSDB_MSG_TYPE_QUERY}, + .pCont = qhandle, + .contLen = 0, + .pRpcContext = pMsg->pRpcContext, + .pVnode = pMsg->pVnode, + }; + + taos_queue queue = dnodeGetVnodeRworker(pMsg->pVnode); + taosWriteQitem(queue, &readMsg); +} + static void dnodeProcessQueryMsg(SReadMsg *pMsg) { SQueryTableMsg* pQueryTableMsg = (SQueryTableMsg*) pMsg->pCont; SQInfo* pQInfo = NULL; - void* tsdb = dnodeGetVnodeTsdb(pMsg->pVnode); - int32_t code = qCreateQueryInfo(tsdb, pQueryTableMsg, &pQInfo); + if (pMsg->rpcMsg.contLen != 0) { + void* tsdb = dnodeGetVnodeTsdb(pMsg->pVnode); + int32_t code = qCreateQueryInfo(tsdb, pQueryTableMsg, pMsg, &pQInfo); - SQueryTableRsp *pRsp = (SQueryTableRsp *) rpcMallocCont(sizeof(SQueryTableRsp)); - pRsp->code = code; - pRsp->qhandle = htobe64((uint64_t) (pQInfo)); - - SRpcMsg rpcRsp = { - .handle = pMsg->rpcMsg.handle, - .pCont = pRsp, - .contLen = sizeof(SQueryTableRsp), - .code = code, - .msgType = 0 - }; + SQueryTableRsp *pRsp = (SQueryTableRsp *) rpcMallocCont(sizeof(SQueryTableRsp)); + pRsp->code = code; + pRsp->qhandle = htobe64((uint64_t) (pQInfo)); - rpcSendResponse(&rpcRsp); + SRpcMsg rpcRsp = { + .handle = pMsg->rpcMsg.handle, + .pCont = pRsp, + .contLen = sizeof(SQueryTableRsp), + .code = code, + .msgType = 0 + }; + + rpcSendResponse(&rpcRsp); + } else { + pQInfo = pMsg->pCont; + } // do execute query qTableQuery(pQInfo); } -static int32_t c = 0; static void dnodeProcessRetrieveMsg(SReadMsg *pMsg) { SRetrieveTableMsg *pRetrieve = pMsg->pCont; void *pQInfo = (void*) htobe64(pRetrieve->qhandle); dTrace("QInfo:%p vgId:%d, retrieve msg is received", pQInfo, pRetrieve->header.vgId); - if ((++c)%2 == 0) { - int32_t k = 1; - } - int32_t rowSize = 0; - int32_t numOfRows = 0; + int32_t contLen = 0; SRetrieveTableRsp *pRsp = NULL; - int32_t code = qRetrieveQueryResultInfo(pQInfo, &numOfRows, &rowSize); + int32_t code = qRetrieveQueryResultInfo(pQInfo); if (code != TSDB_CODE_SUCCESS) { contLen = sizeof(SRetrieveTableRsp); @@ -275,6 +287,10 @@ static void dnodeProcessRetrieveMsg(SReadMsg *pMsg) { } else { // todo check code and handle error in build result set code = qDumpRetrieveResult(pQInfo, &pRsp, &contLen); + + if (qNeedFurtherExec(pQInfo)) { + dnodeContinueExecuteQuery(pQInfo, pMsg); + } } SRpcMsg rpcRsp = (SRpcMsg) { diff --git a/src/query/inc/queryExecutor.h b/src/query/inc/queryExecutor.h index 4ce606f599b8e04f8db50ab30999ebc1d837c274..c30d47e261dca42539f1279ca9b98f755c645a20 100644 --- a/src/query/inc/queryExecutor.h +++ b/src/query/inc/queryExecutor.h @@ -68,8 +68,10 @@ typedef struct SWindowResult { } SWindowResult; typedef struct SResultRec { - int64_t pointsTotal; - int64_t pointsRead; + int64_t total; + int64_t size; + int64_t capacity; + int32_t threshold; // the threshold size, when the number of rows in result buffer, return to client } SResultRec; typedef struct SWindowResInfo { @@ -112,7 +114,7 @@ typedef struct STableQueryInfo { typedef struct STableDataInfo { int32_t numOfBlocks; - int32_t start; // start block index + int32_t start; // start block index int32_t tableIndex; void* pMeterObj; int32_t groupIdx; // group id in table list @@ -143,7 +145,6 @@ typedef struct SQuery { int32_t pos; int64_t pointsOffset; // the number of points offset to save read data SData** sdata; - int32_t capacity; SSingleColumnFilterInfo* pFilterInfo; } SQuery; @@ -171,15 +172,13 @@ typedef struct SQueryRuntimeEnv { typedef struct SQInfo { void* signature; - void* pVnode; + void* param; // pointer to the RpcReadMsg TSKEY startTime; TSKEY elapsedTime; - SResultRec rec; int32_t pointsInterpo; - int32_t code; // error code to returned to client -// int32_t killed; // denotes if current query is killed + int32_t code; // error code to returned to client sem_t dataReady; - SArray* pTableIdList; // table list + SArray* pTableIdList; // table id list SQueryRuntimeEnv runtimeEnv; int32_t subgroupIdx; int32_t offset; /* offset in group result set of subgroup */ @@ -204,7 +203,7 @@ typedef struct SQInfo { * @param pQInfo * @return */ -int32_t qCreateQueryInfo(void* pVnode, SQueryTableMsg* pQueryTableMsg, SQInfo** pQInfo); +int32_t qCreateQueryInfo(void* pVnode, SQueryTableMsg* pQueryTableMsg, void* param, SQInfo** pQInfo); /** * query on single table @@ -222,7 +221,7 @@ void qSuperTableQuery(void* pReadMsg); * wait for the query completed, and retrieve final results to client * @param pQInfo */ -int32_t qRetrieveQueryResultInfo(SQInfo* pQInfo, int32_t *numOfRows, int32_t* rowsize); +int32_t qRetrieveQueryResultInfo(SQInfo* pQInfo); /** * @@ -232,4 +231,11 @@ int32_t qRetrieveQueryResultInfo(SQInfo* pQInfo, int32_t *numOfRows, int32_t* ro */ int32_t qDumpRetrieveResult(SQInfo *pQInfo, SRetrieveTableRsp** pRsp, int32_t* contLen); +/** + * + * @param pQInfo + * @return + */ +bool qNeedFurtherExec(SQInfo* pQInfo); + #endif // TDENGINE_QUERYEXECUTOR_H diff --git a/src/query/src/queryExecutor.c b/src/query/src/queryExecutor.c index a996c8467f648f8a4724ca6a62797ebcd181f555..a4da2073ea13c2144ac02ba3694829344df44e64 100644 --- a/src/query/src/queryExecutor.c +++ b/src/query/src/queryExecutor.c @@ -364,8 +364,8 @@ bool isTSCompQuery(SQuery *pQuery) { return pQuery->pSelectExpr[0].pBase.functio bool doRevisedResultsByLimit(SQInfo *pQInfo) { SQuery *pQuery = pQInfo->runtimeEnv.pQuery; - if ((pQuery->limit.limit > 0) && (pQuery->rec.pointsRead + pQInfo->rec.pointsRead > pQuery->limit.limit)) { - pQuery->rec.pointsRead = pQuery->limit.limit - pQInfo->rec.pointsRead; + if ((pQuery->limit.limit > 0) && (pQuery->rec.size + pQuery->rec.size > pQuery->limit.limit)) { + pQuery->rec.size = pQuery->limit.limit - pQuery->rec.size; // query completed setQueryStatus(pQuery, QUERY_COMPLETED); @@ -1344,17 +1344,16 @@ static int32_t reviseForwardSteps(SQueryRuntimeEnv *pRuntimeEnv, int32_t forward static int32_t tableApplyFunctionsOnBlock(SQueryRuntimeEnv *pRuntimeEnv, SDataBlockInfo *pDataBlockInfo, SDataStatis *pStatis, __block_search_fn_t searchFn, int32_t *numOfRes, SWindowResInfo *pWindowResInfo, SArray *pDataBlock) { - SQuery *pQuery = pRuntimeEnv->pQuery; - int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order); + SQuery *pQuery = pRuntimeEnv->pQuery; if (pQuery->numOfFilterCols > 0 || pRuntimeEnv->pTSBuf != NULL || isGroupbyNormalCol(pQuery->pGroupbyExpr)) { *numOfRes = rowwiseApplyAllFunctions(pRuntimeEnv, pStatis, pDataBlockInfo, pWindowResInfo, pDataBlock); } else { *numOfRes = blockwiseApplyAllFunctions(pRuntimeEnv, pStatis, pDataBlockInfo, pWindowResInfo, searchFn, pDataBlock); } - + TSKEY lastKey = QUERY_IS_ASC_QUERY(pQuery) ? pDataBlockInfo->window.ekey : pDataBlockInfo->window.skey; - pQuery->lastKey = lastKey + step; + pQuery->lastKey = lastKey + GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order); doCheckQueryCompleted(pRuntimeEnv, lastKey, pWindowResInfo); @@ -1368,12 +1367,8 @@ static int32_t tableApplyFunctionsOnBlock(SQueryRuntimeEnv *pRuntimeEnv, SDataBl assert(*numOfRes >= 0); // check if buffer is large enough for accommodating all qualified points - if (*numOfRes > 0 && pQuery->checkBufferInLoop == 1) { - pQuery->pointsOffset -= *numOfRes; - if (pQuery->pointsOffset <= 0) { // todo return correct numOfRes for ts_comp function - pQuery->pointsOffset = 0; - setQueryStatus(pQuery, QUERY_RESBUF_FULL); - } + if (*numOfRes > 0 && pQuery->checkBufferInLoop == 1 && ((*numOfRes) >= pQuery->rec.threshold)) { + setQueryStatus(pQuery, QUERY_RESBUF_FULL); } return 0; @@ -2302,7 +2297,7 @@ int32_t vnodeSTableQueryPrepare(SQInfo *pQInfo, SQuery *pQuery, void *param) { pQuery->status = 0; - pQInfo->rec = (SResultRec){0}; + pQuery->rec = (SResultRec){0}; pQuery->rec = (SResultRec){0}; changeExecuteScanOrder(pQuery, true); @@ -2668,9 +2663,9 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) { } int32_t numOfRes = 0; - SDataStatis *pStatis = NULL; - SArray * pDataBlock = loadDataBlockOnDemand(pRuntimeEnv, &blockInfo, &pStatis); + + SArray *pDataBlock = loadDataBlockOnDemand(pRuntimeEnv, &blockInfo, &pStatis); int32_t forwardStep = tableApplyFunctionsOnBlock(pRuntimeEnv, &blockInfo, pStatis, binarySearchForKey, &numOfRes, &pRuntimeEnv->windowResInfo, pDataBlock); @@ -3035,9 +3030,9 @@ void copyResToQueryResultBuf(SQInfo *pQInfo, SQuery *pQuery) { offset += pData->numOfElems; } - assert(pQuery->rec.pointsRead == 0); + assert(pQuery->rec.size == 0); - pQuery->rec.pointsRead += rows; + pQuery->rec.size += rows; pQInfo->offset += 1; } @@ -3367,7 +3362,7 @@ void resetCtxOutputBuf(SQueryRuntimeEnv *pRuntimeEnv) { pCtx->ptsOutputBuf = pRuntimeEnv->pCtx[0].aOutputBuf; } - memset(pQuery->sdata[i]->data, 0, (size_t)pQuery->pSelectExpr[i].resBytes * pQuery->capacity); + memset(pQuery->sdata[i]->data, 0, (size_t)pQuery->pSelectExpr[i].resBytes * pQuery->rec.capacity); } initCtxOutputBuf(pRuntimeEnv); @@ -3414,14 +3409,14 @@ void initCtxOutputBuf(SQueryRuntimeEnv *pRuntimeEnv) { void doSkipResults(SQueryRuntimeEnv *pRuntimeEnv) { SQuery *pQuery = pRuntimeEnv->pQuery; - if (pQuery->rec.pointsRead == 0 || pQuery->limit.offset == 0) { + if (pQuery->rec.size == 0 || pQuery->limit.offset == 0) { return; } - if (pQuery->rec.pointsRead <= pQuery->limit.offset) { - pQuery->limit.offset -= pQuery->rec.pointsRead; + if (pQuery->rec.size <= pQuery->limit.offset) { + pQuery->limit.offset -= pQuery->rec.size; - pQuery->rec.pointsRead = 0; + pQuery->rec.size = 0; // pQuery->pointsOffset = pQuery->rec.pointsToRead; // clear all data in result buffer resetCtxOutputBuf(pRuntimeEnv); @@ -3430,13 +3425,13 @@ void doSkipResults(SQueryRuntimeEnv *pRuntimeEnv) { pQuery->status &= (~QUERY_RESBUF_FULL); } else { int32_t numOfSkip = (int32_t)pQuery->limit.offset; - pQuery->rec.pointsRead -= numOfSkip; + pQuery->rec.size -= numOfSkip; for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) { int32_t functionId = pQuery->pSelectExpr[i].pBase.functionId; int32_t bytes = pRuntimeEnv->pCtx[i].outputBytes; assert(0); - // memmove(pQuery->sdata[i]->data, pQuery->sdata[i]->data + bytes * numOfSkip, pQuery->pointsRead * bytes); + // memmove(pQuery->sdata[i]->data, pQuery->sdata[i]->data + bytes * numOfSkip, pQuery->size * bytes); pRuntimeEnv->pCtx[i].aOutputBuf += bytes * numOfSkip; if (functionId == TSDB_FUNC_DIFF || functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM) { @@ -3999,8 +3994,9 @@ void copyFromWindowResToSData(SQInfo *pQInfo, SWindowResult *result) { int32_t orderType = (pQuery->pGroupbyExpr != NULL) ? pQuery->pGroupbyExpr->orderType : TSQL_SO_ASC; int32_t numOfResult = doCopyToSData(pQInfo, result, orderType); - pQuery->rec.pointsRead += numOfResult; - // assert(pQuery->rec.pointsRead <= pQuery->pointsToRead); + pQuery->rec.size += numOfResult; + + assert(pQuery->rec.size <= pQuery->rec.capacity); } static void updateWindowResNumOfRes(SQueryRuntimeEnv *pRuntimeEnv, STableDataInfo *pTableDataInfo) { @@ -4038,31 +4034,6 @@ void stableApplyFunctionsOnBlock_(SQInfo *pQInfo, STableDataInfo *pTableDataInfo updatelastkey(pQuery, pTableQueryInfo); } -// we need to split the refstatsult into different packages. -int32_t vnodeGetResultSize(void *thandle, int32_t *numOfRows) { - SQInfo *pQInfo = (SQInfo *)thandle; - SQuery *pQuery = &pQInfo->runtimeEnv.pQuery; - - /* - * get the file size and set the numOfRows to be the file size, since for tsComp query, - * the returned row size is equalled to 1 - * - * TODO handle the case that the file is too large to send back one time - */ - if (isTSCompQuery(pQuery) && (*numOfRows) > 0) { - struct stat fstat; - if (stat(pQuery->sdata[0]->data, &fstat) == 0) { - *numOfRows = fstat.st_size; - return fstat.st_size; - } else { - dError("QInfo:%p failed to get file info, path:%s, reason:%s", pQInfo, pQuery->sdata[0]->data, strerror(errno)); - return 0; - } - } else { - return pQuery->rowSize * (*numOfRows); - } -} - bool vnodeHasRemainResults(void *handle) { SQInfo *pQInfo = (SQInfo *)handle; @@ -4074,7 +4045,7 @@ bool vnodeHasRemainResults(void *handle) { SQuery * pQuery = pRuntimeEnv->pQuery; SInterpolationInfo *pInterpoInfo = &pRuntimeEnv->interpoInfo; - if (pQuery->limit.limit > 0 && pQInfo->rec.pointsRead >= pQuery->limit.limit) { + if (pQuery->limit.limit > 0 && pQuery->rec.size >= pQuery->limit.limit) { return false; } @@ -4147,6 +4118,11 @@ static void doCopyQueryResultToMsg(SQInfo *pQInfo, int32_t numOfRows, char *data memmove(data, pQuery->sdata[col]->data, bytes * numOfRows); data += bytes * numOfRows; } + + // all data returned, set query over + if (Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) { + setQueryStatus(pQuery, QUERY_OVER); + } } int32_t vnodeQueryResultInterpolate(SQInfo *pQInfo, tFilePage **pDst, tFilePage **pDataSrc, int32_t numOfRows, @@ -4255,8 +4231,6 @@ int32_t initQInfo(SQInfo *pQInfo, void *param, void* tsdb) { setScanLimitationByResultBuffer(pQuery); changeExecuteScanOrder(pQuery, false); - pQInfo->rec = (SResultRec){0}; - // dataInCache requires lastKey value pQuery->lastKey = pQuery->window.skey; @@ -4535,7 +4509,7 @@ static int64_t doCheckMetersInGroup(SQInfo *pQInfo, int32_t index, int32_t start // accumulate the point interpolation result if (numOfRes > 0) { - pQuery->rec.pointsRead += numOfRes; + pQuery->rec.size += numOfRes; forwardCtxOutputBuf(pRuntimeEnv, numOfRes); } @@ -4623,7 +4597,7 @@ static void vnodeSTableSeqProcessor(SQInfo *pQInfo) { pSupporter->subgroupIdx++; // output buffer is full, return to client - if (pQuery->pointsRead >= pQuery->pointsToRead) { + if (pQuery->size >= pQuery->pointsToRead) { break; } } @@ -4639,9 +4613,9 @@ static void vnodeSTableSeqProcessor(SQInfo *pQInfo) { */ if (pSupporter->subgroupIdx > 0) { copyFromWindowResToSData(pQInfo, pRuntimeEnv->windowResInfo.pResult); - pQInfo->pointsRead += pQuery->pointsRead; + pQInfo->size += pQuery->size; - if (pQuery->pointsRead > 0) { + if (pQuery->size > 0) { return; } } @@ -4707,7 +4681,7 @@ static void vnodeSTableSeqProcessor(SQInfo *pQInfo) { vnodeScanAllData(pRuntimeEnv); - pQuery->pointsRead = getNumOfResult(pRuntimeEnv); + pQuery->size = getNumOfResult(pRuntimeEnv); doSkipResults(pRuntimeEnv); // the limitation of output result is reached, set the query completed @@ -4742,7 +4716,7 @@ static void vnodeSTableSeqProcessor(SQInfo *pQInfo) { pQuery->skey = pQuery->lastKey; // all data in the result buffer are skipped due to the offset, continue to retrieve data from current meter - if (pQuery->pointsRead == 0) { + if (pQuery->size == 0) { assert(!Q_STATUS_EQUAL(pQuery->over, QUERY_RESBUF_FULL)); continue; } else { @@ -4789,17 +4763,17 @@ static void vnodeSTableSeqProcessor(SQInfo *pQInfo) { } pQInfo->pTableQuerySupporter->subgroupIdx = 0; - pQuery->pointsRead = 0; + pQuery->size = 0; copyFromWindowResToSData(pQInfo, pWindowResInfo->pResult); } - pQInfo->pointsRead += pQuery->pointsRead; + pQInfo->size += pQuery->size; pQuery->pointsOffset = pQuery->pointsToRead; dTrace( "QInfo %p vid:%d, numOfMeters:%d, index:%d, numOfGroups:%d, %d points returned, totalRead:%d totalReturn:%d," "next skey:%" PRId64 ", offset:%" PRId64, - pQInfo, vid, pTableIdList->numOfTables, pSupporter->meterIdx, pTableIdList->numOfSubSet, pQuery->pointsRead, pQInfo->pointsRead, + pQInfo, vid, pTableIdList->numOfTables, pSupporter->meterIdx, pTableIdList->numOfSubSet, pQuery->size, pQInfo->size, pQInfo->pointsReturned, pQuery->skey, pQuery->limit.offset); #endif } @@ -4911,13 +4885,13 @@ static void vnodeMultiMeterQueryProcessor(SQInfo *pQInfo) { copyFromWindowResToSData(pQInfo, pRuntimeEnv->windowResInfo.pResult); } - pQInfo->rec.pointsRead += pQuery->rec.pointsRead; + pQuery->rec.size += pQuery->rec.size; - if (pQuery->rec.pointsRead == 0) { + if (pQuery->rec.size == 0) { // vnodePrintQueryStatistics(pSupporter); } - dTrace("QInfo:%p current:%lldd, total:%lldd", pQInfo, pQuery->rec.pointsRead, pQInfo->rec.pointsTotal); + dTrace("QInfo:%p current:%lldd, total:%lldd", pQInfo, pQuery->rec.size, pQuery->rec.total); return; } #if 0 @@ -4970,8 +4944,8 @@ static void vnodeMultiMeterQueryProcessor(SQInfo *pQInfo) { } // handle the limitation of output buffer - pQInfo->pointsRead += pQuery->pointsRead; - dTrace("QInfo:%p points returned:%d, totalRead:%d totalReturn:%d", pQInfo, pQuery->pointsRead, pQInfo->pointsRead, + pQInfo->size += pQuery->size; + dTrace("QInfo:%p points returned:%d, totalRead:%d totalReturn:%d", pQInfo, pQuery->size, pQInfo->size, pQInfo->pointsReturned); #endif } @@ -4994,8 +4968,8 @@ static void tableFixedOutputProcessor(SQInfo *pQInfo) { } // since the numOfOutputElems must be identical for all sql functions that are allowed to be executed simutanelously. - pQuery->rec.pointsRead = getNumOfResult(pRuntimeEnv); - // assert(pQuery->pointsRead <= pQuery->pointsToRead && + pQuery->rec.size = getNumOfResult(pRuntimeEnv); + // assert(pQuery->size <= pQuery->pointsToRead && // Q_STATUS_EQUAL(pQuery->over, QUERY_COMPLETED)); // must be top/bottom query if offset > 0 @@ -5006,7 +4980,7 @@ static void tableFixedOutputProcessor(SQInfo *pQInfo) { doSkipResults(pRuntimeEnv); doRevisedResultsByLimit(pQInfo); - pQInfo->rec.pointsRead = pQuery->rec.pointsRead; + pQuery->rec.size = pQuery->rec.size; } static void tableMultiOutputProcessor(SQInfo *pQInfo) { @@ -5026,16 +5000,16 @@ static void tableMultiOutputProcessor(SQInfo *pQInfo) { return; } - pQuery->rec.pointsRead = getNumOfResult(pRuntimeEnv); - if (pQuery->limit.offset > 0 && pQuery->numOfFilterCols > 0 && pQuery->rec.pointsRead > 0) { + pQuery->rec.size = getNumOfResult(pRuntimeEnv); + if (pQuery->limit.offset > 0 && pQuery->numOfFilterCols > 0 && pQuery->rec.size > 0) { doSkipResults(pRuntimeEnv); } /* - * 1. if pQuery->pointsRead == 0, pQuery->limit.offset >= 0, still need to check data - * 2. if pQuery->pointsRead > 0, pQuery->limit.offset must be 0 + * 1. if pQuery->size == 0, pQuery->limit.offset >= 0, still need to check data + * 2. if pQuery->size > 0, pQuery->limit.offset must be 0 */ - if (pQuery->rec.pointsRead > 0 || Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) { + if (pQuery->rec.size > 0 || Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) { break; } @@ -5046,23 +5020,21 @@ static void tableMultiOutputProcessor(SQInfo *pQInfo) { } doRevisedResultsByLimit(pQInfo); - pQInfo->rec.pointsRead += pQuery->rec.pointsRead; - if (Q_STATUS_EQUAL(pQuery->status, QUERY_RESBUF_FULL)) { -// dTrace("QInfo:%p vid:%d sid:%d id:%s, query abort due to buffer limitation, next qrange:%" PRId64 "-%" PRId64, -// pQInfo, pQuery->lastKey, pQuery->ekey); + dTrace("QInfo:%p query paused due to buffer limitation, next qrange:%" PRId64 "-%" PRId64, + pQInfo, pQuery->lastKey, pQuery->window.ekey); } // dTrace("QInfo:%p vid:%d sid:%d id:%s, %d points returned, totalRead:%d totalReturn:%d", pQInfo, pMeterObj->vnode, -// pMeterObj->sid, pMeterObj->meterId, pQuery->pointsRead, pQInfo->pointsRead, pQInfo->pointsReturned); +// pMeterObj->sid, pMeterObj->meterId, pQuery->size, pQInfo->size, pQInfo->pointsReturned); // pQuery->pointsOffset = pQuery->pointsToRead; //restore the available buffer // if (!isTSCompQuery(pQuery)) { -// assert(pQuery->pointsRead <= pQuery->pointsToRead); +// assert(pQuery->size <= pQuery->pointsToRead); // } } -static void vnodeSingleMeterIntervalMainLooper(SQueryRuntimeEnv *pRuntimeEnv) { +static void tableIntervalProcessImpl(SQueryRuntimeEnv *pRuntimeEnv) { SQuery *pQuery = pRuntimeEnv->pQuery; while (1) { @@ -5088,13 +5060,7 @@ static void vnodeSingleMeterIntervalMainLooper(SQueryRuntimeEnv *pRuntimeEnv) { pQuery->limit.offset -= c; } - if (Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) { - break; - } - - // load the data block for the next retrieve - // loadRequiredBlockIntoMem(pRuntimeEnv, &pRuntimeEnv->nextPos); - if (Q_STATUS_EQUAL(pQuery->status, QUERY_RESBUF_FULL)) { + if (Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED|QUERY_RESBUF_FULL)) { break; } } @@ -5108,12 +5074,11 @@ static void tableIntervalProcessor(SQInfo *pQInfo) { int32_t numOfInterpo = 0; while (1) { - resetCtxOutputBuf(pRuntimeEnv); - vnodeSingleMeterIntervalMainLooper(pRuntimeEnv); + tableIntervalProcessImpl(pRuntimeEnv); if (pQuery->intervalTime > 0) { pQInfo->subgroupIdx = 0; // always start from 0 - pQuery->rec.pointsRead = 0; + pQuery->rec.size = 0; copyFromWindowResToSData(pQInfo, pRuntimeEnv->windowResInfo.pResult); clearFirstNTimeWindow(pRuntimeEnv, pQInfo->subgroupIdx); @@ -5124,43 +5089,43 @@ static void tableIntervalProcessor(SQInfo *pQInfo) { doRevisedResultsByLimit(pQInfo); break; } else { - taosInterpoSetStartInfo(&pRuntimeEnv->interpoInfo, pQuery->rec.pointsRead, pQuery->interpoType); + taosInterpoSetStartInfo(&pRuntimeEnv->interpoInfo, pQuery->rec.size, pQuery->interpoType); SData **pInterpoBuf = pRuntimeEnv->pInterpoBuf; for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) { - memcpy(pInterpoBuf[i]->data, pQuery->sdata[i]->data, pQuery->rec.pointsRead * pQuery->pSelectExpr[i].resBytes); + memcpy(pInterpoBuf[i]->data, pQuery->sdata[i]->data, pQuery->rec.size * pQuery->pSelectExpr[i].resBytes); } numOfInterpo = 0; - pQuery->rec.pointsRead = vnodeQueryResultInterpolate( - pQInfo, (tFilePage **)pQuery->sdata, (tFilePage **)pInterpoBuf, pQuery->rec.pointsRead, &numOfInterpo); + pQuery->rec.size = vnodeQueryResultInterpolate( + pQInfo, (tFilePage **)pQuery->sdata, (tFilePage **)pInterpoBuf, pQuery->rec.size, &numOfInterpo); - dTrace("QInfo: %p interpo completed, final:%d", pQInfo, pQuery->rec.pointsRead); - if (pQuery->rec.pointsRead > 0 || Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) { + dTrace("QInfo: %p interpo completed, final:%d", pQInfo, pQuery->rec.size); + if (pQuery->rec.size > 0 || Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) { doRevisedResultsByLimit(pQInfo); break; } // no result generated yet, continue retrieve data - pQuery->rec.pointsRead = 0; + pQuery->rec.size = 0; } } // all data scanned, the group by normal column can return if (isGroupbyNormalCol(pQuery->pGroupbyExpr)) { // todo refactor with merge interval time result pQInfo->subgroupIdx = 0; - pQuery->rec.pointsRead = 0; + pQuery->rec.size = 0; copyFromWindowResToSData(pQInfo, pRuntimeEnv->windowResInfo.pResult); clearFirstNTimeWindow(pRuntimeEnv, pQInfo->subgroupIdx); } - pQInfo->rec.pointsRead += pQuery->rec.pointsRead; + pQuery->rec.size += pQuery->rec.size; pQInfo->pointsInterpo += numOfInterpo; // dTrace("%p vid:%d sid:%d id:%s, %d points returned %d points interpo, totalRead:%d totalInterpo:%d // totalReturn:%d", - // pQInfo, pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pQuery->pointsRead, numOfInterpo, - // pQInfo->pointsRead - pQInfo->pointsInterpo, pQInfo->pointsInterpo, pQInfo->pointsReturned); + // pQInfo, pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pQuery->size, numOfInterpo, + // pQInfo->size - pQInfo->pointsInterpo, pQInfo->pointsInterpo, pQInfo->pointsReturned); } void qTableQuery(SQInfo *pQInfo) { @@ -5187,16 +5152,16 @@ void qTableQuery(SQInfo *pQInfo) { int32_t numOfInterpo = 0; int32_t remain = taosNumOfRemainPoints(&pRuntimeEnv->interpoInfo); - pQuery->rec.pointsRead = vnodeQueryResultInterpolate(pQInfo, (tFilePage **)pQuery->sdata, + pQuery->rec.size = vnodeQueryResultInterpolate(pQInfo, (tFilePage **)pQuery->sdata, (tFilePage **)pRuntimeEnv->pInterpoBuf, remain, &numOfInterpo); doRevisedResultsByLimit(pQInfo); pQInfo->pointsInterpo += numOfInterpo; - pQInfo->rec.pointsRead += pQuery->rec.pointsRead; + pQuery->rec.size += pQuery->rec.size; // dTrace("QInfo:%p %d points returned %d points interpo, totalRead:%d totalInterpo:%d totalReturn:%d", - // pQInfo, pQuery->pointsRead, numOfInterpo, pQInfo->pointsRead, pQInfo->pointsInterpo, + // pQInfo, pQuery->size, numOfInterpo, pQInfo->size, pQInfo->pointsInterpo, // pQInfo->pointsReturned); sem_post(&pQInfo->dataReady); return; @@ -5206,22 +5171,22 @@ void qTableQuery(SQInfo *pQInfo) { if (Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) { // continue to get push data from the group result if (isGroupbyNormalCol(pQuery->pGroupbyExpr) || - (pQuery->intervalTime > 0 && pQInfo->rec.pointsTotal < pQuery->limit.limit)) { + (pQuery->intervalTime > 0 && pQuery->rec.total < pQuery->limit.limit)) { // todo limit the output for interval query? - pQuery->rec.pointsRead = 0; + pQuery->rec.size = 0; pQInfo->subgroupIdx = 0; // always start from 0 if (pRuntimeEnv->windowResInfo.size > 0) { copyFromWindowResToSData(pQInfo, pRuntimeEnv->windowResInfo.pResult); - pQInfo->rec.pointsRead += pQuery->rec.pointsRead; + pQuery->rec.size += pQuery->rec.size; clearFirstNTimeWindow(pRuntimeEnv, pQInfo->subgroupIdx); - if (pQuery->rec.pointsRead > 0) { + if (pQuery->rec.size > 0) { // dTrace("QInfo:%p vid:%d sid:%d id:%s, %d points returned %d from group results, totalRead:%d // totalReturn:%d", - // pQInfo, pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pQuery->pointsRead, - // pQInfo->pointsRead, pQInfo->pointsInterpo, pQInfo->pointsReturned); + // pQInfo, pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pQuery->size, + // pQInfo->size, pQInfo->pointsInterpo, pQInfo->pointsReturned); sem_post(&pQInfo->dataReady); return; @@ -5231,7 +5196,7 @@ void qTableQuery(SQInfo *pQInfo) { // dTrace("QInfo:%p vid:%d sid:%d id:%s, query over, %d points are returned", pQInfo, pMeterObj->vnode, // pMeterObj->sid, - // pMeterObj->meterId, pQInfo->pointsRead); + // pMeterObj->meterId, pQInfo->size); // vnodePrintQueryStatistics(pSupporter); sem_post(&pQInfo->dataReady); @@ -5239,7 +5204,7 @@ void qTableQuery(SQInfo *pQInfo) { } // number of points returned during this query - pQuery->rec.pointsRead = 0; + pQuery->rec.size = 0; int64_t st = taosGetTimestampUs(); @@ -5265,7 +5230,7 @@ void qTableQuery(SQInfo *pQInfo) { if (isQueryKilled(pQInfo)) { dTrace("QInfo:%p query is killed", pQInfo); } else { - dTrace("QInfo:%p query task completed, %d points are returned", pQInfo, pQuery->rec.pointsRead); + dTrace("QInfo:%p query task completed, %d points are returned", pQInfo, pQuery->rec.size); } sem_post(&pQInfo->dataReady); @@ -5288,7 +5253,7 @@ void qSuperTableQuery(void *pReadMsg) { // assert(pQInfo->refCount >= 1); #if 0 SQuery *pQuery = &pQInfo->runtimeEnv.pQuery; - pQuery->rec.pointsRead = 0; + pQuery->rec.size = 0; int64_t st = taosGetTimestampUs(); if (pQuery->intervalTime > 0 || @@ -5306,13 +5271,13 @@ void qSuperTableQuery(void *pReadMsg) { pQInfo->elapsedTime += (taosGetTimestampUs() - st); pQuery->status = isQueryKilled(pQInfo) ? 1 : 0; -// taosInterpoSetStartInfo(&pQInfo->runtimeEnv.interpoInfo, pQuery->pointsRead, +// taosInterpoSetStartInfo(&pQInfo->runtimeEnv.interpoInfo, pQuery->size, // pQInfo->query.interpoType); - if (pQuery->rec.pointsRead == 0) { + if (pQuery->rec.size == 0) { // pQInfo->over = 1; // dTrace("QInfo:%p over, %d meters queried, %d points are returned", pQInfo, pSupporter->numOfMeters, -// pQInfo->pointsRead); +// pQInfo->size); // vnodePrintQueryStatistics(pSupporter); } @@ -5916,12 +5881,14 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGrou } // set the output buffer capacity - pQuery->capacity = 4096; + pQuery->rec.capacity = 4096; + pQuery->rec.threshold = 2; + for (int32_t col = 0; col < pQuery->numOfOutputCols; ++col) { assert(pExprs[col].interResBytes >= pExprs[col].resBytes); // allocate additional memory for interResults that are usually larger then final results - size_t size = (pQuery->capacity + 1) * pExprs[col].resBytes + pExprs[col].interResBytes + sizeof(SData); + size_t size = (pQuery->rec.capacity + 1) * pExprs[col].resBytes + pExprs[col].interResBytes + sizeof(SData); pQuery->sdata[col] = (SData *)calloc(1, size); if (pQuery->sdata[col] == NULL) { goto _clean_memory; @@ -5943,9 +5910,8 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGrou pQInfo->pTableIdList = pTableIdList; pQuery->pos = -1; - // dTrace("vid:%d sid:%d meterId:%s, QInfo is allocated:%p", pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, - // pQInfo); - + + dTrace("QInfo %p is allocated", pQInfo); return pQInfo; _clean_memory: @@ -6098,7 +6064,7 @@ _error: return code; } -int32_t qCreateQueryInfo(void* tsdb, SQueryTableMsg *pQueryTableMsg, SQInfo **pQInfo) { +int32_t qCreateQueryInfo(void* tsdb, SQueryTableMsg *pQueryTableMsg, void* param, SQInfo **pQInfo) { assert(pQueryTableMsg != NULL); int32_t code = TSDB_CODE_SUCCESS; @@ -6136,6 +6102,7 @@ int32_t qCreateQueryInfo(void* tsdb, SQueryTableMsg *pQueryTableMsg, SQInfo **pQ // pObj->qhandle = vnodeQueryOnMultiMeters(pMeterObjList, pGroupbyExpr, pExprs, pQueryTableMsg, &code); } else { code = createQInfo(pQueryTableMsg, pGroupbyExpr, pExprs, pTableIdList, tsdb, pQInfo); + (*pQInfo)->param = param; } _query_over: @@ -6161,7 +6128,7 @@ _query_over: return TSDB_CODE_SUCCESS; } -int32_t qRetrieveQueryResultInfo(SQInfo *pQInfo, int32_t *numOfRows, int32_t *rowsize) { +int32_t qRetrieveQueryResultInfo(SQInfo *pQInfo) { if (pQInfo == NULL || !isQInfoValid(pQInfo)) { return TSDB_CODE_INVALID_QHANDLE; } @@ -6177,11 +6144,8 @@ int32_t qRetrieveQueryResultInfo(SQInfo *pQInfo, int32_t *numOfRows, int32_t *ro } sem_wait(&pQInfo->dataReady); - - *numOfRows = pQInfo->rec.pointsRead; - *rowsize = pQuery->rowSize; - - dTrace("QInfo:%p retrieve result info, rowsize:%d, rows:%d, code:%d", pQInfo, *rowsize, *numOfRows, pQInfo->code); + dTrace("QInfo:%p retrieve result info, rowsize:%d, rows:%d, code:%d", pQInfo, pQuery->rowSize, pQuery->rec.size, + pQInfo->code); return (pQInfo->code >= 0)? pQInfo->code:(-pQInfo->code); } @@ -6208,7 +6172,7 @@ static size_t getResultSize(SQInfo *pQInfo, int64_t *numOfRows) { } } -static int32_t doDumpQueryResult(SQInfo *pQInfo, char *data, int32_t *size) { +static int32_t doDumpQueryResult(SQInfo *pQInfo, char *data) { // the remained number of retrieved rows, not the interpolated result SQuery *pQuery = pQInfo->runtimeEnv.pQuery; @@ -6231,28 +6195,31 @@ static int32_t doDumpQueryResult(SQInfo *pQInfo, char *data, int32_t *size) { pQuery->sdata[0]->data, strerror(errno)); } } else { - doCopyQueryResultToMsg(pQInfo, pQInfo->rec.pointsRead, data); + doCopyQueryResultToMsg(pQInfo, pQuery->rec.size, data); } - pQInfo->rec.pointsTotal += pQInfo->rec.pointsRead; - dTrace("QInfo:%p current:%d, total:%d", pQInfo, pQInfo->rec.pointsRead, pQInfo->rec.pointsTotal); + pQuery->rec.total += pQuery->rec.size; + dTrace("QInfo:%p current:%d, total:%d", pQInfo, pQuery->rec.size, pQuery->rec.total); - setQueryStatus(pQuery, QUERY_COMPLETED); return TSDB_CODE_SUCCESS; // todo if interpolation exists, the result may be dump to client by several rounds } -static void addToTaskQueue(SQInfo* pQInfo) { - // no error occurred, continue retrieving data - if (pQInfo->code == TSDB_CODE_SUCCESS) { -#ifdef _TD_ARM_ - dTrace("QInfo:%p set query flag, sig:%" PRIu64 ", func:doDumpQueryResult", pQInfo, pQInfo->signature); -#else - dTrace("QInfo:%p set query flag, sig:%" PRIu64 ", func:%s", pQInfo, pQInfo->signature, __FUNCTION__); -#endif - - // todo add to task queue +bool qNeedFurtherExec(SQInfo* pQInfo) { + if (pQInfo == NULL || pQInfo->signature != pQInfo || pQInfo->code != TSDB_CODE_SUCCESS) { + return false; + } + + SQuery* pQuery = pQInfo->runtimeEnv.pQuery; + if (Q_STATUS_EQUAL(pQuery->status, QUERY_OVER)) { + return false; + } else if (Q_STATUS_EQUAL(pQuery->status, QUERY_RESBUF_FULL)) { + return true; + } else if (Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) { + return true; + } else { + assert(0); } } @@ -6262,13 +6229,12 @@ int32_t qDumpRetrieveResult(SQInfo *pQInfo, SRetrieveTableRsp** pRsp, int32_t* c } SQuery* pQuery = pQInfo->runtimeEnv.pQuery; - size_t size = getResultSize(pQInfo, &pQInfo->rec.pointsRead); + size_t size = getResultSize(pQInfo, &pQuery->rec.size); *contLen = size + sizeof(SRetrieveTableRsp); // todo handle failed to allocate memory *pRsp = (SRetrieveTableRsp *)rpcMallocCont(*contLen); - - (*pRsp)->numOfRows = htonl(pQInfo->rec.pointsRead); + (*pRsp)->numOfRows = htonl(pQuery->rec.size); int32_t code = pQInfo->code; if (code == TSDB_CODE_SUCCESS) { @@ -6279,16 +6245,13 @@ int32_t qDumpRetrieveResult(SQInfo *pQInfo, SRetrieveTableRsp** pRsp, int32_t* c (*pRsp)->useconds = 0; } - if (pQInfo->rec.pointsRead > 0 && code == TSDB_CODE_SUCCESS) { - code = doDumpQueryResult(pQInfo, (*pRsp)->data, NULL); - - // has more data to return or need next round to execute - addToTaskQueue(pQInfo); + if (pQuery->rec.size > 0 && code == TSDB_CODE_SUCCESS) { + code = doDumpQueryResult(pQInfo, (*pRsp)->data); } else { code = pQInfo->code; } - if (isQueryKilled(pQInfo) || Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) { + if (isQueryKilled(pQInfo) || Q_STATUS_EQUAL(pQuery->status, QUERY_OVER)) { (*pRsp)->completed = 1; // notify no more result to client vnodeFreeQInfo(pQInfo); } diff --git a/src/vnode/tsdb/src/tsdbRead.c b/src/vnode/tsdb/src/tsdbRead.c index 36472857fe4a7b95b855e81dce5d255b8ab0bace..2dc61f51072fac850d697cfad57a6446c7a3eace 100644 --- a/src/vnode/tsdb/src/tsdbRead.c +++ b/src/vnode/tsdb/src/tsdbRead.c @@ -124,6 +124,7 @@ typedef struct STsdbQueryHandle { int32_t tableIndex; bool isFirstSlot; void * qinfo; // query info handle, for debug purpose + SSkipListIterator* memIter; } STsdbQueryHandle; int32_t doAllocateBuf(STsdbQueryHandle *pQueryHandle, int32_t rowsPerFileBlock) { @@ -367,8 +368,13 @@ SDataBlockInfo tsdbRetrieveDataBlockInfo(tsdb_query_handle_t *pQueryHandle) { int32_t rows = 0; if (pTable->mem != NULL) { - SSkipListIterator* iter = tSkipListCreateIter(pTable->mem->pData); - rows = tsdbReadRowsFromCache(iter, INT64_MAX, 4000, &skey, &ekey, pHandle); + + // create mem table iterator if it is not created yet + if (pHandle->memIter == NULL) { + pHandle->memIter = tSkipListCreateIter(pTable->mem->pData); + } + + rows = tsdbReadRowsFromCache(pHandle->memIter, INT64_MAX, 2, &skey, &ekey, pHandle); } SDataBlockInfo blockInfo = {