diff --git a/source/dnode/vnode/CMakeLists.txt b/source/dnode/vnode/CMakeLists.txt index 204cd7bfc7d9bda8bd9e5f5799296e6faa2a2cd1..25c6ef18a988a552aee3aacc5e76d441d400b14f 100644 --- a/source/dnode/vnode/CMakeLists.txt +++ b/source/dnode/vnode/CMakeLists.txt @@ -46,6 +46,7 @@ target_sources( "src/tsdb/tsdbReaderWriter.c" "src/tsdb/tsdbUtil.c" "src/tsdb/tsdbSnapshot.c" + "src/tsdb/tsdbCacheRead.c" # tq "src/tq/tq.c" diff --git a/source/dnode/vnode/src/tsdb/tsdbCacheRead.c b/source/dnode/vnode/src/tsdb/tsdbCacheRead.c new file mode 100644 index 0000000000000000000000000000000000000000..74ecf806fd1d5d0544da8da96072fce89dea5461 --- /dev/null +++ b/source/dnode/vnode/src/tsdb/tsdbCacheRead.c @@ -0,0 +1,97 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "taoserror.h" +#include "tarray.h" +#include "tcommon.h" +#include "tsdb.h" + +#define LASTROW_RETRIEVE_TYPE_ALL 0x1 +#define LASTROW_RETRIEVE_TYPE_SINGLE 0x2 + +// todo parse the stsrow and set the results +static void keepOneRow(const STSRow* pRow, SSDataBlock* pBlock) { + int32_t rowIndex = pBlock->info.rows; + size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock); + for (int32_t i = 0; i < numOfCols; ++i) { + SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i); + + // todo extract the value of specified column id from STSRow + const char* p = NULL; + colDataAppend(pColInfoData, rowIndex, p, false); + } + + pBlock->info.rows += 1; +} + +int32_t tsdbRetrieveLastRow(void* pVnode, const SArray* pTableIdList, int32_t type, SSDataBlock* pResBlock) { + if (pVnode == NULL || pTableIdList == NULL || pResBlock == NULL) { + return TSDB_CODE_INVALID_PARA; + } + + SVnode* pv = pVnode; + STSRow* pRow = NULL; + size_t numOfTables = taosArrayGetSize(pTableIdList); + + // retrieve the only one last row of all tables in the uid list. + if (type == LASTROW_RETRIEVE_TYPE_SINGLE) { + int64_t lastKey = INT64_MIN; + bool internalResult = false; + for (int32_t i = 0; i < numOfTables; ++i) { + tb_uid_t* uid = taosArrayGet(pTableIdList, i); + + int32_t code = tsdbCacheGetLastrow(pv->pTsdb->lruCache, *uid, &pRow); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + + if (pRow == NULL) { + continue; + } + + if (pRow->ts > lastKey) { + // Set result row into the same rowIndex repeatly, so we need to check if the internal result row has already + // appended or not. + if (internalResult) { + pResBlock->info.rows -= 1; + } + + keepOneRow(pRow, pResBlock); + internalResult = true; + lastKey = pRow->ts; + } + } + } else if (type == LASTROW_RETRIEVE_TYPE_ALL) { + for (int32_t i = 0; i < numOfTables; ++i) { + tb_uid_t* uid = taosArrayGet(pTableIdList, i); + + int32_t code = tsdbCacheGetLastrow(pv->pTsdb->lruCache, *uid, &pRow); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + + // no data in the table of Uid + if (pRow == NULL) { + continue; + } + + keepOneRow(pRow, pResBlock); + } + } else { + return TSDB_CODE_INVALID_PARA; + } + + return TSDB_CODE_SUCCESS; +} diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 4332b23477b6736cfc6fa50f009606d963143e45..392966bb78ece6159054f7d8576b0776419bcb7c 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -285,6 +285,12 @@ typedef struct STagScanInfo { SNode* pFilterNode; // filter info, } STagScanInfo; +typedef struct SLastrowScanInfo { + SSDataBlock *pRes; + STableListInfo *pTableList; + SReadHandle readHandle; +} SLastrowScanInfo; + typedef enum EStreamScanMode { STREAM_SCAN_FROM_READERHANDLE = 1, STREAM_SCAN_FROM_RES, diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index fc7e2b3a6a21f33fd82caab2ab9f021be7e486a0..095282f188675c04b1238fccc60408edd3681eef 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -2343,3 +2343,68 @@ _error: taosMemoryFree(pOperator); return NULL; } + +static SSDataBlock* doScanLastrow(SOperatorInfo* pOperator) { + if (pOperator->status == OP_EXEC_DONE) { + return NULL; + } + + SLastrowScanInfo* pInfo = pOperator->info; + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + + int32_t size = taosArrayGetSize(pInfo->pTableList->pTableList); + if (size == 0) { + setTaskStatus(pTaskInfo, TASK_COMPLETED); + return NULL; + } + + // check if it is a group by tbname + if (size == taosHashGetSize(pInfo->pTableList->map)) { + // fetch last row for each table + } else { + //todo fetch the result for each group + + } + + return pInfo->pRes->info.rows == 0? NULL:pInfo->pRes; +} + +static void destroyLastrowScanOperator(void* param, int32_t numOfOutput) { + SLastrowScanInfo* pInfo = (SLastrowScanInfo*) param; + blockDataDestroy(pInfo->pRes); +} + +SOperatorInfo* createLastrowScanOperator(STableScanPhysiNode* pTableScanNode, SReadHandle* readHandle, + STableListInfo* pTableList, SExecTaskInfo* pTaskInfo) { + + SLastrowScanInfo* pInfo = taosMemoryCalloc(1, sizeof(SLastrowScanInfo)); + SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); + if (pInfo == NULL || pOperator == NULL) { + goto _error; + } + + pInfo->pTableList = pTableList; + pInfo->readHandle = *readHandle; +// pInfo->pRes = createResDataBlock(); + + pOperator->name = "LastrowScanOperator"; +// pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN; + pOperator->blocking = false; + pOperator->status = OP_NOT_OPENED; + pOperator->info = pInfo; + pOperator->pTaskInfo = pTaskInfo; + + initResultSizeInfo(pOperator, 1024); + + pOperator->fpSet = + createOperatorFpSet(operatorDummyOpenFn, doScanLastrow, NULL, NULL, destroyLastrowScanOperator, + NULL, NULL, NULL); + pOperator->cost.openCost = 0; + return pOperator; + +_error: + pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY; + taosMemoryFree(pInfo); + taosMemoryFree(pOperator); + return NULL; +}