diff --git a/src/inc/tsdb.h b/src/inc/tsdb.h index bb27cc73900c9c8752ccadf65329545e4ae626dc..d82d9a6045d2b00d82ca48e9b00a1e87e271b245 100644 --- a/src/inc/tsdb.h +++ b/src/inc/tsdb.h @@ -262,7 +262,7 @@ bool tsdbNextDataBlock(TsdbQueryHandleT *pQueryHandle); */ bool tsdbNextDataBlockWithoutMerge(TsdbQueryHandleT *pQueryHandle); -SArray* tsdbGetExternalRow(TsdbQueryHandleT *pHandle, SMemRef* pMemRef, int16_t type); +SArray* tsdbGetExternalRow(TsdbQueryHandleT *pHandle, SDataBlockInfo* blockInfo); /** * Get current data block information diff --git a/src/query/inc/qExecutor.h b/src/query/inc/qExecutor.h index cf3345c0be6da5ddf9acde2473c30393e9669d59..2721e9e0932d7c0608a36f28b7791791479ee72a 100644 --- a/src/query/inc/qExecutor.h +++ b/src/query/inc/qExecutor.h @@ -345,29 +345,28 @@ typedef struct SQueryParam { typedef struct STableScanInfo { SQueryRuntimeEnv *pRuntimeEnv; - void *pQueryHandle; - int32_t numOfBlocks; - int32_t numOfSkipped; - int32_t numOfBlockStatis; - int64_t numOfRows; - - int32_t order; // scan order - int32_t times; // repeat counts - int32_t current; - - int32_t reverseTimes; // 0 by default - - SSDataBlock block; - - SQLFunctionCtx *pCtx; // next operator query context + void *pQueryHandle; + int32_t numOfBlocks; + int32_t numOfSkipped; + int32_t numOfBlockStatis; + int64_t numOfRows; + + int32_t order; // scan order + int32_t times; // repeat counts + int32_t current; + int32_t reverseTimes; // 0 by default + + SQLFunctionCtx *pCtx; // next operator query context SResultRowInfo *pResultRowInfo; int32_t *rowCellInfoOffset; SExprInfo *pExpr; - + SSDataBlock block; + bool loadExternalRows; // load external rows (prev & next rows) + bool externalLoaded; // external rows loaded int32_t numOfOutput; int64_t elapsedTime; - int32_t tableIndex; + int32_t tableIndex; } STableScanInfo; typedef struct STagScanInfo { diff --git a/src/query/src/qAggMain.c b/src/query/src/qAggMain.c index 44108e895cff1e36446eff847c853387914cf8cf..2fb377cb5c3af838101acba7dc75305a0a4da97c 100644 --- a/src/query/src/qAggMain.c +++ b/src/query/src/qAggMain.c @@ -4087,38 +4087,32 @@ static void interp_function_impl(SQLFunctionCtx *pCtx) { if (pCtx->inputType == TSDB_DATA_TYPE_TIMESTAMP) { *(TSKEY *) pCtx->pOutput = pCtx->startTs; } else { - if (pCtx->start.key == INT64_MIN) { - assert(pCtx->end.key == INT64_MIN); - return; - } - if (type == TSDB_FILL_NULL) { setNull(pCtx->pOutput, pCtx->outputType, pCtx->outputBytes); } else if (type == TSDB_FILL_SET_VALUE) { tVariantDump(&pCtx->param[1], pCtx->pOutput, pCtx->inputType, true); } else if (type == TSDB_FILL_PREV) { - if (IS_NUMERIC_TYPE(pCtx->inputType) || pCtx->inputType == TSDB_DATA_TYPE_BOOL) { - SET_TYPED_DATA(pCtx->pOutput, pCtx->inputType, pCtx->start.val); - } else { - assignVal(pCtx->pOutput, pCtx->start.ptr, pCtx->outputBytes, pCtx->inputType); - } + assignVal(pCtx->pOutput, pCtx->pInput, pCtx->outputBytes, pCtx->inputType); } else if (type == TSDB_FILL_NEXT) { - if (IS_NUMERIC_TYPE(pCtx->inputType) || pCtx->inputType == TSDB_DATA_TYPE_BOOL) { - SET_TYPED_DATA(pCtx->pOutput, pCtx->inputType, pCtx->end.val); - } else { - assignVal(pCtx->pOutput, pCtx->end.ptr, pCtx->outputBytes, pCtx->inputType); - } + char* d = GET_INPUT_DATA(pCtx, 1); + assignVal(pCtx->pOutput, d, pCtx->outputBytes, pCtx->inputType); } else if (type == TSDB_FILL_LINEAR) { - SPoint point1 = {.key = pCtx->start.key, .val = &pCtx->start.val}; - SPoint point2 = {.key = pCtx->end.key, .val = &pCtx->end.val}; - SPoint point = {.key = pCtx->startTs, .val = pCtx->pOutput}; + char* start = GET_INPUT_DATA(pCtx, 0); + char* end = GET_INPUT_DATA(pCtx, 1); + + TSKEY skey = GET_TS_DATA(pCtx, 0); + TSKEY ekey = GET_TS_DATA(pCtx, 1); + + SPoint point1 = {.key = skey, .val = start}; + SPoint point2 = {.key = ekey, .val = end }; + SPoint point = {.key = pCtx->startTs, .val = pCtx->pOutput}; int32_t srcType = pCtx->inputType; if (IS_NUMERIC_TYPE(srcType)) { // TODO should find the not null data? - if (isNull((char *)&pCtx->start.val, srcType) || isNull((char *)&pCtx->end.val, srcType)) { + if (isNull(start, srcType) || isNull(end, srcType)) { setNull(pCtx->pOutput, srcType, pCtx->inputBytes); } else { - taosGetLinearInterpolationVal(&point, pCtx->outputType, &point1, &point2, TSDB_DATA_TYPE_DOUBLE); + taosGetLinearInterpolationVal(&point, pCtx->outputType, &point1, &point2, srcType); } } else { setNull(pCtx->pOutput, srcType, pCtx->inputBytes); @@ -4127,8 +4121,8 @@ static void interp_function_impl(SQLFunctionCtx *pCtx) { } SET_VAL(pCtx, 1, 1); - } + static void interp_function(SQLFunctionCtx *pCtx) { // at this point, the value is existed, return directly if (pCtx->size > 0) { diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index e5c2eca4f0ead8d4e7a9e0c84f9f8c511dc319a2..3439e48d60633361f7193c257cc0da736f652a75 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -172,7 +172,7 @@ static STableIdInfo createTableIdInfo(SQuery* pQuery); static SOperatorInfo* createBiDirectionTableScanInfo(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv, int32_t repeatTime, int32_t reverseTime); static SOperatorInfo* createTableScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv, int32_t repeatTime); -static SOperatorInfo* createSeqTableBlockScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv); +static SOperatorInfo* createSeqTableBlockScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv, bool loadExternalRows); static void setTableScanFilterOperatorInfo(STableScanInfo* pTableScanInfo, SOperatorInfo* pDownstream); @@ -1842,7 +1842,7 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf pRuntimeEnv->proot = createArithOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQuery->pExpr2, pQuery->numOfExpr2); } - if (pQuery->fillType != TSDB_FILL_NONE) { + if (pQuery->fillType != TSDB_FILL_NONE && !isPointInterpoQuery(pQuery)) { SOperatorInfo* pInfo = pRuntimeEnv->proot; pRuntimeEnv->proot = createFillOperatorInfo(pRuntimeEnv, pInfo, pInfo->pExpr, pInfo->numOfOutput); } @@ -2430,8 +2430,8 @@ static int32_t doTSJoinFilter(SQueryRuntimeEnv *pRuntimeEnv, TSKEY key, bool asc return TS_JOIN_TS_EQUAL; } -void filterDataBlock_rv(SQueryRuntimeEnv* pRuntimeEnv, SSingleColumnFilterInfo *pFilterInfo, - int32_t numOfFilterCols, SSDataBlock* pBlock, STSBuf* pTsBuf, bool ascQuery) { +void filterDataBlock_rv(SQueryRuntimeEnv* pRuntimeEnv, SSingleColumnFilterInfo* pFilterInfo, int32_t numOfFilterCols, + SSDataBlock* pBlock, STSBuf* pTsBuf, bool ascQuery) { int32_t numOfRows = pBlock->info.rows; int8_t *p = calloc(numOfRows, sizeof(int8_t)); @@ -4153,17 +4153,26 @@ static void doDestroyTableQueryInfo(STableGroupInfo* pTableqinfoGroupInfo); static void setTableQueryHandle(SQueryRuntimeEnv* pRuntimeEnv, int32_t tableIndex) { SQuery* pQuery = pRuntimeEnv->pQuery; - SArray *group = GET_TABLEGROUP(pRuntimeEnv, 0); - // handle the first table - STableQueryInfo* pCheckInfo = taosArrayGetP(group, tableIndex); + int32_t numOfGroup = GET_NUM_OF_TABLEGROUP(pRuntimeEnv); + + STableQueryInfo* pCheckInfo = NULL; + if (numOfGroup == 1) { + SArray *group = GET_TABLEGROUP(pRuntimeEnv, 0); + pCheckInfo = taosArrayGetP(group, tableIndex); + } else { + assert(numOfGroup == pRuntimeEnv->tableqinfoGroupInfo.numOfTables); + SArray *group = GET_TABLEGROUP(pRuntimeEnv, tableIndex); + pCheckInfo = taosArrayGetP(group, 0); + } + // handle the first table STsdbQueryCond cond = { .twindow = {pCheckInfo->lastKey, pCheckInfo->win.ekey}, .order = pQuery->order.order, .colList = pQuery->colList, .numOfCols = pQuery->numOfCols, - .loadExternalRows = false, + .loadExternalRows = isPointInterpoQuery(pQuery), }; SArray *g1 = taosArrayInit(1, POINTER_BYTES); @@ -4198,7 +4207,6 @@ static int32_t setupQueryHandle(void* tsdb, SQInfo* pQInfo, bool isSTableQuery) } STsdbQueryCond cond = createTsdbQueryCond(pQuery, &pQuery->window); - cond.loadExternalRows = isPointInterpoQuery(pQuery); if (!isSTableQuery && (pRuntimeEnv->tableqinfoGroupInfo.numOfTables == 1) @@ -4307,7 +4315,7 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, SArray* prevResult, void *ts pRuntimeEnv->resultInfo.capacity = 4096; pRuntimeEnv->proot = createTagScanOperatorInfo(pRuntimeEnv, pQuery->pExpr1, pQuery->numOfOutput); } else if (isTsCompQuery(pQuery) || isPointInterpoQuery(pQuery)) { - pRuntimeEnv->pTableScanner = createSeqTableBlockScanOperator(pRuntimeEnv->pQueryHandle, pRuntimeEnv); + pRuntimeEnv->pTableScanner = createSeqTableBlockScanOperator(pRuntimeEnv->pQueryHandle, pRuntimeEnv, isPointInterpoQuery(pQuery)); } else if (needReverseScan(pQuery)) { pRuntimeEnv->pTableScanner = createBiDirectionTableScanInfo(pRuntimeEnv->pQueryHandle, pRuntimeEnv, getNumOfScanTimes(pQuery), 1); } else { @@ -4427,8 +4435,8 @@ static void doCloseAllTimeWindow(SQueryRuntimeEnv* pRuntimeEnv) { } static SSDataBlock* doTableScanImpl(STableScanInfo *pTableScanInfo) { - SSDataBlock *pBlock = &pTableScanInfo->block; - SQuery* pQuery = pTableScanInfo->pRuntimeEnv->pQuery; + SSDataBlock* pBlock = &pTableScanInfo->block; + SQuery* pQuery = pTableScanInfo->pRuntimeEnv->pQuery; STableGroupInfo* pTableGroupInfo = &pTableScanInfo->pRuntimeEnv->tableqinfoGroupInfo; while (tsdbNextDataBlock(pTableScanInfo->pQueryHandle)) { @@ -4438,7 +4446,8 @@ static SSDataBlock* doTableScanImpl(STableScanInfo *pTableScanInfo) { tsdbRetrieveDataBlockInfo(pTableScanInfo->pQueryHandle, &pBlock->info); if (pTableGroupInfo->numOfTables > 1 || (pQuery->current == NULL && pTableGroupInfo->numOfTables == 1)) { - STableQueryInfo **pTableQueryInfo = (STableQueryInfo **)taosHashGet( pTableGroupInfo->map, &pBlock->info.tid, sizeof(pBlock->info.tid)); + STableQueryInfo** pTableQueryInfo = + (STableQueryInfo**)taosHashGet(pTableGroupInfo->map, &pBlock->info.tid, sizeof(pBlock->info.tid)); if (pTableQueryInfo == NULL) { break; } @@ -4459,9 +4468,27 @@ static SSDataBlock* doTableScanImpl(STableScanInfo *pTableScanInfo) { continue; } + if (pTableScanInfo->loadExternalRows) { + pTableScanInfo->externalLoaded = true; + } + return pBlock; } + if (pTableScanInfo->loadExternalRows && (!pTableScanInfo->externalLoaded)) { + pBlock->pDataBlock = tsdbGetExternalRow(pTableScanInfo->pQueryHandle, &pBlock->info); + pTableScanInfo->externalLoaded = true; + + if (pBlock->pDataBlock != NULL) { + STableQueryInfo** pTableQueryInfo = + (STableQueryInfo**)taosHashGet(pTableGroupInfo->map, &pBlock->info.tid, sizeof(pBlock->info.tid)); + assert(*pTableQueryInfo != NULL); + pQuery->current = *pTableQueryInfo; + } + + return (pBlock->pDataBlock != NULL)? pBlock:NULL; + } + return NULL; } @@ -4537,7 +4564,7 @@ static SSDataBlock* doTableScan(void* param) { return NULL; } -static SSDataBlock* doSeqTableBlockScan(void* param) { +static SSDataBlock* doSeqTableBlocksScan(void* param) { SOperatorInfo* pOperator = (SOperatorInfo*)param; STableScanInfo *pTableScanInfo = pOperator->info; @@ -4558,6 +4585,7 @@ static SSDataBlock* doSeqTableBlockScan(void* param) { setTableQueryHandle(pRuntimeEnv, pTableScanInfo->tableIndex); pTableScanInfo->pQueryHandle = pRuntimeEnv->pQueryHandle; + pTableScanInfo->externalLoaded = false; } } @@ -4584,7 +4612,7 @@ SOperatorInfo* createTableScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv* return pOperator; } -SOperatorInfo* createSeqTableBlockScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv) { +SOperatorInfo* createSeqTableBlockScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv, bool loadExternalRows) { STableScanInfo* pInfo = calloc(1, sizeof(STableScanInfo)); pInfo->pQueryHandle = pTsdbQueryHandle; @@ -4594,6 +4622,7 @@ SOperatorInfo* createSeqTableBlockScanOperator(void* pTsdbQueryHandle, SQueryRun pInfo->current = 0; pInfo->pRuntimeEnv = pRuntimeEnv; + pInfo->loadExternalRows = loadExternalRows; SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); pOperator->name = "TableBlockSeqScan"; @@ -4601,7 +4630,7 @@ SOperatorInfo* createSeqTableBlockScanOperator(void* pTsdbQueryHandle, SQueryRun pOperator->status = OP_IN_EXECUTING; pOperator->info = pInfo; pOperator->numOfOutput = pRuntimeEnv->pQuery->numOfCols; - pOperator->exec = doSeqTableBlockScan; + pOperator->exec = doSeqTableBlocksScan; return pOperator; } diff --git a/src/tsdb/src/tsdbRead.c b/src/tsdb/src/tsdbRead.c index 3d70487986c33fa525f0e370f407829a9ad58665..c68fb7bce04b14e9937ada81fc0622ce1413913a 100644 --- a/src/tsdb/src/tsdbRead.c +++ b/src/tsdb/src/tsdbRead.c @@ -484,6 +484,9 @@ void tsdbResetQueryHandleForNewTable(TsdbQueryHandleT queryHandle, STsdbQueryCon tsdbCleanupQueryHandle(pQueryHandle); terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; } + + pQueryHandle->prev = doFreeColumnInfoData(pQueryHandle->prev); + pQueryHandle->next = doFreeColumnInfoData(pQueryHandle->next); } TsdbQueryHandleT tsdbQueryLastRow(STsdbRepo *tsdb, STsdbQueryCond *pCond, STableGroupInfo *groupList, void* qinfo, SMemRef* pMemRef) { @@ -2445,10 +2448,37 @@ out_of_memory: return terrno; } -SArray* tsdbGetExternalRow(TsdbQueryHandleT *pHandle, SMemRef* pMemRef, int16_t type) { +SArray* tsdbGetExternalRow(TsdbQueryHandleT *pHandle, SDataBlockInfo* blockInfo) { STsdbQueryHandle* pQueryHandle = (STsdbQueryHandle*) pHandle; - assert(type == TSDB_PREV_ROW || type == TSDB_NEXT_ROW); - return (type == TSDB_PREV_ROW)? pQueryHandle->prev:pQueryHandle->next; + int32_t numOfCols = QH_GET_NUM_OF_COLS(pQueryHandle); + + STableCheckInfo* pCheckInfo = taosArrayGet(pQueryHandle->pTableCheckInfo, 0); + blockInfo->tid = pCheckInfo->tableId.tid; + blockInfo->uid = pCheckInfo->tableId.uid; + blockInfo->numOfCols = numOfCols; + + if (pQueryHandle->prev == NULL || pQueryHandle->next == NULL) { + blockInfo->rows = 0; + return NULL; + } + + for (int32_t i = 0; i < numOfCols; ++i) { + SColumnInfoData* pColInfoData = taosArrayGet(pQueryHandle->pColumns, i); + SColumnInfoData* first = taosArrayGet(pQueryHandle->prev, i); + + memcpy(pColInfoData->pData, first->pData, pColInfoData->info.bytes); + + SColumnInfoData* sec = taosArrayGet(pQueryHandle->next, i); + memcpy(pColInfoData->pData + pColInfoData->info.bytes, sec->pData, pColInfoData->info.bytes); + + if (i == 0 && pColInfoData->info.type == TSDB_DATA_TYPE_TIMESTAMP) { + blockInfo->window.skey = *(TSKEY*)pColInfoData->pData; + blockInfo->window.ekey = *(TSKEY*)(pColInfoData->pData + TSDB_KEYSIZE); + } + } + + blockInfo->rows = 2; + return pQueryHandle->pColumns; } /*