diff --git a/src/query/src/queryExecutor.c b/src/query/src/queryExecutor.c index 04f3a711fdae8c312b5fe4b143d0abcbb98364ae..f234e2cf4639061d5fb63081d147419aef4487c2 100644 --- a/src/query/src/queryExecutor.c +++ b/src/query/src/queryExecutor.c @@ -96,18 +96,20 @@ enum { static int32_t mergeIntoGroupResultImpl(SQInfo *pQInfo, SArray* group); static void setWindowResOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult *pResult); -static void resetMergeResultBuf(SQuery *pQuery, SQLFunctionCtx *pCtx, SResultInfo *pResultInfo); -static int32_t flushFromResultBuf(SQInfo *pQInfo); -static bool functionNeedToExecute(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx *pCtx, int32_t functionId); -static void getNextTimeWindow(SQuery *pQuery, STimeWindow *pTimeWindow); +static void resetMergeResultBuf(SQuery *pQuery, SQLFunctionCtx *pCtx, SResultInfo *pResultInfo); +static bool functionNeedToExecute(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx *pCtx, int32_t functionId); +static void getNextTimeWindow(SQuery *pQuery, STimeWindow *pTimeWindow); static void setExecParams(SQuery *pQuery, SQLFunctionCtx *pCtx, void *inputData, TSKEY *tsCol, int32_t size, int32_t functionId, SDataStatis *pStatis, bool hasNull, void *param, int32_t scanFlag); static void initCtxOutputBuf(SQueryRuntimeEnv *pRuntimeEnv); static void destroyMeterQueryInfo(STableQueryInfo *pTableQueryInfo, int32_t numOfCols); +static void resetCtxOutputBuf(SQueryRuntimeEnv *pRuntimeEnv); +static bool hasMainOutput(SQuery *pQuery); +static void createTableDataInfo(SQInfo* pQInfo); + static int32_t setAdditionalInfo(SQInfo *pQInfo, STable* pTable, STableQueryInfo *pTableQueryInfo); -static void resetCtxOutputBuf(SQueryRuntimeEnv *pRuntimeEnv); -static bool hasMainOutput(SQuery *pQuery); +static int32_t flushFromResultBuf(SQInfo *pQInfo); bool getNeighborPoints(SQInfo *pQInfo, void *pMeterObj, SPointInterpoSupporter *pPointInterpSupporter) { #if 0 @@ -2662,22 +2664,19 @@ static void updatelastkey(SQuery *pQuery, STableQueryInfo *pTableQInfo) { pTable * set tag value in SQLFunctionCtx * e.g.,tag information into input buffer */ -static void doSetTagValueInParam(void* tsdb, STableId id, int32_t tagColIdx, tVariant *param) { - assert(tagColIdx >= 0); - +static void doSetTagValueInParam(void* tsdb, STableId id, int32_t tagColId, tVariant *param) { tVariantDestroy(param); char* val = NULL; int16_t bytes = 0; - int16_t type = 0; + int16_t type = 0; - tsdbGetTableTagVal(tsdb, id, tagColIdx, &type, &bytes, &val); + tsdbGetTableTagVal(tsdb, id, tagColId, &type, &bytes, &val); tVariantCreateFromBinary(param, val, bytes, type); } -void vnodeSetTagValueInParam(STableGroupInfo *groupList, SQueryRuntimeEnv *pRuntimeEnv, STableId id, void* tsdb) { +void setTagVal(SQueryRuntimeEnv *pRuntimeEnv, STableId id, void* tsdb) { SQuery * pQuery = pRuntimeEnv->pQuery; -// SColumnModel *pTagSchema = NULL;//pSidSet->pColumnModel; SSqlFuncExprMsg *pFuncMsg = &pQuery->pSelectExpr[0].pBase; if (pQuery->numOfOutputCols == 1 && pFuncMsg->functionId == TSDB_FUNC_TS_COMP) { @@ -2686,20 +2685,23 @@ void vnodeSetTagValueInParam(STableGroupInfo *groupList, SQueryRuntimeEnv *pRunt } else { // set tag value, by which the results are aggregated. for (int32_t idx = 0; idx < pQuery->numOfOutputCols; ++idx) { - SColIndex *pColEx = &pQuery->pSelectExpr[idx].pBase.colInfo; + SColIndex *pCol = &pQuery->pSelectExpr[idx].pBase.colInfo; // ts_comp column required the tag value for join filter - if (!TSDB_COL_IS_TAG(pColEx->flag)) { + if (!TSDB_COL_IS_TAG(pCol->flag)) { continue; } - - doSetTagValueInParam(tsdb, id, pColEx->colIndex, &pRuntimeEnv->pCtx[idx].tag); + + + // todo use tag column index to optimize performance + doSetTagValueInParam(tsdb, id, pCol->colId, &pRuntimeEnv->pCtx[idx].tag); } // set the join tag for first column if (pFuncMsg->functionId == TSDB_FUNC_TS && pFuncMsg->colInfo.colIndex == PRIMARYKEY_TIMESTAMP_COL_INDEX && pRuntimeEnv->pTSBuf != NULL) { assert(pFuncMsg->numOfParams == 1); + assert(0); // to do fix me // doSetTagValueInParam(pTagSchema, pFuncMsg->arg->argValue.i64, pMeterSidInfo, &pRuntimeEnv->pCtx[0].tag); } } @@ -2902,17 +2904,13 @@ int32_t tableResultComparFn(const void *pLeft, const void *pRight, void *param) } int32_t mergeIntoGroupResult(SQInfo *pQInfo) { - SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv; - SQuery * pQuery = pRuntimeEnv->pQuery; - -// int64_t st = taosGetTimestampMs(); + int64_t st = taosGetTimestampMs(); int32_t ret = TSDB_CODE_SUCCESS; -// while (pQInfo->subgroupIdx < pQInfo->pSidSet->numOfSubSet) { -// int32_t start = 0;//pQInfo->pSidSet->starterPos[pQInfo->subgroupIdx]; -// int32_t end = pQInfo->groupInfo.numOfTables - 1;//pQInfo->pSidSet->starterPos[pQInfo->subgroupIdx + 1]; - - SArray* group = taosArrayGetP(pQInfo->groupInfo.pGroupList, 0); + int32_t numOfGroups = taosArrayGetSize(pQInfo->groupInfo.pGroupList); + + while (pQInfo->subgroupIdx < numOfGroups) { + SArray* group = taosArrayGetP(pQInfo->groupInfo.pGroupList, pQInfo->subgroupIdx); ret = mergeIntoGroupResultImpl(pQInfo, group); if (ret < 0) { // not enough disk space to save the data into disk return -1; @@ -2921,16 +2919,16 @@ int32_t mergeIntoGroupResult(SQInfo *pQInfo) { pQInfo->subgroupIdx += 1; // this group generates at least one result, return results -// if (ret > 0) { -// break; -// } + if (ret > 0) { + break; + } assert(pQInfo->numOfGroupResultPages == 0); - dTrace("QInfo:%p no result in group %d, continue", GET_QINFO_ADDR(pQuery), pQInfo->subgroupIdx - 1); -// } + dTrace("QInfo:%p no result in group %d, continue", pQInfo, pQInfo->subgroupIdx - 1); + } -// dTrace("QInfo:%p merge res data into group, index:%d, total group:%d, elapsed time:%lldms", -// GET_QINFO_ADDR(pQuery), pQInfo->subgroupIdx - 1, pQInfo->pSidSet->numOfSubSet, taosGetTimestampMs() - st); + dTrace("QInfo:%p merge res data into group, index:%d, total group:%d, elapsed time:%lldms", + pQInfo, pQInfo->subgroupIdx - 1, numOfGroups, taosGetTimestampMs() - st); return TSDB_CODE_SUCCESS; } @@ -3025,11 +3023,11 @@ int32_t mergeIntoGroupResultImpl(SQInfo *pQInfo, SArray* pGroup) { int32_t numOfTables = 0; for (int32_t i = 0; i < size; ++i) { SPair* p = taosArrayGet(pGroup, i); - STableQueryInfo* pInfo = p->sec; + STableDataInfo* pInfo = p->sec; - SIDList list = getDataBufPagesIdList(pRuntimeEnv->pResultBuf, pInfo->tid); - if (list.size > 0 && pInfo->windowResInfo.size > 0) { -// pTableList[numOfTables] = &pTableDataInfo[i]; + SIDList list = getDataBufPagesIdList(pRuntimeEnv->pResultBuf, pInfo->pTableQInfo->tid); + if (list.size > 0 && pInfo->pTableQInfo->windowResInfo.size > 0) { + pTableList[numOfTables] = pInfo; numOfTables += 1; } } @@ -3151,8 +3149,7 @@ int32_t flushFromResultBuf(SQInfo *pQInfo) { // the base value for group result, since the maximum number of table for each vnode will not exceed 100,000. int32_t pageId = -1; - assert(0); - int32_t remain = 0;//pQuery->sdata[0]->num; + int32_t remain = pQuery->sdata[0]->num; int32_t offset = 0; while (remain > 0) { @@ -3188,7 +3185,7 @@ void resetMergeResultBuf(SQuery *pQuery, SQLFunctionCtx *pCtx, SResultInfo *pRes pCtx[k].startOffset = 0; pCtx[k].resultInfo = &pResultInfo[k]; - pQuery->sdata[k] = 0; + pQuery->sdata[k]->num = 0; } } @@ -3516,7 +3513,6 @@ void scanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) { setQueryStatus(pQuery, QUERY_NOT_COMPLETED); // store the start query position -// void *pos = tsdbDataBlockTell(pRuntimeEnv->pQueryHandle); SQInfo* pQInfo = (SQInfo*) GET_QINFO_ADDR(pRuntimeEnv); int64_t skey = pQuery->lastKey; @@ -3755,7 +3751,7 @@ int32_t setAdditionalInfo(SQInfo *pQInfo, STable* pTable, STableQueryInfo *pTabl SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv; assert(pTableQueryInfo->lastKey > 0); - vnodeSetTagValueInParam(&pQInfo->groupInfo, pRuntimeEnv, pTable->tableId, pQInfo->tsdb); + setTagVal(pRuntimeEnv, pTable->tableId, pQInfo->tsdb); // both the master and supplement scan needs to set the correct ts comp start position if (pRuntimeEnv->pTSBuf != NULL) { @@ -4316,7 +4312,7 @@ static UNUSED_FUNC bool doCheckWithPrevQueryRange(SQuery *pQuery, TSKEY nextKey) return true; } -static UNUSED_FUNC void enableExecutionForNextTable(SQueryRuntimeEnv *pRuntimeEnv) { +static void enableExecutionForNextTable(SQueryRuntimeEnv *pRuntimeEnv) { SQuery *pQuery = pRuntimeEnv->pQuery; for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) { @@ -4389,41 +4385,46 @@ static int64_t queryOnDataBlocks(SQInfo *pQInfo) { return et - st; } -static bool multimeterMultioutputHelper(SQInfo *pQInfo, bool *dataInDisk, bool *dataInCache, int32_t index, - int32_t start) { - // STableIdInfo **pMeterSidExtInfo = pQInfo->pMeterSidExtInfo; -// SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv; -// SQuery * pQuery = pRuntimeEnv->pQuery; -#if 0 +static bool multiTableMultioutputHelper(SQInfo *pQInfo, int32_t index) { + SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv; + SQuery *pQuery = pRuntimeEnv->pQuery; + setQueryStatus(pQuery, QUERY_NOT_COMPLETED); + SArray* group = taosArrayGetP(pQInfo->groupInfo.pGroupList, 0); + SPair* p = taosArrayGet(group, index); - SMeterObj *pMeterObj = getMeterObj(pSupporter->pMetersHashTable, pMeterSidExtInfo[index]->sid); - if (pMeterObj == NULL) { - dError("QInfo:%p do not find required meter id: %d, all meterObjs id is:", pQInfo, pMeterSidExtInfo[index]->sid); - return false; - } + STable* pTable = p->first; + STableDataInfo* pInfo = p->sec; - vnodeSetTagValueInParam(pSupporter->pSidSet, pRuntimeEnv, pMeterSidExtInfo[index]); + setTagVal(pRuntimeEnv, pTable->tableId, pQInfo->tsdb); - dTrace("QInfo:%p query on (%d): vid:%d sid:%d meterId:%s, qrange:%" PRId64 "-%" PRId64, pQInfo, index - start, - pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pQuery->skey, pQuery->ekey); + dTrace("QInfo:%p query on (%d): uid:%" PRIu64 ", tid:%d, qrange:%" PRId64 "-%" PRId64, pQInfo, index, + pTable->tableId.uid, pInfo->pTableQInfo->lastKey, pInfo->pTableQInfo->win.ekey); - pQInfo->pObj = pMeterObj; - pQuery->lastKey = pQuery->skey; - pRuntimeEnv->pMeterObj = pMeterObj; - - vnodeUpdateQueryColumnIndex(pQuery, pRuntimeEnv->pMeterObj); - vnodeUpdateFilterColumnIndex(pQuery); - - vnodeCheckIfDataExists(pRuntimeEnv, pMeterObj, dataInDisk, dataInCache); + STsdbQueryCond cond = { + .twindow = {pInfo->pTableQInfo->lastKey, pInfo->pTableQInfo->win.ekey}, + .order = pQuery->order.order, + .colList = pQuery->colList, + }; - // data in file or cache is not qualified for the query. abort - if (!(dataInCache || dataInDisk)) { - dTrace("QInfo:%p vid:%d sid:%d meterId:%s, qrange:%" PRId64 "-%" PRId64 ", nores, %p", pQInfo, pMeterObj->vnode, - pMeterObj->sid, pMeterObj->meterId, pQuery->skey, pQuery->ekey, pQuery); - return false; + SArray *cols = taosArrayInit(pQuery->numOfCols, sizeof(pQuery->colList[0])); + for (int32_t i = 0; i < pQuery->numOfCols; ++i) { + taosArrayPush(cols, &pQuery->colList[i]); } + SArray* g1 = taosArrayInit(1, POINTER_BYTES); + STableGroupInfo gp = {.numOfTables = 1, .pGroupList = g1}; + + SArray* tx = taosArrayInit(1, sizeof(SPair)); + taosArrayPush(tx, p); + + taosArrayPush(g1, &tx); + // include only current table + pRuntimeEnv->pQueryHandle = tsdbQueryTables(pQInfo->tsdb, &cond, &gp, cols); + +// vnodeUpdateQueryColumnIndex(pQuery, pRuntimeEnv->pMeterObj); +// vnodeUpdateFilterColumnIndex(pQuery); + if (pRuntimeEnv->pTSBuf != NULL) { if (pRuntimeEnv->cur.vnodeIndex == -1) { int64_t tag = pRuntimeEnv->pCtx[0].tag.i64Key; @@ -4438,9 +4439,7 @@ static bool multimeterMultioutputHelper(SQInfo *pQInfo, bool *dataInDisk, bool * } } -#endif - -// initCtxOutputBuf(pRuntimeEnv); + initCtxOutputBuf(pRuntimeEnv); return true; } @@ -4448,9 +4447,7 @@ static UNUSED_FUNC int64_t doCheckMetersInGroup(SQInfo *pQInfo, int32_t index, i SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv; SQuery * pQuery = pRuntimeEnv->pQuery; - bool dataInDisk = true; - bool dataInCache = true; - if (!multimeterMultioutputHelper(pQInfo, &dataInDisk, &dataInCache, index, start)) { + if (!multiTableMultioutputHelper(pQInfo, index)) { return 0; } @@ -4494,61 +4491,47 @@ static UNUSED_FUNC int64_t doCheckMetersInGroup(SQInfo *pQInfo, int32_t index, i * * @param pQInfo */ -static void vnodeSTableSeqProcessor(SQInfo *pQInfo) { +static void sequentialTableProcess(SQInfo *pQInfo) { SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv; SQuery* pQuery = pRuntimeEnv->pQuery; setQueryStatus(pQuery, QUERY_COMPLETED); -#if 0 -// STableGroupInfo *pTableIdList = pSupporter->pSidSet; - - int32_t vid = getMeterObj(pSupporter->pMetersHashTable, pMeterSidExtInfo[0]->sid)->vnode; + size_t numOfGroups = taosArrayGetSize(pQInfo->groupInfo.pGroupList); if (isPointInterpoQuery(pQuery)) { resetCtxOutputBuf(pRuntimeEnv); - assert(pQuery->limit.offset == 0 && pQuery->limit.limit != 0); - - while (pSupporter->subgroupIdx < pTableIdList->numOfSubSet) { - int32_t start = pTableIdList->starterPos[pSupporter->subgroupIdx]; - int32_t end = pTableIdList->starterPos[pSupporter->subgroupIdx + 1] - 1; - + +#if 0 + while (pQInfo->subgroupIdx < numOfGroups) { + + SArray* group = taosArrayGetP(pQInfo->groupInfo.pGroupList, pQInfo->subgroupIdx); + size_t numOfTable = taosArrayGetSize(group); + if (isFirstLastRowQuery(pQuery)) { dTrace("QInfo:%p last_row query on vid:%d, numOfGroups:%d, current group:%d", pQInfo, vid, pTableIdList->numOfSubSet, - pSupporter->subgroupIdx); + pQInfo->subgroupIdx); TSKEY key = -1; int32_t index = -1; // choose the last key for one group - pSupporter->meterIdx = start; + pQInfo->tableIndex = 0; - for (int32_t k = start; k <= end; ++k, pSupporter->meterIdx++) { + for (int32_t k = 0; k < numOfTable; ++k, pQInfo->tableIndex++) { if (isQueryKilled(pQInfo)) { - setQueryStatus(pQuery, QUERY_NO_DATA_TO_CHECK); return; } - - // get the last key of meters that belongs to this group - SMeterObj *pMeterObj = getMeterObj(pSupporter->pMetersHashTable, pMeterSidExtInfo[k]->sid); - if (pMeterObj != NULL) { - if (key < pMeterObj->lastKey) { - key = pMeterObj->lastKey; - index = k; - } - } } - pQuery->skey = key; - pQuery->ekey = key; - pSupporter->rawSKey = key; - pSupporter->rawEKey = key; + pQuery->window.skey = key; + pQuery->window.ekey = key; - int64_t num = doCheckMetersInGroup(pQInfo, index, start); - assert(num >= 0); +// int64_t num = doCheckMetersInGroup(pQInfo, index, start); +// assert(num >= 0); } else { dTrace("QInfo:%p interp query on vid:%d, numOfGroups:%d, current group:%d", pQInfo, vid, pTableIdList->numOfSubSet, - pSupporter->subgroupIdx); + pQInfo->subgroupIdx); for (int32_t k = start; k <= end; ++k) { if (isQueryKilled(pQInfo)) { @@ -4573,129 +4556,110 @@ static void vnodeSTableSeqProcessor(SQInfo *pQInfo) { break; } } +#endif + } else { - /* - * 1. super table projection query, 2. group-by on normal columns query, 3. ts-comp query - */ - assert(pSupporter->meterIdx >= 0); + createTableDataInfo(pQInfo); /* + * 1. super table projection query, 2. group-by on normal columns query, 3. ts-comp query * if the subgroup index is larger than 0, results generated by group by tbname,k is existed. * we need to return it to client in the first place. */ - if (pSupporter->subgroupIdx > 0) { + if (pQInfo->subgroupIdx > 0) { copyFromWindowResToSData(pQInfo, pRuntimeEnv->windowResInfo.pResult); - pQInfo->size += pQuery->size; + pQuery->rec.total += pQuery->rec.rows; - if (pQuery->size > 0) { + if (pQuery->rec.rows > 0) { return; } } - if (pSupporter->meterIdx >= pTableIdList->numOfTables) { + // all data have returned already + if (pQInfo->tableIndex >= pQInfo->groupInfo.numOfTables) { return; } resetCtxOutputBuf(pRuntimeEnv); resetTimeWindowInfo(pRuntimeEnv, &pRuntimeEnv->windowResInfo); - while (pSupporter->meterIdx < pSupporter->numOfTables) { - int32_t k = pSupporter->meterIdx; + SArray* group = taosArrayGetP(pQInfo->groupInfo.pGroupList, 0); + assert(taosArrayGetSize(group) == pQInfo->groupInfo.numOfTables && 1 == taosArrayGetSize(pQInfo->groupInfo.pGroupList)); + + while (pQInfo->tableIndex < pQInfo->groupInfo.numOfTables) { + int32_t k = pQInfo->tableIndex; if (isQueryKilled(pQInfo)) { - setQueryStatus(pQuery, QUERY_NO_DATA_TO_CHECK); return; } - TSKEY skey = pQInfo->pTableQuerySupporter->pMeterSidExtInfo[k]->key; + SPair *p = taosArrayGet(group, k); + STableDataInfo* pInfo = p->sec; + + TSKEY skey = pInfo->pTableQInfo->lastKey; if (skey > 0) { - pQuery->skey = skey; + pQuery->window.skey = skey; } - bool dataInDisk = true; - bool dataInCache = true; - if (!multimeterMultioutputHelper(pQInfo, &dataInDisk, &dataInCache, k, 0)) { - pQuery->skey = pSupporter->rawSKey; - pQuery->ekey = pSupporter->rawEKey; - - pSupporter->meterIdx++; + if (!multiTableMultioutputHelper(pQInfo, k)) { + pQInfo->tableIndex++; continue; } - -#if DEFAULT_IO_ENGINE == IO_ENGINE_MMAP - for (int32_t i = 0; i < pRuntimeEnv->numOfFiles; ++i) { - resetMMapWindow(&pRuntimeEnv->pVnodeFiles[i]); - } -#endif - SPointInterpoSupporter pointInterpSupporter = {0}; - assert(0); -// if (normalizedFirstQueryRange(dataInDisk, dataInCache, pSupporter, &pointInterpSupporter, NULL) == false) { -// pQuery->skey = pSupporter->rawSKey; -// pQuery->ekey = pSupporter->rawEKey; -// -// pSupporter->meterIdx++; -// continue; -// } +// SPointInterpoSupporter pointInterpSupporter = {0}; // TODO handle the limit problem if (pQuery->numOfFilterCols == 0 && pQuery->limit.offset > 0) { - forwardQueryStartPosition(pRuntimeEnv); +// forwardQueryStartPosition(pRuntimeEnv); - if (Q_STATUS_EQUAL(pQuery->over, QUERY_NO_DATA_TO_CHECK | QUERY_COMPLETED)) { - pQuery->skey = pSupporter->rawSKey; - pQuery->ekey = pSupporter->rawEKey; - - pSupporter->meterIdx++; + if (Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) { + pQInfo->tableIndex++; continue; } } scanAllDataBlocks(pRuntimeEnv); - pQuery->size = getNumOfResult(pRuntimeEnv); + pQuery->rec.rows = getNumOfResult(pRuntimeEnv); doSkipResults(pRuntimeEnv); // the limitation of output result is reached, set the query completed if (doRevisedResultsByLimit(pQInfo)) { - pSupporter->meterIdx = pSupporter->pSidSet->numOfTables; + pQInfo->tableIndex = pQInfo->groupInfo.numOfTables; break; } // enable execution for next table, when handling the projection query enableExecutionForNextTable(pRuntimeEnv); - if (Q_STATUS_EQUAL(pQuery->over, QUERY_NO_DATA_TO_CHECK | QUERY_COMPLETED)) { + if (Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) { /* * query range is identical in terms of all meters involved in query, * so we need to restore them at the *beginning* of query on each meter, * not the consecutive query on meter on which is aborted due to buffer limitation * to ensure that, we can reset the query range once query on a meter is completed. */ - pQuery->skey = pSupporter->rawSKey; - pQuery->ekey = pSupporter->rawEKey; - pSupporter->meterIdx++; - - pQInfo->pTableQuerySupporter->pMeterSidExtInfo[k]->key = pQuery->lastKey; + pQInfo->tableIndex++; + pInfo->pTableQInfo->lastKey = pQuery->lastKey; // if the buffer is full or group by each table, we need to jump out of the loop - if (Q_STATUS_EQUAL(pQuery->over, QUERY_RESBUF_FULL) || - isGroupbyEachTable(pQuery->pGroupbyExpr, pSupporter->pSidSet)) { + if (Q_STATUS_EQUAL(pQuery->status, QUERY_RESBUF_FULL) /*|| + isGroupbyEachTable(pQuery->pGroupbyExpr, pSupporter->pSidSet)*/) { break; } } else { // forward query range - pQuery->skey = pQuery->lastKey; + pQuery->window.skey = pQuery->lastKey; // all data in the result buffer are skipped due to the offset, continue to retrieve data from current meter - if (pQuery->size == 0) { - assert(!Q_STATUS_EQUAL(pQuery->over, QUERY_RESBUF_FULL)); + if (pQuery->rec.rows == 0) { + assert(!Q_STATUS_EQUAL(pQuery->status, QUERY_RESBUF_FULL)); continue; } else { - pQInfo->pTableQuerySupporter->pMeterSidExtInfo[k]->key = pQuery->lastKey; - // buffer is full, wait for the next round to retrieve data from current meter - assert(Q_STATUS_EQUAL(pQuery->over, QUERY_RESBUF_FULL)); - break; +// pQInfo->pTableQuerySupporter->pMeterSidExtInfo[k]->key = pQuery->lastKey; +// // buffer is full, wait for the next round to retrieve data from current meter +// assert(Q_STATUS_EQUAL(pQuery->over, QUERY_RESBUF_FULL)); +// break; } } } @@ -4734,20 +4698,16 @@ static void vnodeSTableSeqProcessor(SQInfo *pQInfo) { } } - pQInfo->pTableQuerySupporter->subgroupIdx = 0; - pQuery->size = 0; + pQInfo->subgroupIdx = 0; + pQuery->rec.rows = 0; copyFromWindowResToSData(pQInfo, pWindowResInfo->pResult); } - pQInfo->size += pQuery->size; - pQuery->pointsOffset = pQuery->pointsToRead; + pQuery->rec.total += pQuery->rec.rows; - dTrace( - "QInfo %p vid:%d, numOfTables:%d, index:%d, numOfGroups:%d, %d points returned, totalRead:%d totalReturn:%d," - "next skey:%" PRId64 ", offset:%" PRId64, - pQInfo, vid, pTableIdList->numOfTables, pSupporter->meterIdx, pTableIdList->numOfSubSet, pQuery->size, pQInfo->size, - pQInfo->pointsReturned, pQuery->skey, pQuery->limit.offset); -#endif + dTrace( "QInfo %p, numOfTables:%d, index:%d, numOfGroups:%d, %d points returned, total:%d totalReturn:%d," + " offset:%" PRId64, pQInfo, pQInfo->groupInfo.numOfTables, pQInfo->tableIndex, numOfGroups, + pQuery->rec.rows, pQuery->rec.total, pQuery->limit.offset); } static void createTableDataInfo(SQInfo* pQInfo) { @@ -4755,6 +4715,7 @@ static void createTableDataInfo(SQInfo* pQInfo) { // todo make sure the table are added the reference count to gauranteed that all involved tables are valid size_t numOfGroups = taosArrayGetSize(pQInfo->groupInfo.pGroupList); + int32_t index = 0; for (int32_t i = 0; i < numOfGroups; ++i) { // load all meter meta info SArray *group = *(SArray**) taosArrayGet(pQInfo->groupInfo.pGroupList, i); @@ -4762,12 +4723,19 @@ static void createTableDataInfo(SQInfo* pQInfo) { size_t s = taosArrayGetSize(group); for(int32_t j = 0; j < s; ++j) { SPair* p = (SPair*) taosArrayGet(group, j); + + // STableDataInfo has been created for each table + if (p->sec != NULL) { // todo refactor + return; + } + STableDataInfo* pInfo = calloc(1, sizeof(STableDataInfo)); setTableDataInfo(pInfo, index, i); pInfo->pTableQInfo = createTableQueryInfo(&pQInfo->runtimeEnv, ((STable*)(p->first))->tableId.tid, pQuery->window); p->sec = pInfo; + index += 1; } } @@ -5182,7 +5150,7 @@ static void stableQueryImpl(SQInfo* pQInfo) { assert((pQuery->checkBuffer == 1 && pQuery->intervalTime == 0) || isPointInterpoQuery(pQuery) || isGroupbyNormalCol(pQuery->pGroupbyExpr)); - vnodeSTableSeqProcessor(pQInfo); + sequentialTableProcess(pQInfo); } // record the total elapsed time diff --git a/src/tsdb/src/tsdbMeta.c b/src/tsdb/src/tsdbMeta.c index 8bbac11dc10dbaa33152a1f3e625523a51a0adf3..716f888153d8b942d44d47c4fce3fcb5568b28cb 100644 --- a/src/tsdb/src/tsdbMeta.c +++ b/src/tsdb/src/tsdbMeta.c @@ -225,18 +225,27 @@ STSchema * tsdbGetTableTagSchema(STsdbMeta *pMeta, STable *pTable) { } } -int32_t tsdbGetTableTagVal(tsdb_repo_t* repo, STableId id, int32_t col, int16_t* type, int16_t* bytes, char** val) { +int32_t tsdbGetTableTagVal(tsdb_repo_t* repo, STableId id, int32_t colId, int16_t* type, int16_t* bytes, char** val) { STsdbMeta* pMeta = tsdbGetMeta(repo); STable* pTable = tsdbGetTableByUid(pMeta, id.uid); STSchema* pSchema = tsdbGetTableTagSchema(pMeta, pTable); - STColumn* pCol = schemaColAt(pSchema, col); + + STColumn* pCol = NULL; + for(int32_t col = 0; col < schemaNCols(pSchema); ++col) { + STColumn* p = schemaColAt(pSchema, col); + if (p->colId == colId) { + pCol = p; + } + } + + assert(pCol != NULL); SDataRow row = (SDataRow)pTable->tagVal; char* d = dataRowAt(row, TD_DATA_ROW_HEAD_SIZE); *val = d; - *type = pCol->type; + *type = pCol->type; *bytes = pCol->bytes; return 0; diff --git a/src/tsdb/src/tsdbRead.c b/src/tsdb/src/tsdbRead.c index 59225a211ce157bdabc228c068b4b4b2d59af4ae..600ed2ba8d20a5d76f46745c2a35e07ce7f2f533 100644 --- a/src/tsdb/src/tsdbRead.c +++ b/src/tsdb/src/tsdbRead.c @@ -98,31 +98,23 @@ typedef struct SBlockOrderSupporter { typedef struct STsdbQueryHandle { STsdbRepo* pTsdb; SQueryFilePos cur; // current position - SQueryFilePos start; // the start position, used for secondary/third iteration SDataBlockLoadInfo dataBlockLoadInfo; /* record current block load information */ SLoadCompBlockInfo compBlockLoadInfo; /* record current compblock information in SQuery */ - int16_t numOfRowsPerPage; - uint16_t flag; // denotes reversed scan of data or not int16_t order; STimeWindow window; // the primary query time window that applies to all queries - int32_t blockBufferSize; SCompBlock* pBlock; int32_t numOfBlocks; SField** pFields; SArray* pColumns; // column list, SColumnInfoData array list bool locateStart; int32_t realNumOfRows; - bool loadDataAfterSeek; // load data after seek. SArray* pTableCheckInfo; int32_t activeIndex; - - bool checkFiles; // check file stage - int32_t tableIndex; - bool isFirstSlot; - void* qinfo; // query info handle, for debug purpose - + bool checkFiles; // check file stage + void* qinfo; // query info handle, for debug purpose + STableBlockInfo* pDataBlockInfo; SFileGroup* pFileGroup; @@ -152,8 +144,6 @@ tsdb_query_handle_t* tsdbQueryTables(tsdb_repo_t* tsdb, STsdbQueryCond* pCond, S pQueryHandle->pTsdb = tsdb; pQueryHandle->compIndex = calloc(10000, sizeof(SCompIdx)), - pQueryHandle->loadDataAfterSeek = false; - pQueryHandle->isFirstSlot = true; pQueryHandle->cur.fid = -1; size_t sizeOfGroup = taosArrayGetSize(groupList->pGroupList);