diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index e2191c73c27adf8d8ac0d25a20184b0f1548d6f0..c13acbba6a96b78aa66a522720078bca68e023b0 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -4674,8 +4674,8 @@ int32_t parseLimitClause(SQueryInfo* pQueryInfo, int32_t clauseIndex, SQuerySQL* } // No tables included. No results generated. Query results are empty. - if (pTableMetaInfo->pTableMeta == NULL) { - tscTrace("%p no table in metricmeta, no output result", pSql); + if (pTableMetaInfo->vgroupList->numOfVgroups == 0) { + tscTrace("%p no table in super table, no output result", pSql); pQueryInfo->command = TSDB_SQL_RETRIEVE_EMPTY_RESULT; } diff --git a/src/inc/tsdb.h b/src/inc/tsdb.h index cd64f8e854f1ae3054e1f58fd73c4bfabd3a2d35..3e15257fcb866a1508507b5951561742f24018d8 100644 --- a/src/inc/tsdb.h +++ b/src/inc/tsdb.h @@ -145,6 +145,7 @@ typedef struct STableGroupList { // qualified table object list in group typedef struct STsdbQueryCond { STimeWindow twindow; int32_t order; // desc/asc order to iterate the data block + int32_t numOfCols; SColumnInfoData *colList; } STsdbQueryCond; @@ -189,8 +190,7 @@ typedef void *TsdbPosT; * @param pTableList table sid list * @return */ -TsdbQueryHandleT *tsdbQueryTables(TsdbRepoT *tsdb, STsdbQueryCond *pCond, STableGroupInfo *groupInfo, - SArray *pColumnInfo); +TsdbQueryHandleT *tsdbQueryTables(TsdbRepoT *tsdb, STsdbQueryCond *pCond, STableGroupInfo *groupInfo); /** * move to next block @@ -241,20 +241,6 @@ SArray *tsdbRetrieveDataBlock(TsdbQueryHandleT *pQueryHandle, SArray *pIdList); */ int32_t tsdbResetQuery(TsdbQueryHandleT *pQueryHandle, STimeWindow *window, TsdbPosT position, int16_t order); -/** - * return the access position of current query handle - * @param pQueryHandle - * @return - */ -int32_t tsdbDataBlockSeek(TsdbQueryHandleT *pQueryHandle, TsdbPosT pos); - -/** - * todo remove this function later - * @param pQueryHandle - * @return - */ -TsdbPosT tsdbDataBlockTell(TsdbQueryHandleT *pQueryHandle); - /** * todo remove this function later * @param pQueryHandle @@ -292,7 +278,7 @@ SArray *tsdbGetTableList(TsdbQueryHandleT *pQueryHandle); * @param pTagCond. tag query condition * */ -int32_t tsdbQueryTags(TsdbRepoT *tsdb, int64_t uid, const char *pTagCond, size_t len, STableGroupInfo *pGroupList, +int32_t tsdbQueryByTagsCond(TsdbRepoT *tsdb, int64_t uid, const char *pTagCond, size_t len, STableGroupInfo *pGroupList, SColIndex *pColIndex, int32_t numOfCols); int32_t tsdbGetOneTableGroup(TsdbRepoT *tsdb, int64_t uid, STableGroupInfo *pGroupInfo); diff --git a/src/query/inc/qast.h b/src/query/inc/qast.h index bd5e61c321acd90a14a74053d101859c22cf285b..f3484509f8113f22050b959bb5c8226375d2bd3c 100644 --- a/src/query/inc/qast.h +++ b/src/query/inc/qast.h @@ -30,8 +30,6 @@ extern "C" { struct tExprNode; struct SSchema; -struct tSkipList; -struct tSkipListNode; enum { TSQL_NODE_EXPR = 0x1, diff --git a/src/query/inc/queryExecutor.h b/src/query/inc/queryExecutor.h index 244f15e1ddc64a4889cd08466424960814bb44d8..c07897abf6150b6d99ed3d70cd5daad6ac89b30e 100644 --- a/src/query/inc/queryExecutor.h +++ b/src/query/inc/queryExecutor.h @@ -170,11 +170,11 @@ typedef struct SQInfo { int32_t pointsInterpo; int32_t code; // error code to returned to client sem_t dataReady; - STableGroupInfo groupInfo; // table id list void* tsdb; + STableGroupInfo groupInfo; // table id list SQueryRuntimeEnv runtimeEnv; - int32_t subgroupIdx; + int32_t groupIndex; int32_t offset; /* offset in group result set of subgroup */ T_REF_DECLARE() diff --git a/src/query/src/queryExecutor.c b/src/query/src/queryExecutor.c index fcb073ef4d36542924be1d32ecce264e9276d920..6ded52d11d01c8515d2f06111532c2f954f32ce2 100644 --- a/src/query/src/queryExecutor.c +++ b/src/query/src/queryExecutor.c @@ -48,6 +48,7 @@ #define GET_QINFO_ADDR(x) ((void*)((char *)(x)-offsetof(SQInfo, runtimeEnv))) #define GET_COL_DATA_POS(query, index, step) ((query)->pos + (index) * (step)) +#define SWITCH_ORDER(n) (((n) = ((n) == TSDB_ORDER_ASC)? TSDB_ORDER_DESC:TSDB_ORDER_ASC)) /* get the qinfo struct address from the query struct address */ #define GET_COLUMN_BYTES(query, colidx) \ @@ -82,15 +83,24 @@ typedef enum { QUERY_OVER = 0x8u, } vnodeQueryStatus; -static void setQueryStatus(SQuery *pQuery, int8_t status); -bool isIntervalQuery(SQuery *pQuery) { return pQuery->intervalTime > 0; } - enum { TS_JOIN_TS_EQUAL = 0, TS_JOIN_TS_NOT_EQUALS = 1, TS_JOIN_TAG_NOT_EQUALS = 2, }; +typedef struct { + int32_t status; // query status + TSKEY lastKey; // the lastKey value before query executed + STimeWindow w; // whole query time window + STimeWindow current; // current query window + int32_t windowIndex; // index of active time window result for interval query + STSCursor cur; +} SQueryStatusInfo; + +static void setQueryStatus(SQuery *pQuery, int8_t status); +bool isIntervalQuery(SQuery *pQuery) { return pQuery->intervalTime > 0; } + static int32_t mergeIntoGroupResultImpl(SQInfo *pQInfo, SArray* group); static void setWindowResOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult *pResult); @@ -2224,111 +2234,11 @@ char *getPosInResultPage(SQueryRuntimeEnv *pRuntimeEnv, int32_t columnIndex, SWi pQuery->pSelectExpr[columnIndex].resBytes * realRowId; } -int32_t UNUSED_FUNC vnodeSTableQueryPrepare(SQInfo *pQInfo, SQuery *pQuery, void *param) { - if ((QUERY_IS_ASC_QUERY(pQuery) && (pQuery->window.skey > pQuery->window.ekey)) || - (!QUERY_IS_ASC_QUERY(pQuery) && (pQuery->window.ekey > pQuery->window.skey))) { - qTrace("QInfo:%p no result in time range %" PRId64 "-%" PRId64 ", order %d", pQInfo, pQuery->window.skey, - pQuery->window.ekey, pQuery->order.order); - - sem_post(&pQInfo->dataReady); - return TSDB_CODE_SUCCESS; - } - - pQuery->status = 0; - pQuery->rec = (SResultRec){0}; - - changeExecuteScanOrder(pQuery, true); - SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv; - - /* - * since we employ the output control mechanism in main loop. - * so, disable it during data block scan procedure. - */ - setScanLimitationByResultBuffer(pQuery); - - // save raw query range for applying to each subgroup - pQuery->lastKey = pQuery->window.skey; - - // create runtime environment - // SColumnModel *pTagSchemaInfo = pQInfo->pSidSet->pColumnModel; - - // get one queried meter - assert(0); - // SMeterObj *pMeter = getMeterObj(pQInfo->groupInfo, pQInfo->pSidSet->pTableIdList[0]->sid); - - pRuntimeEnv->pTSBuf = param; - pRuntimeEnv->cur.vnodeIndex = -1; - - // set the ts-comp file traverse order - if (param != NULL) { - int16_t order = (pQuery->order.order == pRuntimeEnv->pTSBuf->tsOrder) ? TSDB_ORDER_ASC : TSDB_ORDER_DESC; - tsBufSetTraverseOrder(pRuntimeEnv->pTSBuf, order); - } - - assert(0); - // int32_t ret = setupQueryRuntimeEnv(pMeter, pQuery, &pQInfo->runtimeEnv, pTagSchemaInfo, TSDB_ORDER_ASC, true); - // if (ret != TSDB_CODE_SUCCESS) { - // return ret; - // } - - // createTableGroup(pQInfo->pSidSet); - - int32_t size = getInitialPageNum(pQInfo); - int32_t ret = createDiskbasedResultBuffer(&pRuntimeEnv->pResultBuf, size, pQuery->rowSize); - if (ret != TSDB_CODE_SUCCESS) { - return ret; - } - - if (pQuery->intervalTime == 0) { - int16_t type = TSDB_DATA_TYPE_NULL; - - if (isGroupbyNormalCol(pQuery->pGroupbyExpr)) { // group by columns not tags; - type = getGroupbyColumnType(pQuery, pQuery->pGroupbyExpr); - } else { - type = TSDB_DATA_TYPE_INT; // group id - } - - initWindowResInfo(&pRuntimeEnv->windowResInfo, pRuntimeEnv, 512, 4096, type); - } - - pRuntimeEnv->numOfRowsPerPage = getNumOfRowsInResultPage(pQuery, true); - - STsdbQueryCond cond = { - .twindow = (STimeWindow) {.skey = pQuery->window.skey, .ekey = pQuery->window.ekey}, - .order = pQuery->order.order, - .colList = pQuery->colList, - }; - - // for(int32_t i = 0; i < pQInfo->pSidSet->numOfTables; ++i) { - // SMeterObj *p1 = getMeterObj(pQInfo->groupInfo, pQInfo->pSidSet->pTableIdList[i]->sid); - // taosArrayPush(sa, &p1); - // } - - SArray *cols = taosArrayInit(pQuery->numOfCols, sizeof(pQuery->colList[0])); - for (int32_t i = 0; i < pQuery->numOfCols; ++i) { - taosArrayPush(cols, &pQuery->colList[i]); - } - - pRuntimeEnv->pQueryHandle = tsdbQueryTables(NULL, &cond, &pQInfo->groupInfo, cols); - - // metric query do not invoke interpolation, it will be done at the second-stage merge - if (!isPointInterpoQuery(pQuery)) { - pQuery->interpoType = TSDB_INTERPO_NONE; - } - - TSKEY revisedStime = taosGetIntervalStartTimestamp(pQuery->window.skey, pQuery->intervalTime, pQuery->slidingTimeUnit, - pQuery->precision); - taosInitInterpoInfo(&pRuntimeEnv->interpoInfo, pQuery->order.order, revisedStime, 0, 0); - pRuntimeEnv->stableQuery = true; - - return TSDB_CODE_SUCCESS; -} - /** * decrease the refcount for each table involved in this query * @param pQInfo */ -void vnodeDecMeterRefcnt(SQInfo *pQInfo) { +UNUSED_FUNC void vnodeDecMeterRefcnt(SQInfo *pQInfo) { if (pQInfo != NULL) { // assert(taosHashGetSize(pQInfo->groupInfo) >= 1); } @@ -2362,7 +2272,7 @@ void vnodeDecMeterRefcnt(SQInfo *pQInfo) { #endif } -void setTimestampRange(SQueryRuntimeEnv *pRuntimeEnv, int64_t stime, int64_t etime) { +UNUSED_FUNC void setTimestampRange(SQueryRuntimeEnv *pRuntimeEnv, int64_t stime, int64_t etime) { SQuery *pQuery = pRuntimeEnv->pQuery; for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) { @@ -2907,14 +2817,14 @@ int32_t mergeIntoGroupResult(SQInfo *pQInfo) { int32_t numOfGroups = taosArrayGetSize(pQInfo->groupInfo.pGroupList); - while (pQInfo->subgroupIdx < numOfGroups) { - SArray* group = taosArrayGetP(pQInfo->groupInfo.pGroupList, pQInfo->subgroupIdx); + while (pQInfo->groupIndex < numOfGroups) { + SArray* group = taosArrayGetP(pQInfo->groupInfo.pGroupList, pQInfo->groupIndex); ret = mergeIntoGroupResultImpl(pQInfo, group); if (ret < 0) { // not enough disk space to save the data into disk return -1; } - pQInfo->subgroupIdx += 1; + pQInfo->groupIndex += 1; // this group generates at least one result, return results if (ret > 0) { @@ -2922,11 +2832,11 @@ int32_t mergeIntoGroupResult(SQInfo *pQInfo) { } assert(pQInfo->numOfGroupResultPages == 0); - qTrace("QInfo:%p no result in group %d, continue", pQInfo, pQInfo->subgroupIdx - 1); + qTrace("QInfo:%p no result in group %d, continue", pQInfo, pQInfo->groupIndex - 1); } qTrace("QInfo:%p merge res data into group, index:%d, total group:%d, elapsed time:%lldms", - pQInfo, pQInfo->subgroupIdx - 1, numOfGroups, taosGetTimestampMs() - st); + pQInfo, pQInfo->groupIndex - 1, numOfGroups, taosGetTimestampMs() - st); return TSDB_CODE_SUCCESS; } @@ -2941,7 +2851,7 @@ void copyResToQueryResultBuf(SQInfo *pQInfo, SQuery *pQuery) { } // set current query completed - // if (pQInfo->numOfGroupResultPages == 0 && pQInfo->subgroupIdx == pQInfo->pSidSet->numOfSubSet) { + // if (pQInfo->numOfGroupResultPages == 0 && pQInfo->groupIndex == pQInfo->pSidSet->numOfSubSet) { // pQInfo->tableIndex = pQInfo->pSidSet->numOfTables; // return; // } @@ -2950,7 +2860,7 @@ void copyResToQueryResultBuf(SQInfo *pQInfo, SQuery *pQuery) { SQueryRuntimeEnv * pRuntimeEnv = &pQInfo->runtimeEnv; SDiskbasedResultBuf *pResultBuf = pRuntimeEnv->pResultBuf; - int32_t id = getGroupResultId(pQInfo->subgroupIdx - 1); + int32_t id = getGroupResultId(pQInfo->groupIndex - 1); SIDList list = getDataBufPagesIdList(pResultBuf, pQInfo->offset + id); int32_t total = 0; @@ -3156,7 +3066,7 @@ int32_t flushFromResultBuf(SQInfo *pQInfo) { r = capacity; } - int32_t id = getGroupResultId(pQInfo->subgroupIdx) + pQInfo->numOfGroupResultPages; + int32_t id = getGroupResultId(pQInfo->groupIndex) + pQInfo->numOfGroupResultPages; tFilePage *buf = getNewDataBuf(pResultBuf, id, &pageId); // pagewise copy to dest buffer @@ -3205,8 +3115,8 @@ static void doDisableFunctsForSupplementaryScan(SQuery *pQuery, SWindowResInfo * for (int32_t j = 0; j < pQuery->numOfOutputCols; ++j) { int32_t functId = pQuery->pSelectExpr[j].pBase.functionId; - if (((functId == TSDB_FUNC_FIRST || functId == TSDB_FUNC_FIRST_DST) && order == TSDB_ORDER_DESC) || - ((functId == TSDB_FUNC_LAST || functId == TSDB_FUNC_LAST_DST) && order == TSDB_ORDER_ASC)) { + if (((functId == TSDB_FUNC_FIRST || functId == TSDB_FUNC_FIRST_DST) && order == TSDB_ORDER_ASC) || + ((functId == TSDB_FUNC_LAST || functId == TSDB_FUNC_LAST_DST) && order == TSDB_ORDER_DESC)) { buf->resultInfo[j].complete = false; } else if (functId != TSDB_FUNC_TS && functId != TSDB_FUNC_TAG) { buf->resultInfo[j].complete = true; @@ -3215,32 +3125,28 @@ static void doDisableFunctsForSupplementaryScan(SQuery *pQuery, SWindowResInfo * } } -void disableFunctForTableSuppleScan(SQueryRuntimeEnv *pRuntimeEnv, int32_t order) { +void disableFuncInReverseScan(SQueryRuntimeEnv *pRuntimeEnv) { SQuery *pQuery = pRuntimeEnv->pQuery; - + int32_t order = pQuery->order.order; + // group by normal columns and interval query on normal table - for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) { - pRuntimeEnv->pCtx[i].order = (pRuntimeEnv->pCtx[i].order) ^ 1u; - } - SWindowResInfo *pWindowResInfo = &pRuntimeEnv->windowResInfo; if (isGroupbyNormalCol(pQuery->pGroupbyExpr) || isIntervalQuery(pQuery)) { doDisableFunctsForSupplementaryScan(pQuery, pWindowResInfo, order); } else { // for simple result of table query, for (int32_t j = 0; j < pQuery->numOfOutputCols; ++j) { - int32_t functId = pQuery->pSelectExpr[j].pBase.functionId; + int32_t functId = pQuery->pSelectExpr[j].pBase.functionId; + SQLFunctionCtx *pCtx = &pRuntimeEnv->pCtx[j]; - if (((functId == TSDB_FUNC_FIRST || functId == TSDB_FUNC_FIRST_DST) && order == TSDB_ORDER_DESC) || - ((functId == TSDB_FUNC_LAST || functId == TSDB_FUNC_LAST_DST) && order == TSDB_ORDER_ASC)) { + if (((functId == TSDB_FUNC_FIRST || functId == TSDB_FUNC_FIRST_DST) && order == TSDB_ORDER_ASC) || + ((functId == TSDB_FUNC_LAST || functId == TSDB_FUNC_LAST_DST) && order == TSDB_ORDER_DESC)) { pCtx->resultInfo->complete = false; } else if (functId != TSDB_FUNC_TS && functId != TSDB_FUNC_TAG) { pCtx->resultInfo->complete = true; } } } - - pQuery->order.order = pQuery->order.order ^ 1u; } void disableFuncForReverseScan(SQInfo *pQInfo, int32_t order) { @@ -3266,14 +3172,11 @@ void disableFuncForReverseScan(SQInfo *pQInfo, int32_t order) { pQuery->order.order = (pQuery->order.order) ^ 1u; } -void enableFuncForForwardScan(SQueryRuntimeEnv *pRuntimeEnv, int32_t order) { +void switchCtxOrder(SQueryRuntimeEnv *pRuntimeEnv) { SQuery *pQuery = pRuntimeEnv->pQuery; - for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) { - pRuntimeEnv->pCtx[i].order = (pRuntimeEnv->pCtx[i].order) ^ 1u; + SWITCH_ORDER(pRuntimeEnv->pCtx[i].order);// = (pRuntimeEnv->pCtx[i].order == TSDB_ORDER_ASC)? TSDB_ORDER_DESC:TSDB_ORDER_ASC; } - - pQuery->order.order = (pQuery->order.order) ^ 1u; } void createQueryResultInfo(SQuery *pQuery, SWindowResult *pResultRow, bool isSTableQuery, SPosInfo *posInfo) { @@ -3387,70 +3290,6 @@ void doSkipResults(SQueryRuntimeEnv *pRuntimeEnv) { } } -typedef struct SQueryStatus { - int8_t overStatus; - TSKEY lastKey; - STSCursor cur; -} SQueryStatus; - -// todo refactor -static void queryStatusSave(SQueryRuntimeEnv *pRuntimeEnv, SQueryStatus *pStatus) { - SQuery *pQuery = pRuntimeEnv->pQuery; - - pStatus->overStatus = pQuery->status; - pStatus->lastKey = pQuery->lastKey; - - pStatus->cur = tsBufGetCursor(pRuntimeEnv->pTSBuf); // save the cursor - - if (pRuntimeEnv->pTSBuf) { - pRuntimeEnv->pTSBuf->cur.order ^= 1u; - tsBufNextPos(pRuntimeEnv->pTSBuf); - } - - setQueryStatus(pQuery, QUERY_NOT_COMPLETED); - - SWAP(pQuery->window.skey, pQuery->window.ekey, TSKEY); - pQuery->lastKey = pQuery->window.skey; -} - -static void queryStatusRestore(SQueryRuntimeEnv *pRuntimeEnv, SQueryStatus *pStatus) { - SQuery *pQuery = pRuntimeEnv->pQuery; - SWAP(pQuery->window.skey, pQuery->window.ekey, TSKEY); - - pQuery->lastKey = pStatus->lastKey; - pQuery->status = pStatus->overStatus; - - tsBufSetCursor(pRuntimeEnv->pTSBuf, &pStatus->cur); -} - -static void doSingleMeterSupplementScan(SQueryRuntimeEnv *pRuntimeEnv) { - SQuery * pQuery = pRuntimeEnv->pQuery; - SQueryStatus qStatus = {0}; - - if (!needReverseScan(pQuery)) { - return; - } - - qTrace("QInfo:%p start to supp scan", GET_QINFO_ADDR(pQuery)); - SET_SUPPLEMENT_SCAN_FLAG(pRuntimeEnv); - - // close necessary function execution during supplementary scan - disableFunctForTableSuppleScan(pRuntimeEnv, pQuery->order.order); - queryStatusSave(pRuntimeEnv, &qStatus); - - STimeWindow w = {.skey = pQuery->window.skey, .ekey = pQuery->window.ekey}; - - // reverse scan from current position - TsdbPosT current = tsdbDataBlockTell(pRuntimeEnv->pQueryHandle); - tsdbResetQuery(pRuntimeEnv->pQueryHandle, &w, current, pQuery->order.order); - - doScanAllDataBlocks(pRuntimeEnv); - - queryStatusRestore(pRuntimeEnv, &qStatus); - enableFuncForForwardScan(pRuntimeEnv, pQuery->order.order); - SET_MASTER_SCAN_FLAG(pRuntimeEnv); -} - void setQueryStatus(SQuery *pQuery, int8_t status) { if (status == QUERY_NOT_COMPLETED) { pQuery->status = status; @@ -3506,82 +3345,140 @@ bool needScanDataBlocksAgain(SQueryRuntimeEnv *pRuntimeEnv) { return toContinue; } +static SQueryStatusInfo getQueryStatusInfo(SQueryRuntimeEnv* pRuntimeEnv) { + SQuery* pQuery = pRuntimeEnv->pQuery; + + SQueryStatusInfo info = { + .status = pQuery->status, + .windowIndex = pRuntimeEnv->windowResInfo.curIndex, + .lastKey = pQuery->lastKey, + .w = pQuery->window, + }; + + return info; +} + +static void setEnvBeforeReverseScan(SQueryRuntimeEnv* pRuntimeEnv, SQueryStatusInfo* pStatus) { + SQInfo* pQInfo = GET_QINFO_ADDR(pRuntimeEnv); + SQuery* pQuery = pRuntimeEnv->pQuery; + + // the step should be placed before order changed + int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order); + + pStatus->cur = tsBufGetCursor(pRuntimeEnv->pTSBuf); // save the cursor + if (pRuntimeEnv->pTSBuf) { + SWITCH_ORDER(pRuntimeEnv->pTSBuf->cur.order); + tsBufNextPos(pRuntimeEnv->pTSBuf); + } + + // reverse order time range + pQuery->window.skey = pQuery->lastKey - step; + pQuery->window.ekey = pStatus->lastKey; // the start timestamp of current query + + SWITCH_ORDER(pQuery->order.order); + SET_SUPPLEMENT_SCAN_FLAG(pRuntimeEnv); + + STsdbQueryCond cond = { + .twindow = pQuery->window, + .order = pQuery->order.order, + .colList = pQuery->colList, + .numOfCols = pQuery->numOfCols, + }; + + // clean unused handle + if (pRuntimeEnv->pSecQueryHandle != NULL) { + tsdbCleanupQueryHandle(pRuntimeEnv->pSecQueryHandle); + } + + pRuntimeEnv->pSecQueryHandle = tsdbQueryTables(pQInfo->tsdb, &cond, &pQInfo->groupInfo); + + setQueryStatus(pQuery, QUERY_NOT_COMPLETED); + switchCtxOrder(pRuntimeEnv); + disableFuncInReverseScan(pRuntimeEnv); +} + +static void clearEnvAfterReverseScan(SQueryRuntimeEnv* pRuntimeEnv, TSKEY lastKey, SQueryStatusInfo* pStatus) { + SQuery* pQuery = pRuntimeEnv->pQuery; + + SWITCH_ORDER(pQuery->order.order); + switchCtxOrder(pRuntimeEnv); + + tsBufSetCursor(pRuntimeEnv->pTSBuf, &pStatus->cur); + if (pRuntimeEnv->pTSBuf) { + pRuntimeEnv->pTSBuf->cur.order = pQuery->order.order; + } + + SET_MASTER_SCAN_FLAG(pRuntimeEnv); + + // update the pQuery->window.skey and pQuery->window.ekey to limit the scan scope of sliding query + // during reverse scan + pQuery->lastKey = lastKey; + pQuery->status = pStatus->status; + pQuery->window = pStatus->w; +} + void scanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) { SQuery *pQuery = pRuntimeEnv->pQuery; setQueryStatus(pQuery, QUERY_NOT_COMPLETED); // store the start query position SQInfo* pQInfo = (SQInfo*) GET_QINFO_ADDR(pRuntimeEnv); - - int64_t skey = pQuery->lastKey; - int32_t status = pQuery->status; - int32_t activeSlot = pRuntimeEnv->windowResInfo.curIndex; + SQueryStatusInfo qstatus = getQueryStatusInfo(pRuntimeEnv); SET_MASTER_SCAN_FLAG(pRuntimeEnv); int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order); while (1) { doScanAllDataBlocks(pRuntimeEnv); + + if (pRuntimeEnv->scanFlag == MASTER_SCAN) { + qstatus.status = pQuery->status; + } if (!needScanDataBlocksAgain(pRuntimeEnv)) { - - // restore the status + // restore the status code and jump out of loop if (pRuntimeEnv->scanFlag == REPEAT_SCAN) { - pQuery->status = status; + pQuery->status = qstatus.status; } break; } - // set the correct start position, and load the corresponding block in buffer for next round scan all data blocks. -// /*int32_t ret =*/ tsdbDataBlockSeek(pRuntimeEnv->pQueryHandle, pos); - STsdbQueryCond cond = { - .twindow = {pQuery->window.skey, pQuery->lastKey}, - .order = pQuery->order.order, - .colList = pQuery->colList, + .twindow = {.skey = qstatus.lastKey, .ekey = pQuery->lastKey - step}, + .order = pQuery->order.order, + .colList = pQuery->colList, + .numOfCols = pQuery->numOfCols, }; - SArray *cols = taosArrayInit(pQuery->numOfCols, sizeof(pQuery->colList[0])); - for (int32_t i = 0; i < pQuery->numOfCols; ++i) { - taosArrayPush(cols, &pQuery->colList[i]); - } - if (pRuntimeEnv->pSecQueryHandle != NULL) { - pRuntimeEnv->pSecQueryHandle = tsdbQueryTables(pQInfo->tsdb, &cond, &pQInfo->groupInfo, cols); + tsdbCleanupQueryHandle(pRuntimeEnv->pSecQueryHandle); } - taosArrayDestroy(cols); - - status = pQuery->status; - pRuntimeEnv->windowResInfo.curIndex = activeSlot; + pRuntimeEnv->pSecQueryHandle = tsdbQueryTables(pQInfo->tsdb, &cond, &pQInfo->groupInfo); + pRuntimeEnv->windowResInfo.curIndex = qstatus.windowIndex; setQueryStatus(pQuery, QUERY_NOT_COMPLETED); pRuntimeEnv->scanFlag = REPEAT_SCAN; // check if query is killed or not - if (isQueryKilled(GET_QINFO_ADDR(pRuntimeEnv))) { + if (isQueryKilled(pQInfo)) { return; } } + + if (!needReverseScan(pQuery)) { + return; + } + + TSKEY lastKey = pQuery->lastKey; + setEnvBeforeReverseScan(pRuntimeEnv, &qstatus); - // no need to set the end key - TSKEY lkey = pQuery->lastKey; - TSKEY ekey = pQuery->window.ekey; - - pQuery->window.skey = skey; - pQuery->window.ekey = pQuery->lastKey - step; - /*tsdbpos_t current =*/ tsdbDataBlockTell(pRuntimeEnv->pQueryHandle); - - doSingleMeterSupplementScan(pRuntimeEnv); - - // update the pQuery->window.skey and pQuery->window.ekey to limit the scan scope of sliding query during reverse scan - pQuery->lastKey = lkey; - pQuery->window.ekey = ekey; - -// STimeWindow win = {.skey = pQuery->window.skey, .ekey = pQuery->window.ekey}; -// tsdbResetQuery(pRuntimeEnv->pQueryHandle, &win, current, pQuery->order.order); -// tsdbNextDataBlock(pRuntimeEnv->pQueryHandle); + // reverse scan from current position + qTrace("QInfo:%p start to reverse scan", GET_QINFO_ADDR(pRuntimeEnv)); + doScanAllDataBlocks(pRuntimeEnv); + + clearEnvAfterReverseScan(pRuntimeEnv, lastKey, &qstatus); } void finalizeQueryResult(SQueryRuntimeEnv *pRuntimeEnv) { @@ -3875,17 +3772,17 @@ static int32_t doCopyToSData(SQInfo *pQInfo, SWindowResult *result, int32_t orde int32_t totalSubset = getNumOfSubset(pQInfo); if (orderType == TSDB_ORDER_ASC) { - startIdx = pQInfo->subgroupIdx; + startIdx = pQInfo->groupIndex; step = 1; } else { // desc order copy all data - startIdx = totalSubset - pQInfo->subgroupIdx - 1; + startIdx = totalSubset - pQInfo->groupIndex - 1; step = -1; } for (int32_t i = startIdx; (i < totalSubset) && (i >= 0); i += step) { if (result[i].numOfRows == 0) { pQInfo->offset = 0; - pQInfo->subgroupIdx += 1; + pQInfo->groupIndex += 1; continue; } @@ -3903,7 +3800,7 @@ static int32_t doCopyToSData(SQInfo *pQInfo, SWindowResult *result, int32_t orde pQInfo->offset += numOfRowsToCopy; } else { pQInfo->offset = 0; - pQInfo->subgroupIdx += 1; + pQInfo->groupIndex += 1; } for (int32_t j = 0; j < pQuery->numOfOutputCols; ++j) { @@ -4174,18 +4071,13 @@ int32_t doInitQInfo(SQInfo *pQInfo, void *param, void* tsdb, bool isSTableQuery) pQuery->lastKey = pQuery->window.skey; STsdbQueryCond cond = { - .twindow = pQuery->window, - .order = pQuery->order.order, - .colList = pQuery->colList, + .twindow = pQuery->window, + .order = pQuery->order.order, + .colList = pQuery->colList, + .numOfCols = pQuery->numOfCols, }; - - SArray *cols = taosArrayInit(pQuery->numOfCols, sizeof(pQuery->colList[0])); - for (int32_t i = 0; i < pQuery->numOfCols; ++i) { - taosArrayPush(cols, &pQuery->colList[i]); - } - pRuntimeEnv->pQueryHandle = tsdbQueryTables(tsdb, &cond, &pQInfo->groupInfo, cols); - taosArrayDestroy(cols); + pRuntimeEnv->pQueryHandle = tsdbQueryTables(tsdb, &cond, &pQInfo->groupInfo); pQInfo->tsdb = tsdb; pRuntimeEnv->pQuery = pQuery; @@ -4403,25 +4295,19 @@ static bool multiTableMultioutputHelper(SQInfo *pQInfo, int32_t index) { .twindow = {pInfo->pTableQInfo->lastKey, pInfo->pTableQInfo->win.ekey}, .order = pQuery->order.order, .colList = pQuery->colList, + .numOfCols = pQuery->numOfCols, }; - SArray *cols = taosArrayInit(pQuery->numOfCols, sizeof(pQuery->colList[0])); - for (int32_t i = 0; i < pQuery->numOfCols; ++i) { - taosArrayPush(cols, &pQuery->colList[i]); - } SArray* g1 = taosArrayInit(1, POINTER_BYTES); - STableGroupInfo gp = {.numOfTables = 1, .pGroupList = g1}; - SArray* tx = taosArrayInit(1, sizeof(SPair)); - taosArrayPush(tx, p); + taosArrayPush(tx, p); taosArrayPush(g1, &tx); + STableGroupInfo gp = {.numOfTables = 1, .pGroupList = g1}; + // include only current table - pRuntimeEnv->pQueryHandle = tsdbQueryTables(pQInfo->tsdb, &cond, &gp, cols); - -// vnodeUpdateQueryColumnIndex(pQuery, pRuntimeEnv->pMeterObj); -// vnodeUpdateFilterColumnIndex(pQuery); + pRuntimeEnv->pQueryHandle = tsdbQueryTables(pQInfo->tsdb, &cond, &gp); if (pRuntimeEnv->pTSBuf != NULL) { if (pRuntimeEnv->cur.vnodeIndex == -1) { @@ -4501,14 +4387,14 @@ static void sequentialTableProcess(SQInfo *pQInfo) { assert(pQuery->limit.offset == 0 && pQuery->limit.limit != 0); #if 0 - while (pQInfo->subgroupIdx < numOfGroups) { + while (pQInfo->groupIndex < numOfGroups) { - SArray* group = taosArrayGetP(pQInfo->groupInfo.pGroupList, pQInfo->subgroupIdx); + SArray* group = taosArrayGetP(pQInfo->groupInfo.pGroupList, pQInfo->groupIndex); size_t numOfTable = taosArrayGetSize(group); if (isFirstLastRowQuery(pQuery)) { qTrace("QInfo:%p last_row query on vid:%d, numOfGroups:%d, current group:%d", pQInfo, vid, pTableIdList->numOfSubSet, - pQInfo->subgroupIdx); + pQInfo->groupIndex); TSKEY key = -1; int32_t index = -1; @@ -4529,7 +4415,7 @@ static void sequentialTableProcess(SQInfo *pQInfo) { // assert(num >= 0); } else { qTrace("QInfo:%p interp query on vid:%d, numOfGroups:%d, current group:%d", pQInfo, vid, pTableIdList->numOfSubSet, - pQInfo->subgroupIdx); + pQInfo->groupIndex); for (int32_t k = start; k <= end; ++k) { if (isQueryKilled(pQInfo)) { @@ -4547,7 +4433,7 @@ static void sequentialTableProcess(SQInfo *pQInfo) { } } - pSupporter->subgroupIdx++; + pSupporter->groupIndex++; // output buffer is full, return to client if (pQuery->size >= pQuery->pointsToRead) { @@ -4564,7 +4450,7 @@ static void sequentialTableProcess(SQInfo *pQInfo) { * if the subgroup index is larger than 0, results generated by group by tbname,k is existed. * we need to return it to client in the first place. */ - if (pQInfo->subgroupIdx > 0) { + if (pQInfo->groupIndex > 0) { copyFromWindowResToSData(pQInfo, pRuntimeEnv->windowResInfo.pResult); pQuery->rec.total += pQuery->rec.rows; @@ -4585,13 +4471,11 @@ static void sequentialTableProcess(SQInfo *pQInfo) { assert(taosArrayGetSize(group) == pQInfo->groupInfo.numOfTables && 1 == taosArrayGetSize(pQInfo->groupInfo.pGroupList)); while (pQInfo->tableIndex < pQInfo->groupInfo.numOfTables) { - int32_t k = pQInfo->tableIndex; - if (isQueryKilled(pQInfo)) { return; } - SPair *p = taosArrayGet(group, k); + SPair *p = taosArrayGet(group, pQInfo->tableIndex); STableDataInfo* pInfo = p->sec; TSKEY skey = pInfo->pTableQInfo->lastKey; @@ -4599,7 +4483,7 @@ static void sequentialTableProcess(SQInfo *pQInfo) { pQuery->window.skey = skey; } - if (!multiTableMultioutputHelper(pQInfo, k)) { + if (!multiTableMultioutputHelper(pQInfo, pQInfo->tableIndex)) { pQInfo->tableIndex++; continue; } @@ -4696,7 +4580,7 @@ static void sequentialTableProcess(SQInfo *pQInfo) { } } - pQInfo->subgroupIdx = 0; + pQInfo->groupIndex = 0; pQuery->rec.rows = 0; copyFromWindowResToSData(pQInfo, pWindowResInfo->pResult); } @@ -4773,7 +4657,7 @@ static void doRestoreContext(SQInfo* pQInfo) { pRuntimeEnv->pTSBuf->cur.order = pRuntimeEnv->pTSBuf->cur.order ^ 1; } - enableFuncForForwardScan(pRuntimeEnv, pQuery->order.order); + switchCtxOrder(pRuntimeEnv); SET_MASTER_SCAN_FLAG(pRuntimeEnv); } @@ -4806,9 +4690,9 @@ static void multiTableQueryProcess(SQInfo *pQInfo) { SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv; SQuery * pQuery = pRuntimeEnv->pQuery; - if (pQInfo->subgroupIdx > 0) { + if (pQInfo->groupIndex > 0) { /* - * if the subgroupIdx > 0, the query process must be completed yet, we only need to + * if the groupIndex > 0, the query process must be completed yet, we only need to * copy the data into output buffer */ if (isIntervalQuery(pQuery)) { @@ -4870,7 +4754,7 @@ static void multiTableQueryProcess(SQInfo *pQInfo) { } if (isIntervalQuery(pQuery) || isSumAvgRateQuery(pQuery)) { -// assert(pSupporter->subgroupIdx == 0 && pSupporter->numOfGroupResultPages == 0); +// assert(pSupporter->groupIndex == 0 && pSupporter->numOfGroupResultPages == 0); if (mergeIntoGroupResult(pQInfo) == TSDB_CODE_SUCCESS) { copyResToQueryResultBuf(pQInfo, pQuery); @@ -5008,11 +4892,11 @@ static void tableIntervalProcess(SQInfo *pQInfo) { tableIntervalProcessImpl(pRuntimeEnv); if (isIntervalQuery(pQuery)) { - pQInfo->subgroupIdx = 0; // always start from 0 + pQInfo->groupIndex = 0; // always start from 0 pQuery->rec.rows = 0; copyFromWindowResToSData(pQInfo, pRuntimeEnv->windowResInfo.pResult); - clearFirstNTimeWindow(pRuntimeEnv, pQInfo->subgroupIdx); + clearFirstNTimeWindow(pRuntimeEnv, pQInfo->groupIndex); } // the offset is handled at prepare stage if no interpolation involved @@ -5044,10 +4928,10 @@ static void tableIntervalProcess(SQInfo *pQInfo) { // all data scanned, the group by normal column can return if (isGroupbyNormalCol(pQuery->pGroupbyExpr)) { // todo refactor with merge interval time result - pQInfo->subgroupIdx = 0; + pQInfo->groupIndex = 0; pQuery->rec.rows = 0; copyFromWindowResToSData(pQInfo, pRuntimeEnv->windowResInfo.pResult); - clearFirstNTimeWindow(pRuntimeEnv, pQInfo->subgroupIdx); + clearFirstNTimeWindow(pRuntimeEnv, pQInfo->groupIndex); } pQInfo->pointsInterpo += numOfInterpo; @@ -5083,13 +4967,13 @@ static void tableQueryImpl(SQInfo* pQInfo) { // todo limit the output for interval query? pQuery->rec.rows = 0; - pQInfo->subgroupIdx = 0; // always start from 0 + pQInfo->groupIndex = 0; // always start from 0 if (pRuntimeEnv->windowResInfo.size > 0) { copyFromWindowResToSData(pQInfo, pRuntimeEnv->windowResInfo.pResult); pQuery->rec.rows += pQuery->rec.rows; - clearFirstNTimeWindow(pRuntimeEnv, pQInfo->subgroupIdx); + clearFirstNTimeWindow(pRuntimeEnv, pQInfo->groupIndex); if (pQuery->rec.rows > 0) { qTrace("QInfo:%p %d rows returned from group results, total:%d", pQInfo, pQuery->rec.rows, pQuery->rec.total); @@ -5895,13 +5779,6 @@ static void freeQInfo(SQInfo *pQInfo) { sem_destroy(&(pQInfo->dataReady)); teardownQueryRuntimeEnv(&pQInfo->runtimeEnv); -// if (pQInfo->pTableDataInfo != NULL) { - // size_t num = taosHashGetSize(pQInfo->groupInfo); -// for (int32_t j = 0; j < 0; ++j) { -// destroyMeterQueryInfo(pQInfo->pTableDataInfo[j].pTableQInfo, pQuery->numOfOutputCols); -// } -// } - for (int32_t i = 0; i < pQuery->numOfFilterCols; ++i) { SSingleColumnFilterInfo *pColFilter = &pQuery->pFilterInfo[i]; if (pColFilter->numOfFilters > 0) { @@ -5933,6 +5810,12 @@ static void freeQInfo(SQInfo *pQInfo) { tfree(pQuery->pGroupbyExpr); tfree(pQuery); + int32_t numOfGroups = taosArrayGetSize(pQInfo->groupInfo.pGroupList); + for(int32_t i = 0; i < numOfGroups; ++i) { + SArray* p = taosArrayGetP(pQInfo->groupInfo.pGroupList, i); + taosArrayDestroy(p); + } + taosArrayDestroy(pQInfo->groupInfo.pGroupList); qTrace("QInfo:%p QInfo is freed", pQInfo); @@ -6036,7 +5919,7 @@ int32_t qCreateQueryInfo(void* tsdb, SQueryTableMsg *pQueryMsg, qinfo_t *pQInfo) } bool isSTableQuery = false; - STableGroupInfo* groupInfo = calloc(1, sizeof(STableGroupInfo)); + STableGroupInfo groupInfo = {0}; if ((pQueryMsg->queryType & TSDB_QUERY_TYPE_STABLE_QUERY) != 0) { isSTableQuery = true; @@ -6044,8 +5927,8 @@ int32_t qCreateQueryInfo(void* tsdb, SQueryTableMsg *pQueryMsg, qinfo_t *pQInfo) STableId* id = taosArrayGet(pTableIdList, 0); id->uid = -1; //todo fix me - /*int32_t ret =*/ tsdbQueryTags(tsdb, id->uid, tagCond, pQueryMsg->tagCondLen, groupInfo, pGroupColIndex, pQueryMsg->numOfGroupCols); - if (groupInfo->numOfTables == 0) { // no qualified tables no need to do query + /*int32_t ret =*/ tsdbQueryByTagsCond(tsdb, id->uid, tagCond, pQueryMsg->tagCondLen, &groupInfo, pGroupColIndex, pQueryMsg->numOfGroupCols); + if (groupInfo.numOfTables == 0) { // no qualified tables no need to do query code = TSDB_CODE_SUCCESS; goto _query_over; } @@ -6053,12 +5936,12 @@ int32_t qCreateQueryInfo(void* tsdb, SQueryTableMsg *pQueryMsg, qinfo_t *pQInfo) assert(taosArrayGetSize(pTableIdList) == 1); STableId* id = taosArrayGet(pTableIdList, 0); - if ((code = tsdbGetOneTableGroup(tsdb, id->uid, groupInfo)) != TSDB_CODE_SUCCESS) { + if ((code = tsdbGetOneTableGroup(tsdb, id->uid, &groupInfo)) != TSDB_CODE_SUCCESS) { goto _query_over; } } - (*pQInfo) = createQInfoImpl(pQueryMsg, pGroupbyExpr, pExprs, groupInfo); + (*pQInfo) = createQInfoImpl(pQueryMsg, pGroupbyExpr, pExprs, &groupInfo); if ((*pQInfo) == NULL) { code = TSDB_CODE_SERV_OUT_OF_MEMORY; } @@ -6066,24 +5949,9 @@ int32_t qCreateQueryInfo(void* tsdb, SQueryTableMsg *pQueryMsg, qinfo_t *pQInfo) code = initQInfo(pQueryMsg, tsdb, *pQInfo, isSTableQuery); _query_over: - if (code != TSDB_CODE_SUCCESS) { - taosArrayDestroy(pTableIdList); - } + taosArrayDestroy(pTableIdList); // if failed to add ref for all meters in this query, abort current query - // if (code != TSDB_CODE_SUCCESS) { - // vnodeDecQueryRefCount(pQueryMsg, pMeterObjList, incNumber); - // } - // - // tfree(pQueryMsg->pSqlFuncExprs); - // tfree(pMeterObjList); - // ret = vnodeSendQueryRspMsg(pObj, code, pObj->qhandle); - // - // tfree(pQueryMsg->pSidExtInfo); - // for(int32_t i = 0; i < pQueryMsg->numOfCols; ++i) { - // vnodeFreeColumnInfo(&pQueryMsg->colList[i]); - // } - // // atomic_fetch_add_32(&vnodeSelectReqNum, 1); return TSDB_CODE_SUCCESS; } diff --git a/src/tsdb/src/tsdbRead.c b/src/tsdb/src/tsdbRead.c index 43bb69e96a521ff160c278131c8e4d83c4679ac3..4ab6895863626307f8ad0337b2bf10460af1a30e 100644 --- a/src/tsdb/src/tsdbRead.c +++ b/src/tsdb/src/tsdbRead.c @@ -134,7 +134,7 @@ static void tsdbInitCompBlockLoadInfo(SLoadCompBlockInfo* pCompBlockLoadInfo) { pCompBlockLoadInfo->fileListIndex = -1; } -TsdbQueryHandleT* tsdbQueryTables(TsdbRepoT* tsdb, STsdbQueryCond* pCond, STableGroupInfo* groupList, SArray* pColumnInfo) { +TsdbQueryHandleT* tsdbQueryTables(TsdbRepoT* tsdb, STsdbQueryCond* pCond, STableGroupInfo* groupList) { // todo 1. filter not exist table // todo 2. add the reference count for each table that is involved in query @@ -148,7 +148,7 @@ TsdbQueryHandleT* tsdbQueryTables(TsdbRepoT* tsdb, STsdbQueryCond* pCond, STable pQueryHandle->cur.fid = -1; size_t sizeOfGroup = taosArrayGetSize(groupList->pGroupList); - assert(sizeOfGroup >= 1); + assert(sizeOfGroup >= 1 && pCond != NULL && pCond->numOfCols > 0); pQueryHandle->pTableCheckInfo = taosArrayInit(groupList->numOfTables, sizeof(STableCheckInfo)); @@ -182,16 +182,15 @@ TsdbQueryHandleT* tsdbQueryTables(TsdbRepoT* tsdb, STsdbQueryCond* pCond, STable pQueryHandle->activeIndex = 0; // allocate buffer in order to load data blocks from file - int32_t numOfCols = taosArrayGetSize(pColumnInfo); + int32_t numOfCols = pCond->numOfCols; size_t bufferCapacity = 4096; pQueryHandle->pColumns = taosArrayInit(numOfCols, sizeof(SColumnInfoData)); - for (int32_t i = 0; i < numOfCols; ++i) { - SColumnInfoData* pCol = taosArrayGet(pColumnInfo, i); + for (int32_t i = 0; i < pCond->numOfCols; ++i) { SColumnInfoData pDest = {{0}, 0}; - pDest.pData = calloc(1, EXTRA_BYTES + bufferCapacity * pCol->info.bytes); - pDest.info = pCol->info; + pDest.info = pCond->colList[i].info; + pDest.pData = calloc(1, EXTRA_BYTES + bufferCapacity * pCond->colList[i].info.bytes); taosArrayPush(pQueryHandle->pColumns, &pDest); } @@ -430,9 +429,6 @@ static bool doLoadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlo taosArrayDestroy(sa); tfree(data); - // TSKEY* d = (TSKEY*)pCheckInfo->pDataCols->cols[PRIMARYKEY_TIMESTAMP_COL_INDEX].pData; - // assert(d[0] == pBlock->keyFirst && d[pBlock->numOfPoints - 1] == pBlock->keyLast); - return blockLoaded; } @@ -587,7 +583,7 @@ static void filterDataInDataBlock(STsdbQueryHandle* pQueryHandle, STableCheckInf } } - // int32_t start = MIN(cur->pos, endPos); + int32_t start = MIN(cur->pos, endPos); // move the data block in the front to data block if needed int32_t numOfCols = QH_GET_NUM_OF_COLS(pQueryHandle); @@ -600,9 +596,9 @@ static void filterDataInDataBlock(STsdbQueryHandle* pQueryHandle, STableCheckInf if (pCol->info.colId == colId) { // SDataCol* pDataCol = &pCols->cols[i]; - pCol->pData = pQueryHandle->rhelper.pDataCols[0]->cols[i].pData; - // memmove(pCol->pData, pDataCol->pData + pCol->info.bytes * start, - // pQueryHandle->realNumOfRows * pCol->info.bytes); +// pCol->pData = pQueryHandle->rhelper.pDataCols[0]->cols[i].pData + pCol->info.bytes * start; + memmove(pCol->pData, pQueryHandle->rhelper.pDataCols[0]->cols[i].pData + pCol->info.bytes * start, + pQueryHandle->realNumOfRows * pCol->info.bytes); break; } } @@ -941,7 +937,6 @@ bool tsdbNextDataBlock(TsdbQueryHandleT* pqHandle) { return getDataBlocksInFiles(pQueryHandle); } - } static int tsdbReadRowsFromCache(SSkipListIterator* pIter, TSKEY maxKey, int maxRowsToRead, TSKEY* skey, TSKEY* ekey, @@ -1053,7 +1048,7 @@ SDataBlockInfo tsdbRetrieveDataBlockInfo(TsdbQueryHandleT* pQueryHandle) { if (pTable->mem != NULL) { // create mem table iterator if it is not created yet assert(pCheckInfo->iter != NULL); - rows = tsdbReadRowsFromCache(pCheckInfo->iter, pHandle->window.ekey, 2, &skey, &ekey, pHandle); + rows = tsdbReadRowsFromCache(pCheckInfo->iter, pHandle->window.ekey, 4000, &skey, &ekey, pHandle); // update the last key value pCheckInfo->lastKey = ekey + step; @@ -1117,10 +1112,6 @@ int32_t tsdbResetQuery(TsdbQueryHandleT* pQueryHandle, STimeWindow* window, Tsdb return 0; } -int32_t tsdbDataBlockSeek(TsdbQueryHandleT* pQueryHandle, TsdbPosT pos) { return 0; } - -TsdbPosT tsdbDataBlockTell(TsdbQueryHandleT* pQueryHandle) { return NULL; } - SArray* tsdbRetrieveDataRow(TsdbQueryHandleT* pQueryHandle, SArray* pIdList, SQueryRowCond* pCond) { return NULL; } TsdbQueryHandleT* tsdbQueryFromTagConds(STsdbQueryCond* pCond, int16_t stableId, const char* pTagFilterStr) { @@ -1266,12 +1257,14 @@ int32_t tableGroupComparFn(const void *p1, const void *p2, const void *param) { SColIndex* pColIndex = &pTableGroupSupp->pCols[i]; int32_t colIndex = pColIndex->colIndex; + assert(colIndex >= 0 && colIndex < schemaNCols(pTableGroupSupp->pTagSchema)); + char * f1 = NULL; char * f2 = NULL; int32_t type = 0; int32_t bytes = 0; - if (colIndex == -1) { // table name, todo fix me + if (colIndex == -1) { // todo fix me, table name // f1 = s1->tags; // f2 = s2->tags; type = TSDB_DATA_TYPE_BINARY; @@ -1438,7 +1431,7 @@ static int32_t doQueryTableList(STable* pSTable, SArray* pRes, tExprNode* pExpr) return TSDB_CODE_SUCCESS; } -int32_t tsdbQueryTags(TsdbRepoT* tsdb, int64_t uid, const char* pTagCond, size_t len, STableGroupInfo* pGroupInfo, +int32_t tsdbQueryByTagsCond(TsdbRepoT* tsdb, int64_t uid, const char* pTagCond, size_t len, STableGroupInfo* pGroupInfo, SColIndex* pColIndex, int32_t numOfCols) { STable* pSTable = tsdbGetTableByUid(tsdbGetMeta(tsdb), uid); @@ -1520,11 +1513,11 @@ void tsdbCleanupQueryHandle(TsdbQueryHandleT queryHandle) { taosArrayDestroy(pQueryHandle->pTableCheckInfo); tfree(pQueryHandle->compIndex); - // size_t cols = taosArrayGetSize(pQueryHandle->pColumns); - // for (int32_t i = 0; i < cols; ++i) { - // SColumnInfoData* pColInfo = taosArrayGet(pQueryHandle->pColumns, i); - // // tfree(pColInfo->pData); - // } + size_t cols = taosArrayGetSize(pQueryHandle->pColumns); + for (int32_t i = 0; i < cols; ++i) { + SColumnInfoData* pColInfo = taosArrayGet(pQueryHandle->pColumns, i); + tfree(pColInfo->pData); + } taosArrayDestroy(pQueryHandle->pColumns);