From 912d3baab4b477867c029fab584a53f9f29c60e2 Mon Sep 17 00:00:00 2001 From: hjxilinx Date: Sat, 18 Apr 2020 18:17:06 +0800 Subject: [PATCH] [td-98] refactor code, change the query function signature. --- src/inc/tsdb.h | 19 +-- src/query/src/queryExecutor.c | 255 +++++++++++++++------------------- src/tsdb/src/tsdbRead.c | 23 ++- 3 files changed, 125 insertions(+), 172 deletions(-) diff --git a/src/inc/tsdb.h b/src/inc/tsdb.h index cc1e4daf12..8f4e1db590 100644 --- a/src/inc/tsdb.h +++ b/src/inc/tsdb.h @@ -144,6 +144,7 @@ typedef struct STableGroupList { // qualified table object list in group typedef struct STsdbQueryCond { STimeWindow twindow; int32_t order; // desc/asc order to iterate the data block + int32_t numOfCols; SColumnInfoData *colList; } STsdbQueryCond; @@ -188,7 +189,7 @@ typedef void *tsdbpos_t; * @param pTableList table sid list * @return */ -tsdb_query_handle_t *tsdbQueryTables(tsdb_repo_t* tsdb, STsdbQueryCond *pCond, STableGroupInfo *groupInfo, SArray *pColumnInfo); +tsdb_query_handle_t *tsdbQueryTables(tsdb_repo_t* tsdb, STsdbQueryCond *pCond, STableGroupInfo *groupInfo); /** * move to next block @@ -239,20 +240,6 @@ SArray *tsdbRetrieveDataBlock(tsdb_query_handle_t *pQueryHandle, SArray *pIdList */ int32_t tsdbResetQuery(tsdb_query_handle_t *pQueryHandle, STimeWindow *window, tsdbpos_t position, int16_t order); -/** - * return the access position of current query handle - * @param pQueryHandle - * @return - */ -int32_t tsdbDataBlockSeek(tsdb_query_handle_t *pQueryHandle, tsdbpos_t pos); - -/** - * todo remove this function later - * @param pQueryHandle - * @return - */ -tsdbpos_t tsdbDataBlockTell(tsdb_query_handle_t *pQueryHandle); - /** * todo remove this function later * @param pQueryHandle @@ -290,7 +277,7 @@ SArray *tsdbGetTableList(tsdb_query_handle_t *pQueryHandle); * @param pTagCond. tag query condition * */ -int32_t tsdbQueryTags(tsdb_repo_t* tsdb, int64_t uid, const char* pTagCond, size_t len, STableGroupInfo* pGroupList, +int32_t tsdbQueryByTagsCond(tsdb_repo_t* tsdb, int64_t uid, const char* pTagCond, size_t len, STableGroupInfo* pGroupList, SColIndex* pColIndex, int32_t numOfCols); int32_t tsdbGetOneTableGroup(tsdb_repo_t* tsdb, int64_t uid, STableGroupInfo* pGroupInfo); diff --git a/src/query/src/queryExecutor.c b/src/query/src/queryExecutor.c index f234e2cf46..02a3916b9e 100644 --- a/src/query/src/queryExecutor.c +++ b/src/query/src/queryExecutor.c @@ -2226,105 +2226,96 @@ char *getPosInResultPage(SQueryRuntimeEnv *pRuntimeEnv, int32_t columnIndex, SWi pQuery->pSelectExpr[columnIndex].resBytes * realRowId; } -int32_t UNUSED_FUNC vnodeSTableQueryPrepare(SQInfo *pQInfo, SQuery *pQuery, void *param) { - if ((QUERY_IS_ASC_QUERY(pQuery) && (pQuery->window.skey > pQuery->window.ekey)) || - (!QUERY_IS_ASC_QUERY(pQuery) && (pQuery->window.ekey > pQuery->window.skey))) { - dTrace("QInfo:%p no result in time range %" PRId64 "-%" PRId64 ", order %d", pQInfo, pQuery->window.skey, - pQuery->window.ekey, pQuery->order.order); - - sem_post(&pQInfo->dataReady); - return TSDB_CODE_SUCCESS; - } - - pQuery->status = 0; - pQuery->rec = (SResultRec){0}; - - changeExecuteScanOrder(pQuery, true); - SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv; - - /* - * since we employ the output control mechanism in main loop. - * so, disable it during data block scan procedure. - */ - setScanLimitationByResultBuffer(pQuery); - - // save raw query range for applying to each subgroup - pQuery->lastKey = pQuery->window.skey; - - // create runtime environment - // SColumnModel *pTagSchemaInfo = pQInfo->pSidSet->pColumnModel; - - // get one queried meter - assert(0); - // SMeterObj *pMeter = getMeterObj(pQInfo->groupInfo, pQInfo->pSidSet->pTableIdList[0]->sid); - - pRuntimeEnv->pTSBuf = param; - pRuntimeEnv->cur.vnodeIndex = -1; - - // set the ts-comp file traverse order - if (param != NULL) { - int16_t order = (pQuery->order.order == pRuntimeEnv->pTSBuf->tsOrder) ? TSDB_ORDER_ASC : TSDB_ORDER_DESC; - tsBufSetTraverseOrder(pRuntimeEnv->pTSBuf, order); - } - - assert(0); - // int32_t ret = setupQueryRuntimeEnv(pMeter, pQuery, &pQInfo->runtimeEnv, pTagSchemaInfo, TSDB_ORDER_ASC, true); - // if (ret != TSDB_CODE_SUCCESS) { - // return ret; - // } - - // createTableGroup(pQInfo->pSidSet); - - int32_t size = getInitialPageNum(pQInfo); - int32_t ret = createDiskbasedResultBuffer(&pRuntimeEnv->pResultBuf, size, pQuery->rowSize); - if (ret != TSDB_CODE_SUCCESS) { - return ret; - } - - if (pQuery->intervalTime == 0) { - int16_t type = TSDB_DATA_TYPE_NULL; - - if (isGroupbyNormalCol(pQuery->pGroupbyExpr)) { // group by columns not tags; - type = getGroupbyColumnType(pQuery, pQuery->pGroupbyExpr); - } else { - type = TSDB_DATA_TYPE_INT; // group id - } - - initWindowResInfo(&pRuntimeEnv->windowResInfo, pRuntimeEnv, 512, 4096, type); - } - - pRuntimeEnv->numOfRowsPerPage = getNumOfRowsInResultPage(pQuery, true); - - STsdbQueryCond cond = { - .twindow = (STimeWindow) {.skey = pQuery->window.skey, .ekey = pQuery->window.ekey}, - .order = pQuery->order.order, - .colList = pQuery->colList, - }; - - // for(int32_t i = 0; i < pQInfo->pSidSet->numOfTables; ++i) { - // SMeterObj *p1 = getMeterObj(pQInfo->groupInfo, pQInfo->pSidSet->pTableIdList[i]->sid); - // taosArrayPush(sa, &p1); - // } - - SArray *cols = taosArrayInit(pQuery->numOfCols, sizeof(pQuery->colList[0])); - for (int32_t i = 0; i < pQuery->numOfCols; ++i) { - taosArrayPush(cols, &pQuery->colList[i]); - } - - pRuntimeEnv->pQueryHandle = tsdbQueryTables(NULL, &cond, &pQInfo->groupInfo, cols); - - // metric query do not invoke interpolation, it will be done at the second-stage merge - if (!isPointInterpoQuery(pQuery)) { - pQuery->interpoType = TSDB_INTERPO_NONE; - } - - TSKEY revisedStime = taosGetIntervalStartTimestamp(pQuery->window.skey, pQuery->intervalTime, pQuery->slidingTimeUnit, - pQuery->precision); - taosInitInterpoInfo(&pRuntimeEnv->interpoInfo, pQuery->order.order, revisedStime, 0, 0); - pRuntimeEnv->stableQuery = true; - - return TSDB_CODE_SUCCESS; -} +//int32_t UNUSED_FUNC vnodeSTableQueryPrepare(SQInfo *pQInfo, SQuery *pQuery, void *param) { +// if ((QUERY_IS_ASC_QUERY(pQuery) && (pQuery->window.skey > pQuery->window.ekey)) || +// (!QUERY_IS_ASC_QUERY(pQuery) && (pQuery->window.ekey > pQuery->window.skey))) { +// dTrace("QInfo:%p no result in time range %" PRId64 "-%" PRId64 ", order %d", pQInfo, pQuery->window.skey, +// pQuery->window.ekey, pQuery->order.order); +// +// sem_post(&pQInfo->dataReady); +// return TSDB_CODE_SUCCESS; +// } +// +// pQuery->status = 0; +// pQuery->rec = (SResultRec){0}; +// +// changeExecuteScanOrder(pQuery, true); +// SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv; +// +// /* +// * since we employ the output control mechanism in main loop. +// * so, disable it during data block scan procedure. +// */ +// setScanLimitationByResultBuffer(pQuery); +// +// // save raw query range for applying to each subgroup +// pQuery->lastKey = pQuery->window.skey; +// +// // create runtime environment +// // SColumnModel *pTagSchemaInfo = pQInfo->pSidSet->pColumnModel; +// +// // get one queried meter +// assert(0); +// // SMeterObj *pMeter = getMeterObj(pQInfo->groupInfo, pQInfo->pSidSet->pTableIdList[0]->sid); +// +// pRuntimeEnv->pTSBuf = param; +// pRuntimeEnv->cur.vnodeIndex = -1; +// +// // set the ts-comp file traverse order +// if (param != NULL) { +// int16_t order = (pQuery->order.order == pRuntimeEnv->pTSBuf->tsOrder) ? TSDB_ORDER_ASC : TSDB_ORDER_DESC; +// tsBufSetTraverseOrder(pRuntimeEnv->pTSBuf, order); +// } +// +// assert(0); +// // int32_t ret = setupQueryRuntimeEnv(pMeter, pQuery, &pQInfo->runtimeEnv, pTagSchemaInfo, TSDB_ORDER_ASC, true); +// // if (ret != TSDB_CODE_SUCCESS) { +// // return ret; +// // } +// +// // createTableGroup(pQInfo->pSidSet); +// +// int32_t size = getInitialPageNum(pQInfo); +// int32_t ret = createDiskbasedResultBuffer(&pRuntimeEnv->pResultBuf, size, pQuery->rowSize); +// if (ret != TSDB_CODE_SUCCESS) { +// return ret; +// } +// +// if (pQuery->intervalTime == 0) { +// int16_t type = TSDB_DATA_TYPE_NULL; +// +// if (isGroupbyNormalCol(pQuery->pGroupbyExpr)) { // group by columns not tags; +// type = getGroupbyColumnType(pQuery, pQuery->pGroupbyExpr); +// } else { +// type = TSDB_DATA_TYPE_INT; // group id +// } +// +// initWindowResInfo(&pRuntimeEnv->windowResInfo, pRuntimeEnv, 512, 4096, type); +// } +// +// pRuntimeEnv->numOfRowsPerPage = getNumOfRowsInResultPage(pQuery, true); +// +// STsdbQueryCond cond = { +// .twindow = (STimeWindow) {.skey = pQuery->window.skey, .ekey = pQuery->window.ekey}, +// .order = pQuery->order.order, +// .colList = pQuery->colList, +// +// }; +// +// pRuntimeEnv->pQueryHandle = tsdbQueryTables(NULL, &cond, &pQInfo->groupInfo); +// +// // metric query do not invoke interpolation, it will be done at the second-stage merge +// if (!isPointInterpoQuery(pQuery)) { +// pQuery->interpoType = TSDB_INTERPO_NONE; +// } +// +// TSKEY revisedStime = taosGetIntervalStartTimestamp(pQuery->window.skey, pQuery->intervalTime, pQuery->slidingTimeUnit, +// pQuery->precision); +// taosInitInterpoInfo(&pRuntimeEnv->interpoInfo, pQuery->order.order, revisedStime, 0, 0); +// pRuntimeEnv->stableQuery = true; +// +// return TSDB_CODE_SUCCESS; +//} /** * decrease the refcount for each table involved in this query @@ -3440,11 +3431,11 @@ static void doSingleMeterSupplementScan(SQueryRuntimeEnv *pRuntimeEnv) { disableFunctForTableSuppleScan(pRuntimeEnv, pQuery->order.order); queryStatusSave(pRuntimeEnv, &qStatus); - STimeWindow w = {.skey = pQuery->window.skey, .ekey = pQuery->window.ekey}; +// STimeWindow w = {.skey = pQuery->window.skey, .ekey = pQuery->window.ekey}; // reverse scan from current position - tsdbpos_t current = tsdbDataBlockTell(pRuntimeEnv->pQueryHandle); - tsdbResetQuery(pRuntimeEnv->pQueryHandle, &w, current, pQuery->order.order); +// tsdbpos_t current = tsdbDataBlockTell(pRuntimeEnv->pQueryHandle); +// tsdbResetQuery(pRuntimeEnv->pQueryHandle, &w, current, pQuery->order.order); doScanAllDataBlocks(pRuntimeEnv); @@ -3535,26 +3526,17 @@ void scanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) { break; } - // set the correct start position, and load the corresponding block in buffer for next round scan all data blocks. -// /*int32_t ret =*/ tsdbDataBlockSeek(pRuntimeEnv->pQueryHandle, pos); - STsdbQueryCond cond = { - .twindow = {pQuery->window.skey, pQuery->lastKey}, - .order = pQuery->order.order, - .colList = pQuery->colList, + .twindow = {pQuery->window.skey, pQuery->lastKey}, + .order = pQuery->order.order, + .colList = pQuery->colList, + .numOfCols = pQuery->numOfCols, }; - SArray *cols = taosArrayInit(pQuery->numOfCols, sizeof(pQuery->colList[0])); - for (int32_t i = 0; i < pQuery->numOfCols; ++i) { - taosArrayPush(cols, &pQuery->colList[i]); - } - if (pRuntimeEnv->pSecQueryHandle != NULL) { - pRuntimeEnv->pSecQueryHandle = tsdbQueryTables(pQInfo->tsdb, &cond, &pQInfo->groupInfo, cols); + pRuntimeEnv->pSecQueryHandle = tsdbQueryTables(pQInfo->tsdb, &cond, &pQInfo->groupInfo); } - taosArrayDestroy(cols); - status = pQuery->status; pRuntimeEnv->windowResInfo.curIndex = activeSlot; @@ -3573,7 +3555,7 @@ void scanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) { pQuery->window.skey = skey; pQuery->window.ekey = pQuery->lastKey - step; - /*tsdbpos_t current =*/ tsdbDataBlockTell(pRuntimeEnv->pQueryHandle); +// /*tsdbpos_t current =*/ tsdbDataBlockTell(pRuntimeEnv->pQueryHandle); doSingleMeterSupplementScan(pRuntimeEnv); @@ -4176,18 +4158,13 @@ int32_t doInitQInfo(SQInfo *pQInfo, void *param, void* tsdb, bool isSTableQuery) pQuery->lastKey = pQuery->window.skey; STsdbQueryCond cond = { - .twindow = pQuery->window, - .order = pQuery->order.order, - .colList = pQuery->colList, + .twindow = pQuery->window, + .order = pQuery->order.order, + .colList = pQuery->colList, + .numOfCols = pQuery->numOfCols, }; - - SArray *cols = taosArrayInit(pQuery->numOfCols, sizeof(pQuery->colList[0])); - for (int32_t i = 0; i < pQuery->numOfCols; ++i) { - taosArrayPush(cols, &pQuery->colList[i]); - } - pRuntimeEnv->pQueryHandle = tsdbQueryTables(tsdb, &cond, &pQInfo->groupInfo, cols); - taosArrayDestroy(cols); + pRuntimeEnv->pQueryHandle = tsdbQueryTables(tsdb, &cond, &pQInfo->groupInfo); pQInfo->tsdb = tsdb; pRuntimeEnv->pQuery = pQuery; @@ -4405,25 +4382,19 @@ static bool multiTableMultioutputHelper(SQInfo *pQInfo, int32_t index) { .twindow = {pInfo->pTableQInfo->lastKey, pInfo->pTableQInfo->win.ekey}, .order = pQuery->order.order, .colList = pQuery->colList, + .numOfCols = pQuery->numOfCols, }; - SArray *cols = taosArrayInit(pQuery->numOfCols, sizeof(pQuery->colList[0])); - for (int32_t i = 0; i < pQuery->numOfCols; ++i) { - taosArrayPush(cols, &pQuery->colList[i]); - } SArray* g1 = taosArrayInit(1, POINTER_BYTES); - STableGroupInfo gp = {.numOfTables = 1, .pGroupList = g1}; - SArray* tx = taosArrayInit(1, sizeof(SPair)); - taosArrayPush(tx, p); + taosArrayPush(tx, p); taosArrayPush(g1, &tx); + STableGroupInfo gp = {.numOfTables = 1, .pGroupList = g1}; + // include only current table - pRuntimeEnv->pQueryHandle = tsdbQueryTables(pQInfo->tsdb, &cond, &gp, cols); - -// vnodeUpdateQueryColumnIndex(pQuery, pRuntimeEnv->pMeterObj); -// vnodeUpdateFilterColumnIndex(pQuery); + pRuntimeEnv->pQueryHandle = tsdbQueryTables(pQInfo->tsdb, &cond, &gp); if (pRuntimeEnv->pTSBuf != NULL) { if (pRuntimeEnv->cur.vnodeIndex == -1) { @@ -4587,13 +4558,11 @@ static void sequentialTableProcess(SQInfo *pQInfo) { assert(taosArrayGetSize(group) == pQInfo->groupInfo.numOfTables && 1 == taosArrayGetSize(pQInfo->groupInfo.pGroupList)); while (pQInfo->tableIndex < pQInfo->groupInfo.numOfTables) { - int32_t k = pQInfo->tableIndex; - if (isQueryKilled(pQInfo)) { return; } - SPair *p = taosArrayGet(group, k); + SPair *p = taosArrayGet(group, pQInfo->tableIndex); STableDataInfo* pInfo = p->sec; TSKEY skey = pInfo->pTableQInfo->lastKey; @@ -4601,7 +4570,7 @@ static void sequentialTableProcess(SQInfo *pQInfo) { pQuery->window.skey = skey; } - if (!multiTableMultioutputHelper(pQInfo, k)) { + if (!multiTableMultioutputHelper(pQInfo, pQInfo->tableIndex)) { pQInfo->tableIndex++; continue; } @@ -6046,7 +6015,7 @@ int32_t qCreateQueryInfo(void* tsdb, SQueryTableMsg *pQueryMsg, qinfo_t *pQInfo) STableId* id = taosArrayGet(pTableIdList, 0); id->uid = -1; //todo fix me - /*int32_t ret =*/ tsdbQueryTags(tsdb, id->uid, tagCond, pQueryMsg->tagCondLen, groupInfo, pGroupColIndex, pQueryMsg->numOfGroupCols); + /*int32_t ret =*/ tsdbQueryByTagsCond(tsdb, id->uid, tagCond, pQueryMsg->tagCondLen, groupInfo, pGroupColIndex, pQueryMsg->numOfGroupCols); if (groupInfo->numOfTables == 0) { // no qualified tables no need to do query code = TSDB_CODE_SUCCESS; goto _query_over; diff --git a/src/tsdb/src/tsdbRead.c b/src/tsdb/src/tsdbRead.c index 600ed2ba8d..b2177a4cbc 100644 --- a/src/tsdb/src/tsdbRead.c +++ b/src/tsdb/src/tsdbRead.c @@ -134,7 +134,7 @@ static void tsdbInitCompBlockLoadInfo(SLoadCompBlockInfo* pCompBlockLoadInfo) { pCompBlockLoadInfo->fileListIndex = -1; } -tsdb_query_handle_t* tsdbQueryTables(tsdb_repo_t* tsdb, STsdbQueryCond* pCond, STableGroupInfo* groupList, SArray* pColumnInfo) { +tsdb_query_handle_t* tsdbQueryTables(tsdb_repo_t* tsdb, STsdbQueryCond* pCond, STableGroupInfo* groupList) { // todo 1. filter not exist table // todo 2. add the reference count for each table that is involved in query @@ -147,7 +147,7 @@ tsdb_query_handle_t* tsdbQueryTables(tsdb_repo_t* tsdb, STsdbQueryCond* pCond, S pQueryHandle->cur.fid = -1; size_t sizeOfGroup = taosArrayGetSize(groupList->pGroupList); - assert(sizeOfGroup >= 1); + assert(sizeOfGroup >= 1 && pCond != NULL && pCond->numOfCols > 0); pQueryHandle->pTableCheckInfo = taosArrayInit(groupList->numOfTables, sizeof(STableCheckInfo)); @@ -181,16 +181,15 @@ tsdb_query_handle_t* tsdbQueryTables(tsdb_repo_t* tsdb, STsdbQueryCond* pCond, S pQueryHandle->activeIndex = 0; // allocate buffer in order to load data blocks from file - int32_t numOfCols = taosArrayGetSize(pColumnInfo); + int32_t numOfCols = pCond->numOfCols; size_t bufferCapacity = 4096; pQueryHandle->pColumns = taosArrayInit(numOfCols, sizeof(SColumnInfoData)); - for (int32_t i = 0; i < numOfCols; ++i) { - SColumnInfoData* pCol = taosArrayGet(pColumnInfo, i); + for (int32_t i = 0; i < pCond->numOfCols; ++i) { SColumnInfoData pDest = {{0}, 0}; - pDest.pData = calloc(1, EXTRA_BYTES + bufferCapacity * pCol->info.bytes); - pDest.info = pCol->info; + pDest.info = pCond->colList[i].info; + pDest.pData = calloc(1, EXTRA_BYTES + bufferCapacity * pCond->colList[i].info.bytes); taosArrayPush(pQueryHandle->pColumns, &pDest); } @@ -1114,10 +1113,6 @@ int32_t tsdbResetQuery(tsdb_query_handle_t* pQueryHandle, STimeWindow* window, t return 0; } -int32_t tsdbDataBlockSeek(tsdb_query_handle_t* pQueryHandle, tsdbpos_t pos) { return 0; } - -tsdbpos_t tsdbDataBlockTell(tsdb_query_handle_t* pQueryHandle) { return NULL; } - SArray* tsdbRetrieveDataRow(tsdb_query_handle_t* pQueryHandle, SArray* pIdList, SQueryRowCond* pCond) { return NULL; } tsdb_query_handle_t* tsdbQueryFromTagConds(STsdbQueryCond* pCond, int16_t stableId, const char* pTagFilterStr) { @@ -1263,12 +1258,14 @@ int32_t tableGroupComparFn(const void *p1, const void *p2, const void *param) { SColIndex* pColIndex = &pTableGroupSupp->pCols[i]; int32_t colIndex = pColIndex->colIndex; + assert(colIndex >= 0 && colIndex < schemaNCols(pTableGroupSupp->pTagSchema)); + char * f1 = NULL; char * f2 = NULL; int32_t type = 0; int32_t bytes = 0; - if (colIndex == -1) { // table name, todo fix me + if (colIndex == -1) { // todo fix me, table name // f1 = s1->tags; // f2 = s2->tags; type = TSDB_DATA_TYPE_BINARY; @@ -1435,7 +1432,7 @@ static int32_t doQueryTableList(STable* pSTable, SArray* pRes, tExprNode* pExpr) return TSDB_CODE_SUCCESS; } -int32_t tsdbQueryTags(tsdb_repo_t* tsdb, int64_t uid, const char* pTagCond, size_t len, STableGroupInfo* pGroupInfo, +int32_t tsdbQueryByTagsCond(tsdb_repo_t* tsdb, int64_t uid, const char* pTagCond, size_t len, STableGroupInfo* pGroupInfo, SColIndex* pColIndex, int32_t numOfCols) { STable* pSTable = tsdbGetTableByUid(tsdbGetMeta(tsdb), uid); -- GitLab