diff --git a/src/client/src/tscParseInsert.c b/src/client/src/tscParseInsert.c index ccd734ec7b784d290c334628e043bcd757fa3fc2..5651e5aa389c20bdf1c5a72554de1eabc6364f23 100644 --- a/src/client/src/tscParseInsert.c +++ b/src/client/src/tscParseInsert.c @@ -1238,22 +1238,11 @@ int doParseInsertSql(SSqlObj *pSql, char *str) { goto _clean; } - // submit to more than one vnode if (pCmd->pDataBlocks->nSize > 0) { // merge according to vgId if ((code = tscMergeTableDataBlocks(pSql, pCmd->pDataBlocks)) != TSDB_CODE_SUCCESS) { goto _error_clean; } - - STableDataBlocks *pDataBlock = pCmd->pDataBlocks->pData[0]; - if ((code = tscCopyDataBlockToPayload(pSql, pDataBlock)) != TSDB_CODE_SUCCESS) { - goto _error_clean; - } - - pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0); - - // set the next sent data vnode index in data block arraylist - pTableMetaInfo->vnodeIndex = 1; } else { pCmd->pDataBlocks = tscDestroyBlockArrayList(pCmd->pDataBlocks); } diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index e0850a7139f72a7569560c8131d5c8c2ff69fa14..6427578e82fd73909e04ab8cd9329efafd014e15 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -1489,11 +1489,9 @@ static int32_t setExprInfoForFunctions(SQueryInfo* pQueryInfo, SSchema* pSchema, SSqlExpr* pExpr = tscSqlExprInsert(pQueryInfo, resColIdx, functionID, pColIndex, type, bytes, bytes); strncpy(pExpr->aliasName, columnName, tListLen(pExpr->aliasName)); - // for point interpolation/last_row query, we need the timestamp column to be loaded + // for all querie, the timestamp column meeds to be loaded SColumnIndex index = {.tableIndex = pColIndex->tableIndex, .columnIndex = PRIMARYKEY_TIMESTAMP_COL_INDEX}; - if (functionID == TSDB_FUNC_INTERP || functionID == TSDB_FUNC_LAST_ROW) { - tscColumnBaseInfoInsert(pQueryInfo, &index); - } + tscColumnBaseInfoInsert(pQueryInfo, &index); SColumnList ids = getColumnList(1, pColIndex->tableIndex, pColIndex->columnIndex); insertResultField(pQueryInfo, resColIdx, &ids, bytes, type, columnName, pExpr); @@ -1581,7 +1579,10 @@ int32_t addExprAndResultField(SQueryInfo* pQueryInfo, int32_t colIdx, tSQLExprIt tscColumnBaseInfoInsert(pQueryInfo, &(ids.ids[i])); } } - + + SColumnIndex tsCol = {.tableIndex = index.tableIndex, .columnIndex = PRIMARYKEY_TIMESTAMP_COL_INDEX}; + tscColumnBaseInfoInsert(pQueryInfo, &tsCol); + return TSDB_CODE_SUCCESS; } case TK_SUM: @@ -1689,7 +1690,10 @@ int32_t addExprAndResultField(SQueryInfo* pQueryInfo, int32_t colIdx, tSQLExprIt tscColumnBaseInfoInsert(pQueryInfo, &(ids.ids[i])); } } - + + SColumnIndex tsCol = {.tableIndex = index.tableIndex, .columnIndex = PRIMARYKEY_TIMESTAMP_COL_INDEX}; + tscColumnBaseInfoInsert(pQueryInfo, &tsCol); + return TSDB_CODE_SUCCESS; } case TK_FIRST: @@ -1708,7 +1712,6 @@ int32_t addExprAndResultField(SQueryInfo* pQueryInfo, int32_t colIdx, tSQLExprIt } /* in first/last function, multiple columns can be add to resultset */ - for (int32_t i = 0; i < pItem->pNode->pParam->nExpr; ++i) { tSQLExprItem* pParamElem = &(pItem->pNode->pParam->a[i]); if (pParamElem->pNode->nSQLOptr != TK_ALL && pParamElem->pNode->nSQLOptr != TK_ID) { @@ -1753,7 +1756,7 @@ int32_t addExprAndResultField(SQueryInfo* pQueryInfo, int32_t colIdx, tSQLExprIt } } } - + return TSDB_CODE_SUCCESS; } else { // select * from xxx int32_t numOfFields = 0; @@ -1773,6 +1776,7 @@ int32_t addExprAndResultField(SQueryInfo* pQueryInfo, int32_t colIdx, tSQLExprIt numOfFields += tscGetNumOfColumns(pTableMetaInfo->pTableMeta); } + return TSDB_CODE_SUCCESS; } } @@ -1891,6 +1895,8 @@ int32_t addExprAndResultField(SQueryInfo* pQueryInfo, int32_t colIdx, tSQLExprIt default: return TSDB_CODE_INVALID_SQL; } + + } // todo refactor diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index 7856856606e1ce550307a856b27bf69945ea2a42..decf9875d28e3483ab4b0f064c5d523a9f1e92a0 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -341,14 +341,8 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg) { (*pSql->fp)(pSql->param, taosres, rpcMsg->code); if (shouldFree) { - // If it is failed, all objects allocated during execution taos_connect_a should be released - if (command == TSDB_SQL_CONNECT) { - taos_close(pObj); - tscTrace("%p Async sql close failed connection", pSql); - } else { - tscFreeSqlObj(pSql); - tscTrace("%p Async sql is automatically freed", pSql); - } + tscFreeSqlObj(pSql); + tscTrace("%p Async sql is automatically freed", pSql); } } diff --git a/src/client/src/tscSql.c b/src/client/src/tscSql.c index 4885cf7cc3cc064f05672be40d3c8303b85de67d..d62dac088ba73d3bd96720b25a778c1ec273a97c 100644 --- a/src/client/src/tscSql.c +++ b/src/client/src/tscSql.c @@ -594,11 +594,7 @@ static void **tscBuildResFromSubqueries(SSqlObj *pSql) { } if (numOfTableHasRes >= 2) { // do merge result - success = (doSetResultRowData(pSql->pSubs[0]) != NULL) && (doSetResultRowData(pSql->pSubs[1]) != NULL); - // TSKEY key1 = *(TSKEY *)pRes1->tsrow[0]; - // TSKEY key2 = *(TSKEY *)pRes2->tsrow[0]; - // printf("first:%" PRId64 ", second:%" PRId64 "\n", key1, key2); } else { // only one subquery SSqlObj *pSub = pSql->pSubs[0]; if (pSub == NULL) { @@ -674,14 +670,13 @@ TAOS_ROW taos_fetch_row(TAOS_RES *res) { SSqlRes *pRes = &pSql->res; if (pRes->qhandle == 0 || - pRes->completed || pCmd->command == TSDB_SQL_RETRIEVE_EMPTY_RESULT || pCmd->command == TSDB_SQL_INSERT) { return NULL; } // current data are exhausted, fetch more data - if (pRes->data == NULL || (pRes->data != NULL && pRes->row >= pRes->numOfRows && + if (pRes->data == NULL || (pRes->data != NULL && pRes->row >= pRes->numOfRows && pRes->completed != true && (pCmd->command == TSDB_SQL_RETRIEVE || pCmd->command == TSDB_SQL_RETRIEVE_METRIC || pCmd->command == TSDB_SQL_FETCH))) { taos_fetch_rows_a(res, asyncFetchCallback, pSql->pTscObj); diff --git a/src/client/src/tscStream.c b/src/client/src/tscStream.c index 46e3ac2e60b9e28fa21251c0fbe4a2db3fdd0351..0b464c362b03b4f890e36827b46dd72c9a2ea491 100644 --- a/src/client/src/tscStream.c +++ b/src/client/src/tscStream.c @@ -504,7 +504,6 @@ TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *p } tsem_init(&pSql->rspSem, 0, 0); - tsem_init(&pSql->emptyRspSem, 0, 1); SSqlInfo SQLInfo = {0}; tSQLParse(&SQLInfo, pSql->sqlstr); diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index 2a9673b192796a87481fc7f04f57ceb51eab3e41..e33fdc70a49869ac3ca6c3a2b1f0f451588d43c1 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -423,9 +423,6 @@ void tscFreeResData(SSqlObj* pSql) { } void tscFreeSqlResult(SSqlObj* pSql) { - //TODO not free - return; - tfree(pSql->res.pRsp); pSql->res.row = 0; pSql->res.numOfRows = 0; @@ -469,8 +466,6 @@ void tscFreeSqlObjPartial(SSqlObj* pSql) { tscFreeSqlCmdData(pCmd); tscTrace("%p free sqlObj partial completed", pSql); - - tscFreeSqlCmdData(pCmd); } void tscFreeSqlObj(SSqlObj* pSql) { @@ -489,10 +484,7 @@ void tscFreeSqlObj(SSqlObj* pSql) { pCmd->allocSize = 0; - if (pSql->fp == NULL) { - tsem_destroy(&pSql->rspSem); - tsem_destroy(&pSql->emptyRspSem); - } + tsem_destroy(&pSql->rspSem); free(pSql); } @@ -721,7 +713,7 @@ static void trimDataBlock(void* pDataBlock, STableDataBlocks* pTableDataBlock) { char* p = pTableDataBlock->pData + sizeof(SSubmitBlk); - SSubmitBlk* pBlock = pTableDataBlock->pData; + SSubmitBlk* pBlock = (SSubmitBlk*) pTableDataBlock->pData; int32_t rows = htons(pBlock->numOfRows); for(int32_t i = 0; i < rows; ++i) { @@ -1751,16 +1743,8 @@ bool tscShouldFreeAsyncSqlObj(SSqlObj* pSql) { } int32_t command = pSql->cmd.command; - if (pTscObj->pSql == pSql) { - /* - * in case of taos_connect_a query, the object should all be released, even it is the - * master sql object. Otherwise, the master sql should not be released - */ - if (command == TSDB_SQL_CONNECT && pSql->res.code != TSDB_CODE_SUCCESS) { - return true; - } - - return false; + if (command == TSDB_SQL_CONNECT) { + return true; } if (command == TSDB_SQL_INSERT) { diff --git a/src/dnode/src/dnodeRead.c b/src/dnode/src/dnodeRead.c index d6ecf2fa66b0cf4cbaaa4cbcd061d6ff13d6c4a9..a23336630e5ef6314db72c4173029e2853566095 100644 --- a/src/dnode/src/dnodeRead.c +++ b/src/dnode/src/dnodeRead.c @@ -176,10 +176,10 @@ static void *dnodeProcessReadQueue(void *param) { terrno = TSDB_CODE_MSG_NOT_PROCESSED; } - dnodeProcessReadResult(pVnode, pReadMsg); +// dnodeProcessReadResult(pVnode, pReadMsg); taosFreeQitem(pReadMsg); - dnodeReleaseVnode(pVnode); + dnodeReleaseVnode(pVnode); } return NULL; @@ -220,7 +220,7 @@ static void dnodeProcessReadResult(void *pVnode, SReadMsg *pRead) { code = terrno; } - //TODO: query handle is returned by dnodeProcessQueryMsg + //TODO: query handle is returned by dnodeProcessQueryMsg if (0) { SRpcMsg rsp; rsp.handle = pRead->rpcMsg.handle; @@ -232,47 +232,67 @@ static void dnodeProcessReadResult(void *pVnode, SReadMsg *pRead) { rpcFreeCont(pRead->rpcMsg.pCont); // free the received message } +static void dnodeContinueExecuteQuery(void* pVnode, void* qhandle, SReadMsg *pMsg) { + + SReadMsg *pRead = (SReadMsg *)taosAllocateQitem(sizeof(SReadMsg)); + pRead->rpcMsg = pMsg->rpcMsg; + pRead->pCont = qhandle; + pRead->contLen = 0; + pRead->pRpcContext = pMsg->pRpcContext; + pRead->rpcMsg.msgType = TSDB_MSG_TYPE_QUERY; + + taos_queue queue = dnodeGetVnodeRworker(pVnode); + taosWriteQitem(queue, TAOS_QTYPE_RPC, pRead); + +// SReadMsg readMsg = { +// .rpcMsg = {0}, +// .pCont = qhandle, +// .contLen = 0, +// .pRpcContext = pMsg->pRpcContext, +// }; +// +// taos_queue queue = dnodeGetVnodeRworker(pVnode); +// taosWriteQitem(queue, TSDB_MSG_TYPE_QUERY, &readMsg); +} + static void dnodeProcessQueryMsg(void *pVnode, SReadMsg *pMsg) { SQueryTableMsg* pQueryTableMsg = (SQueryTableMsg*) pMsg->pCont; SQInfo* pQInfo = NULL; - void* tsdb = dnodeGetVnodeTsdb(pVnode); - int32_t code = qCreateQueryInfo(tsdb, pQueryTableMsg, &pQInfo); + if (pMsg->contLen != 0) { + void* tsdb = dnodeGetVnodeTsdb(pVnode); + int32_t code = qCreateQueryInfo(tsdb, pQueryTableMsg, NULL, &pQInfo); - SQueryTableRsp *pRsp = (SQueryTableRsp *) rpcMallocCont(sizeof(SQueryTableRsp)); - pRsp->code = code; - pRsp->qhandle = htobe64((uint64_t) (pQInfo)); - - SRpcMsg rpcRsp = { - .handle = pMsg->rpcMsg.handle, - .pCont = pRsp, - .contLen = sizeof(SQueryTableRsp), - .code = code, - .msgType = 0 - }; + SQueryTableRsp *pRsp = (SQueryTableRsp *) rpcMallocCont(sizeof(SQueryTableRsp)); + pRsp->code = code; + pRsp->qhandle = htobe64((uint64_t) (pQInfo)); - rpcSendResponse(&rpcRsp); + SRpcMsg rpcRsp = { + .handle = pMsg->rpcMsg.handle, + .pCont = pRsp, + .contLen = sizeof(SQueryTableRsp), + .code = code, + .msgType = 0 + }; - // do execute query - qTableQuery(pQInfo); + rpcSendResponse(&rpcRsp); + } else { + pQInfo = pMsg->pCont; + } + + qTableQuery(pQInfo); // do execute query } -static int32_t c = 0; static void dnodeProcessRetrieveMsg(void *pVnode, SReadMsg *pMsg) { SRetrieveTableMsg *pRetrieve = pMsg->pCont; void *pQInfo = (void*) htobe64(pRetrieve->qhandle); dTrace("QInfo:%p vgId:%d, retrieve msg is received", pQInfo, pRetrieve->header.vgId); - if ((++c)%2 == 0) { - int32_t k = 1; - } - int32_t rowSize = 0; - int32_t numOfRows = 0; int32_t contLen = 0; SRetrieveTableRsp *pRsp = NULL; - int32_t code = qRetrieveQueryResultInfo(pQInfo, &numOfRows, &rowSize); + int32_t code = qRetrieveQueryResultInfo(pQInfo); if (code != TSDB_CODE_SUCCESS) { contLen = sizeof(SRetrieveTableRsp); @@ -281,6 +301,12 @@ static void dnodeProcessRetrieveMsg(void *pVnode, SReadMsg *pMsg) { } else { // todo check code and handle error in build result set code = qDumpRetrieveResult(pQInfo, &pRsp, &contLen); + + if (qHasMoreResultsToRetrieve(pQInfo)) { + dnodeContinueExecuteQuery(pVnode, pQInfo, pMsg); + } else { // no further execution invoked, release the ref to vnode + dnodeProcessReadResult(pVnode, pMsg); + } } SRpcMsg rpcRsp = (SRpcMsg) { @@ -292,7 +318,4 @@ static void dnodeProcessRetrieveMsg(void *pVnode, SReadMsg *pMsg) { }; rpcSendResponse(&rpcRsp); - - //todo merge result should be done here - //dnodeProcessReadResult(&readMsg); } diff --git a/src/query/inc/queryExecutor.h b/src/query/inc/queryExecutor.h index 4ce606f599b8e04f8db50ab30999ebc1d837c274..16f2006d6f54d1a4a9ad94c04b838a7e8a42ca63 100644 --- a/src/query/inc/queryExecutor.h +++ b/src/query/inc/queryExecutor.h @@ -68,8 +68,10 @@ typedef struct SWindowResult { } SWindowResult; typedef struct SResultRec { - int64_t pointsTotal; - int64_t pointsRead; + int64_t total; + int64_t size; + int64_t capacity; + int32_t threshold; // the threshold size, when the number of rows in result buffer, return to client } SResultRec; typedef struct SWindowResInfo { @@ -112,7 +114,7 @@ typedef struct STableQueryInfo { typedef struct STableDataInfo { int32_t numOfBlocks; - int32_t start; // start block index + int32_t start; // start block index int32_t tableIndex; void* pMeterObj; int32_t groupIdx; // group id in table list @@ -143,7 +145,6 @@ typedef struct SQuery { int32_t pos; int64_t pointsOffset; // the number of points offset to save read data SData** sdata; - int32_t capacity; SSingleColumnFilterInfo* pFilterInfo; } SQuery; @@ -171,15 +172,13 @@ typedef struct SQueryRuntimeEnv { typedef struct SQInfo { void* signature; - void* pVnode; +// void* param; // pointer to the RpcReadMsg TSKEY startTime; TSKEY elapsedTime; - SResultRec rec; int32_t pointsInterpo; - int32_t code; // error code to returned to client -// int32_t killed; // denotes if current query is killed + int32_t code; // error code to returned to client sem_t dataReady; - SArray* pTableIdList; // table list + SArray* pTableIdList; // table id list SQueryRuntimeEnv runtimeEnv; int32_t subgroupIdx; int32_t offset; /* offset in group result set of subgroup */ @@ -204,7 +203,7 @@ typedef struct SQInfo { * @param pQInfo * @return */ -int32_t qCreateQueryInfo(void* pVnode, SQueryTableMsg* pQueryTableMsg, SQInfo** pQInfo); +int32_t qCreateQueryInfo(void* pVnode, SQueryTableMsg* pQueryTableMsg, void* param, SQInfo** pQInfo); /** * query on single table @@ -222,7 +221,7 @@ void qSuperTableQuery(void* pReadMsg); * wait for the query completed, and retrieve final results to client * @param pQInfo */ -int32_t qRetrieveQueryResultInfo(SQInfo* pQInfo, int32_t *numOfRows, int32_t* rowsize); +int32_t qRetrieveQueryResultInfo(SQInfo* pQInfo); /** * @@ -232,4 +231,11 @@ int32_t qRetrieveQueryResultInfo(SQInfo* pQInfo, int32_t *numOfRows, int32_t* ro */ int32_t qDumpRetrieveResult(SQInfo *pQInfo, SRetrieveTableRsp** pRsp, int32_t* contLen); +/** + * + * @param pQInfo + * @return + */ +bool qHasMoreResultsToRetrieve(SQInfo* pQInfo); + #endif // TDENGINE_QUERYEXECUTOR_H diff --git a/src/query/src/queryExecutor.c b/src/query/src/queryExecutor.c index cbce5097ec77f538f16d2d24e4f6817a4dd06746..e9923c6fd8101fe810f40c14f3a87133856c6c9a 100644 --- a/src/query/src/queryExecutor.c +++ b/src/query/src/queryExecutor.c @@ -53,9 +53,9 @@ /* get the qinfo struct address from the query struct address */ #define GET_COLUMN_BYTES(query, colidx) \ - ((query)->colList[(query)->pSelectExpr[colidx].pBase.colInfo.colIdxInBuf].info.bytes) + ((query)->colList[(query)->pSelectExpr[colidx].pBase.colInfo.colIdx].info.bytes) #define GET_COLUMN_TYPE(query, colidx) \ - ((query)->colList[(query)->pSelectExpr[colidx].pBase.colInfo.colIdxInBuf].info.type) + ((query)->colList[(query)->pSelectExpr[colidx].pBase.colInfo.colIdx].info.type) typedef struct SPointInterpoSupporter { int32_t numOfCols; @@ -364,8 +364,8 @@ bool isTSCompQuery(SQuery *pQuery) { return pQuery->pSelectExpr[0].pBase.functio bool doRevisedResultsByLimit(SQInfo *pQInfo) { SQuery *pQuery = pQInfo->runtimeEnv.pQuery; - if ((pQuery->limit.limit > 0) && (pQuery->rec.pointsRead + pQInfo->rec.pointsRead > pQuery->limit.limit)) { - pQuery->rec.pointsRead = pQuery->limit.limit - pQInfo->rec.pointsRead; + if ((pQuery->limit.limit > 0) && (pQuery->rec.size + pQuery->rec.size > pQuery->limit.limit)) { + pQuery->rec.size = pQuery->limit.limit - pQuery->rec.size; // query completed setQueryStatus(pQuery, QUERY_COMPLETED); @@ -1344,17 +1344,16 @@ static int32_t reviseForwardSteps(SQueryRuntimeEnv *pRuntimeEnv, int32_t forward static int32_t tableApplyFunctionsOnBlock(SQueryRuntimeEnv *pRuntimeEnv, SDataBlockInfo *pDataBlockInfo, SDataStatis *pStatis, __block_search_fn_t searchFn, int32_t *numOfRes, SWindowResInfo *pWindowResInfo, SArray *pDataBlock) { - SQuery *pQuery = pRuntimeEnv->pQuery; - int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order); + SQuery *pQuery = pRuntimeEnv->pQuery; if (pQuery->numOfFilterCols > 0 || pRuntimeEnv->pTSBuf != NULL || isGroupbyNormalCol(pQuery->pGroupbyExpr)) { *numOfRes = rowwiseApplyAllFunctions(pRuntimeEnv, pStatis, pDataBlockInfo, pWindowResInfo, pDataBlock); } else { *numOfRes = blockwiseApplyAllFunctions(pRuntimeEnv, pStatis, pDataBlockInfo, pWindowResInfo, searchFn, pDataBlock); } - + TSKEY lastKey = QUERY_IS_ASC_QUERY(pQuery) ? pDataBlockInfo->window.ekey : pDataBlockInfo->window.skey; - pQuery->lastKey = lastKey + step; + pQuery->lastKey = lastKey + GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order); doCheckQueryCompleted(pRuntimeEnv, lastKey, pWindowResInfo); @@ -1368,12 +1367,8 @@ static int32_t tableApplyFunctionsOnBlock(SQueryRuntimeEnv *pRuntimeEnv, SDataBl assert(*numOfRes >= 0); // check if buffer is large enough for accommodating all qualified points - if (*numOfRes > 0 && pQuery->checkBufferInLoop == 1) { - pQuery->pointsOffset -= *numOfRes; - if (pQuery->pointsOffset <= 0) { // todo return correct numOfRes for ts_comp function - pQuery->pointsOffset = 0; - setQueryStatus(pQuery, QUERY_RESBUF_FULL); - } + if (*numOfRes > 0 && pQuery->checkBufferInLoop == 1 && ((*numOfRes) >= pQuery->rec.threshold)) { + setQueryStatus(pQuery, QUERY_RESBUF_FULL); } return 0; @@ -1498,16 +1493,8 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, SColumnModel SColIndexEx * pColIndexEx = &pSqlFuncMsg->colInfo; SQLFunctionCtx *pCtx = &pRuntimeEnv->pCtx[i]; - - if (TSDB_COL_IS_TAG(pSqlFuncMsg->colInfo.flag)) { // process tag column info - SSchema *pSchema = getColumnModelSchema(pTagsSchema, pColIndexEx->colIdx); - - pCtx->inputType = pSchema->type; - pCtx->inputBytes = pSchema->bytes; - } else { - pCtx->inputType = GET_COLUMN_TYPE(pQuery, i); - pCtx->inputBytes = GET_COLUMN_BYTES(pQuery, i); - } + pCtx->inputType = GET_COLUMN_TYPE(pQuery, i); + pCtx->inputBytes = GET_COLUMN_BYTES(pQuery, i); pCtx->ptsOutputBuf = NULL; @@ -1607,7 +1594,8 @@ static void teardownQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv) { pRuntimeEnv->pTSBuf = tsBufDestory(pRuntimeEnv->pTSBuf); } -static bool isQueryKilled(SQuery *pQuery) { +static bool isQueryKilled(SQInfo *pQInfo) { + return (pQInfo->code == TSDB_CODE_QUERY_CANCELLED); #if 0 /* * check if the queried meter is going to be deleted. @@ -1621,8 +1609,6 @@ static bool isQueryKilled(SQuery *pQuery) { return (pQInfo->killed == 1); #endif - - return 0; } static bool setQueryKilled(SQInfo* pQInfo) { @@ -1891,8 +1877,6 @@ static void setScanLimitationByResultBuffer(SQuery *pQuery) { pQuery->checkBufferInLoop = hasMultioutput ? 1 : 0; } - - // pQuery->pointsOffset = pQuery->pointsToRead; } /* @@ -2313,7 +2297,7 @@ int32_t vnodeSTableQueryPrepare(SQInfo *pQInfo, SQuery *pQuery, void *param) { pQuery->status = 0; - pQInfo->rec = (SResultRec){0}; + pQuery->rec = (SResultRec){0}; pQuery->rec = (SResultRec){0}; changeExecuteScanOrder(pQuery, true); @@ -2552,7 +2536,7 @@ SArray *loadDataBlockOnDemand(SQueryRuntimeEnv *pRuntimeEnv, SDataBlockInfo *pBl // return DISK_DATA_LOAD_FAILED; } - if (pStatis == NULL) { + if (*pStatis == NULL) { pDataBlock = tsdbRetrieveDataBlock(pRuntimeEnv->pQueryHandle, NULL); } } else { @@ -2651,7 +2635,7 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) { while (tsdbNextDataBlock(pQueryHandle)) { // check if query is killed or not set the status of query to pass the status check - if (isQueryKilled(pQuery)) { + if (isQueryKilled(GET_QINFO_ADDR(pRuntimeEnv))) { return cnt; } @@ -2679,9 +2663,9 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) { } int32_t numOfRes = 0; - SDataStatis *pStatis = NULL; - SArray * pDataBlock = loadDataBlockOnDemand(pRuntimeEnv, &blockInfo, &pStatis); + + SArray *pDataBlock = loadDataBlockOnDemand(pRuntimeEnv, &blockInfo, &pStatis); int32_t forwardStep = tableApplyFunctionsOnBlock(pRuntimeEnv, &blockInfo, pStatis, binarySearchForKey, &numOfRes, &pRuntimeEnv->windowResInfo, pDataBlock); @@ -3046,9 +3030,9 @@ void copyResToQueryResultBuf(SQInfo *pQInfo, SQuery *pQuery) { offset += pData->numOfElems; } - assert(pQuery->rec.pointsRead == 0); + assert(pQuery->rec.size == 0); - pQuery->rec.pointsRead += rows; + pQuery->rec.size += rows; pQInfo->offset += 1; } @@ -3378,7 +3362,7 @@ void resetCtxOutputBuf(SQueryRuntimeEnv *pRuntimeEnv) { pCtx->ptsOutputBuf = pRuntimeEnv->pCtx[0].aOutputBuf; } - memset(pQuery->sdata[i]->data, 0, (size_t)pQuery->pSelectExpr[i].resBytes * pQuery->capacity); + memset(pQuery->sdata[i]->data, 0, (size_t)pQuery->pSelectExpr[i].resBytes * pQuery->rec.capacity); } initCtxOutputBuf(pRuntimeEnv); @@ -3425,14 +3409,14 @@ void initCtxOutputBuf(SQueryRuntimeEnv *pRuntimeEnv) { void doSkipResults(SQueryRuntimeEnv *pRuntimeEnv) { SQuery *pQuery = pRuntimeEnv->pQuery; - if (pQuery->rec.pointsRead == 0 || pQuery->limit.offset == 0) { + if (pQuery->rec.size == 0 || pQuery->limit.offset == 0) { return; } - if (pQuery->rec.pointsRead <= pQuery->limit.offset) { - pQuery->limit.offset -= pQuery->rec.pointsRead; + if (pQuery->rec.size <= pQuery->limit.offset) { + pQuery->limit.offset -= pQuery->rec.size; - pQuery->rec.pointsRead = 0; + pQuery->rec.size = 0; // pQuery->pointsOffset = pQuery->rec.pointsToRead; // clear all data in result buffer resetCtxOutputBuf(pRuntimeEnv); @@ -3441,13 +3425,13 @@ void doSkipResults(SQueryRuntimeEnv *pRuntimeEnv) { pQuery->status &= (~QUERY_RESBUF_FULL); } else { int32_t numOfSkip = (int32_t)pQuery->limit.offset; - pQuery->rec.pointsRead -= numOfSkip; + pQuery->rec.size -= numOfSkip; for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) { int32_t functionId = pQuery->pSelectExpr[i].pBase.functionId; int32_t bytes = pRuntimeEnv->pCtx[i].outputBytes; assert(0); - // memmove(pQuery->sdata[i]->data, pQuery->sdata[i]->data + bytes * numOfSkip, pQuery->pointsRead * bytes); + // memmove(pQuery->sdata[i]->data, pQuery->sdata[i]->data + bytes * numOfSkip, pQuery->size * bytes); pRuntimeEnv->pCtx[i].aOutputBuf += bytes * numOfSkip; if (functionId == TSDB_FUNC_DIFF || functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM) { @@ -3617,7 +3601,7 @@ void vnodeScanAllData(SQueryRuntimeEnv *pRuntimeEnv) { pRuntimeEnv->scanFlag = REPEAT_SCAN; /* check if query is killed or not */ - if (isQueryKilled(pQuery)) { + if (isQueryKilled(GET_QINFO_ADDR(pRuntimeEnv))) { // setQueryStatus(pQuery, QUERY_NO_DATA_TO_CHECK); return; } @@ -4010,8 +3994,9 @@ void copyFromWindowResToSData(SQInfo *pQInfo, SWindowResult *result) { int32_t orderType = (pQuery->pGroupbyExpr != NULL) ? pQuery->pGroupbyExpr->orderType : TSQL_SO_ASC; int32_t numOfResult = doCopyToSData(pQInfo, result, orderType); - pQuery->rec.pointsRead += numOfResult; - // assert(pQuery->rec.pointsRead <= pQuery->pointsToRead); + pQuery->rec.size += numOfResult; + + assert(pQuery->rec.size <= pQuery->rec.capacity); } static void updateWindowResNumOfRes(SQueryRuntimeEnv *pRuntimeEnv, STableDataInfo *pTableDataInfo) { @@ -4049,31 +4034,6 @@ void stableApplyFunctionsOnBlock_(SQInfo *pQInfo, STableDataInfo *pTableDataInfo updatelastkey(pQuery, pTableQueryInfo); } -// we need to split the refstatsult into different packages. -int32_t vnodeGetResultSize(void *thandle, int32_t *numOfRows) { - SQInfo *pQInfo = (SQInfo *)thandle; - SQuery *pQuery = &pQInfo->runtimeEnv.pQuery; - - /* - * get the file size and set the numOfRows to be the file size, since for tsComp query, - * the returned row size is equalled to 1 - * - * TODO handle the case that the file is too large to send back one time - */ - if (isTSCompQuery(pQuery) && (*numOfRows) > 0) { - struct stat fstat; - if (stat(pQuery->sdata[0]->data, &fstat) == 0) { - *numOfRows = fstat.st_size; - return fstat.st_size; - } else { - dError("QInfo:%p failed to get file info, path:%s, reason:%s", pQInfo, pQuery->sdata[0]->data, strerror(errno)); - return 0; - } - } else { - return pQuery->rowSize * (*numOfRows); - } -} - bool vnodeHasRemainResults(void *handle) { SQInfo *pQInfo = (SQInfo *)handle; @@ -4085,7 +4045,7 @@ bool vnodeHasRemainResults(void *handle) { SQuery * pQuery = pRuntimeEnv->pQuery; SInterpolationInfo *pInterpoInfo = &pRuntimeEnv->interpoInfo; - if (pQuery->limit.limit > 0 && pQInfo->rec.pointsRead >= pQuery->limit.limit) { + if (pQuery->limit.limit > 0 && pQuery->rec.size >= pQuery->limit.limit) { return false; } @@ -4158,6 +4118,11 @@ static void doCopyQueryResultToMsg(SQInfo *pQInfo, int32_t numOfRows, char *data memmove(data, pQuery->sdata[col]->data, bytes * numOfRows); data += bytes * numOfRows; } + + // all data returned, set query over + if (Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) { + setQueryStatus(pQuery, QUERY_OVER); + } } int32_t vnodeQueryResultInterpolate(SQInfo *pQInfo, tFilePage **pDst, tFilePage **pDataSrc, int32_t numOfRows, @@ -4266,8 +4231,6 @@ int32_t initQInfo(SQInfo *pQInfo, void *param, void* tsdb) { setScanLimitationByResultBuffer(pQuery); changeExecuteScanOrder(pQuery, false); - pQInfo->rec = (SResultRec){0}; - // dataInCache requires lastKey value pQuery->lastKey = pQuery->window.skey; @@ -4404,14 +4367,10 @@ static void queryOnDataBlocks(SQInfo *pQInfo, STableDataInfo *pMeterDataInfo) { SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv; SQuery * pQuery = pRuntimeEnv->pQuery; - // SMeterObj * pTempMeter = getMeterObj(pSupporter->pMetersHashTable, pSupporter->pMeterSidExtInfo[0]->sid); - // __block_search_fn_t searchFn = vnodeSearchKeyFunc[pTempMeter->searchAlgorithm]; - // dTrace("QInfo:%p start to check data blocks in %d files", pQInfo, pVnodeFileInfo->numOfFiles); - tsdb_query_handle_t *pQueryHandle = pRuntimeEnv->pQueryHandle; while (tsdbNextDataBlock(pQueryHandle)) { - if (isQueryKilled(pQuery)) { + if (isQueryKilled(pQInfo)) { break; } @@ -4550,7 +4509,7 @@ static int64_t doCheckMetersInGroup(SQInfo *pQInfo, int32_t index, int32_t start // accumulate the point interpolation result if (numOfRes > 0) { - pQuery->rec.pointsRead += numOfRes; + pQuery->rec.size += numOfRes; forwardCtxOutputBuf(pRuntimeEnv, numOfRes); } @@ -4593,7 +4552,7 @@ static void vnodeSTableSeqProcessor(SQInfo *pQInfo) { pSupporter->meterIdx = start; for (int32_t k = start; k <= end; ++k, pSupporter->meterIdx++) { - if (isQueryKilled(pQuery)) { + if (isQueryKilled(pQInfo)) { setQueryStatus(pQuery, QUERY_NO_DATA_TO_CHECK); return; } @@ -4620,7 +4579,7 @@ static void vnodeSTableSeqProcessor(SQInfo *pQInfo) { pSupporter->subgroupIdx); for (int32_t k = start; k <= end; ++k) { - if (isQueryKilled(pQuery)) { + if (isQueryKilled(pQInfo)) { setQueryStatus(pQuery, QUERY_NO_DATA_TO_CHECK); return; } @@ -4638,7 +4597,7 @@ static void vnodeSTableSeqProcessor(SQInfo *pQInfo) { pSupporter->subgroupIdx++; // output buffer is full, return to client - if (pQuery->pointsRead >= pQuery->pointsToRead) { + if (pQuery->size >= pQuery->pointsToRead) { break; } } @@ -4654,9 +4613,9 @@ static void vnodeSTableSeqProcessor(SQInfo *pQInfo) { */ if (pSupporter->subgroupIdx > 0) { copyFromWindowResToSData(pQInfo, pRuntimeEnv->windowResInfo.pResult); - pQInfo->pointsRead += pQuery->pointsRead; + pQInfo->size += pQuery->size; - if (pQuery->pointsRead > 0) { + if (pQuery->size > 0) { return; } } @@ -4671,7 +4630,7 @@ static void vnodeSTableSeqProcessor(SQInfo *pQInfo) { while (pSupporter->meterIdx < pSupporter->numOfMeters) { int32_t k = pSupporter->meterIdx; - if (isQueryKilled(pQuery)) { + if (isQueryKilled(pQInfo)) { setQueryStatus(pQuery, QUERY_NO_DATA_TO_CHECK); return; } @@ -4722,7 +4681,7 @@ static void vnodeSTableSeqProcessor(SQInfo *pQInfo) { vnodeScanAllData(pRuntimeEnv); - pQuery->pointsRead = getNumOfResult(pRuntimeEnv); + pQuery->size = getNumOfResult(pRuntimeEnv); doSkipResults(pRuntimeEnv); // the limitation of output result is reached, set the query completed @@ -4757,7 +4716,7 @@ static void vnodeSTableSeqProcessor(SQInfo *pQInfo) { pQuery->skey = pQuery->lastKey; // all data in the result buffer are skipped due to the offset, continue to retrieve data from current meter - if (pQuery->pointsRead == 0) { + if (pQuery->size == 0) { assert(!Q_STATUS_EQUAL(pQuery->over, QUERY_RESBUF_FULL)); continue; } else { @@ -4804,17 +4763,17 @@ static void vnodeSTableSeqProcessor(SQInfo *pQInfo) { } pQInfo->pTableQuerySupporter->subgroupIdx = 0; - pQuery->pointsRead = 0; + pQuery->size = 0; copyFromWindowResToSData(pQInfo, pWindowResInfo->pResult); } - pQInfo->pointsRead += pQuery->pointsRead; + pQInfo->size += pQuery->size; pQuery->pointsOffset = pQuery->pointsToRead; dTrace( "QInfo %p vid:%d, numOfMeters:%d, index:%d, numOfGroups:%d, %d points returned, totalRead:%d totalReturn:%d," "next skey:%" PRId64 ", offset:%" PRId64, - pQInfo, vid, pTableIdList->numOfTables, pSupporter->meterIdx, pTableIdList->numOfSubSet, pQuery->pointsRead, pQInfo->pointsRead, + pQInfo, vid, pTableIdList->numOfTables, pSupporter->meterIdx, pTableIdList->numOfSubSet, pQuery->size, pQInfo->size, pQInfo->pointsReturned, pQuery->skey, pQuery->limit.offset); #endif } @@ -4926,13 +4885,13 @@ static void vnodeMultiMeterQueryProcessor(SQInfo *pQInfo) { copyFromWindowResToSData(pQInfo, pRuntimeEnv->windowResInfo.pResult); } - pQInfo->rec.pointsRead += pQuery->rec.pointsRead; + pQuery->rec.size += pQuery->rec.size; - if (pQuery->rec.pointsRead == 0) { + if (pQuery->rec.size == 0) { // vnodePrintQueryStatistics(pSupporter); } - dTrace("QInfo:%p current:%lldd, total:%lldd", pQInfo, pQuery->rec.pointsRead, pQInfo->rec.pointsTotal); + dTrace("QInfo:%p current:%lldd, total:%lldd", pQInfo, pQuery->rec.size, pQuery->rec.total); return; } #if 0 @@ -4965,7 +4924,7 @@ static void vnodeMultiMeterQueryProcessor(SQInfo *pQInfo) { doMultiMeterSupplementaryScan(pQInfo); - if (isQueryKilled(pQuery)) { + if (isQueryKilled(pQInfo)) { dTrace("QInfo:%p query killed, abort", pQInfo); return; } @@ -4985,8 +4944,8 @@ static void vnodeMultiMeterQueryProcessor(SQInfo *pQInfo) { } // handle the limitation of output buffer - pQInfo->pointsRead += pQuery->pointsRead; - dTrace("QInfo:%p points returned:%d, totalRead:%d totalReturn:%d", pQInfo, pQuery->pointsRead, pQInfo->pointsRead, + pQInfo->size += pQuery->size; + dTrace("QInfo:%p points returned:%d, totalRead:%d totalReturn:%d", pQInfo, pQuery->size, pQInfo->size, pQInfo->pointsReturned); #endif } @@ -5004,13 +4963,13 @@ static void tableFixedOutputProcessor(SQInfo *pQInfo) { vnodeScanAllData(pRuntimeEnv); doFinalizeResult(pRuntimeEnv); - if (isQueryKilled(pQuery)) { + if (isQueryKilled(pQInfo)) { return; } // since the numOfOutputElems must be identical for all sql functions that are allowed to be executed simutanelously. - pQuery->rec.pointsRead = getNumOfResult(pRuntimeEnv); - // assert(pQuery->pointsRead <= pQuery->pointsToRead && + pQuery->rec.size = getNumOfResult(pRuntimeEnv); + // assert(pQuery->size <= pQuery->pointsToRead && // Q_STATUS_EQUAL(pQuery->over, QUERY_COMPLETED)); // must be top/bottom query if offset > 0 @@ -5021,15 +4980,12 @@ static void tableFixedOutputProcessor(SQInfo *pQInfo) { doSkipResults(pRuntimeEnv); doRevisedResultsByLimit(pQInfo); - pQInfo->rec.pointsRead = pQuery->rec.pointsRead; + pQuery->rec.size = pQuery->rec.size; } static void tableMultiOutputProcessor(SQInfo *pQInfo) { -#if 0 - SQuery * pQuery = &pQInfo->query; - SMeterObj *pMeterObj = pQInfo->pObj; - - SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->pTableQuerySupporter->runtimeEnv; + SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv; + SQuery * pQuery = pRuntimeEnv->pQuery; // for ts_comp query, re-initialized is not allowed if (!isTSCompQuery(pQuery)) { @@ -5040,63 +4996,52 @@ static void tableMultiOutputProcessor(SQInfo *pQInfo) { vnodeScanAllData(pRuntimeEnv); doFinalizeResult(pRuntimeEnv); - if (isQueryKilled(pQuery)) { + if (isQueryKilled(pQInfo)) { return; } - pQuery->pointsRead = getNumOfResult(pRuntimeEnv); - if (pQuery->limit.offset > 0 && pQuery->numOfFilterCols > 0 && pQuery->pointsRead > 0) { + pQuery->rec.size = getNumOfResult(pRuntimeEnv); + if (pQuery->limit.offset > 0 && pQuery->numOfFilterCols > 0 && pQuery->rec.size > 0) { doSkipResults(pRuntimeEnv); } /* - * 1. if pQuery->pointsRead == 0, pQuery->limit.offset >= 0, still need to check data - * 2. if pQuery->pointsRead > 0, pQuery->limit.offset must be 0 + * 1. if pQuery->size == 0, pQuery->limit.offset >= 0, still need to check data + * 2. if pQuery->size > 0, pQuery->limit.offset must be 0 */ - if (pQuery->pointsRead > 0 || Q_STATUS_EQUAL(pQuery->over, QUERY_COMPLETED | QUERY_NO_DATA_TO_CHECK)) { + if (pQuery->rec.size > 0 || Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) { break; } - TSKEY nextTimestamp = loadRequiredBlockIntoMem(pRuntimeEnv, &pRuntimeEnv->nextPos); - assert(nextTimestamp > 0 || ((nextTimestamp < 0) && Q_STATUS_EQUAL(pQuery->over, QUERY_NO_DATA_TO_CHECK))); - dTrace("QInfo:%p vid:%d sid:%d id:%s, skip current result, offset:%" PRId64 ", next qrange:%" PRId64 "-%" PRId64, - pQInfo, pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pQuery->limit.offset, pQuery->lastKey, - pQuery->ekey); + pQInfo, pQuery->limit.offset, pQuery->lastKey); resetCtxOutputBuf(pRuntimeEnv); } doRevisedResultsByLimit(pQInfo); - pQInfo->pointsRead += pQuery->pointsRead; - - if (Q_STATUS_EQUAL(pQuery->over, QUERY_RESBUF_FULL)) { - TSKEY nextTimestamp = loadRequiredBlockIntoMem(pRuntimeEnv, &pRuntimeEnv->nextPos); - assert(nextTimestamp > 0 || ((nextTimestamp < 0) && Q_STATUS_EQUAL(pQuery->over, QUERY_NO_DATA_TO_CHECK))); - - dTrace("QInfo:%p vid:%d sid:%d id:%s, query abort due to buffer limitation, next qrange:%" PRId64 "-%" PRId64, - pQInfo, pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pQuery->lastKey, pQuery->ekey); + if (Q_STATUS_EQUAL(pQuery->status, QUERY_RESBUF_FULL)) { + dTrace("QInfo:%p query paused due to buffer limitation, next qrange:%" PRId64 "-%" PRId64, + pQInfo, pQuery->lastKey, pQuery->window.ekey); } - dTrace("QInfo:%p vid:%d sid:%d id:%s, %d points returned, totalRead:%d totalReturn:%d", pQInfo, pMeterObj->vnode, - pMeterObj->sid, pMeterObj->meterId, pQuery->pointsRead, pQInfo->pointsRead, pQInfo->pointsReturned); +// dTrace("QInfo:%p vid:%d sid:%d id:%s, %d points returned, totalRead:%d totalReturn:%d", pQInfo, pMeterObj->vnode, +// pMeterObj->sid, pMeterObj->meterId, pQuery->size, pQInfo->size, pQInfo->pointsReturned); - pQuery->pointsOffset = pQuery->pointsToRead; // restore the available buffer - if (!isTSCompQuery(pQuery)) { - assert(pQuery->pointsRead <= pQuery->pointsToRead); - } - -#endif +// pQuery->pointsOffset = pQuery->pointsToRead; //restore the available buffer +// if (!isTSCompQuery(pQuery)) { +// assert(pQuery->size <= pQuery->pointsToRead); +// } } -static void vnodeSingleMeterIntervalMainLooper(SQueryRuntimeEnv *pRuntimeEnv) { +static void tableIntervalProcessImpl(SQueryRuntimeEnv *pRuntimeEnv) { SQuery *pQuery = pRuntimeEnv->pQuery; while (1) { initCtxOutputBuf(pRuntimeEnv); vnodeScanAllData(pRuntimeEnv); - if (isQueryKilled(pQuery)) { + if (isQueryKilled(GET_QINFO_ADDR(pRuntimeEnv))) { return; } @@ -5115,34 +5060,25 @@ static void vnodeSingleMeterIntervalMainLooper(SQueryRuntimeEnv *pRuntimeEnv) { pQuery->limit.offset -= c; } - if (Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) { - break; - } - - // load the data block for the next retrieve - // loadRequiredBlockIntoMem(pRuntimeEnv, &pRuntimeEnv->nextPos); - if (Q_STATUS_EQUAL(pQuery->status, QUERY_RESBUF_FULL)) { + if (Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED|QUERY_RESBUF_FULL)) { break; } } } -/* handle time interval query on single table */ +// handle time interval query on table static void tableIntervalProcessor(SQInfo *pQInfo) { - // STable *pMeterObj = pQInfo->pObj; - SQueryRuntimeEnv *pRuntimeEnv = &(pQInfo->runtimeEnv); SQuery * pQuery = pRuntimeEnv->pQuery; int32_t numOfInterpo = 0; while (1) { - resetCtxOutputBuf(pRuntimeEnv); - vnodeSingleMeterIntervalMainLooper(pRuntimeEnv); + tableIntervalProcessImpl(pRuntimeEnv); if (pQuery->intervalTime > 0) { pQInfo->subgroupIdx = 0; // always start from 0 - pQuery->rec.pointsRead = 0; + pQuery->rec.size = 0; copyFromWindowResToSData(pQInfo, pRuntimeEnv->windowResInfo.pResult); clearFirstNTimeWindow(pRuntimeEnv, pQInfo->subgroupIdx); @@ -5153,43 +5089,43 @@ static void tableIntervalProcessor(SQInfo *pQInfo) { doRevisedResultsByLimit(pQInfo); break; } else { - taosInterpoSetStartInfo(&pRuntimeEnv->interpoInfo, pQuery->rec.pointsRead, pQuery->interpoType); + taosInterpoSetStartInfo(&pRuntimeEnv->interpoInfo, pQuery->rec.size, pQuery->interpoType); SData **pInterpoBuf = pRuntimeEnv->pInterpoBuf; for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) { - memcpy(pInterpoBuf[i]->data, pQuery->sdata[i]->data, pQuery->rec.pointsRead * pQuery->pSelectExpr[i].resBytes); + memcpy(pInterpoBuf[i]->data, pQuery->sdata[i]->data, pQuery->rec.size * pQuery->pSelectExpr[i].resBytes); } numOfInterpo = 0; - pQuery->rec.pointsRead = vnodeQueryResultInterpolate( - pQInfo, (tFilePage **)pQuery->sdata, (tFilePage **)pInterpoBuf, pQuery->rec.pointsRead, &numOfInterpo); + pQuery->rec.size = vnodeQueryResultInterpolate( + pQInfo, (tFilePage **)pQuery->sdata, (tFilePage **)pInterpoBuf, pQuery->rec.size, &numOfInterpo); - dTrace("QInfo: %p interpo completed, final:%d", pQInfo, pQuery->rec.pointsRead); - if (pQuery->rec.pointsRead > 0 || Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) { + dTrace("QInfo: %p interpo completed, final:%d", pQInfo, pQuery->rec.size); + if (pQuery->rec.size > 0 || Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) { doRevisedResultsByLimit(pQInfo); break; } // no result generated yet, continue retrieve data - pQuery->rec.pointsRead = 0; + pQuery->rec.size = 0; } } // all data scanned, the group by normal column can return if (isGroupbyNormalCol(pQuery->pGroupbyExpr)) { // todo refactor with merge interval time result pQInfo->subgroupIdx = 0; - pQuery->rec.pointsRead = 0; + pQuery->rec.size = 0; copyFromWindowResToSData(pQInfo, pRuntimeEnv->windowResInfo.pResult); clearFirstNTimeWindow(pRuntimeEnv, pQInfo->subgroupIdx); } - pQInfo->rec.pointsRead += pQuery->rec.pointsRead; + pQuery->rec.size += pQuery->rec.size; pQInfo->pointsInterpo += numOfInterpo; // dTrace("%p vid:%d sid:%d id:%s, %d points returned %d points interpo, totalRead:%d totalInterpo:%d // totalReturn:%d", - // pQInfo, pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pQuery->pointsRead, numOfInterpo, - // pQInfo->pointsRead - pQInfo->pointsInterpo, pQInfo->pointsInterpo, pQInfo->pointsReturned); + // pQInfo, pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pQuery->size, numOfInterpo, + // pQInfo->size - pQInfo->pointsInterpo, pQInfo->pointsInterpo, pQInfo->pointsReturned); } void qTableQuery(SQInfo *pQInfo) { @@ -5201,7 +5137,7 @@ void qTableQuery(SQInfo *pQInfo) { SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv; SQuery *pQuery = pRuntimeEnv->pQuery; - if (isQueryKilled(pQuery)) { + if (isQueryKilled(pQInfo)) { dTrace("QInfo:%p it is already killed, abort", pQInfo); return; } @@ -5216,16 +5152,16 @@ void qTableQuery(SQInfo *pQInfo) { int32_t numOfInterpo = 0; int32_t remain = taosNumOfRemainPoints(&pRuntimeEnv->interpoInfo); - pQuery->rec.pointsRead = vnodeQueryResultInterpolate(pQInfo, (tFilePage **)pQuery->sdata, + pQuery->rec.size = vnodeQueryResultInterpolate(pQInfo, (tFilePage **)pQuery->sdata, (tFilePage **)pRuntimeEnv->pInterpoBuf, remain, &numOfInterpo); doRevisedResultsByLimit(pQInfo); pQInfo->pointsInterpo += numOfInterpo; - pQInfo->rec.pointsRead += pQuery->rec.pointsRead; + pQuery->rec.size += pQuery->rec.size; // dTrace("QInfo:%p %d points returned %d points interpo, totalRead:%d totalInterpo:%d totalReturn:%d", - // pQInfo, pQuery->pointsRead, numOfInterpo, pQInfo->pointsRead, pQInfo->pointsInterpo, + // pQInfo, pQuery->size, numOfInterpo, pQInfo->size, pQInfo->pointsInterpo, // pQInfo->pointsReturned); sem_post(&pQInfo->dataReady); return; @@ -5235,22 +5171,22 @@ void qTableQuery(SQInfo *pQInfo) { if (Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) { // continue to get push data from the group result if (isGroupbyNormalCol(pQuery->pGroupbyExpr) || - (pQuery->intervalTime > 0 && pQInfo->rec.pointsTotal < pQuery->limit.limit)) { + (pQuery->intervalTime > 0 && pQuery->rec.total < pQuery->limit.limit)) { // todo limit the output for interval query? - pQuery->rec.pointsRead = 0; + pQuery->rec.size = 0; pQInfo->subgroupIdx = 0; // always start from 0 if (pRuntimeEnv->windowResInfo.size > 0) { copyFromWindowResToSData(pQInfo, pRuntimeEnv->windowResInfo.pResult); - pQInfo->rec.pointsRead += pQuery->rec.pointsRead; + pQuery->rec.size += pQuery->rec.size; clearFirstNTimeWindow(pRuntimeEnv, pQInfo->subgroupIdx); - if (pQuery->rec.pointsRead > 0) { + if (pQuery->rec.size > 0) { // dTrace("QInfo:%p vid:%d sid:%d id:%s, %d points returned %d from group results, totalRead:%d // totalReturn:%d", - // pQInfo, pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pQuery->pointsRead, - // pQInfo->pointsRead, pQInfo->pointsInterpo, pQInfo->pointsReturned); + // pQInfo, pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pQuery->size, + // pQInfo->size, pQInfo->pointsInterpo, pQInfo->pointsReturned); sem_post(&pQInfo->dataReady); return; @@ -5260,7 +5196,7 @@ void qTableQuery(SQInfo *pQInfo) { // dTrace("QInfo:%p vid:%d sid:%d id:%s, query over, %d points are returned", pQInfo, pMeterObj->vnode, // pMeterObj->sid, - // pMeterObj->meterId, pQInfo->pointsRead); + // pMeterObj->meterId, pQInfo->size); // vnodePrintQueryStatistics(pSupporter); sem_post(&pQInfo->dataReady); @@ -5268,7 +5204,7 @@ void qTableQuery(SQInfo *pQInfo) { } // number of points returned during this query - pQuery->rec.pointsRead = 0; + pQuery->rec.size = 0; int64_t st = taosGetTimestampUs(); @@ -5291,10 +5227,10 @@ void qTableQuery(SQInfo *pQInfo) { pQInfo->elapsedTime += (taosGetTimestampUs() - st); /* check if query is killed or not */ - if (isQueryKilled(pQuery)) { + if (isQueryKilled(pQInfo)) { dTrace("QInfo:%p query is killed", pQInfo); } else { - dTrace("QInfo:%p query task completed, %d points are returned", pQInfo, pQuery->rec.pointsRead); + dTrace("QInfo:%p query task completed, %d points are returned", pQInfo, pQuery->rec.size); } sem_post(&pQInfo->dataReady); @@ -5317,7 +5253,7 @@ void qSuperTableQuery(void *pReadMsg) { // assert(pQInfo->refCount >= 1); #if 0 SQuery *pQuery = &pQInfo->runtimeEnv.pQuery; - pQuery->rec.pointsRead = 0; + pQuery->rec.size = 0; int64_t st = taosGetTimestampUs(); if (pQuery->intervalTime > 0 || @@ -5333,15 +5269,15 @@ void qSuperTableQuery(void *pReadMsg) { /* record the total elapsed time */ pQInfo->elapsedTime += (taosGetTimestampUs() - st); - pQuery->status = isQueryKilled(pQuery) ? 1 : 0; + pQuery->status = isQueryKilled(pQInfo) ? 1 : 0; -// taosInterpoSetStartInfo(&pQInfo->runtimeEnv.interpoInfo, pQuery->pointsRead, +// taosInterpoSetStartInfo(&pQInfo->runtimeEnv.interpoInfo, pQuery->size, // pQInfo->query.interpoType); - if (pQuery->rec.pointsRead == 0) { + if (pQuery->rec.size == 0) { // pQInfo->over = 1; // dTrace("QInfo:%p over, %d meters queried, %d points are returned", pQInfo, pSupporter->numOfMeters, -// pQInfo->pointsRead); +// pQInfo->size); // vnodePrintQueryStatistics(pSupporter); } @@ -5839,6 +5775,39 @@ static int32_t vnodeCreateFilterInfo(void *pQInfo, SQuery *pQuery) { return TSDB_CODE_SUCCESS; } +static void doUpdateExprColumnIndex(SQuery* pQuery) { + assert(pQuery->pSelectExpr != NULL && pQuery != NULL); +// int32_t i = 0, j = 0; +// while (i < pQuery->numOfCols && j < pMeterObj->numOfColumns) { +// if (pQuery->colList[i].data.colId == pMeterObj->schema[j].colId) { +// pQuery->colList[i++].colIdx = (int16_t)j++; +// } else if (pQuery->colList[i].data.colId < pMeterObj->schema[j].colId) { +// pQuery->colList[i++].colIdx = -1; +// } else if (pQuery->colList[i].data.colId > pMeterObj->schema[j].colId) { +// j++; +// } +// } + +// while (i < pQuery->numOfCols) { +// pQuery->colList[i++].colIdx = -1; // not such column in current meter +// } + + for(int32_t k = 0; k < pQuery->numOfOutputCols; ++k) { + SSqlFuncExprMsg* pSqlExprMsg = &pQuery->pSelectExpr[k].pBase; + if (pSqlExprMsg->functionId == TSDB_FUNC_ARITHM || pSqlExprMsg->colInfo.flag == TSDB_COL_TAG) { + continue; + } + + SColIndexEx* pColIndexEx = &pSqlExprMsg->colInfo; + for(int32_t f = 0; f < pQuery->numOfCols; ++f) { + if (pColIndexEx->colId == pQuery->colList[f].info.colId) { + pColIndexEx->colIdx = f; + break; + } + } + } +} + static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGroupbyExpr, SSqlFunctionExpr *pExprs, SArray *pTableIdList) { SQInfo *pQInfo = (SQInfo *)calloc(1, sizeof(SQInfo)); @@ -5897,6 +5866,8 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGrou assert(pExprs[col].resBytes > 0); pQuery->rowSize += pExprs[col].resBytes; } + + doUpdateExprColumnIndex(pQuery); int32_t ret = vnodeCreateFilterInfo(pQInfo, pQuery); if (ret != TSDB_CODE_SUCCESS) { @@ -5910,12 +5881,14 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGrou } // set the output buffer capacity - pQuery->capacity = 4096; + pQuery->rec.capacity = 4096; + pQuery->rec.threshold = 2; + for (int32_t col = 0; col < pQuery->numOfOutputCols; ++col) { assert(pExprs[col].interResBytes >= pExprs[col].resBytes); // allocate additional memory for interResults that are usually larger then final results - size_t size = (pQuery->capacity + 1) * pExprs[col].resBytes + pExprs[col].interResBytes + sizeof(SData); + size_t size = (pQuery->rec.capacity + 1) * pExprs[col].resBytes + pExprs[col].interResBytes + sizeof(SData); pQuery->sdata[col] = (SData *)calloc(1, size); if (pQuery->sdata[col] == NULL) { goto _clean_memory; @@ -5933,13 +5906,12 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGrou } // to make sure third party won't overwrite this structure - pQInfo->signature = (uint64_t)pQInfo; + pQInfo->signature = pQInfo; pQInfo->pTableIdList = pTableIdList; pQuery->pos = -1; - // dTrace("vid:%d sid:%d meterId:%s, QInfo is allocated:%p", pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, - // pQInfo); - + + dTrace("QInfo %p is allocated", pQInfo); return pQInfo; _clean_memory: @@ -6092,7 +6064,7 @@ _error: return code; } -int32_t qCreateQueryInfo(void* tsdb, SQueryTableMsg *pQueryTableMsg, SQInfo **pQInfo) { +int32_t qCreateQueryInfo(void* tsdb, SQueryTableMsg *pQueryTableMsg, void* param, SQInfo **pQInfo) { assert(pQueryTableMsg != NULL); int32_t code = TSDB_CODE_SUCCESS; @@ -6130,6 +6102,7 @@ int32_t qCreateQueryInfo(void* tsdb, SQueryTableMsg *pQueryTableMsg, SQInfo **pQ // pObj->qhandle = vnodeQueryOnMultiMeters(pMeterObjList, pGroupbyExpr, pExprs, pQueryTableMsg, &code); } else { code = createQInfo(pQueryTableMsg, pGroupbyExpr, pExprs, pTableIdList, tsdb, pQInfo); +// (*pQInfo)->param = param; } _query_over: @@ -6155,13 +6128,13 @@ _query_over: return TSDB_CODE_SUCCESS; } -int32_t qRetrieveQueryResultInfo(SQInfo *pQInfo, int32_t *numOfRows, int32_t *rowsize) { +int32_t qRetrieveQueryResultInfo(SQInfo *pQInfo) { if (pQInfo == NULL || !isQInfoValid(pQInfo)) { return TSDB_CODE_INVALID_QHANDLE; } SQuery *pQuery = pQInfo->runtimeEnv.pQuery; - if (isQueryKilled(pQuery)) { + if (isQueryKilled(pQInfo)) { dTrace("QInfo:%p query is killed, code:%d", pQInfo, pQInfo->code); if (pQInfo->code == TSDB_CODE_SUCCESS) { return TSDB_CODE_QUERY_CANCELLED; @@ -6171,11 +6144,8 @@ int32_t qRetrieveQueryResultInfo(SQInfo *pQInfo, int32_t *numOfRows, int32_t *ro } sem_wait(&pQInfo->dataReady); - - *numOfRows = pQInfo->rec.pointsRead; - *rowsize = pQuery->rowSize; - - dTrace("QInfo:%p retrieve result info, rowsize:%d, rows:%d, code:%d", pQInfo, *rowsize, *numOfRows, pQInfo->code); + dTrace("QInfo:%p retrieve result info, rowsize:%d, rows:%d, code:%d", pQInfo, pQuery->rowSize, pQuery->rec.size, + pQInfo->code); return (pQInfo->code >= 0)? pQInfo->code:(-pQInfo->code); } @@ -6202,7 +6172,7 @@ static size_t getResultSize(SQInfo *pQInfo, int64_t *numOfRows) { } } -static int32_t doDumpQueryResult(SQInfo *pQInfo, char *data, int32_t *size) { +static int32_t doDumpQueryResult(SQInfo *pQInfo, char *data) { // the remained number of retrieved rows, not the interpolated result SQuery *pQuery = pQInfo->runtimeEnv.pQuery; @@ -6225,28 +6195,31 @@ static int32_t doDumpQueryResult(SQInfo *pQInfo, char *data, int32_t *size) { pQuery->sdata[0]->data, strerror(errno)); } } else { - doCopyQueryResultToMsg(pQInfo, pQInfo->rec.pointsRead, data); + doCopyQueryResultToMsg(pQInfo, pQuery->rec.size, data); } - pQInfo->rec.pointsTotal += pQInfo->rec.pointsRead; - dTrace("QInfo:%p current:%d, total:%d", pQInfo, pQInfo->rec.pointsRead, pQInfo->rec.pointsTotal); + pQuery->rec.total += pQuery->rec.size; + dTrace("QInfo:%p current:%d, total:%d", pQInfo, pQuery->rec.size, pQuery->rec.total); - setQueryStatus(pQuery, QUERY_COMPLETED); return TSDB_CODE_SUCCESS; // todo if interpolation exists, the result may be dump to client by several rounds } -static void addToTaskQueue(SQInfo* pQInfo) { - // no error occurred, continue retrieving data - if (pQInfo->code == TSDB_CODE_SUCCESS) { -#ifdef _TD_ARM_ - dTrace("QInfo:%p set query flag, sig:%" PRIu64 ", func:doDumpQueryResult", pQInfo, pQInfo->signature); -#else - dTrace("QInfo:%p set query flag, sig:%" PRIu64 ", func:%s", pQInfo, pQInfo->signature, __FUNCTION__); -#endif - - // todo add to task queue +bool qHasMoreResultsToRetrieve(SQInfo* pQInfo) { + if (pQInfo == NULL || pQInfo->signature != pQInfo || pQInfo->code != TSDB_CODE_SUCCESS) { + return false; + } + + SQuery* pQuery = pQInfo->runtimeEnv.pQuery; + if (Q_STATUS_EQUAL(pQuery->status, QUERY_OVER)) { + return false; + } else if (Q_STATUS_EQUAL(pQuery->status, QUERY_RESBUF_FULL)) { + return true; + } else if (Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) { + return true; + } else { + assert(0); } } @@ -6256,13 +6229,12 @@ int32_t qDumpRetrieveResult(SQInfo *pQInfo, SRetrieveTableRsp** pRsp, int32_t* c } SQuery* pQuery = pQInfo->runtimeEnv.pQuery; - size_t size = getResultSize(pQInfo, &pQInfo->rec.pointsRead); + size_t size = getResultSize(pQInfo, &pQuery->rec.size); *contLen = size + sizeof(SRetrieveTableRsp); // todo handle failed to allocate memory *pRsp = (SRetrieveTableRsp *)rpcMallocCont(*contLen); - - (*pRsp)->numOfRows = htonl(pQInfo->rec.pointsRead); + (*pRsp)->numOfRows = htonl(pQuery->rec.size); int32_t code = pQInfo->code; if (code == TSDB_CODE_SUCCESS) { @@ -6273,16 +6245,13 @@ int32_t qDumpRetrieveResult(SQInfo *pQInfo, SRetrieveTableRsp** pRsp, int32_t* c (*pRsp)->useconds = 0; } - if (pQInfo->rec.pointsRead > 0 && code == TSDB_CODE_SUCCESS) { - code = doDumpQueryResult(pQInfo, (*pRsp)->data, NULL); - - // has more data to return or need next round to execute - addToTaskQueue(pQInfo); - } else if (isQueryKilled(pQuery)) { - code = TSDB_CODE_QUERY_CANCELLED; + if (pQuery->rec.size > 0 && code == TSDB_CODE_SUCCESS) { + code = doDumpQueryResult(pQInfo, (*pRsp)->data); + } else { + code = pQInfo->code; } - if (isQueryKilled(pQuery) || Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) { + if (isQueryKilled(pQInfo) || Q_STATUS_EQUAL(pQuery->status, QUERY_OVER)) { (*pRsp)->completed = 1; // notify no more result to client vnodeFreeQInfo(pQInfo); } diff --git a/src/rpc/src/rpcMain.c b/src/rpc/src/rpcMain.c index 93334333efd47766646a89de69c54304f483d893..1847475a97988a035958eb3b33d9c0aff9d860f8 100755 --- a/src/rpc/src/rpcMain.c +++ b/src/rpc/src/rpcMain.c @@ -32,6 +32,7 @@ #include "rpcServer.h" #include "rpcHead.h" #include "trpc.h" +#include "hash.h" #define RPC_MSG_OVERHEAD (sizeof(SRpcReqContext) + sizeof(SRpcHead) + sizeof(SRpcDigest)) #define rpcHeadFromCont(cont) ((SRpcHead *) (cont - sizeof(SRpcHead))) @@ -258,7 +259,8 @@ void *rpcOpen(SRpcInit *pInit) { } if (pRpc->connType == TAOS_CONN_SERVER) { - pRpc->hash = taosInitStrHash(pRpc->sessions, sizeof(pRpc), taosHashString); +// pRpc->hash = taosInitStrHash(pRpc->sessions, sizeof(pRpc), taosHashString); + pRpc->hash = taosHashInit(pRpc->sessions, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true); if (pRpc->hash == NULL) { tError("%s failed to init string hash", pRpc->label); rpcClose(pRpc); @@ -292,7 +294,8 @@ void rpcClose(void *param) { } } - taosCleanUpStrHash(pRpc->hash); +// taosCleanUpStrHash(pRpc->hash); + taosHashCleanup(pRpc->hash); taosTmrCleanUp(pRpc->tmrCtrl); taosIdPoolCleanUp(pRpc->idPool); rpcCloseConnCache(pRpc->pCache); @@ -507,8 +510,10 @@ static void rpcCloseConn(void *thandle) { if ( pRpc->connType == TAOS_CONN_SERVER) { char hashstr[40] = {0}; - sprintf(hashstr, "%x:%x:%x:%d", pConn->peerIp, pConn->linkUid, pConn->peerId, pConn->connType); - taosDeleteStrHash(pRpc->hash, hashstr); + size_t size = sprintf(hashstr, "%x:%x:%x:%d", pConn->peerIp, pConn->linkUid, pConn->peerId, pConn->connType); +// taosDeleteStrHash(pRpc->hash, hashstr); +// taosHashRemove(pRpc->hash, hashstr, size); + rpcFreeMsg(pConn->pRspMsg); // it may have a response msg saved, but not request msg pConn->pRspMsg = NULL; pConn->inType = 0; @@ -556,10 +561,11 @@ static SRpcConn *rpcAllocateServerConn(SRpcInfo *pRpc, SRecvInfo *pRecv) { char hashstr[40] = {0}; SRpcHead *pHead = (SRpcHead *)pRecv->msg; - sprintf(hashstr, "%x:%x:%x:%d", pRecv->ip, pHead->linkUid, pHead->sourceId, pRecv->connType); + size_t size = sprintf(hashstr, "%x:%x:%x:%d", pRecv->ip, pHead->linkUid, pHead->sourceId, pRecv->connType); // check if it is already allocated - SRpcConn **ppConn = (SRpcConn **)(taosGetStrHashData(pRpc->hash, hashstr)); +// SRpcConn **ppConn = (SRpcConn **)(taosGetStrHashData(pRpc->hash, hashstr)); + SRpcConn **ppConn = (SRpcConn **)(taosHashGet(pRpc->hash, hashstr, size)); if (ppConn) pConn = *ppConn; if (pConn) return pConn; @@ -591,7 +597,9 @@ static SRpcConn *rpcAllocateServerConn(SRpcInfo *pRpc, SRecvInfo *pRecv) { pConn->localPort = (pRpc->localPort + pRpc->index); } - taosAddStrHash(pRpc->hash, hashstr, (char *)&pConn); +// taosAddStrHash(pRpc->hash, hashstr, (char *)&pConn); + taosHashPut(pRpc->hash, hashstr, size, (char *)&pConn, POINTER_BYTES); + tTrace("%s %p, rpc connection is allocated, sid:%d id:%s port:%u", pRpc->label, pConn, sid, pConn->user, pConn->localPort); } diff --git a/src/vnode/detail/src/vnodeQueryImpl.c b/src/vnode/detail/src/vnodeQueryImpl.c index 9eb3fb8b65f8f6299729b77edf4c14776f0d99f9..f3e5cc27b3105a6e5df1b43b934c1f9eed3afa74 100644 --- a/src/vnode/detail/src/vnodeQueryImpl.c +++ b/src/vnode/detail/src/vnodeQueryImpl.c @@ -5312,7 +5312,7 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) { while (1) { // check if query is killed or not set the status of query to pass the status check - if (isQueryKilled(pQuery)) { + if (isQueryKilled(pQInfo)) { setQueryStatus(pQuery, QUERY_NO_DATA_TO_CHECK); return cnt; } @@ -6375,7 +6375,7 @@ void vnodeScanAllData(SQueryRuntimeEnv *pRuntimeEnv) { pRuntimeEnv->scanFlag = REPEAT_SCAN; /* check if query is killed or not */ - if (isQueryKilled(pQuery)) { + if (isQueryKilled(pQInfo)) { setQueryStatus(pQuery, QUERY_NO_DATA_TO_CHECK); return; } diff --git a/src/vnode/detail/src/vnodeQueryProcess.c b/src/vnode/detail/src/vnodeQueryProcess.c index cedb76b4accda46e934694cdbb6de416dcc8f75b..23520f35a168dbf9f80c8c5ff0714ab4557ade61 100644 --- a/src/vnode/detail/src/vnodeQueryProcess.c +++ b/src/vnode/detail/src/vnodeQueryProcess.c @@ -105,7 +105,7 @@ static void queryOnMultiDataCache(SQInfo *pQInfo, SMeterDataInfo *pMeterDataInfo int32_t start = pSupporter->pSidSet->starterPos[groupIdx]; int32_t end = pSupporter->pSidSet->starterPos[groupIdx + 1] - 1; - if (isQueryKilled(pQuery)) { + if (isQueryKilled(pQInfo)) { return; } @@ -276,7 +276,7 @@ static void queryOnMultiDataFiles(SQInfo *pQInfo, SMeterDataInfo *pMeterDataInfo int64_t st = taosGetTimestampUs(); while (1) { - if (isQueryKilled(pQuery)) { + if (isQueryKilled(pQInfo)) { break; } @@ -363,7 +363,7 @@ static void queryOnMultiDataFiles(SQInfo *pQInfo, SMeterDataInfo *pMeterDataInfo int32_t j = QUERY_IS_ASC_QUERY(pQuery) ? 0 : numOfBlocks - 1; for (; j < numOfBlocks && j >= 0; j += step) { - if (isQueryKilled(pQuery)) { + if (isQueryKilled(pQInfo)) { break; } @@ -603,7 +603,7 @@ static void vnodeSTableSeqProcessor(SQInfo *pQInfo) { pSupporter->meterIdx = start; for (int32_t k = start; k <= end; ++k, pSupporter->meterIdx++) { - if (isQueryKilled(pQuery)) { + if (isQueryKilled(pQInfo)) { setQueryStatus(pQuery, QUERY_NO_DATA_TO_CHECK); return; } @@ -630,7 +630,7 @@ static void vnodeSTableSeqProcessor(SQInfo *pQInfo) { pSupporter->subgroupIdx); for (int32_t k = start; k <= end; ++k) { - if (isQueryKilled(pQuery)) { + if (isQueryKilled(pQInfo)) { setQueryStatus(pQuery, QUERY_NO_DATA_TO_CHECK); return; } @@ -681,7 +681,7 @@ static void vnodeSTableSeqProcessor(SQInfo *pQInfo) { while (pSupporter->meterIdx < pSupporter->numOfMeters) { int32_t k = pSupporter->meterIdx; - if (isQueryKilled(pQuery)) { + if (isQueryKilled(pQInfo)) { setQueryStatus(pQuery, QUERY_NO_DATA_TO_CHECK); return; } @@ -958,7 +958,7 @@ static void vnodeMultiMeterQueryProcessor(SQInfo *pQInfo) { doMultiMeterSupplementaryScan(pQInfo); - if (isQueryKilled(pQuery)) { + if (isQueryKilled(pQInfo)) { dTrace("QInfo:%p query killed, abort", pQInfo); return; } @@ -998,7 +998,7 @@ static void vnodeSingleTableFixedOutputProcessor(SQInfo *pQInfo) { vnodeScanAllData(pRuntimeEnv); doFinalizeResult(pRuntimeEnv); - if (isQueryKilled(pQuery)) { + if (isQueryKilled(pQInfo)) { return; } @@ -1033,7 +1033,7 @@ static void vnodeSingleTableMultiOutputProcessor(SQInfo *pQInfo) { vnodeScanAllData(pRuntimeEnv); doFinalizeResult(pRuntimeEnv); - if (isQueryKilled(pQuery)) { + if (isQueryKilled(pQInfo)) { return; } @@ -1087,7 +1087,7 @@ static void vnodeSingleMeterIntervalMainLooper(STableQuerySupportObj *pSupporter initCtxOutputBuf(pRuntimeEnv); vnodeScanAllData(pRuntimeEnv); - if (isQueryKilled(pQuery)) { + if (isQueryKilled(pQInfo)) { return; } @@ -1301,7 +1301,7 @@ void vnodeSingleTableQuery(SSchedMsg *pMsg) { pQInfo->useconds += (taosGetTimestampUs() - st); /* check if query is killed or not */ - if (isQueryKilled(pQuery)) { + if (isQueryKilled(pQInfo)) { dTrace("QInfo:%p query is killed", pQInfo); pQInfo->over = 1; } else { @@ -1345,7 +1345,7 @@ void vnodeMultiMeterQuery(SSchedMsg *pMsg) { /* record the total elapsed time */ pQInfo->useconds += (taosGetTimestampUs() - st); - pQInfo->over = isQueryKilled(pQuery) ? 1 : 0; + pQInfo->over = isQueryKilled(pQInfo) ? 1 : 0; taosInterpoSetStartInfo(&pQInfo->pTableQuerySupporter->runtimeEnv.interpoInfo, pQuery->pointsRead, pQInfo->query.interpoType); diff --git a/src/vnode/tsdb/src/tsdbRead.c b/src/vnode/tsdb/src/tsdbRead.c index 31cdd70f36bcfd556881b2eeab9c9d105982335f..2dc61f51072fac850d697cfad57a6446c7a3eace 100644 --- a/src/vnode/tsdb/src/tsdbRead.c +++ b/src/vnode/tsdb/src/tsdbRead.c @@ -124,6 +124,7 @@ typedef struct STsdbQueryHandle { int32_t tableIndex; bool isFirstSlot; void * qinfo; // query info handle, for debug purpose + SSkipListIterator* memIter; } STsdbQueryHandle; int32_t doAllocateBuf(STsdbQueryHandle *pQueryHandle, int32_t rowsPerFileBlock) { @@ -335,7 +336,6 @@ static int tsdbReadRowsFromCache(SSkipListIterator *pIter, TSKEY maxKey, int max SDataRow row = SL_GET_NODE_DATA(node); if (dataRowKey(row) > maxKey) break; - // Convert row data to column data if (*skey == INT64_MIN) { *skey = dataRowKey(row); @@ -345,13 +345,13 @@ static int tsdbReadRowsFromCache(SSkipListIterator *pIter, TSKEY maxKey, int max int32_t offset = 0; for(int32_t i = 0; i < numOfCols; ++i) { - SColumnInfoEx* pColInfo = taosArrayGet(pHandle->pColumns, 0); + SColumnInfoEx* pColInfo = taosArrayGet(pHandle->pColumns, i); memcpy(pColInfo->pData + numOfRows*pColInfo->info.bytes, dataRowTuple(row) + offset, pColInfo->info.bytes); offset += pColInfo->info.bytes; } numOfRows++; - if (numOfRows > maxRowsToRead) break; + if (numOfRows >= maxRowsToRead) break; }; return numOfRows; @@ -368,8 +368,13 @@ SDataBlockInfo tsdbRetrieveDataBlockInfo(tsdb_query_handle_t *pQueryHandle) { int32_t rows = 0; if (pTable->mem != NULL) { - SSkipListIterator* iter = tSkipListCreateIter(pTable->mem->pData); - rows = tsdbReadRowsFromCache(iter, INT64_MAX, 4000, &skey, &ekey, pHandle); + + // create mem table iterator if it is not created yet + if (pHandle->memIter == NULL) { + pHandle->memIter = tSkipListCreateIter(pTable->mem->pData); + } + + rows = tsdbReadRowsFromCache(pHandle->memIter, INT64_MAX, 2, &skey, &ekey, pHandle); } SDataBlockInfo blockInfo = { @@ -392,7 +397,9 @@ int32_t tsdbRetrieveDataBlockStatisInfo(tsdb_query_handle_t *pQueryHandle, SData } SArray *tsdbRetrieveDataBlock(tsdb_query_handle_t *pQueryHandle, SArray *pIdList) { - + // in case of data in cache, all data has been kept in column info object. + STsdbQueryHandle* pHandle = (STsdbQueryHandle*) pQueryHandle; + return pHandle->pColumns; } int32_t tsdbResetQuery(tsdb_query_handle_t *pQueryHandle, STimeWindow *window, tsdbpos_t position, int16_t order) {} diff --git a/tests/examples/c/demo.c b/tests/examples/c/demo.c index dc16185d9a1b74cad4492c09f48dc3437772e16e..6fcedb8123cb2ea8e6bb6b89a3b7d45c96bcd3f6 100644 --- a/tests/examples/c/demo.c +++ b/tests/examples/c/demo.c @@ -64,6 +64,9 @@ int main(int argc, char *argv[]) { memset(buf, 0, 512); } + taos_close(taos); + + getchar(); return 0; taos_query(taos, "drop database demo");