From e99699519321e6635d38b6963b0074ae561fbbd2 Mon Sep 17 00:00:00 2001 From: hjxilinx Date: Sat, 21 Mar 2020 17:38:32 +0800 Subject: [PATCH] [td-32] add retrieve support --- src/client/src/tscServer.c | 37 +-- src/dnode/src/dnodeRead.c | 74 +++-- src/inc/taosmsg.h | 6 +- src/query/CMakeLists.txt | 2 +- src/query/inc/qextbuffer.h | 3 +- src/query/inc/queryExecutor.h | 30 +- src/query/src/queryExecutor.c | 557 +++++++++++++++++++++------------- src/vnode/tsdb/src/tsdbRead.c | 48 +++ 8 files changed, 491 insertions(+), 266 deletions(-) diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index fa41ff8a30..8058d7f054 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -609,21 +609,21 @@ static int32_t tscEstimateQueryMsgSize(SSqlCmd *pCmd, int32_t clauseIndex) { return size; } -static char *doSerializeTableInfo(SSqlObj *pSql, int32_t numOfTables, int32_t vnodeId, char *pMsg) { +static char *doSerializeTableInfo(SSqlObj *pSql, int32_t numOfTables, int32_t vgId, char *pMsg) { STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, pSql->cmd.clauseIndex, 0); STableMeta * pTableMeta = pTableMetaInfo->pTableMeta; SSuperTableMeta *pMetricMeta = pTableMetaInfo->pMetricMeta; - tscTrace("%p vid:%d, query on %d meters", pSql, vnodeId, numOfTables); + tscTrace("%p vgId:%d, query on %d tables", pSql, vgId, numOfTables); if (UTIL_TABLE_IS_NOMRAL_TABLE(pTableMetaInfo)) { #ifdef _DEBUG_VIEW tscTrace("%p sid:%d, uid:%" PRIu64, pSql, pTableMetaInfo->pTableMeta->sid, pTableMetaInfo->pTableMeta->uid); #endif - STableIdInfo *pTableMetaInfo = (STableIdInfo *)pMsg; - pTableMetaInfo->sid = htonl(pTableMeta->sid); - pTableMetaInfo->uid = htobe64(pTableMeta->uid); - pTableMetaInfo->key = htobe64(tscGetSubscriptionProgress(pSql->pSubscription, pTableMeta->uid)); + STableIdInfo *pTableIdInfo = (STableIdInfo *)pMsg; + pTableIdInfo->sid = htonl(pTableMeta->sid); + pTableIdInfo->uid = htobe64(pTableMeta->uid); + pTableIdInfo->key = htobe64(tscGetSubscriptionProgress(pSql->pSubscription, pTableMeta->uid)); pMsg += sizeof(STableIdInfo); } else { SVnodeSidList *pVnodeSidList = tscGetVnodeSidList(pMetricMeta, pTableMetaInfo->vnodeIndex); @@ -676,6 +676,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { pQueryMsg->uid = pTableMeta->uid; pQueryMsg->numOfTagsCols = 0; + pQueryMsg->vgId = htonl(pTableMeta->vgid); tscTrace("%p queried tables:%d, table id: %s", pSql, 1, pTableMetaInfo->name); } else { // query on super table if (pTableMetaInfo->vnodeIndex < 0) { @@ -693,7 +694,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { } tscTrace("%p query on vid:%d, number of tables:%d", pSql, vnodeId, numOfTables); - pQueryMsg->vnode = htons(vnodeId); + pQueryMsg->vgId = htons(vnodeId); } pQueryMsg->numOfTables = htonl(numOfTables); @@ -761,14 +762,14 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { SColumnBase *pCol = tscColumnBaseInfoGet(&pQueryInfo->colList, i); SSchema * pColSchema = &pSchema[pCol->colIndex.columnIndex]; - if (pCol->colIndex.columnIndex >= tscGetNumOfColumns(pTableMeta) || pColSchema->type < TSDB_DATA_TYPE_BOOL || - pColSchema->type > TSDB_DATA_TYPE_NCHAR) { - tscError("%p vid:%d sid:%d id:%s, column index out of range, numOfColumns:%d, index:%d, column name:%s", pSql, - htons(pQueryMsg->vnode), pTableMeta->sid, pTableMetaInfo->name, tscGetNumOfColumns(pTableMeta), pCol->colIndex, - pColSchema->name); - - return -1; // 0 means build msg failed - } +// if (pCol->colIndex.columnIndex >= tscGetNumOfColumns(pTableMeta) || pColSchema->type < TSDB_DATA_TYPE_BOOL || +// pColSchema->type > TSDB_DATA_TYPE_NCHAR) { +// tscError("%p vid:%d sid:%d id:%s, column index out of range, numOfColumns:%d, index:%d, column name:%s", pSql, +// htons(pQueryMsg->vnode), pTableMeta->sid, pTableMetaInfo->name, tscGetNumOfColumns(pTableMeta), pCol->colIndex, +// pColSchema->name); +// +// return -1; // 0 means build msg failed +// } pQueryMsg->colList[i].colId = htons(pColSchema->colId); pQueryMsg->colList[i].bytes = htons(pColSchema->bytes); @@ -862,7 +863,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { pQueryMsg->colNameLen = htonl(len); // serialize the table info (sid, uid, tags) - pMsg = doSerializeTableInfo(pSql, numOfTables, htons(pQueryMsg->vnode), pMsg); + pMsg = doSerializeTableInfo(pSql, numOfTables, htons(pQueryMsg->vgId), pMsg); // only include the required tag column schema. If a tag is not required, it won't be sent to vnode if (pTableMetaInfo->numOfTags > 0) { @@ -943,7 +944,9 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { tscTrace("%p msg built success,len:%d bytes", pSql, msgLen); pCmd->payloadLen = msgLen; pSql->cmd.msgType = TSDB_MSG_TYPE_QUERY; - + + pQueryMsg->contLen = htonl(msgLen); + assert(msgLen + minMsgSize() <= size); return TSDB_CODE_SUCCESS; diff --git a/src/dnode/src/dnodeRead.c b/src/dnode/src/dnodeRead.c index 8e6d20dba6..b511a6bf08 100644 --- a/src/dnode/src/dnodeRead.c +++ b/src/dnode/src/dnodeRead.c @@ -229,21 +229,23 @@ static void dnodeProcessQueryMsg(SReadMsg *pMsg) { SQueryTableMsg* pQueryTableMsg = (SQueryTableMsg*) pMsg->pCont; SQInfo* pQInfo = NULL; - int32_t ret = qCreateQueryInfo(pQueryTableMsg, &pQInfo); + int32_t code = qCreateQueryInfo(pQueryTableMsg, &pQInfo); - dTrace("query msg is disposed, qInfo:%p", pQueryTableMsg); - SQueryTableRsp *pRsp = (SQueryTableRsp *) rpcMallocCont(sizeof(SQueryTableRsp)); - pRsp->code = 0; + pRsp->code = code; pRsp->qhandle = htobe64((uint64_t) (pQInfo)); SRpcMsg rpcRsp = { .handle = pMsg->rpcMsg.handle, .pCont = pRsp, .contLen = sizeof(SQueryTableRsp), - .code = 0, + .code = code, .msgType = 0 }; + + // do execute query + qTableQuery(pQInfo); + rpcSendResponse(&rpcRsp); } @@ -252,21 +254,51 @@ static void dnodeProcessRetrieveMsg(SReadMsg *pMsg) { void *pQInfo = htobe64(pRetrieve->qhandle); dTrace("retrieve msg is disposed, qInfo:%p", pQInfo); - - assert(pQInfo != NULL); - int32_t contLen = 100; - SRetrieveTableRsp *pRsp = (SRetrieveTableRsp *) rpcMallocCont(contLen); - pRsp->numOfRows = 0; - pRsp->precision = 0; - pRsp->offset = 0; - pRsp->useconds = 0; - - SRpcMsg rpcRsp = { - .handle = pMsg->rpcMsg.handle, - .pCont = pRsp, - .contLen = contLen, - .code = 0, - .msgType = 0 - }; + + int32_t rowSize = 0; + int32_t numOfRows = 0; + int32_t contLen = 0; + + SRpcMsg rpcRsp = {0}; + + int32_t code = qRetrieveQueryResultInfo(pQInfo, &numOfRows, &rowSize); + if (code != TSDB_CODE_SUCCESS) { + contLen = sizeof(SRetrieveTableRsp); + + SRetrieveTableRsp *pRsp = (SRetrieveTableRsp *)rpcMallocCont(contLen); + pRsp->numOfRows = 0; + pRsp->precision = 0; + pRsp->offset = 0; + pRsp->useconds = 0; + + rpcRsp = (SRpcMsg) { + .handle = pMsg->rpcMsg.handle, + .pCont = pRsp, + .contLen = contLen, + .code = code, + .msgType = 0 + }; + + //todo free qinfo + } else { + contLen = 100; + + SRetrieveTableRsp *pRsp = (SRetrieveTableRsp *)rpcMallocCont(contLen); + pRsp->numOfRows = 0; + pRsp->precision = 0; + pRsp->offset = 0; + pRsp->useconds = 0; + + *(int64_t*) pRsp->data = 1000; + + rpcRsp = (SRpcMsg) { + .handle = pMsg->rpcMsg.handle, + .pCont = pRsp, + .contLen = contLen, + .code = code, + .msgType = 0 + }; + } + rpcSendResponse(&rpcRsp); } diff --git a/src/inc/taosmsg.h b/src/inc/taosmsg.h index 330e303f60..d8a052c161 100644 --- a/src/inc/taosmsg.h +++ b/src/inc/taosmsg.h @@ -451,10 +451,10 @@ typedef struct STimeWindow { * the outputCols will be 3 while the numOfCols is 1. */ typedef struct { - int16_t vnode; + int32_t contLen; // msg header + int16_t vgId; + int32_t numOfTables; - uint64_t pSidExtInfo; // table id & tag info ptr, in windows pointer may - uint64_t uid; STimeWindow window; diff --git a/src/query/CMakeLists.txt b/src/query/CMakeLists.txt index 0984aeb585..0e51962f49 100644 --- a/src/query/CMakeLists.txt +++ b/src/query/CMakeLists.txt @@ -11,5 +11,5 @@ INCLUDE_DIRECTORIES(inc) IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM)) AUX_SOURCE_DIRECTORY(src SRC) ADD_LIBRARY(query ${SRC}) - TARGET_LINK_LIBRARIES(query tutil m rt) + TARGET_LINK_LIBRARIES(query tsdb tutil m rt) ENDIF () \ No newline at end of file diff --git a/src/query/inc/qextbuffer.h b/src/query/inc/qextbuffer.h index 32df93d1e5..598b809d92 100644 --- a/src/query/inc/qextbuffer.h +++ b/src/query/inc/qextbuffer.h @@ -124,9 +124,8 @@ typedef struct tTagSchema { typedef struct tSidSet { int32_t numOfSids; int32_t numOfSubSet; - STableIdInfo **pSids; + STableIdInfo **pTableIdList; int32_t * starterPos; // position of each subgroup, generated according to - SColumnModel *pColumnModel; SColumnOrderInfo orderIdx; } tSidSet; diff --git a/src/query/inc/queryExecutor.h b/src/query/inc/queryExecutor.h index 6ff20affb0..8956fb52b1 100644 --- a/src/query/inc/queryExecutor.h +++ b/src/query/inc/queryExecutor.h @@ -25,6 +25,7 @@ #include "taosdef.h" #include "tref.h" #include "tsqlfunction.h" +#include "tarray.h" typedef struct SData { int32_t num; @@ -39,7 +40,7 @@ enum { struct SColumnFilterElem; typedef bool (*__filter_func_t)(struct SColumnFilterElem* pFilter, char* val1, char* val2); -typedef int (*__block_search_fn_t)(char* data, int num, int64_t key, int order); +typedef int32_t (*__block_search_fn_t)(char* data, int32_t num, int64_t key, int32_t order); typedef struct SSqlGroupbyExpr { int16_t tableIndex; @@ -142,7 +143,7 @@ typedef struct SQuery { int32_t pos; int64_t pointsOffset; // the number of points offset to save read data SData** sdata; - + int32_t capacity; SSingleColumnFilterInfo* pFilterInfo; } SQuery; @@ -152,7 +153,6 @@ typedef struct SQueryCostSummary { typedef struct SQueryRuntimeEnv { SResultInfo* resultInfo; // todo refactor to merge with SWindowResInfo SQuery* pQuery; -// void* pTabObj; SData** pInterpoBuf; SQLFunctionCtx* pCtx; int16_t numOfRowsPerPage; @@ -174,16 +174,17 @@ typedef struct SQInfo { TSKEY startTime; int64_t elapsedTime; SResultRec rec; - int pointsReturned; - int pointsInterpo; - int code; // error code to returned to client + int32_t pointsReturned; + int32_t pointsInterpo; + int32_t code; // error code to returned to client + int32_t killed; // denotes if current query is killed sem_t dataReady; - SHashObj* pTableList; // table list + SArray* pTableIdList; // table list SQueryRuntimeEnv runtimeEnv; int32_t subgroupIdx; int32_t offset; /* offset in group result set of subgroup */ - tSidSet* pSidSet; - +// tSidSet* pSidSet; + T_REF_DECLARE() /* * the query is executed position on which meter of the whole list. @@ -210,7 +211,7 @@ int32_t qCreateQueryInfo(SQueryTableMsg* pQueryTableMsg, SQInfo** pQInfo); * query on single table * @param pReadMsg */ -void qTableQuery(void* pReadMsg); +void qTableQuery(SQInfo* pQInfo); /** * query on super table @@ -218,4 +219,13 @@ void qTableQuery(void* pReadMsg); */ void qSuperTableQuery(void* pReadMsg); +/** + * wait for the query completed, and retrieve final results to client + * @param pQInfo + */ +int32_t qRetrieveQueryResultInfo(SQInfo* pQInfo, int32_t *numOfRows, int32_t* rowsize); + + +//int32_t qBuildQueryResult(SQInfo* pQInfo, void* pBuf); + #endif // TDENGINE_QUERYEXECUTOR_H diff --git a/src/query/src/queryExecutor.c b/src/query/src/queryExecutor.c index 8672fb9c5e..0a5abccdd5 100644 --- a/src/query/src/queryExecutor.c +++ b/src/query/src/queryExecutor.c @@ -12,18 +12,19 @@ * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ -#include #include "os.h" +#include "taosmsg.h" #include "hash.h" #include "hashfunc.h" -#include "taosmsg.h" #include "tlog.h" #include "tlosertree.h" #include "tscompression.h" #include "tstatus.h" #include "ttime.h" +#include "qast.h" + #include "qresultBuf.h" #include "queryExecutor.h" #include "queryUtil.h" @@ -52,9 +53,9 @@ /* get the qinfo struct address from the query struct address */ #define GET_COLUMN_BYTES(query, colidx) \ - ((query)->colList[(query)->pSelectExpr[colidx].pBase.colInfo.colIdxInBuf].data.bytes) + ((query)->colList[(query)->pSelectExpr[colidx].pBase.colInfo.colIdxInBuf].info.bytes) #define GET_COLUMN_TYPE(query, colidx) \ - ((query)->colList[(query)->pSelectExpr[colidx].pBase.colInfo.colIdxInBuf].data.type) + ((query)->colList[(query)->pSelectExpr[colidx].pBase.colInfo.colIdxInBuf].info.type) typedef struct SPointInterpoSupporter { int32_t numOfCols; @@ -1505,9 +1506,8 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, SColumnModel pCtx->inputType = pSchema->type; pCtx->inputBytes = pSchema->bytes; } else { - assert(0); - // pCtx->inputType = GET_COLUMN_TYPE(pQuery, i); - // pCtx->inputBytes = GET_COLUMN_BYTES(pQuery, i); + pCtx->inputType = GET_COLUMN_TYPE(pQuery, i); + pCtx->inputBytes = GET_COLUMN_BYTES(pQuery, i); } pCtx->ptsOutputBuf = NULL; @@ -1558,9 +1558,6 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, SColumnModel } setCtxTagColumnInfo(pQuery, pRuntimeEnv->pCtx); - - // for loading block data in memory - // assert(vnodeList[pMeterObj->vnode].cfg.rowsInFileBlock == pMeterObj->pointsPerFileBlock); return TSDB_CODE_SUCCESS; _error_clean: @@ -1893,8 +1890,7 @@ static void setScanLimitationByResultBuffer(SQuery *pQuery) { pQuery->checkBufferInLoop = hasMultioutput ? 1 : 0; } - - assert(0); + // pQuery->pointsOffset = pQuery->pointsToRead; } @@ -2243,10 +2239,10 @@ static int32_t getInitialPageNum(SQInfo *pQInfo) { if (isGroupbyNormalCol(pQuery->pGroupbyExpr)) { num = 128; } else if (isIntervalQuery(pQuery)) { // time window query, allocate one page for each table - size_t s = taosHashGetSize(pQInfo->pTableList); + size_t s = taosArrayGetSize(pQInfo->pTableIdList); num = MAX(s, INITIAL_RESULT_ROWS_VALUE); } else { // for super table query, one page for each subset - num = pQInfo->pSidSet->numOfSubSet; +// num = pQInfo->pSidSet->numOfSubSet; } assert(num > 0); @@ -2290,16 +2286,11 @@ void vnodeQueryFreeQInfoEx(SQInfo *pQInfo) { teardownQueryRuntimeEnv(&pQInfo->runtimeEnv); - if (pQInfo->pTableList != NULL) { - taosHashCleanup(pQInfo->pTableList); - pQInfo->pTableList = NULL; - } - // tSidSetDestroy(&pQInfo->pSidSet); if (pQInfo->pTableDataInfo != NULL) { - size_t num = taosHashGetSize(pQInfo->pTableList); - for (int32_t j = 0; j < num; ++j) { +// size_t num = taosHashGetSize(pQInfo->pTableIdList); + for (int32_t j = 0; j < 0; ++j) { destroyMeterQueryInfo(pQInfo->pTableDataInfo[j].pTableQInfo, pQuery->numOfOutputCols); } } @@ -2337,11 +2328,11 @@ int32_t vnodeSTableQueryPrepare(SQInfo *pQInfo, SQuery *pQuery, void *param) { pQuery->lastKey = pQuery->window.skey; // create runtime environment - SColumnModel *pTagSchemaInfo = pQInfo->pSidSet->pColumnModel; +// SColumnModel *pTagSchemaInfo = pQInfo->pSidSet->pColumnModel; // get one queried meter assert(0); - // SMeterObj *pMeter = getMeterObj(pQInfo->pTableList, pQInfo->pSidSet->pSids[0]->sid); + // SMeterObj *pMeter = getMeterObj(pQInfo->pTableIdList, pQInfo->pSidSet->pTableIdList[0]->sid); pRuntimeEnv->pTSBuf = param; pRuntimeEnv->cur.vnodeIndex = -1; @@ -2388,7 +2379,7 @@ int32_t vnodeSTableQueryPrepare(SQInfo *pQInfo, SQuery *pQuery, void *param) { SArray *sa = taosArrayInit(1, POINTER_BYTES); // for(int32_t i = 0; i < pQInfo->pSidSet->numOfTables; ++i) { - // SMeterObj *p1 = getMeterObj(pQInfo->pTableList, pQInfo->pSidSet->pSids[i]->sid); + // SMeterObj *p1 = getMeterObj(pQInfo->pTableIdList, pQInfo->pSidSet->pTableIdList[i]->sid); // taosArrayPush(sa, &p1); // } @@ -2418,7 +2409,7 @@ int32_t vnodeSTableQueryPrepare(SQInfo *pQInfo, SQuery *pQuery, void *param) { */ void vnodeDecMeterRefcnt(SQInfo *pQInfo) { if (pQInfo != NULL) { - assert(taosHashGetSize(pQInfo->pTableList) >= 1); +// assert(taosHashGetSize(pQInfo->pTableIdList) >= 1); } #if 0 @@ -2429,7 +2420,7 @@ void vnodeDecMeterRefcnt(SQInfo *pQInfo) { } else { int32_t num = 0; for (int32_t i = 0; i < pQInfo->numOfMeters; ++i) { - SMeterObj *pMeter = getMeterObj(pQInfo->pTableList, pQInfo->pSidSet->pSids[i]->sid); + SMeterObj *pMeter = getMeterObj(pQInfo->pTableIdList, pQInfo->pSidSet->pTableIdList[i]->sid); atomic_fetch_sub_32(&(pMeter->numOfQueries), 1); if (pMeter->numOfQueries > 0) { @@ -2592,15 +2583,72 @@ SArray *loadDataBlockOnDemand(SQueryRuntimeEnv *pRuntimeEnv, SDataBlockInfo *pBl return pDataBlock; } +int32_t binarySearchForKey(char *pValue, int num, TSKEY key, int order) { + int firstPos, lastPos, midPos = -1; + int numOfPoints; + TSKEY *keyList; + + if (num <= 0) return -1; + + keyList = (TSKEY *)pValue; + firstPos = 0; + lastPos = num - 1; + + if (order == 0) { + // find the first position which is smaller than the key + while (1) { + if (key >= keyList[lastPos]) return lastPos; + if (key == keyList[firstPos]) return firstPos; + if (key < keyList[firstPos]) return firstPos - 1; + + numOfPoints = lastPos - firstPos + 1; + midPos = (numOfPoints >> 1) + firstPos; + + if (key < keyList[midPos]) { + lastPos = midPos - 1; + } else if (key > keyList[midPos]) { + firstPos = midPos + 1; + } else { + break; + } + } + + } else { + // find the first position which is bigger than the key + while (1) { + if (key <= keyList[firstPos]) return firstPos; + if (key == keyList[lastPos]) return lastPos; + + if (key > keyList[lastPos]) { + lastPos = lastPos + 1; + if (lastPos >= num) + return -1; + else + return lastPos; + } + + numOfPoints = lastPos - firstPos + 1; + midPos = (numOfPoints >> 1) + firstPos; + + if (key < keyList[midPos]) { + lastPos = midPos - 1; + } else if (key > keyList[midPos]) { + firstPos = midPos + 1; + } else { + break; + } + } + } + + return midPos; +} + static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) { -#if 0 SQuery *pQuery = pRuntimeEnv->pQuery; - assert(0); -// __block_search_fn_t searchFn = vnodeSearchKeyFunc[pRuntimeEnv->pTabObj->searchAlgorithm]; - + int64_t cnt = 0; - dTrace("QInfo:%p query start, qrange:%" PRId64 "-%" PRId64 ", lastkey:%" PRId64 ", order:%d", GET_QINFO_ADDR(pQuery), - pQuery->window.skey, pQuery->window.ekey, pQuery->lastKey, pQuery->order.order); + dTrace("QInfo:%p query start, qrange:%" PRId64 "-%" PRId64 ", lastkey:%" PRId64 ", order:%d", + GET_QINFO_ADDR(pRuntimeEnv), pQuery->window.skey, pQuery->window.ekey, pQuery->lastKey, pQuery->order.order); tsdb_query_handle_t pQueryHandle = pRuntimeEnv->pQueryHandle; @@ -2619,17 +2667,15 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) { SWindowResInfo* pWindowResInfo = &pRuntimeEnv->windowResInfo; if (QUERY_IS_ASC_QUERY(pQuery)) { -// doGetAlignedIntervalQueryRangeImpl(pQuery, blockInfo.window.skey, blockInfo.window.skey, -// pQueryHandle->window.ekey, &skey1, &ekey1, &w); + doGetAlignedIntervalQueryRangeImpl(pQuery, blockInfo.window.skey, blockInfo.window.skey, pQuery->window.ekey, &skey1, &ekey1, &w); pWindowResInfo->startTime = w.skey; pWindowResInfo->prevSKey = w.skey; } else { // the start position of the first time window in the endpoint that spreads beyond the queried last timestamp TSKEY winStart = blockInfo.window.ekey - pQuery->intervalTime; -// doGetAlignedIntervalQueryRangeImpl(pQuery, winStart, pQueryHandle->window.ekey, -// blockInfo.window.ekey, &skey1, &ekey1, &w); + doGetAlignedIntervalQueryRangeImpl(pQuery, winStart, pQuery->window.ekey, blockInfo.window.ekey, &skey1, &ekey1, &w); -// pWindowResInfo->startTime = pQueryHandle->window.skey; + pWindowResInfo->startTime = pQuery->window.skey; pWindowResInfo->prevSKey = w.skey; } } @@ -2638,20 +2684,17 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) { SDataStatis *pStatis = NULL; SArray *pDataBlock = loadDataBlockOnDemand(pRuntimeEnv, &blockInfo, &pStatis); -// int32_t forwardStep = tableApplyFunctionsOnBlock(pRuntimeEnv, &blockInfo, pStatis, searchFn, &numOfRes, -// &pRuntimeEnv->windowResInfo, pDataBlock); + int32_t forwardStep = tableApplyFunctionsOnBlock(pRuntimeEnv, &blockInfo, pStatis, binarySearchForKey, &numOfRes, + &pRuntimeEnv->windowResInfo, pDataBlock); // dTrace("QInfo:%p check data block, brange:%" PRId64 "-%" PRId64 ", fileId:%d, slot:%d, pos:%d, rows:%d, checked:%d", // GET_QINFO_ADDR(pQuery), blockInfo.window.skey, blockInfo.window.ekey, pQueryHandle->cur.fileId, pQueryHandle->cur.slot, // pQuery->pos, blockInfo.size, forwardStep); // save last access position -// cnt += forwardStep; - -// if (queryPaused(pQuery, &blockInfo, forwardStep)) { -// if (Q_STATUS_EQUAL(pQuery->status, QUERY_RESBUF_FULL)) { -// break; -// } + cnt += forwardStep; + if (Q_STATUS_EQUAL(pQuery->status, QUERY_RESBUF_FULL)) { + break; } } @@ -2673,8 +2716,6 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) { } return cnt; -#endif - return 0; } static void updatelastkey(SQuery *pQuery, STableQueryInfo *pTableQInfo) { pTableQInfo->lastKey = pQuery->lastKey; } @@ -2935,29 +2976,29 @@ int32_t mergeMetersResultToOneGroups(SQInfo *pQInfo) { int64_t st = taosGetTimestampMs(); int32_t ret = TSDB_CODE_SUCCESS; - while (pQInfo->subgroupIdx < pQInfo->pSidSet->numOfSubSet) { - int32_t start = pQInfo->pSidSet->starterPos[pQInfo->subgroupIdx]; - int32_t end = pQInfo->pSidSet->starterPos[pQInfo->subgroupIdx + 1]; - - assert(0); - // ret = doMergeMetersResultsToGroupRes(pQInfo, pQuery, pRuntimeEnv, pQInfo->pTableDataInfo, start, end); - if (ret < 0) { // not enough disk space to save the data into disk - return -1; - } - - pQInfo->subgroupIdx += 1; - - // this group generates at least one result, return results - if (ret > 0) { - break; - } - - assert(pQInfo->numOfGroupResultPages == 0); - dTrace("QInfo:%p no result in group %d, continue", GET_QINFO_ADDR(pQuery), pQInfo->subgroupIdx - 1); - } - - dTrace("QInfo:%p merge res data into group, index:%d, total group:%d, elapsed time:%lldms", GET_QINFO_ADDR(pQuery), - pQInfo->subgroupIdx - 1, pQInfo->pSidSet->numOfSubSet, taosGetTimestampMs() - st); +// while (pQInfo->subgroupIdx < pQInfo->pSidSet->numOfSubSet) { +// int32_t start = pQInfo->pSidSet->starterPos[pQInfo->subgroupIdx]; +// int32_t end = pQInfo->pSidSet->starterPos[pQInfo->subgroupIdx + 1]; +// +// assert(0); +// // ret = doMergeMetersResultsToGroupRes(pQInfo, pQuery, pRuntimeEnv, pQInfo->pTableDataInfo, start, end); +// if (ret < 0) { // not enough disk space to save the data into disk +// return -1; +// } +// +// pQInfo->subgroupIdx += 1; +// +// // this group generates at least one result, return results +// if (ret > 0) { +// break; +// } +// +// assert(pQInfo->numOfGroupResultPages == 0); +// dTrace("QInfo:%p no result in group %d, continue", GET_QINFO_ADDR(pQuery), pQInfo->subgroupIdx - 1); +// } +// +// dTrace("QInfo:%p merge res data into group, index:%d, total group:%d, elapsed time:%lldms", GET_QINFO_ADDR(pQuery), +// pQInfo->subgroupIdx - 1, pQInfo->pSidSet->numOfSubSet, taosGetTimestampMs() - st); return TSDB_CODE_SUCCESS; } @@ -2972,10 +3013,10 @@ void copyResToQueryResultBuf(SQInfo *pQInfo, SQuery *pQuery) { } // set current query completed - if (pQInfo->numOfGroupResultPages == 0 && pQInfo->subgroupIdx == pQInfo->pSidSet->numOfSubSet) { +// if (pQInfo->numOfGroupResultPages == 0 && pQInfo->subgroupIdx == pQInfo->pSidSet->numOfSubSet) { // pQInfo->tableIndex = pQInfo->pSidSet->numOfTables; - return; - } +// return; +// } } SQueryRuntimeEnv * pRuntimeEnv = &pQInfo->runtimeEnv; @@ -3282,7 +3323,7 @@ void disableFunctForSuppleScan(SQInfo *pQInfo, int32_t order) { } if (isIntervalQuery(pQuery)) { - size_t numOfTables = taosHashGetSize(pQInfo->pTableList); + size_t numOfTables = taosArrayGetSize(pQInfo->pTableIdList); for (int32_t i = 0; i < numOfTables; ++i) { STableQueryInfo *pTableQueryInfo = pQInfo->pTableDataInfo[i].pTableQInfo; @@ -3320,7 +3361,6 @@ void createQueryResultInfo(SQuery *pQuery, SWindowResult *pResultRow, bool isSTa void resetCtxOutputBuf(SQueryRuntimeEnv *pRuntimeEnv) { SQuery *pQuery = pRuntimeEnv->pQuery; - // int32_t rows = pRuntimeEnv->pTabObj->pointsPerFileBlock; for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) { SQLFunctionCtx *pCtx = &pRuntimeEnv->pCtx[i]; @@ -3339,8 +3379,7 @@ void resetCtxOutputBuf(SQueryRuntimeEnv *pRuntimeEnv) { pCtx->ptsOutputBuf = pRuntimeEnv->pCtx[0].aOutputBuf; } - assert(0); - // memset(pQuery->sdata[i]->data, 0, (size_t)pQuery->pSelectExpr[i].resBytes * rows); + memset(pQuery->sdata[i]->data, 0, (size_t)pQuery->pSelectExpr[i].resBytes * pQuery->capacity); } initCtxOutputBuf(pRuntimeEnv); @@ -3884,7 +3923,7 @@ static int32_t getNumOfSubset(SQInfo *pQInfo) { if (isGroupbyNormalCol(pQuery->pGroupbyExpr) || (isIntervalQuery(pQuery))) { totalSubset = numOfClosedTimeWindow(&pQInfo->runtimeEnv.windowResInfo); } else { - totalSubset = pQInfo->pSidSet->numOfSubSet; +// totalSubset = pQInfo->pSidSet->numOfSubSet; } return totalSubset; @@ -4275,7 +4314,7 @@ int32_t vnodeQueryTablePrepare(SQInfo *pQInfo, void *param) { pQuery->window.ekey, pQuery->order.order); sem_post(&pQInfo->dataReady); - // pQInfo->over = 1; + pQInfo->killed = 1; return TSDB_CODE_SUCCESS; } @@ -4298,10 +4337,7 @@ int32_t vnodeQueryTablePrepare(SQInfo *pQInfo, void *param) { taosArrayPush(cols, &pQuery->colList[i]); } - SArray* sa = taosArrayInit(1, sizeof(int16_t)); - taosArrayPush(sa, &pQInfo->pSidSet->pSids[0]->sid); - - pQInfo->runtimeEnv.pQueryHandle = tsdbQueryByTableId(&cond, sa, cols); + pQInfo->runtimeEnv.pQueryHandle = tsdbQueryByTableId(&cond, pQInfo->pTableIdList, cols); SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv; pRuntimeEnv->pQuery = pQuery; @@ -4588,7 +4624,7 @@ static void vnodeSTableSeqProcessor(SQInfo *pQInfo) { #if 0 SQuery* pQuery = pRuntimeEnv->pQuery; -// tSidSet *pSids = pSupporter->pSidSet; +// tSidSet *pTableIdList = pSupporter->pSidSet; int32_t vid = getMeterObj(pSupporter->pMetersHashTable, pMeterSidExtInfo[0]->sid)->vnode; @@ -4597,12 +4633,12 @@ static void vnodeSTableSeqProcessor(SQInfo *pQInfo) { assert(pQuery->limit.offset == 0 && pQuery->limit.limit != 0); - while (pSupporter->subgroupIdx < pSids->numOfSubSet) { - int32_t start = pSids->starterPos[pSupporter->subgroupIdx]; - int32_t end = pSids->starterPos[pSupporter->subgroupIdx + 1] - 1; + while (pSupporter->subgroupIdx < pTableIdList->numOfSubSet) { + int32_t start = pTableIdList->starterPos[pSupporter->subgroupIdx]; + int32_t end = pTableIdList->starterPos[pSupporter->subgroupIdx + 1] - 1; if (isFirstLastRowQuery(pQuery)) { - dTrace("QInfo:%p last_row query on vid:%d, numOfGroups:%d, current group:%d", pQInfo, vid, pSids->numOfSubSet, + dTrace("QInfo:%p last_row query on vid:%d, numOfGroups:%d, current group:%d", pQInfo, vid, pTableIdList->numOfSubSet, pSupporter->subgroupIdx); TSKEY key = -1; @@ -4635,7 +4671,7 @@ static void vnodeSTableSeqProcessor(SQInfo *pQInfo) { int64_t num = doCheckMetersInGroup(pQInfo, index, start); assert(num >= 0); } else { - dTrace("QInfo:%p interp query on vid:%d, numOfGroups:%d, current group:%d", pQInfo, vid, pSids->numOfSubSet, + dTrace("QInfo:%p interp query on vid:%d, numOfGroups:%d, current group:%d", pQInfo, vid, pTableIdList->numOfSubSet, pSupporter->subgroupIdx); for (int32_t k = start; k <= end; ++k) { @@ -4680,7 +4716,7 @@ static void vnodeSTableSeqProcessor(SQInfo *pQInfo) { } } - if (pSupporter->meterIdx >= pSids->numOfTables) { + if (pSupporter->meterIdx >= pTableIdList->numOfTables) { return; } @@ -4833,7 +4869,7 @@ static void vnodeSTableSeqProcessor(SQInfo *pQInfo) { dTrace( "QInfo %p vid:%d, numOfMeters:%d, index:%d, numOfGroups:%d, %d points returned, totalRead:%d totalReturn:%d," "next skey:%" PRId64 ", offset:%" PRId64, - pQInfo, vid, pSids->numOfTables, pSupporter->meterIdx, pSids->numOfSubSet, pQuery->pointsRead, pQInfo->pointsRead, + pQInfo, vid, pTableIdList->numOfTables, pSupporter->meterIdx, pTableIdList->numOfSubSet, pQuery->pointsRead, pQInfo->pointsRead, pQInfo->pointsReturned, pQuery->skey, pQuery->limit.offset); #endif } @@ -4878,7 +4914,7 @@ static void doOrderedScan(SQInfo *pQInfo) { static void setupMeterQueryInfoForSupplementQuery(SQInfo *pQInfo) { SQuery *pQuery = pQInfo->runtimeEnv.pQuery; - int32_t num = taosHashGetSize(pQInfo->pTableList); + int32_t num = taosHashGetSize(pQInfo->pTableIdList); for (int32_t i = 0; i < num; ++i) { // STableQueryInfo *pTableQueryInfo = pSupporter->pMeterDataInfo[i].pTableQInfo; // changeMeterQueryInfoForSuppleQuery(pQuery, pTableQueryInfo, pSupporter->rawSKey, pSupporter->rawEKey); @@ -5017,7 +5053,7 @@ static void vnodeMultiMeterQueryProcessor(SQInfo *pQInfo) { * select count(*)/top(field,k)/avg(field name) from table_name [where ts>now-1a]; * select count(*) from table_name group by status_column; */ -static void vnodeSingleTableFixedOutputProcessor(SQInfo *pQInfo) { +static void tableFixedOutputProcessor(SQInfo *pQInfo) { SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv; SQuery * pQuery = pRuntimeEnv->pQuery; @@ -5044,7 +5080,7 @@ static void vnodeSingleTableFixedOutputProcessor(SQInfo *pQInfo) { pQInfo->rec.pointsRead = pQuery->rec.pointsRead; } -static void vnodeSingleTableMultiOutputProcessor(SQInfo *pQInfo) { +static void tableMultiOutputProcessor(SQInfo *pQInfo) { #if 0 SQuery * pQuery = &pQInfo->query; SMeterObj *pMeterObj = pQInfo->pObj; @@ -5148,7 +5184,7 @@ static void vnodeSingleMeterIntervalMainLooper(SQueryRuntimeEnv *pRuntimeEnv) { } /* handle time interval query on single table */ -static void vnodeSingleTableIntervalProcessor(SQInfo *pQInfo) { +static void tableIntervalProcessor(SQInfo *pQInfo) { // STable *pMeterObj = pQInfo->pObj; SQueryRuntimeEnv *pRuntimeEnv = &(pQInfo->runtimeEnv); @@ -5212,31 +5248,18 @@ static void vnodeSingleTableIntervalProcessor(SQInfo *pQInfo) { // pQInfo->pointsRead - pQInfo->pointsInterpo, pQInfo->pointsInterpo, pQInfo->pointsReturned); } -void qTableQuery(void *pReadMsg) { - // SQInfo *pQInfo = (SQInfo *)pReadMsg->ahandle; - -#if 0 - if (pQInfo == NULL) { - dTrace("%p freed abort query", pQInfo); +void qTableQuery(SQInfo* pQInfo) { + assert(pQInfo != NULL); + + if (pQInfo->killed) { + dTrace("QInfo:%p it is already killed, abort", pQInfo); return; } -// if (pQInfo->killed) { -// dTrace("QInfo:%p it is already killed, abort", pQInfo); -// vnodeDecRefCount(pQInfo); -// -// return; -// } - -// assert(pQInfo->refCount >= 1); - SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv; - SQuery * pQuery = &pRuntimeEnv->pQuery; - -// assert(pRuntimeEnv->pMeterObj == pMeterObj); + SQuery *pQuery = pRuntimeEnv->pQuery; -// dTrace("vid:%d sid:%d id:%s, query thread is created, numOfQueries:%d, QInfo:%p", pMeterObj->vnode, pMeterObj->sid, -// pMeterObj->meterId, pMeterObj->numOfQueries, pQInfo); +// dTrace("vid:%d sid:%d id:%s, query thread is created, numOfQueries:%d, QInfo:%p", pQInfo); if (vnodeHasRemainResults(pQInfo)) { /* @@ -5254,15 +5277,9 @@ void qTableQuery(void *pReadMsg) { pQInfo->pointsInterpo += numOfInterpo; pQInfo->rec.pointsRead += pQuery->rec.pointsRead; -// dTrace( -// "QInfo:%p vid:%d sid:%d id:%s, %d points returned %d points interpo, totalRead:%d totalInterpo:%d " -// "totalReturn:%d", -// pQInfo, pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pQuery->pointsRead, numOfInterpo, -// pQInfo->pointsRead, pQInfo->pointsInterpo, pQInfo->pointsReturned); - +// dTrace("QInfo:%p %d points returned %d points interpo, totalRead:%d totalInterpo:%d totalReturn:%d", +// pQInfo, pQuery->pointsRead, numOfInterpo, pQInfo->pointsRead, pQInfo->pointsInterpo, pQInfo->pointsReturned); sem_post(&pQInfo->dataReady); -// vnodeDecRefCount(pQInfo); - return; } @@ -5287,26 +5304,20 @@ void qTableQuery(void *pReadMsg) { // pQInfo->pointsInterpo, pQInfo->pointsReturned); sem_post(&pQInfo->dataReady); -// vnodeDecRefCount(pQInfo); - return; } } } - assert(0); -// pQInfo->over = 1; // dTrace("QInfo:%p vid:%d sid:%d id:%s, query over, %d points are returned", pQInfo, pMeterObj->vnode, pMeterObj->sid, // pMeterObj->meterId, pQInfo->pointsRead); // vnodePrintQueryStatistics(pSupporter); sem_post(&pQInfo->dataReady); - -// vnodeDecRefCount(pQInfo); return; } - /* number of points returned during this query */ + // number of points returned during this query pQuery->rec.pointsRead = 0; int64_t st = taosGetTimestampUs(); @@ -5314,14 +5325,15 @@ void qTableQuery(void *pReadMsg) { // group by normal column, sliding window query, interval query are handled by interval query processor if (pQuery->intervalTime != 0 || isGroupbyNormalCol(pQuery->pGroupbyExpr)) { // interval (down sampling operation) // assert(pQuery->checkBufferInLoop == 0 && pQuery->pointsOffset == pQuery->pointsToRead); - vnodeSingleTableIntervalProcessor(pQInfo); + tableIntervalProcessor(pQInfo); } else { if (isFixedOutputQuery(pQuery)) { assert(pQuery->checkBufferInLoop == 0); - vnodeSingleTableFixedOutputProcessor(pQInfo); + + tableFixedOutputProcessor(pQInfo); } else { // diff/add/multiply/subtract/division assert(pQuery->checkBufferInLoop == 1); - vnodeSingleTableMultiOutputProcessor(pQInfo); + tableMultiOutputProcessor(pQInfo); } } @@ -5339,7 +5351,6 @@ void qTableQuery(void *pReadMsg) { sem_post(&pQInfo->dataReady); // vnodeDecRefCount(pQInfo); -#endif } void qSuperTableQuery(void *pReadMsg) { @@ -5449,17 +5460,12 @@ static int32_t validateQueryMeterMsg(SQueryTableMsg *pQueryTableMsg) { return 0; } -int32_t convertQueryTableMsg(SQueryTableMsg *pQueryTableMsg) { - pQueryTableMsg->vnode = htons(pQueryTableMsg->vnode); +static int32_t convertQueryTableMsg(SQueryTableMsg *pQueryTableMsg, SArray** pTableIdList) { + pQueryTableMsg->vgId = htons(pQueryTableMsg->vgId); pQueryTableMsg->numOfTables = htonl(pQueryTableMsg->numOfTables); - -#ifdef TSKEY32 - pQueryTableMsg->skey = htonl(pQueryTableMsg->skey); - pQueryTableMsg->ekey = htonl(pQueryTableMsg->ekey); -#else + pQueryTableMsg->window.skey = htobe64(pQueryTableMsg->window.skey); pQueryTableMsg->window.ekey = htobe64(pQueryTableMsg->window.ekey); -#endif pQueryTableMsg->order = htons(pQueryTableMsg->order); pQueryTableMsg->orderColId = htons(pQueryTableMsg->orderColId); @@ -5477,17 +5483,17 @@ int32_t convertQueryTableMsg(SQueryTableMsg *pQueryTableMsg) { pQueryTableMsg->limit = htobe64(pQueryTableMsg->limit); pQueryTableMsg->offset = htobe64(pQueryTableMsg->offset); + pQueryTableMsg->tsOffset = htonl(pQueryTableMsg->tsOffset); pQueryTableMsg->tsLen = htonl(pQueryTableMsg->tsLen); pQueryTableMsg->tsNumOfBlocks = htonl(pQueryTableMsg->tsNumOfBlocks); pQueryTableMsg->tsOrder = htonl(pQueryTableMsg->tsOrder); - + // query msg safety check if (validateQueryMeterMsg(pQueryTableMsg) != 0) { return TSDB_CODE_INVALID_QUERY_MSG; } - // STableIdInfo **pSids = NULL; char *pMsg = (char *)(pQueryTableMsg->colList) + sizeof(SColumnInfo) * pQueryTableMsg->numOfCols; for (int32_t col = 0; col < pQueryTableMsg->numOfCols; ++col) { @@ -5584,25 +5590,28 @@ int32_t convertQueryTableMsg(SQueryTableMsg *pQueryTableMsg) { pQueryTableMsg->colNameList = (int64_t)pMsg; pMsg += pQueryTableMsg->colNameLen; } - - STableIdInfo **pSids = (STableIdInfo **)calloc(pQueryTableMsg->numOfTables, sizeof(STableIdInfo *)); - pQueryTableMsg->pSidExtInfo = (uint64_t)pSids; - - pSids[0] = (STableIdInfo *)pMsg; - pSids[0]->sid = htonl(pSids[0]->sid); - pSids[0]->uid = htobe64(pSids[0]->uid); - pSids[0]->key = htobe64(pSids[0]->key); - + + *pTableIdList = taosArrayInit(pQueryTableMsg->numOfTables, sizeof(STableIdInfo)); + + STableIdInfo* pTableIdInfo = (STableIdInfo *)pMsg; + pTableIdInfo->sid = htonl(pTableIdInfo->sid); + pTableIdInfo->uid = htobe64(pTableIdInfo->uid); + pTableIdInfo->key = htobe64(pTableIdInfo->key); + + taosArrayPush(*pTableIdList, pTableIdInfo); + pMsg += sizeof(STableIdInfo); + for (int32_t j = 1; j < pQueryTableMsg->numOfTables; ++j) { - pSids[j] = (STableIdInfo *)((char *)pSids[j - 1] + sizeof(STableIdInfo) + pQueryTableMsg->tagLength); - pSids[j]->sid = htonl(pSids[j]->sid); - pSids[j]->uid = htobe64(pSids[j]->uid); - pSids[j]->key = htobe64(pSids[j]->key); + pTableIdInfo = (STableIdInfo *)pMsg; + + pTableIdInfo->sid = htonl(pTableIdInfo->sid); + pTableIdInfo->uid = htobe64(pTableIdInfo->uid); + pTableIdInfo->key = htobe64(pTableIdInfo->key); + + taosArrayPush(*pTableIdList, pTableIdInfo); + pMsg += sizeof(STableIdInfo); } - pMsg = (char *)pSids[pQueryTableMsg->numOfTables - 1]; - pMsg += sizeof(STableIdInfo) + pQueryTableMsg->tagLength; - if (pQueryTableMsg->numOfGroupCols > 0 || pQueryTableMsg->numOfTagsCols > 0) { // group by tag columns pQueryTableMsg->pTagSchema = (uint64_t)pMsg; SSchema *pTagSchema = (SSchema *)pQueryTableMsg->pTagSchema; @@ -5698,9 +5707,10 @@ static int32_t buildAirthmeticExprFromMsg(SSqlFunctionExpr *pExpr, SQueryTableMs return TSDB_CODE_SUCCESS; } -int32_t createSqlFunctionExprFromMsg(SQueryTableMsg *pQueryMsg, SSqlFunctionExpr **pSqlFuncExpr) { +static int32_t createSqlFunctionExprFromMsg(SQueryTableMsg *pQueryMsg, SSqlFunctionExpr **pSqlFuncExpr) { *pSqlFuncExpr = NULL; - + int32_t code = TSDB_CODE_SUCCESS; + SSqlFunctionExpr *pExprs = (SSqlFunctionExpr *)calloc(1, sizeof(SSqlFunctionExpr) * pQueryMsg->numOfOutputCols); if (pExprs == NULL) { tfree(pQueryMsg->pSqlFuncExprs); @@ -5732,24 +5742,24 @@ int32_t createSqlFunctionExprFromMsg(SQueryTableMsg *pQueryMsg, SSqlFunctionExpr bytes = pTagSchema[pColumnIndexExInfo->colIdx].bytes; } else { // parse the arithmetic expression - // if (pExprs[i].pBase.functionId == TSDB_FUNC_ARITHM) { - // *code = buildAirthmeticExprFromMsg(&pExprs[i], pQueryMsg); - // - // if (*code != TSDB_CODE_SUCCESS) { - // tfree(pExprs); - // return NULL; - // } - // - // type = TSDB_DATA_TYPE_DOUBLE; - // bytes = tDataTypeDesc[type].nSize; - // } else { // parse the normal column - // int32_t j = getColumnIndexInSource(pQueryMsg, &pExprs[i].pBase); - // assert(j < pQueryMsg->numOfCols); - // - // SColumnInfo* pCol = &pQueryMsg->colList[j]; - // type = pCol->type; - // bytes = pCol->bytes; - // } + if (pExprs[i].pBase.functionId == TSDB_FUNC_ARITHM) { + code = buildAirthmeticExprFromMsg(&pExprs[i], pQueryMsg); + + if (code != TSDB_CODE_SUCCESS) { + tfree(pExprs); + return code; + } + + type = TSDB_DATA_TYPE_DOUBLE; + bytes = tDataTypeDesc[type].nSize; + } else { // parse the normal column + int32_t j = getColumnIndexInSource(pQueryMsg, &pExprs[i].pBase); + assert(j < pQueryMsg->numOfCols); + + SColumnInfo* pCol = &pQueryMsg->colList[j]; + type = pCol->type; + bytes = pCol->bytes; + } } int32_t param = pExprs[i].pBase.arg[0].argValue.i64; @@ -5792,7 +5802,7 @@ int32_t createSqlFunctionExprFromMsg(SQueryTableMsg *pQueryMsg, SSqlFunctionExpr return TSDB_CODE_SUCCESS; } -SSqlGroupbyExpr *createGroupbyExprFromMsg(SQueryTableMsg *pQueryMsg, int32_t *code) { +static SSqlGroupbyExpr *createGroupbyExprFromMsg(SQueryTableMsg *pQueryMsg, int32_t *code) { if (pQueryMsg->numOfGroupCols == 0) { return NULL; } @@ -5815,7 +5825,7 @@ SSqlGroupbyExpr *createGroupbyExprFromMsg(SQueryTableMsg *pQueryMsg, int32_t *co return pGroupbyExpr; } -int32_t vnodeCreateFilterInfo(void *pQInfo, SQuery *pQuery) { +static int32_t vnodeCreateFilterInfo(void *pQInfo, SQuery *pQuery) { for (int32_t i = 0; i < pQuery->numOfCols; ++i) { if (pQuery->colList[i].info.numOfFilters > 0) { pQuery->numOfFilterCols++; @@ -5899,7 +5909,7 @@ int32_t vnodeCreateFilterInfo(void *pQInfo, SQuery *pQuery) { return TSDB_CODE_SUCCESS; } -static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGroupbyExpr, SSqlFunctionExpr *pExprs) { +static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGroupbyExpr, SSqlFunctionExpr *pExprs, SArray* pTableIdList) { SQInfo *pQInfo = (SQInfo *)calloc(1, sizeof(SQInfo)); if (pQInfo == NULL) { goto _clean_memory; @@ -5934,6 +5944,21 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGrou if (pQuery->colList == NULL) { goto _clean_memory; } + + for (int16_t i = 0; i < numOfCols; ++i) { + pQuery->colList[i].info = pQueryMsg->colList[i]; +// SColumnInfo *pColInfo = &pQuery->colList[i].data; +// pColInfo->filters = NULL; +// if (colList[i].numOfFilters > 0) { +// pColInfo->filters = calloc(1, colList[i].numOfFilters * sizeof(SColumnFilterInfo)); +// +// for (int32_t j = 0; j < colList[i].numOfFilters; ++j) { +// tscColumnFilterInfoCopy(&pColInfo->filters[j], &colList[i].filters[j]); +// } +// } else { +// pQuery->colList[i].data.filters = NULL; +// } + } // calculate the result row size for (int16_t col = 0; col < numOfOutputCols; ++col) { @@ -5952,13 +5977,13 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGrou goto _clean_memory; } + // set the output buffer capacity + pQuery->capacity = 4096; for (int32_t col = 0; col < pQuery->numOfOutputCols; ++col) { assert(pExprs[col].interResBytes >= pExprs[col].resBytes); // allocate additional memory for interResults that are usually larger then final results - // size_t size = (pQInfo->query.pointsToRead + 1) * pExprs[col].resBytes + pExprs[col].interResBytes + - // sizeof(SData); - size_t size = 1000; + size_t size = (pQuery->capacity + 1) * pExprs[col].resBytes + pExprs[col].interResBytes + sizeof(SData); pQuery->sdata[col] = (SData *)calloc(1, size); if (pQuery->sdata[col] == NULL) { goto _clean_memory; @@ -5977,10 +6002,10 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGrou // to make sure third party won't overwrite this structure pQInfo->signature = (uint64_t)pQInfo; + pQInfo->pTableIdList = pTableIdList; + pQuery->pos = -1; - - // dTrace("vid:%d sid:%d meterId:%s, QInfo is allocated:%p", pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, - // pQInfo); + // dTrace("vid:%d sid:%d meterId:%s, QInfo is allocated:%p", pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pQInfo); return pQInfo; @@ -6005,28 +6030,107 @@ _clean_memory: return NULL; } -int32_t createQInfo(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGroupbyExpr, SSqlFunctionExpr *pSqlExprs, - SQInfo **pQInfo) { - SQuery *pQuery; +bool isQInfoValid(void *param) { + SQInfo *pQInfo = (SQInfo *)param; + if (pQInfo == NULL) { + return false; + } + + /* + * pQInfo->signature may be changed by another thread, so we assign value of signature + * into local variable, then compare by using local variable + */ + uint64_t sig = pQInfo->signature; + return (sig == (uint64_t)pQInfo); +} + +void vnodeFreeQInfo(SQInfo* pQInfo, bool decQueryRef) { + if (!isQInfoValid(pQInfo)) { + return; + } + + pQInfo->killed = 1; + dTrace("QInfo:%p start to free SQInfo", pQInfo); + + if (decQueryRef) { + vnodeDecMeterRefcnt(pQInfo); + } + + SQuery *pQuery = pQInfo->runtimeEnv.pQuery; + + for (int col = 0; col < pQuery->numOfOutputCols; ++col) { + tfree(pQuery->sdata[col]); + } + +// for (int col = 0; col < pQuery->numOfCols; ++col) { +// vnodeFreeColumnInfo(&pQuery->colList[col].data); +// } +// +// if (pQuery->colList[0].colIdx != PRIMARYKEY_TIMESTAMP_COL_INDEX) { +// tfree(pQuery->tsData); +// } + + sem_destroy(&(pQInfo->dataReady)); + vnodeQueryFreeQInfoEx(pQInfo); + + for (int32_t i = 0; i < pQuery->numOfFilterCols; ++i) { + SSingleColumnFilterInfo *pColFilter = &pQuery->pFilterInfo[i]; + if (pColFilter->numOfFilters > 0) { + tfree(pColFilter->pFilters); + } + } + + tfree(pQuery->pFilterInfo); + tfree(pQuery->colList); + tfree(pQuery->sdata); + + if (pQuery->pSelectExpr != NULL) { + for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) { + SSqlBinaryExprInfo *pBinExprInfo = &pQuery->pSelectExpr[i].binExprInfo; + + if (pBinExprInfo->numOfCols > 0) { + tfree(pBinExprInfo->pReqColumns); + tSQLBinaryExprDestroy(&pBinExprInfo->pBinExpr, NULL); + } + } + + tfree(pQuery->pSelectExpr); + } + + if (pQuery->defaultVal != NULL) { + tfree(pQuery->defaultVal); + } + + tfree(pQuery->pGroupbyExpr); + tfree(pQuery); + +// dTrace("QInfo:%p vid:%d sid:%d meterId:%s, QInfo is freed", pQInfo, pObj->vnode, pObj->sid, pObj->meterId); + + //destroy signature, in order to avoid the query process pass the object safety check + memset(pQInfo, 0, sizeof(SQInfo)); + tfree(pQInfo); +} + +static int32_t createQInfo(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGroupbyExpr, SSqlFunctionExpr *pSqlExprs, + SArray* pTableIdList, SQInfo **pQInfo) { int32_t code = TSDB_CODE_SUCCESS; - (*pQInfo) = createQInfoImpl(pQueryMsg, pGroupbyExpr, pSqlExprs); + (*pQInfo) = createQInfoImpl(pQueryMsg, pGroupbyExpr, pSqlExprs, pTableIdList); if (pQInfo == NULL) { code = TSDB_CODE_SERV_OUT_OF_MEMORY; goto _error; } - pQuery = (*pQInfo)->runtimeEnv.pQuery; + SQuery* pQuery = (*pQInfo)->runtimeEnv.pQuery; dTrace("qmsg:%p create QInfo:%p, QInfo created", pQueryMsg, pQInfo); - STableIdInfo **pSids = (STableIdInfo **)pQueryMsg->pSidExtInfo; - if (pSids != NULL && pSids[0]->key > 0) { - pQuery->window.skey = pSids[0]->key; - } else { - pQuery->window.skey = pQueryMsg->window.skey; - } - +// STableIdInfo **pTableIdList = (STableIdInfo **)pQueryMsg->pSidExtInfo; +// if (pTableIdList != NULL && pTableIdList[0]->key > 0) { +// pQuery->window.skey = pTableIdList[0]->key; +// } else { + pQuery->window.skey = pQueryMsg->window.skey; pQuery->window.ekey = pQueryMsg->window.ekey; + pQuery->lastKey = pQuery->window.skey; if (sem_init(&(*pQInfo)->dataReady, 0, 0) != 0) { @@ -6038,10 +6142,6 @@ int32_t createQInfo(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGroupbyExpr, SS vnodeParametersSafetyCheck(pQuery); - (*pQInfo)->pTableList = taosHashInit(pQueryMsg->numOfTables, taosIntHash_32, false); - // taosHashPut(pSupporter->pMetersHashTable, (const char *)&pMetersObj[0]->sid, sizeof(pMeterObj[0].sid), - // (char *)&pMetersObj[0], POINTER_BYTES); - STSBuf *pTSBuf = NULL; if (pQueryMsg->tsLen > 0) { // open new file to save the result char *tsBlock = (char *)pQueryMsg + pQueryMsg->tsOffset; @@ -6062,13 +6162,11 @@ int32_t createQInfo(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGroupbyExpr, SS // dTrace("QInfo:%p set query flag and prepare runtime environment completed, ref:%d, wait for schedule", pQInfo, // pQInfo->refCount); - - // taosScheduleTask(queryQhandle, &schedMsg); return code; _error: // table query ref will be decrease during error handling - // vnodeFreeQInfo(pQInfo, false); + vnodeFreeQInfo(*pQInfo, false); return code; } @@ -6076,7 +6174,8 @@ int32_t qCreateQueryInfo(SQueryTableMsg *pQueryTableMsg, SQInfo **pQInfo) { assert(pQueryTableMsg != NULL); int32_t code = TSDB_CODE_SUCCESS; - if ((code = convertQueryTableMsg(pQueryTableMsg)) != TSDB_CODE_SUCCESS) { + SArray* pTableIdList = NULL; + if ((code = convertQueryTableMsg(pQueryTableMsg, &pTableIdList)) != TSDB_CODE_SUCCESS) { return code; } @@ -6087,7 +6186,7 @@ int32_t qCreateQueryInfo(SQueryTableMsg *pQueryTableMsg, SQInfo **pQInfo) { } // todo check vnode status - if (pQueryTableMsg->pSidExtInfo == 0) { + if (pTableIdList == NULL || taosArrayGetSize(pTableIdList) == 0) { dError("qmsg:%p, SQueryTableMsg wrong format", pQueryTableMsg); code = TSDB_CODE_INVALID_QUERY_MSG; goto _query_over; @@ -6106,10 +6205,14 @@ int32_t qCreateQueryInfo(SQueryTableMsg *pQueryTableMsg, SQInfo **pQInfo) { if (QUERY_IS_STABLE_QUERY(pQueryTableMsg->queryType)) { // pObj->qhandle = vnodeQueryOnMultiMeters(pMeterObjList, pGroupbyExpr, pExprs, pQueryTableMsg, &code); } else { - code = createQInfo(pQueryTableMsg, pGroupbyExpr, pExprs, pQInfo); + code = createQInfo(pQueryTableMsg, pGroupbyExpr, pExprs, pTableIdList, pQInfo); } _query_over: + if (code != TSDB_CODE_SUCCESS) { + taosArrayDestroy(pTableIdList); + } + // if failed to add ref for all meters in this query, abort current query // if (code != TSDB_CODE_SUCCESS) { // vnodeDecQueryRefCount(pQueryTableMsg, pMeterObjList, incNumber); @@ -6126,4 +6229,34 @@ _query_over: // // atomic_fetch_add_32(&vnodeSelectReqNum, 1); return TSDB_CODE_SUCCESS; -} \ No newline at end of file +} + +int32_t qRetrieveQueryResultInfo(SQInfo* pQInfo, int32_t *numOfRows, int32_t* rowsize) { + if (pQInfo == NULL || !isQInfoValid(pQInfo)) { + return TSDB_CODE_INVALID_QHANDLE; + } + + if (pQInfo->killed) { + dTrace("QInfo:%p query is killed, code:%d", pQInfo, pQInfo->code); + if (pQInfo->code == TSDB_CODE_SUCCESS) { + return TSDB_CODE_QUERY_CANCELLED; + } else { // in case of not TSDB_CODE_SUCCESS, return the code to client + return abs(pQInfo->code); + } + } + + sem_wait(&pQInfo->dataReady); + + SQuery* pQuery = pQInfo->runtimeEnv.pQuery; + +// *numOfRows = pQInfo->rec.pointsRead; +// *rowsize = pQuery->rowSize; + *numOfRows = 1; + +// dTrace("QInfo:%p, retrieve data info completed, precision:%d, rowsize:%d, rows:%d, code:%d", pQInfo, *timePrec, +// *rowsize, *numOfRows, pQInfo->code); + + if (pQInfo->code < 0) { // less than 0 means there are error existed. + return -pQInfo->code; + } +} diff --git a/src/vnode/tsdb/src/tsdbRead.c b/src/vnode/tsdb/src/tsdbRead.c index 36593e719a..a62299c45f 100644 --- a/src/vnode/tsdb/src/tsdbRead.c +++ b/src/vnode/tsdb/src/tsdbRead.c @@ -16,3 +16,51 @@ #include "os.h" #include "tsdb.h" +tsdb_query_handle_t *tsdbQueryByTableId(STsdbQueryCond *pCond, SArray *idList, SArray *pColumnInfo) { + +} + +bool tsdbNextDataBlock(tsdb_query_handle_t *pQueryHandle) { + return false; +} + +SDataBlockInfo tsdbRetrieveDataBlockInfo(tsdb_query_handle_t *pQueryHandle) { + +} + +int32_t tsdbRetrieveDataBlockStatisInfo(tsdb_query_handle_t *pQueryHandle, SDataStatis **pBlockStatis) { + +} + +SArray *tsdbRetrieveDataBlock(tsdb_query_handle_t *pQueryHandle, SArray *pIdList) { + +} + +int32_t tsdbResetQuery(tsdb_query_handle_t *pQueryHandle, STimeWindow* window, tsdbpos_t position, int16_t order) { + +} + +int32_t tsdbDataBlockSeek(tsdb_query_handle_t *pQueryHandle, tsdbpos_t pos) { + +} + +tsdbpos_t tsdbDataBlockTell(tsdb_query_handle_t *pQueryHandle) { + return NULL; +} + +SArray *tsdbRetrieveDataRow(tsdb_query_handle_t *pQueryHandle, SArray *pIdList, SQueryRowCond *pCond) { + +} + +tsdb_query_handle_t *tsdbQueryFromTagConds(STsdbQueryCond *pCond, int16_t stableId, const char *pTagFilterStr) { + +} + +STableIDList *tsdbGetTableList(tsdb_query_handle_t *pQueryHandle) { + +} + +STableIDList *tsdbQueryTableList(int16_t stableId, const char *pTagCond) { + +} + -- GitLab