diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index 69c5a6fc805230719044f4806f1801175635e567..f228e83357dd9d937eefe042d62620feb071b180 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -132,7 +132,10 @@ SArray *tsdbRetrieveDataBlock(STsdbReader *pTsdbReadHandle, SArray *pColumnIdLis void tsdbResetReadHandle(STsdbReader *pReader, SQueryTableDataCond *pCond, int32_t tWinIdx); int32_t tsdbGetFileBlocksDistInfo(STsdbReader *pReader, STableBlockDistInfo *pTableBlockInfo); int64_t tsdbGetNumOfRowsInMemTable(STsdbReader *pHandle); -int32_t tsdbRetrieveLastRow(void* pVnode, const SArray* pTableIdList, int32_t type, SSDataBlock* pResBlock); + +int32_t tsdbLastRowReaderOpen(void* pVnode, int32_t type, SArray* pTableIdList, int32_t* colId, int32_t numOfCols, void** pReader); +int32_t tsdbRetrieveLastRow(void* pReader, SSDataBlock* pResBlock, const int32_t* slotIds); +int32_t tsdbLastrowReaderClose(void* pReader); // tq diff --git a/source/dnode/vnode/src/tsdb/tsdbCacheRead.c b/source/dnode/vnode/src/tsdb/tsdbCacheRead.c index 15bacab5fe66a5ee8c07027fd2163e69b90b7a1c..e3eb015c759f45dca83f9e1a4dcada8e11c64d1d 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCacheRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbCacheRead.c @@ -18,38 +18,116 @@ #include "tcommon.h" #include "tsdb.h" -// todo parse the stsrow and set the results -static void keepOneRow(const STSRow* pRow, SSDataBlock* pBlock) { - int32_t rowIndex = pBlock->info.rows; +typedef struct SLastrowReader { + SVnode* pVnode; + STSchema* pSchema; + uint64_t uid; +// int32_t* pSlotIds; + char** transferBuf; // todo remove it soon + int32_t numOfCols; + int32_t type; + int32_t tableIndex; // currently returned result tables + SArray* pTableList; // table id list +} SLastrowReader; + +static void saveOneRow(STSRow* pRow, SSDataBlock* pBlock, SLastrowReader* pReader, const int32_t *slotIds) { + int32_t numOfRows = pBlock->info.rows; size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock); + + SColVal colVal = {0}; for (int32_t i = 0; i < numOfCols; ++i) { SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i); - // todo extract the value of specified column id from STSRow - const char* p = NULL; - colDataAppend(pColInfoData, rowIndex, p, false); + if (slotIds[i] == -1) { + colDataAppend(pColInfoData, numOfRows, (const char*)&pRow->ts, false); + } else { + tTSRowGetVal(pRow, pReader->pSchema, slotIds[i], &colVal); + + if (IS_VAR_DATA_TYPE(colVal.type)) { + if (colVal.isNull) { + colDataAppendNULL(pColInfoData, numOfRows); + } else { + varDataSetLen(pReader->transferBuf[i], colVal.value.nData); + memcpy(varDataVal(pReader->transferBuf[i]), colVal.value.pData, colVal.value.nData); + colDataAppend(pColInfoData, numOfRows, pReader->transferBuf[i], false); + } + } else { + colDataAppend(pColInfoData, numOfRows, (const char*)&colVal.value, colVal.isNull); + } + } } pBlock->info.rows += 1; } -int32_t tsdbRetrieveLastRow(void* pVnode, const SArray* pTableIdList, int32_t type, SSDataBlock* pResBlock) { - if (pVnode == NULL || pTableIdList == NULL || pResBlock == NULL) { + +int32_t tsdbLastRowReaderOpen(void* pVnode, int32_t type, SArray* pTableIdList, int32_t* colId, int32_t numOfCols, void** pReader) { + SLastrowReader* p = taosMemoryCalloc(1, sizeof(SLastrowReader)); + if (p == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + p->type = type; + p->pVnode = pVnode; + p->numOfCols = numOfCols; + p->transferBuf = taosMemoryCalloc(p->numOfCols, POINTER_BYTES); + + STableKeyInfo* pKeyInfo = taosArrayGet(pTableIdList, 0); + p->pSchema = metaGetTbTSchema(p->pVnode->pMeta, pKeyInfo->uid, -1); + p->pTableList = pTableIdList; +#if 0 + for(int32_t i = 0; i < p->numOfCols; ++i) { + for(int32_t j = 0; j < p->pSchema->numOfCols; ++j) { + if (colId[i] == PRIMARYKEY_TIMESTAMP_COL_ID) { + p->pSlotIds[i] = -1; + break; + } + + if (colId[i] == p->pSchema->columns[j].colId) { + p->pSlotIds[i] = j; + break; + } + } + + if (IS_VAR_DATA_TYPE(colId[i])) { + p->transferBuf[i] = taosMemoryMalloc(p->pSchema->columns[p->pSlotIds[i]].bytes); + } + } +#endif + *pReader = p; + return TSDB_CODE_SUCCESS; +} + +int32_t tsdbLastrowReaderClose(void* pReader) { + SLastrowReader* p = pReader; + + for(int32_t i = 0; i < p->numOfCols; ++i) { + taosMemoryFreeClear(p->transferBuf[i]); + } + + taosMemoryFree(p->transferBuf); + taosMemoryFree(pReader); + return TSDB_CODE_SUCCESS; +} + +int32_t tsdbRetrieveLastRow(void* pReader, SSDataBlock* pResBlock, const int32_t* slotIds) { + if (pReader == NULL || pResBlock == NULL) { return TSDB_CODE_INVALID_PARA; } - SVnode* pv = pVnode; + SLastrowReader* pr = pReader; + STSRow* pRow = NULL; - size_t numOfTables = taosArrayGetSize(pTableIdList); + size_t numOfTables = taosArrayGetSize(pr->pTableList); // retrieve the only one last row of all tables in the uid list. - if (type == LASTROW_RETRIEVE_TYPE_SINGLE) { + if (pr->type == LASTROW_RETRIEVE_TYPE_SINGLE) { int64_t lastKey = INT64_MIN; bool internalResult = false; for (int32_t i = 0; i < numOfTables; ++i) { - STableKeyInfo* pKeyInfo = taosArrayGet(pTableIdList, i); - - int32_t code = tsdbCacheGetLastrow(pv->pTsdb->lruCache, pKeyInfo->uid, pv->pTsdb, &pRow); + STableKeyInfo* pKeyInfo = taosArrayGet(pr->pTableList, i); + + int32_t code = tsdbCacheGetLastrow(pr->pVnode->pTsdb->lruCache, pKeyInfo->uid, pr->pVnode->pTsdb, &pRow); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -65,16 +143,16 @@ int32_t tsdbRetrieveLastRow(void* pVnode, const SArray* pTableIdList, int32_t ty pResBlock->info.rows -= 1; } - keepOneRow(pRow, pResBlock); + saveOneRow(pRow, pResBlock, pr, slotIds); internalResult = true; lastKey = pRow->ts; } } - } else if (type == LASTROW_RETRIEVE_TYPE_ALL) { - for (int32_t i = 0; i < numOfTables; ++i) { - STableKeyInfo* pKeyInfo = taosArrayGet(pTableIdList, i); + } else if (pr->type == LASTROW_RETRIEVE_TYPE_ALL) { + for (int32_t i = pr->tableIndex; i < numOfTables; ++i) { + STableKeyInfo* pKeyInfo = taosArrayGet(pr->pTableList, i); - int32_t code = tsdbCacheGetLastrow(pv->pTsdb->lruCache, pKeyInfo->uid, pv->pTsdb, &pRow); + int32_t code = tsdbCacheGetLastrow(pr->pVnode->pTsdb->lruCache, pKeyInfo->uid, pr->pVnode->pTsdb, &pRow); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -84,7 +162,12 @@ int32_t tsdbRetrieveLastRow(void* pVnode, const SArray* pTableIdList, int32_t ty continue; } - keepOneRow(pRow, pResBlock); + saveOneRow(pRow, pResBlock, pr, slotIds); + + pr->tableIndex += 1; + if (pResBlock->info.rows >= pResBlock->info.capacity) { + return TSDB_CODE_SUCCESS; + } } } else { return TSDB_CODE_INVALID_PARA; diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index b947c560c661149216bd1b548f2bc01f2e340f0c..d1299b65f9ff738c9611ce17a1c5f96cf57b2ccf 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -142,8 +142,8 @@ typedef struct SExecTaskInfo { struct { char *tablename; char *dbname; - int32_t sversion; int32_t tversion; + SSchemaWrapper*sw; } schemaVer; STableListInfo tableqinfoList; // this is a table list @@ -296,6 +296,9 @@ typedef struct SLastrowScanInfo { SSDataBlock *pRes; SArray *pTableList; SReadHandle readHandle; + void *pLastrowReader; + SArray *pColMatchInfo; + int32_t *pSlotIds; } SLastrowScanInfo; typedef enum EStreamScanMode { diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 6de364e63a3c2bbe6e5044a53a7f916b1d2d8da1..9cf6be5eef12c13f03f56ad4a057b4a80873d3b5 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -187,7 +187,7 @@ int32_t qGetQueriedTableSchemaVersion(qTaskInfo_t tinfo, char* dbName, char* tab ASSERT(tinfo != NULL && dbName != NULL && tableName != NULL); SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; - *sversion = pTaskInfo->schemaVer.sversion; + *sversion = pTaskInfo->schemaVer.sw->version; *tversion = pTaskInfo->schemaVer.tversion; if (pTaskInfo->schemaVer.dbname) { strcpy(dbName, pTaskInfo->schemaVer.dbname); diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 0d84d67d7e452a1df06528af5187db11dc6be7c6..570dea474ae9c34c64c769af052e149c50e9884d 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -13,6 +13,8 @@ * along with this program. If not, see . */ +#include +#include #include "filter.h" #include "function.h" #include "functionMgt.h" @@ -2803,7 +2805,7 @@ int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t* order, int32_t* scan int32_t type = pOperator->operatorType; if (type == QUERY_NODE_PHYSICAL_PLAN_EXCHANGE || type == QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN || type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN || type == QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN || - type == QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN) { + type == QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN || type == QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN) { *order = TSDB_ORDER_ASC; *scanFlag = MAIN_SCAN; return TSDB_CODE_SUCCESS; @@ -3886,15 +3888,15 @@ int32_t extractTableSchemaVersion(SReadHandle* pHandle, uint64_t uid, SExecTaskI pTaskInfo->schemaVer.tablename = strdup(mr.me.name); if (mr.me.type == TSDB_SUPER_TABLE) { - pTaskInfo->schemaVer.sversion = mr.me.stbEntry.schemaRow.version; + pTaskInfo->schemaVer.sw = tCloneSSchemaWrapper(&mr.me.stbEntry.schemaRow); pTaskInfo->schemaVer.tversion = mr.me.stbEntry.schemaTag.version; } else if (mr.me.type == TSDB_CHILD_TABLE) { tb_uid_t suid = mr.me.ctbEntry.suid; metaGetTableEntryByUid(&mr, suid); - pTaskInfo->schemaVer.sversion = mr.me.stbEntry.schemaRow.version; + pTaskInfo->schemaVer.sw = tCloneSSchemaWrapper(&mr.me.stbEntry.schemaRow); pTaskInfo->schemaVer.tversion = mr.me.stbEntry.schemaTag.version; } else { - pTaskInfo->schemaVer.sversion = mr.me.ntbEntry.schemaRow.version; + pTaskInfo->schemaVer.sw = tCloneSSchemaWrapper(&mr.me.ntbEntry.schemaRow); } metaReaderClear(&mr); @@ -4177,9 +4179,11 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo // return NULL; // } + int32_t code = extractTableSchemaVersion(pHandle, pScanNode->uid, pTaskInfo); + pTableListInfo->pTableList = taosArrayInit(4, sizeof(STableKeyInfo)); if (pScanNode->tableType == TSDB_SUPER_TABLE) { - int32_t code = vnodeGetAllTableList(pHandle->meta, pScanNode->uid, pTableListInfo->pTableList); + code = vnodeGetAllTableList(pHandle->meta, pScanNode->uid, pTableListInfo->pTableList); if (code != TSDB_CODE_SUCCESS) { pTaskInfo->code = terrno; return NULL; diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index f2b1725fb38641067330c77547f5b4a110fb7596..bb3f4e403d50009bd76f7ab97740101509c73001 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -13,13 +13,12 @@ * along with this program. If not, see . */ -#include "filter.h" +#include "executorimpl.h" #include "function.h" #include "functionMgt.h" #include "os.h" #include "querynodes.h" #include "systable.h" -#include "tglobal.h" #include "tname.h" #include "ttime.h" @@ -33,8 +32,6 @@ #include "ttypes.h" #include "vnode.h" -#include "executorInt.h" - #define SET_REVERSE_SCAN_FLAG(_info) ((_info)->scanFlag = REVERSE_SCAN) #define SWITCH_ORDER(n) (((n) = ((n) == TSDB_ORDER_ASC) ? TSDB_ORDER_DESC : TSDB_ORDER_ASC)) @@ -2537,7 +2534,8 @@ static SSDataBlock* doScanLastrow(SOperatorInfo* pOperator) { // check if it is a group by tbname if (size == taosArrayGetSize(pInfo->pTableList)) { - tsdbRetrieveLastRow(pInfo->readHandle.vnode, pInfo->pTableList, LASTROW_RETRIEVE_TYPE_ALL, pInfo->pRes); + blockDataCleanup(pInfo->pRes); + tsdbRetrieveLastRow(pInfo->pLastrowReader, pInfo->pRes, pInfo->pSlotIds); return (pInfo->pRes->info.rows == 0)? NULL:pInfo->pRes; } else { //todo fetch the result for each group @@ -2550,9 +2548,10 @@ static SSDataBlock* doScanLastrow(SOperatorInfo* pOperator) { static void destroyLastrowScanOperator(void* param, int32_t numOfOutput) { SLastrowScanInfo* pInfo = (SLastrowScanInfo*) param; blockDataDestroy(pInfo->pRes); + tsdbLastrowReaderClose(pInfo->pLastrowReader); } -SOperatorInfo* createLastrowScanOperator(SLastRowScanPhysiNode* pTableScanNode, SReadHandle* readHandle, +SOperatorInfo* createLastrowScanOperator(SLastRowScanPhysiNode* pScanNode, SReadHandle* readHandle, SArray* pTableList, SExecTaskInfo* pTaskInfo) { SLastrowScanInfo* pInfo = taosMemoryCalloc(1, sizeof(SLastrowScanInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); @@ -2562,7 +2561,34 @@ SOperatorInfo* createLastrowScanOperator(SLastRowScanPhysiNode* pTableScanNode, pInfo->pTableList = pTableList; pInfo->readHandle = *readHandle; - pInfo->pRes = createResDataBlock(pTableScanNode->node.pOutputDataBlockDesc); + pInfo->pRes = createResDataBlock(pScanNode->node.pOutputDataBlockDesc); + + int32_t numOfCols = 0; + pInfo->pColMatchInfo = extractColMatchInfo(pScanNode->pScanCols, pScanNode->node.pOutputDataBlockDesc, &numOfCols, COL_MATCH_FROM_COL_ID); + int32_t* pCols = taosMemoryMalloc(numOfCols * sizeof(int32_t)); + for(int32_t i = 0; i < numOfCols; ++i) { + SColMatchInfo* pColMatch = taosArrayGet(pInfo->pColMatchInfo, i); + pCols[i] = pColMatch->colId; + } + + pInfo->pSlotIds = taosMemoryMalloc(numOfCols * sizeof(pInfo->pSlotIds[0])); + for(int32_t i = 0; i < numOfCols; ++i) { + SColMatchInfo* pColMatch = taosArrayGet(pInfo->pColMatchInfo, i); + for(int32_t j = 0; j < pTaskInfo->schemaVer.sw->nCols; ++j) { + if (pColMatch->colId == pTaskInfo->schemaVer.sw->pSchema[j].colId && pColMatch->colId == PRIMARYKEY_TIMESTAMP_COL_ID) { + pInfo->pSlotIds[pColMatch->targetSlotId] = -1; + break; + } + + if (pColMatch->colId == pTaskInfo->schemaVer.sw->pSchema[j].colId) { + pInfo->pSlotIds[pColMatch->targetSlotId] = j; + break; + } + } + } + + tsdbLastRowReaderOpen(readHandle->vnode, LASTROW_RETRIEVE_TYPE_ALL, pTableList, pCols, numOfCols, &pInfo->pLastrowReader); + taosMemoryFree(pCols); pOperator->name = "LastrowScanOperator"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN; @@ -2570,8 +2596,10 @@ SOperatorInfo* createLastrowScanOperator(SLastRowScanPhysiNode* pTableScanNode, pOperator->status = OP_NOT_OPENED; pOperator->info = pInfo; pOperator->pTaskInfo = pTaskInfo; + pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pRes->pDataBlock); initResultSizeInfo(pOperator, 1024); + blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity); pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doScanLastrow, NULL, NULL, destroyLastrowScanOperator, diff --git a/source/libs/function/inc/builtinsimpl.h b/source/libs/function/inc/builtinsimpl.h index 0820e884cea1f81d6576d609dc91bba65447e6c7..bba4086960286b5d80e784438b8b593b220c460f 100644 --- a/source/libs/function/inc/builtinsimpl.h +++ b/source/libs/function/inc/builtinsimpl.h @@ -106,6 +106,8 @@ bool irateFuncSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResInfo); int32_t irateFunction(SqlFunctionCtx *pCtx); int32_t irateFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); +int32_t lastrowFunction(SqlFunctionCtx* pCtx); + bool getFirstLastFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); int32_t firstFunction(SqlFunctionCtx *pCtx); int32_t firstFunctionMerge(SqlFunctionCtx *pCtx); diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index d41bc89a5fe61d65e9890c24eeac8a257182d931..4bf9d8fcc125d4ab78590bc9e7b605929c1d5adf 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -1873,11 +1873,11 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .name = "last_row", .type = FUNCTION_TYPE_LAST_ROW, .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_TIMELINE_FUNC, - .translateFunc = translateLastRow, - .getEnvFunc = getMinmaxFuncEnv, - .initFunc = minmaxFunctionSetup, - .processFunc = maxFunction, - .finalizeFunc = functionFinalize + .translateFunc = translateFirstLast, + .getEnvFunc = getFirstLastFuncEnv, + .initFunc = functionSetup, + .processFunc = lastrowFunction, + .finalizeFunc = firstLastFinalize }, { .name = "first", diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index c4d3a26ab4244c4d6dab96aa3fb0d82cac878153..93a117aec9d0332166dd9a3f385bd44c026e7e85 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -5503,3 +5503,43 @@ int32_t interpFunction(SqlFunctionCtx* pCtx) { return TSDB_CODE_SUCCESS; } + +int32_t lastrowFunction(SqlFunctionCtx* pCtx) { + int32_t numOfElems = 0; + + SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx); + SFirstLastRes* pInfo = GET_ROWCELL_INTERBUF(pResInfo); + + SInputColumnInfoData* pInput = &pCtx->input; + SColumnInfoData* pInputCol = pInput->pData[0]; + + int32_t type = pInputCol->info.type; + int32_t bytes = pInputCol->info.bytes; + pInfo->bytes = bytes; + + for (int32_t i = pInput->numOfRows + pInput->startRowIndex - 1; i >= pInput->startRowIndex; --i) { + if (pInputCol->hasNull && colDataIsNull_s(pInputCol, i)) { + continue; + } + + numOfElems++; + + char* data = colDataGetData(pInputCol, i); + TSKEY cts = getRowPTs(pInput->pPTS, i); + if (pResInfo->numOfRes == 0 || *(TSKEY*)(pInfo->buf + bytes) < cts) { + if (IS_VAR_DATA_TYPE(type)) { + bytes = varDataTLen(data); + pInfo->bytes = bytes; + } + + memcpy(pInfo->buf, data, bytes); + *(TSKEY*)(pInfo->buf + bytes) = cts; + + pInfo->hasResult = true; + pResInfo->numOfRes = 1; + } + } + + SET_VAL(pResInfo, numOfElems, 1); + return TSDB_CODE_SUCCESS; +}