提交 6a50b2b4 编写于 作者: H Hongze Cheng

Merge branch 'feat/tsdb_refact' of https://github.com/taosdata/TDengine into feat/tsdb_refact

...@@ -46,6 +46,7 @@ target_sources( ...@@ -46,6 +46,7 @@ target_sources(
"src/tsdb/tsdbReaderWriter.c" "src/tsdb/tsdbReaderWriter.c"
"src/tsdb/tsdbUtil.c" "src/tsdb/tsdbUtil.c"
"src/tsdb/tsdbSnapshot.c" "src/tsdb/tsdbSnapshot.c"
"src/tsdb/tsdbCacheRead.c"
# tq # tq
"src/tq/tq.c" "src/tq/tq.c"
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "taoserror.h"
#include "tarray.h"
#include "tcommon.h"
#include "tsdb.h"
#define LASTROW_RETRIEVE_TYPE_ALL 0x1
#define LASTROW_RETRIEVE_TYPE_SINGLE 0x2
// todo parse the stsrow and set the results
static void keepOneRow(const STSRow* pRow, SSDataBlock* pBlock) {
int32_t rowIndex = pBlock->info.rows;
size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
for (int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
// todo extract the value of specified column id from STSRow
const char* p = NULL;
colDataAppend(pColInfoData, rowIndex, p, false);
}
pBlock->info.rows += 1;
}
int32_t tsdbRetrieveLastRow(void* pVnode, const SArray* pTableIdList, int32_t type, SSDataBlock* pResBlock) {
if (pVnode == NULL || pTableIdList == NULL || pResBlock == NULL) {
return TSDB_CODE_INVALID_PARA;
}
SVnode* pv = pVnode;
STSRow* pRow = NULL;
size_t numOfTables = taosArrayGetSize(pTableIdList);
// retrieve the only one last row of all tables in the uid list.
if (type == LASTROW_RETRIEVE_TYPE_SINGLE) {
int64_t lastKey = INT64_MIN;
bool internalResult = false;
for (int32_t i = 0; i < numOfTables; ++i) {
tb_uid_t* uid = taosArrayGet(pTableIdList, i);
int32_t code = tsdbCacheGetLastrow(pv->pTsdb->lruCache, *uid, &pRow);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
if (pRow == NULL) {
continue;
}
if (pRow->ts > lastKey) {
// Set result row into the same rowIndex repeatly, so we need to check if the internal result row has already
// appended or not.
if (internalResult) {
pResBlock->info.rows -= 1;
}
keepOneRow(pRow, pResBlock);
internalResult = true;
lastKey = pRow->ts;
}
}
} else if (type == LASTROW_RETRIEVE_TYPE_ALL) {
for (int32_t i = 0; i < numOfTables; ++i) {
tb_uid_t* uid = taosArrayGet(pTableIdList, i);
int32_t code = tsdbCacheGetLastrow(pv->pTsdb->lruCache, *uid, &pRow);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
// no data in the table of Uid
if (pRow == NULL) {
continue;
}
keepOneRow(pRow, pResBlock);
}
} else {
return TSDB_CODE_INVALID_PARA;
}
return TSDB_CODE_SUCCESS;
}
...@@ -285,6 +285,12 @@ typedef struct STagScanInfo { ...@@ -285,6 +285,12 @@ typedef struct STagScanInfo {
SNode* pFilterNode; // filter info, SNode* pFilterNode; // filter info,
} STagScanInfo; } STagScanInfo;
typedef struct SLastrowScanInfo {
SSDataBlock *pRes;
STableListInfo *pTableList;
SReadHandle readHandle;
} SLastrowScanInfo;
typedef enum EStreamScanMode { typedef enum EStreamScanMode {
STREAM_SCAN_FROM_READERHANDLE = 1, STREAM_SCAN_FROM_READERHANDLE = 1,
STREAM_SCAN_FROM_RES, STREAM_SCAN_FROM_RES,
......
...@@ -2343,3 +2343,68 @@ _error: ...@@ -2343,3 +2343,68 @@ _error:
taosMemoryFree(pOperator); taosMemoryFree(pOperator);
return NULL; return NULL;
} }
static SSDataBlock* doScanLastrow(SOperatorInfo* pOperator) {
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
}
SLastrowScanInfo* pInfo = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
int32_t size = taosArrayGetSize(pInfo->pTableList->pTableList);
if (size == 0) {
setTaskStatus(pTaskInfo, TASK_COMPLETED);
return NULL;
}
// check if it is a group by tbname
if (size == taosHashGetSize(pInfo->pTableList->map)) {
// fetch last row for each table
} else {
//todo fetch the result for each group
}
return pInfo->pRes->info.rows == 0? NULL:pInfo->pRes;
}
static void destroyLastrowScanOperator(void* param, int32_t numOfOutput) {
SLastrowScanInfo* pInfo = (SLastrowScanInfo*) param;
blockDataDestroy(pInfo->pRes);
}
SOperatorInfo* createLastrowScanOperator(STableScanPhysiNode* pTableScanNode, SReadHandle* readHandle,
STableListInfo* pTableList, SExecTaskInfo* pTaskInfo) {
SLastrowScanInfo* pInfo = taosMemoryCalloc(1, sizeof(SLastrowScanInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) {
goto _error;
}
pInfo->pTableList = pTableList;
pInfo->readHandle = *readHandle;
// pInfo->pRes = createResDataBlock();
pOperator->name = "LastrowScanOperator";
// pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN;
pOperator->blocking = false;
pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo;
pOperator->pTaskInfo = pTaskInfo;
initResultSizeInfo(pOperator, 1024);
pOperator->fpSet =
createOperatorFpSet(operatorDummyOpenFn, doScanLastrow, NULL, NULL, destroyLastrowScanOperator,
NULL, NULL, NULL);
pOperator->cost.openCost = 0;
return pOperator;
_error:
pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY;
taosMemoryFree(pInfo);
taosMemoryFree(pOperator);
return NULL;
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册